Fabric笔记

#1 简单描述Fabric数据上链过程

##1.1、模拟

client发起交易请求,请求被发送至endorsers,模拟进行这些交易,会产生一个read set和一个write set记录这个交易的影响,模拟完成后,endorser对read set和write set进行签名并将其一起返回给client

如果client收到的read set和write set是一致的(可能存在恶意endorser或者智能合约存在不确定的算法导致出现不一致),那么client就会生成一个真正的交易请求,包含read set、write set和对应的签名,并将这个请求发送给ordering service。

##1.2、排序

ordering service对来自client的交易进行排序,需要注意的是这里并不检查交易的内容,默认按照交易到达的顺序进行排序,ordering service将交易排序后打包成block,发送给网络中的peers,这里不保证所有的peer同时收到这个block,但保证收到的block的顺序是一致的(使用gossip协议)。

##1.3、验证
当peer收到block后,就开始验证阶段。

验证阶段主要包括两个检查:

####Endorsement Policy检查

检查交易是否满足endorsement policy以及是否包含有效的签名,否则说明交易可能被client或者恶意peer篡改过,直接丢弃。

####交易冲突检查

检查交易之间是否存在冲突,也就是是否读脏数据的问题(某个交易在读取ledger之前,ledger被前一个交易改变了),如果存在就丢弃该交易。
两次检查都通过的话就可以进入commit阶段了。

##1.4、上链
peer将block添加到链上,注意这里是所有的交易(有效的和无效的)都加进来了。然后根据有效的交易改变当前的ledger状态

1
https://hyperledger-fabric-zh-cn.readthedocs.io/zh/latest/docs/Ledger.html#transaction-flow

#2 Fabric如何动态增加组织、节点

调用k8s创建同一个namespace下containter–pod

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func (service *channelService) updateChannelConfig(request module.PutChannelOrganizationRequest) error {
namespace := request.OrderNamespace
clientset, err := kubernetes.NewKubernetesClient()
if err != nil {
return err
}
dep, err := clientset.AppsV1beta1().Deployments(namespace).Get(CliDepName, v1.GetOptions{})
if err != nil {
return err
}

podLabels := dep.Spec.Selector.MatchLabels
podSelector := pkglabels.FormatLabels(podLabels)
podOpts := v1.ListOptions{
LabelSelector: podSelector,
}
podList, err := clientset.CoreV1().Pods(namespace).List(podOpts)
if err != nil {
return err
}

if len(podList.Items) == 0 {
return errors.New("can't find cli pod")
}

podName := podList.Items[0].Name
containerName := podList.Items[0].Spec.Containers[0].Name
peerOrgs := strings.Join(request.PeerOrgs, ",")
command := fmt.Sprintf("/opt/gopath/src/github.com/hyperledger/fabric/peer/scripts/addorg.sh %s %s %s %s", request.ChannelName, request.OrderNamespace, request.OrgName, peerOrgs)
glog.Infof("ContainerExec namespace :%s", namespace)
glog.Infof("ContainerExec podName :%s", podName)
glog.Infof("ContainerExec containerName :%s", containerName)
glog.Infof("ContainerExec command :%s", command)
stdout, stderr, err := kubernetes.ContainerExec(namespace, podName, containerName, command)
if err != nil {
glog.Errorf("Invoke ContainerExec occur a error. It is %s", err)
return err
}
glog.Info("stdout ==>", stdout)
glog.Info("stderr ==>", stderr)
return nil
}

将节点加入通道

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

fabric网络中创建通道
mspId, _ := db.GetMspIdByPeerId(peer)
fabricClient, err := context.GetServer().GetClient(joinChl.NetId, "Admin", mspId)
if err != nil {
log.Logger.Errorf("GetClient error: %v", err)
c.JSON(http.StatusInternalServerError, gin.H{
"err": err.Error(),
})
}
// Resmgmt client
clientProvider := fabricClient.GetClientProvider()
resmgmtClient, err := resmgmt.New(clientProvider)
ctx, err := clientProvider()
log.Logger.Debugf("join channel resmgmtClient.ctx = %+v, %v", ctx, err)
err = resmgmtClient.JoinChannel(channelName, resmgmt.WithTargetURLs(peerURL), resmgmt.WithOrdererURL(ordererURL))
if err != nil {
log.Logger.Errorf("JoinChannel error :%v ", err)
c.JSON(http.StatusInternalServerError, gin.H{
"err": err.Error(),
})
return
}
log.Logger.Info("JoinChannel SUCCESS!")
1
2
3
4
5
6
7
8
9
10
11
//RefreshClient
for _, peerOrg := range PeerOrgs {
err = context.GetServer().RefreshClient(joinChl.NetId, "Admin", fmt.Sprintf("%s%s", strings.ToUpper(peerOrg.Name[0:1]), peerOrg.Name[1:]))
if err != nil {
log.Logger.Errorln("RefreshClient error:", err)
c.JSON(http.StatusInternalServerError, gin.H{
"err": err.Error(),
})
return
}
}

#3 如果一个peer节点服务器损坏,数据如何恢复

// 如果CA等数据还在,再次加入节点,会自动同步区块

#4 描述Fabric的创世区块的作用,世界状态的作用

创世区块:创世区块文件,需要手动用官方提供的工具configtxgen生成,创建语句为configtxgen -profile TwoOrgsOrdererGenesis -channelID byfn-sys-channel -outputBlock ./channel-artifacts/genesis.block,创世区块的主要作用是定义了系统通道的共识算法,准入的组织名称,策略等信息

世界状态:世界状态代表了所有账本状态当前的值。这个世界状态非常有用,因为程序通常需要某个账本的当前状态值,并且总是很容易就能获取到。你不需要遍历整个区块链去计算账本当前的状态的值(余额),你可以直接从世界状态获取

#5 Fabric中一个区块的数据结构

区块头 包含区块高度,上一个区块的哈希值,本区块的哈希值

交易数据集合,封装了打包的交易集合

区块元数据,封装了如下4个元数据索引项

1
2
3
4
5
6
7
BlockMetadataIndex_SIGNATURES:区块签名;

BlockMetadataIndex_LAST_CONFIG:最新配置区块的区块号;

BlockMetadataIndex_TRANSACTIONS_FILTER:最新交易过滤器,封装了交易数据集合Data中所有交易对应的交易验证码,标识其交易的有效性。

BlockMetadataIndex_ORDERER:Orderer配置信息,如Kafka共识组件的初始化参数。

#6 描述CA 公私钥及常见的加密算法

##Fabric-CA

1
2
3
4
5

https://www.cnblogs.com/kaixinyufeng/p/9803413.html


https://blog.csdn.net/greedystar/article/details/80344984

##公私钥及常见的加密算法

1
2
3
4

https://www.cnblogs.com/charlesblc/p/6130433.html

https://blog.csdn.net/u011531425/article/details/89161392

#7 常用的共识算法

1
https://blog.csdn.net/s_lisheng/article/details/78022645

#8 Merkie tree、GHOST协议、分叉

##Merkie tree

1
https://www.cnblogs.com/fengzhiwu/p/5524324.html

##GHOST协议

1
https://blog.csdn.net/t46414704152abc/article/details/81191804

##分叉

1
2


#9 描述下http协议,session,cookie在其中的作用

1
2
3
https://blog.csdn.net/astro_boy/article/details/84054090

http://m.elecfans.com/article/596778.html

#10 联盟链,Fabric比较其他联盟链的优势

1
http://www.elecfans.com/blockchain/800061.html

#11 链码的事件通知

Elasticsearch 经典资料汇总

中文资料

https://es.xiaoleilu.com

https://github.com/elasticsearch-cn/elasticsearch-definitive-guide

https://www.elastic.co/guide/cn/elasticsearch/guide/current/index.html

Elasticsearch Client Golang SDK

https://github.com/olivere/elastic/tree/v6.2.11

深度分页问题

https://www.elastic.co/guide/en/elasticsearch/reference/6.3/search-request-search-after.html

https://blog.csdn.net/u011228889/article/details/79760167

https://blog.csdn.net/WangPing1223/article/details/79148244

Kubernetes上非常好用的Elasticsearch

https://github.com/pires/kubernetes-elasticsearch-cluster/tree/6.3.0

Elasticsearch安装

#Elasticsearch安装

安装Elasticsearch唯一的要求是安装官方新版的Java,地址:www.java.com

Install Elasticsearch

##1 Docker部署
Install Elasticsearch with Docker

1
2
3
docker pull docker.elastic.co/elasticsearch/elasticsearch:6.4.3

docker run -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch:6.4.3

##2 Download and install the .tar.gz package

1
2
3
4
5
6
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.4.3.tar.gz
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.4.3.tar.gz.sha512
shasum -a 512 -c elasticsearch-6.4.3.tar.gz.sha512
tar -xzf elasticsearch-6.4.3.tar.gz
cd elasticsearch-6.4.3/
./bin/elasticsearch

异常

1
2
[root@JD elasticsearch]# shasum -a 512 -c elasticsearch-6.4.3.tar.gz.sha512 
-bash: shasum: command not found

centos解决:yum install perl-Digest-SHA

异常

1
2
3
4
5
6
7
[root@JD elasticsearch-6.4.3]# ./bin/elasticsearch
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000ca660000, 899284992, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 899284992 bytes for committing reserved memory.
# An error report file with more information is saved as:
# logs/hs_err_pid16061.log

ElasticJob

ElasticJob

来自官方的定义

ElasticJob

1
2
3
Elastic-Job is a distributed scheduling solution consisting of two separate projects, Lite and Cloud.

Elastic-Job-Lite is a lightweight, decentralized solution that provides distributed task sharding services. Elastic-Job-Cloud is a Mesos Framework which provides additional resource management, App distribution, process isolation and task aggregation features.

1、 基本概念

1.1. 分片概念

任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的服务器分别执行某一个或几个分片项。

例如:有一个遍历数据库某张表的作业,现有2台服务器。为了快速的执行作业,那么每台服务器应执行作业的50%。 为满足此需求,可将作业分成2片,每台服务器执行1片。作业遍历数据的逻辑应为:服务器A遍历ID以奇数结尾的数据;服务器B遍历ID以偶数结尾的数据。 如果分成10片,则作业遍历数据的逻辑应为:每片分到的分片项应为ID%10,而服务器A被分配到分片项0,1,2,3,4;服务器B被分配到分片项5,6,7,8,9,直接的结果就是服务器A遍历ID以0-4结尾的数据;服务器B遍历ID以5-9结尾的数据。

1.2. 分片项与业务处理解耦

Elastic-Job并不直接提供数据处理的功能,框架只会将分片项分配至各个运行中的作业服务器,开发者需要自行处理分片项与真实数据的对应关系。

1.3. 个性化参数的适用场景

个性化参数即shardingItemParameter,可以和分片项匹配对应关系,用于将分片项的数字转换为更加可读的业务代码。

例如:按照地区水平拆分数据库,数据库A是北京的数据;数据库B是上海的数据;数据库C是广州的数据。 如果仅按照分片项配置,开发者需要了解0表示北京;1表示上海;2表示广州。 合理使用个性化参数可以让代码更可读,如果配置为0=北京,1=上海,2=广州,那么代码中直接使用北京,上海,广州的枚举值即可完成分片项和业务逻辑的对应关系。

2、 核心理念

2.1. 分布式调度

Elastic-Job-Lite并无作业调度中心节点,而是基于部署作业框架的程序在到达相应时间点时各自触发调度。

注册中心仅用于作业注册和监控信息存储。而主作业节点仅用于处理分片和清理等功能。

2.2. 作业高可用

Elastic-Job-Lite提供最安全的方式执行作业。将分片总数设置为1,并使用多于1台的服务器执行作业,作业将会以1主n从的方式执行。

一旦执行作业的服务器崩溃,等待执行的服务器将会在下次作业启动时替补执行。开启失效转移功能效果更好,可以保证在本次作业执行时崩溃,备机立即启动替补执行。

2.3. 最大限度利用资源

Elastic-Job-Lite也提供最灵活的方式,最大限度的提高执行作业的吞吐量。将分片项设置为大于服务器的数量,最好是大于服务器倍数的数量,作业将会合理的利用分布式资源,动态的分配分片项。

例如:3台服务器,分成10片,则分片项分配结果为服务器A=0,1,2;服务器B=3,4,5;服务器C=6,7,8,9。 如果服务器C崩溃,则分片项分配结果为服务器A=0,1,2,3,4;服务器B=5,6,7,8,9。在不丢失分片项的情况下,最大限度的利用现有资源提高吞吐量。

3、 功能列表

  • 分布式调度协调
  • 弹性扩容缩容
  • 失效转移
  • 错过执行作业重触发
  • 作业分片一致性,保证同一分片在分布式环境中仅一个执行实例
  • 自诊断并修复分布式不稳定造成的问题
  • 支持并行调度
  • 支持作业生命周期操作
  • 丰富的作业类型
  • Spring整合以及命名空间提供
  • 运维平台

4、 快速入门

以springBoot为例

4.1、 引入maven依赖

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>${elastic-job.version}</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>${elastic-job.version}</version>
</dependency>

4.2、 作业开发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import com.dangdang.ddframe.job.api.ShardingContext;
import DynamicJobConfig;
import SimpleDeliveryJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;

/**
* @Author: zy
* @Date: 2019/7/20/0020 10:55
*/
@Slf4j
@Component
public class InPolygonJob implements SimpleDeliveryJob {

@Value("${elastic-job.task.InPolygon.cron}")
private String cron;

@Value("${elastic-job.task.InPolygon.shardingTotalCount}")
private int shardingTotalCount;

@Value("${elastic-job.task.InPolygon.shardingItemParameters}")
private String shardingItemParameters;

@Value("${elastic-job.task.InPolygon.description}")
private String description;

private ApplicationContext applicationContext;
@Autowired
private ISyncInPolygonService syncInPolygonService;


@Override
public void execute(ShardingContext shardingContext) {
// do something
}

@Override
public void afterPropertiesSet() throws Exception {
DynamicJobConfig dynamicJobConfig = (DynamicJobConfig) applicationContext.getBean("dynamicJobConfig");
dynamicJobConfig.dynamicAddSimpleJobScheduler(this, cron, shardingTotalCount, shardingItemParameters, description);
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
1
2
3
4
5
6
7
8
9
10
11
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContextAware;

/**
* @Description :
* @author: zy
* @Date : 2019-06-14 13:59
*/
public interface SimpleDeliveryJob extends SimpleJob, InitializingBean, ApplicationContextAware {
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.event.rdb.JobEventRdbConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import javax.sql.DataSource;

/**
* @Description : 动态调度任务
* @author: zy
* @Date : 2019-06-12 18:11
*/
@Slf4j
@Component
public class DynamicJobConfig {

@Autowired
private ZookeeperRegistryCenter regCenter;

@Autowired
@Qualifier("taskDataSource")
private DataSource taskDataSource;

@Async("taskExecutor")
public void dynamicAddSimpleJobScheduler(SimpleJob simpleJob, String cron,
int shardingTotalCount, String shardingItemParameters, String description) {
new SpringJobScheduler(simpleJob, regCenter,
LiteJobConfigUtil.getLiteJobConfiguration(simpleJob.getClass(), cron, shardingTotalCount,shardingItemParameters, description),
new JobEventRdbConfiguration(taskDataSource)).init();
}

}

5、 作业配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
elastic-job:
datasource:
driver-class-name: com.mysql.jdbc.Driver
username: root
password: root
url: jdbc:mysql://192.168.0.1:3306/elastic_job?autoReconnect=true&allowMultiQueries=true&useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false
type: com.alibaba.druid.pool.DruidDataSource
initialSize: 5
minIdle: 5
maxActive: 20
maxWait: 60000
timeBetweenEvictionRunsMillis: 60000
minEvictableIdleTimeMillis: 300000
validationQuery: SELECT 1 FROM DUAL
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
filters: stat,wall,log4j
logSlowSql: true

registry-center:
address: 192.168.0.2:2181
namespace: elastic-job

task:
test:
cron: 0/5 * * * * ?
#作业分片总数
shardingTotalCount: 3
shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou
description: 描述

其他方式见开发指南