SOFAJRaft源码阅读(伍)-初识RheaKV

SOFAJRaft的SOFAJRaft-RheaKV 是基于 SOFAJRaft 和 RocksDB 实现的嵌入式、分布式、高可用、强一致的 KV 存储类库。SOFAJRaft-RheaKV 集群主要包括三个核心组件:PD,Store 和 Region。
@Author:Akai-yuan
@更新时间:2023/2/3

1.架构设计

SOFAJRaft-RheaKV 存储类库主要包括PD,Store 和 Region 三个核心组件,支持轻量级的状态/元信息存储以及集群同步,分布式锁服务使用场景

(1)调度

PD 是全局的中心总控节点,负责整个集群的调度管理、Region ID 生成、维护 RegionRouteTable 路由表。一个 PDServer 管理多个集群,集群之间基于 clusterId 隔离;PD Server 需要单独部署,很多场景其实并不需要自管理,RheaKV 也支持不启用 PD,不需要自管理的集群可不启用 PD,设置 PlacementDriverOptions 的 fake选项为 true 即可。PD 一般通过 Region 的心跳返回信息进行对 Region 调度,Region 处理完后,PD 则会在下一个心跳返回中收到 Region 的变更信息来更新路由及状态表。

(2)存储

Store 是集群中的一个物理存储节点,一个 Store 包含一个或多个 Region。通常一个 Node 负责一个 Store,Store 可以被看作是 Region 的容器,里面存储着多个分片数据。Store 会向 PD 主动上报 StoreHeartbeatRequest 心跳,心跳交由 PD 的 handleStoreHeartbeat 处理,里面包含该 Store 的基本信息,比如,包含多少 Region,有哪些 Region 的 Leader 在该 Store 等。

(3)数据

Region 是最小的 KV 数据单元,可理解为一个数据分区或者分片,每个 Region 都有一个左闭右开的区间 [startKey, endKey),能够根据请求流量/负载/数据量大小等指标自动分裂以及自动副本搬迁。Region 有多个副本 Replication 构建 Raft Groups 存储在不同的 Store 节点,通过 Raft 协议日志复制功能数据同步到同 Group 的全部节点。Region对应的是 Store 里某个实际的数据区间。每个 Region 会有多个副本,每个副本存储在不同的 Store,一起组成一个Raft Group。Region 中的 Leader 会向 PD 主动上报 RegionHeartbeatRequest 心跳,交由 PD 的 handleRegionHeartbeat 处理,而 PD 是通过 Region的Epoch 感知 Region 是否有变化。
为了让大家更清楚PD,Store 和 Region 三个核心组件的功能,这里放一张官方图片以便于理解:
SOFAJRaft源码阅读(伍)-初识RheaKV

2.初始化

我们从JRaft-Example模块的RheaKV部分开始,首先看到com.alipay.sofa.jraft.example.rheakv.Server1的main方法。

  1. 声明了PlacementDriverOptions、StoreEngineOptions两个配置选项实体
  2. 定义了RheaKVStoreOptions,并将PDOptions和SEOptions装配到属性中,并初始化声明集群名、是否开启并行压缩、服务的IP:端口列表("127.0.0.1:8181,127.0.0.1:8182,127.0.0.1:8183")。
  3. 声明一个Node节点
  4. 添加一个钩子函数,实现优雅停机。(作者曾经分析过钩子函数的作用,具体参照:SOFAJRaft源码阅读(叁)-ShutdownHook如何优雅的停机
public static void main(final String[] args) {     final PlacementDriverOptions pdOpts = PlacementDriverOptionsConfigured.newConfigured()         .withFake(true) // use a fake pd         .config();     final StoreEngineOptions storeOpts = StoreEngineOptionsConfigured.newConfigured() //         //StoreEngine 存储引擎支持 MemoryDB 和 RocksDB 两种实现         .withStorageType(StorageType.RocksDB)         .withRocksDBOptions(RocksDBOptionsConfigured.newConfigured().withDbPath(Configs.DB_PATH).config())         .withRaftDataPath(Configs.RAFT_DATA_PATH)         .withServerAddress(new Endpoint("127.0.0.1", 8181))         .config();     final RheaKVStoreOptions opts = RheaKVStoreOptionsConfigured.newConfigured() //         .withClusterName(Configs.CLUSTER_NAME) //         .withUseParallelCompress(true) //         .withInitialServerList(Configs.ALL_NODE_ADDRESSES)         .withStoreEngineOptions(storeOpts) //         .withPlacementDriverOptions(pdOpts) //         .config();     System.out.println(opts);     final Node node = new Node(opts);     node.start();     Runtime.getRuntime().addShutdownHook(new Thread(node::stop));     System.out.println("server1 start OK"); } 

关于Node节点的实现:
里面维护了一个RheaKVStoreOptions、RheaKVStore。

public class Node {     private final RheaKVStoreOptions options;     private RheaKVStore              rheaKVStore;     public Node(RheaKVStoreOptions options) {         this.options = options;     }     public void start() {         this.rheaKVStore = new DefaultRheaKVStore();         this.rheaKVStore.init(this.options);     }     public void stop() {         this.rheaKVStore.shutdown();     }     public RheaKVStore getRheaKVStore() {         return rheaKVStore;     } } 

(1)DefaultRheaKVStore的初始化

可以看到调用了DefaultRheaKVStore,还调用了他的init方法进行初始化。

public synchronized boolean init(final RheaKVStoreOptions opts) {     	//判断是否已经启动        	if (this.started) {             LOG.info("[DefaultRheaKVStore] already started.");             return true;         }         DescriberManager.getInstance().addDescriber(RouteTable.getInstance());         this.opts = opts;         //根据PlacementDriverOptions初始化PD         final PlacementDriverOptions pdOpts = opts.getPlacementDriverOptions();         final String clusterName = opts.getClusterName();         Requires.requireNonNull(pdOpts, "opts.placementDriverOptions");         Requires.requireNonNull(clusterName, "opts.clusterName");         if (Strings.isBlank(pdOpts.getInitialServerList())) {             // 如果为空,则继承父级的值             pdOpts.setInitialServerList(opts.getInitialServerList());         }         //这里不启用 PD,就实例化一个FakePlacementDriverClient         if (pdOpts.isFake()) {             this.pdClient = new FakePlacementDriverClient(opts.getClusterId(), clusterName);         //启用 PD,就实例化一个RemotePlacementDriverClient         } else {             this.pdClient = new RemotePlacementDriverClient(opts.getClusterId(), clusterName);         }     	//初始化FakePlacementDriverClient/RemotePlacementDriverClient         if (!this.pdClient.init(pdOpts)) {             LOG.error("Fail to init [PlacementDriverClient].");             return false;         }         // 初始化压缩策略         ZipStrategyManager.init(opts);         // 初始化存储引擎         final StoreEngineOptions stOpts = opts.getStoreEngineOptions();         if (stOpts != null) {             stOpts.setInitialServerList(opts.getInitialServerList());             this.storeEngine = new StoreEngine(this.pdClient, this.stateListenerContainer);             if (!this.storeEngine.init(stOpts)) {                 LOG.error("Fail to init [StoreEngine].");                 return false;             }         }    		//获取当前节点的ip和端口号         final Endpoint selfEndpoint = this.storeEngine == null ? null : this.storeEngine.getSelfEndpoint();         final RpcOptions rpcOpts = opts.getRpcOptions();         Requires.requireNonNull(rpcOpts, "opts.rpcOptions");         //初始化一个RpcService,并重写getLeader方法     	this.rheaKVRpcService = new DefaultRheaKVRpcService(this.pdClient, selfEndpoint) {             @Override             public Endpoint getLeader(final long regionId, final boolean forceRefresh, final long timeoutMillis) {                 final Endpoint leader = getLeaderByRegionEngine(regionId);                 if (leader != null) {                     return leader;                 }                 return super.getLeader(regionId, forceRefresh, timeoutMillis);             }         };         if (!this.rheaKVRpcService.init(rpcOpts)) {             LOG.error("Fail to init [RheaKVRpcService].");             return false;         }         //获取重试次数,默认重试两次         this.failoverRetries = opts.getFailoverRetries();         //默认5000         this.futureTimeoutMillis = opts.getFutureTimeoutMillis();         //是否只从leader读取数据,默认为true         this.onlyLeaderRead = opts.isOnlyLeaderRead();         //初始化kvDispatcher, 这里默认为true         if (opts.isUseParallelKVExecutor()) {             final int numWorkers = Utils.cpus();             //乘以16             final int bufSize = numWorkers << 4;             final String name = "parallel-kv-executor";             final ThreadFactory threadFactory = Constants.THREAD_AFFINITY_ENABLED                     ? new AffinityNamedThreadFactory(name, true) : new NamedThreadFactory(name, true);             //初始化Dispatcher             this.kvDispatcher = new TaskDispatcher(bufSize, numWorkers, WaitStrategyType.LITE_BLOCKING_WAIT, threadFactory);         }         this.batchingOpts = opts.getBatchingOptions();         //默认是true         if (this.batchingOpts.isAllowBatching()) {             this.getBatching = new GetBatching(KeyEvent::new, "get_batching",                     new GetBatchingHandler("get", false));             this.getBatchingOnlySafe = new GetBatching(KeyEvent::new, "get_batching_only_safe",                     new GetBatchingHandler("get_only_safe", true));             this.putBatching = new PutBatching(KVEvent::new, "put_batching",                     new PutBatchingHandler("put"));         }         LOG.info("[DefaultRheaKVStore] start successfully, options: {}.", opts);         return this.started = true;     } 

(2)StoreEngine初始化

其中有些代码操作与DefaultRheaKVStore的init方法中的某些代码一致,就不再重复赘述,其余的用注释的方式标注在以下代码块中。

public synchronized boolean init(final StoreEngineOptions opts) {         if (this.started) {             LOG.info("[StoreEngine] already started.");             return true;         }         DescriberManager.getInstance().addDescriber(this);         this.storeOpts = Requires.requireNonNull(opts, "opts");         Endpoint serverAddress = Requires.requireNonNull(opts.getServerAddress(), "opts.serverAddress");         //获取ip和端口     	final int port = serverAddress.getPort();         final String ip = serverAddress.getIp();         //如果传入的IP为空,那么就设置启动机器ip作为serverAddress的ip         if (ip == null || Utils.IP_ANY.equals(ip)) {             serverAddress = new Endpoint(NetUtil.getLocalCanonicalHostName(), port);             opts.setServerAddress(serverAddress);         }         //获取度量上报时间         final long metricsReportPeriod = opts.getMetricsReportPeriod();         // 初始化RegionEngineOptions         List<RegionEngineOptions> rOptsList = opts.getRegionEngineOptionsList();         //如果RegionEngineOptions为空,则初始化一个         if (rOptsList == null || rOptsList.isEmpty()) {             // -1 region             final RegionEngineOptions rOpts = new RegionEngineOptions();             rOpts.setRegionId(Constants.DEFAULT_REGION_ID);             rOptsList = Lists.newArrayList();             rOptsList.add(rOpts);             opts.setRegionEngineOptionsList(rOptsList);         }         //获取集群名         final String clusterName = this.pdClient.getClusterName();         //遍历rOptsList集合,为其中的RegionEngineOptions对象设置参数         for (final RegionEngineOptions rOpts : rOptsList) {             //用集群名+“-”+RegionId 拼接设置为RaftGroupId             rOpts.setRaftGroupId(JRaftHelper.getJRaftGroupId(clusterName, rOpts.getRegionId()));             rOpts.setServerAddress(serverAddress);             if (Strings.isBlank(rOpts.getInitialServerList())) {                 // if blank, extends parent's value                 rOpts.setInitialServerList(opts.getInitialServerList());             }             if (rOpts.getNodeOptions() == null) {                 // copy common node options                 rOpts.setNodeOptions(opts.getCommonNodeOptions() == null ? new NodeOptions() : opts                     .getCommonNodeOptions().copy());             }             //如果原本没有设置度量上报时间,那么就重置一下             if (rOpts.getMetricsReportPeriod() <= 0 && metricsReportPeriod > 0) {                 // extends store opts                 rOpts.setMetricsReportPeriod(metricsReportPeriod);             }         }         // 初始化Store和Store里面的region         final Store store = this.pdClient.getStoreMetadata(opts);         if (store == null || store.getRegions() == null || store.getRegions().isEmpty()) {             LOG.error("Empty store metadata: {}.", store);             return false;         }         this.storeId = store.getId();         this.partRocksDBOptions = SystemPropertyUtil.getBoolean(PART_ROCKSDB_OPTIONS_KEY, false);         // 初始化执行器         if (this.readIndexExecutor == null) {             this.readIndexExecutor = StoreEngineHelper.createReadIndexExecutor(opts.getReadIndexCoreThreads());         }         if (this.raftStateTrigger == null) {             this.raftStateTrigger = StoreEngineHelper.createRaftStateTrigger(opts.getLeaderStateTriggerCoreThreads());         }         if (this.snapshotExecutor == null) {             this.snapshotExecutor = StoreEngineHelper.createSnapshotExecutor(opts.getSnapshotCoreThreads(),                 opts.getSnapshotMaxThreads());         }         // init rpc executors         final boolean useSharedRpcExecutor = opts.isUseSharedRpcExecutor();         // 初始化rpc远程执行器,用来执行RPCServer的Processors         if (!useSharedRpcExecutor) {             if (this.cliRpcExecutor == null) {                 this.cliRpcExecutor = StoreEngineHelper.createCliRpcExecutor(opts.getCliRpcCoreThreads());             }             if (this.raftRpcExecutor == null) {                 this.raftRpcExecutor = StoreEngineHelper.createRaftRpcExecutor(opts.getRaftRpcCoreThreads());             }             if (this.kvRpcExecutor == null) {                 this.kvRpcExecutor = StoreEngineHelper.createKvRpcExecutor(opts.getKvRpcCoreThreads());             }         }         // 初始化指标度量         startMetricReporters(metricsReportPeriod);         // 初始化rpcServer,供其他服务调用         this.rpcServer = RaftRpcServerFactory.createRaftRpcServer(serverAddress, this.raftRpcExecutor,             this.cliRpcExecutor);     	//为server加入各种processor         StoreEngineHelper.addKvStoreRequestProcessor(this.rpcServer, this);         if (!this.rpcServer.init(null)) {             LOG.error("Fail to init [RpcServer].");             return false;         }         // init db store     	// 根据不同的类型选择db         if (!initRawKVStore(opts)) {             return false;         }         if (this.rawKVStore instanceof Describer) {             DescriberManager.getInstance().addDescriber((Describer) this.rawKVStore);         }         // init all region engine     	// 为每个region初始化RegionEngine         if (!initAllRegionEngine(opts, store)) {             LOG.error("Fail to init all [RegionEngine].");             return false;         }         // heartbeat sender         // 如果开启了自管理的集群,那么需要初始化心跳发送器         if (this.pdClient instanceof RemotePlacementDriverClient) {             HeartbeatOptions heartbeatOpts = opts.getHeartbeatOptions();             if (heartbeatOpts == null) {                 heartbeatOpts = new HeartbeatOptions();             }             this.heartbeatSender = new HeartbeatSender(this);             if (!this.heartbeatSender.init(heartbeatOpts)) {                 LOG.error("Fail to init [HeartbeatSender].");                 return false;             }         }         this.startTime = System.currentTimeMillis();         LOG.info("[StoreEngine] start successfully: {}.", this);         return this.started = true;     } 

(3)RegionEngineOptions初始化

在上述StoreEngine的初始化中,可以看到有对regionEngine进行初始化,接下来我们单独再拿出该部分代码进行分析。

  1. 首先对opts.getRegionEngineOptionsList()判空,若是空则初始化一个RegionEngineOptions()
  2. 遍历新生成的每一个RegionEngineOptions,并设置参数(RaftGroupId、ServerAddress、InitialServerList、NodeOptions、MetricsReportPeriod)
        // init region options         List<RegionEngineOptions> rOptsList = opts.getRegionEngineOptionsList();         if (rOptsList == null || rOptsList.isEmpty()) {             // -1 region             final RegionEngineOptions rOpts = new RegionEngineOptions();             rOpts.setRegionId(Constants.DEFAULT_REGION_ID);             rOptsList = Lists.newArrayList();             rOptsList.add(rOpts);             opts.setRegionEngineOptionsList(rOptsList);         }         final String clusterName = this.pdClient.getClusterName();         for (final RegionEngineOptions rOpts : rOptsList) {             rOpts.setRaftGroupId(JRaftHelper.getJRaftGroupId(clusterName, rOpts.getRegionId()));             rOpts.setServerAddress(serverAddress);             if (Strings.isBlank(rOpts.getInitialServerList())) {                 // if blank, extends parent's value                 rOpts.setInitialServerList(opts.getInitialServerList());             }             if (rOpts.getNodeOptions() == null) {                 // copy common node options                 rOpts.setNodeOptions(opts.getCommonNodeOptions() == null ? new NodeOptions() : opts                     .getCommonNodeOptions().copy());             }             if (rOpts.getMetricsReportPeriod() <= 0 && metricsReportPeriod > 0) {                 // extends store opts                 rOpts.setMetricsReportPeriod(metricsReportPeriod);             }         } 

(4)Store初始化

调用pdClient的getStoreMetadata方法进行初始化:

final Store store = this.pdClient.getStoreMetadata(opts); 

当调用FakePlacementDriverClient#getStoreMetadata时:

  1. 获取之前初始化得到的RegionEngineOptions链表
  2. 构造一个与RegionEngineOptions链表相同大小的Region链表,因为一个Store里面会有多个region,之前在那张图中能直观看到。
  3. RegionEngineOptions链表中的每个元素执行getLocalRegionMetadata方法,并将结果添加到region链表中。
    public Store getStoreMetadata(final StoreEngineOptions opts) {         final Store store = new Store();         final List<RegionEngineOptions> rOptsList = opts.getRegionEngineOptionsList();         final List<Region> regionList = Lists.newArrayListWithCapacity(rOptsList.size());         store.setId(-1);         store.setEndpoint(opts.getServerAddress());         for (final RegionEngineOptions rOpts : rOptsList) {             regionList.add(getLocalRegionMetadata(rOpts));         }         store.setRegions(regionList);         return store;     } 

我们来看AbstractPlacementDriverClient#getLocalRegionMetadata方法:

  1. 保证regionId在合理的范围内
  2. 设置key的范围(左闭右开
  3. 根据initialServerList转换成peer对象
  4. 将Region添加到regionRouteTable路由表
    protected Region getLocalRegionMetadata(final RegionEngineOptions opts) {         final long regionId = Requires.requireNonNull(opts.getRegionId(), "opts.regionId");         Requires.requireTrue(regionId >= Region.MIN_ID_WITH_MANUAL_CONF, "opts.regionId must >= "                                                                          + Region.MIN_ID_WITH_MANUAL_CONF);         Requires.requireTrue(regionId < Region.MAX_ID_WITH_MANUAL_CONF, "opts.regionId must < "                                                                         + Region.MAX_ID_WITH_MANUAL_CONF);         final byte[] startKey = opts.getStartKeyBytes();         final byte[] endKey = opts.getEndKeyBytes();         final String initialServerList = opts.getInitialServerList();         final Region region = new Region();         final Configuration conf = new Configuration();         // region         region.setId(regionId);         region.setStartKey(startKey);         region.setEndKey(endKey);         region.setRegionEpoch(new RegionEpoch(-1, -1));         // peers         Requires.requireTrue(Strings.isNotBlank(initialServerList), "opts.initialServerList is blank");         conf.parse(initialServerList);         region.setPeers(JRaftHelper.toPeerList(conf.listPeers()));         this.regionRouteTable.addOrUpdateRegion(region);         return region;     } 

RegionEpoch涉及到两个版本号:
(1)confVer:Conf 变化的版本号, 当增加或者移除一个peer时,版本号自增
(2)version:Region 版本号, 分裂或合并时,版本号自增

    public RegionEpoch(long confVer, long version) {         this.confVer = confVer         this.version = version;     } 

关于RegionRouteTable#addOrUpdateRegion:

    public void addOrUpdateRegion(final Region region) {         Requires.requireNonNull(region, "region");         Requires.requireNonNull(region.getRegionEpoch(), "regionEpoch");         final long regionId = region.getId();         final byte[] startKey = BytesUtil.nullToEmpty(region.getStartKey());         final StampedLock stampedLock = this.stampedLock;         final long stamp = stampedLock.writeLock();         try {             this.regionTable.put(regionId, region.copy());             this.rangeTable.put(startKey, regionId);         } finally {             stampedLock.unlockWrite(stamp);         }     } 

我们看看RegionRouteTable的几个字段:

  • keyBytesComparator:是一个LexicographicByteArrayComparator字典序比较器
  • stampedLock:比读写锁性能更高的锁
  • rangeTable:是一个TreeMap,它实现了NavigableMap,并按照指定的keyBytesComparator排序,键值对为<regionId, region>
  • regionTable:是一个hashMap,键值对为<startKey, regionId>
    private static final Comparator<byte[]>  keyBytesComparator = BytesUtil.getDefaultByteArrayComparator();     private final StampedLock                stampedLock        = new StampedLock();     private final NavigableMap<byte[], Long> rangeTable         = new TreeMap<>(keyBytesComparator);     private final Map<Long, Region>          regionTable        = Maps.newHashMap(); 

(5)RegionEngine初始化

StoreEngine#initAllRegionEngine

    private boolean initAllRegionEngine(final StoreEngineOptions opts, final Store store) {         Requires.requireNonNull(opts, "opts");         Requires.requireNonNull(store, "store");         //获取主目录         String baseRaftDataPath = opts.getRaftDataPath();         if (Strings.isNotBlank(baseRaftDataPath)) {             try {                 FileUtils.forceMkdir(new File(baseRaftDataPath));             } catch (final Throwable t) {                 LOG.error("Fail to make dir for raftDataPath: {}.", baseRaftDataPath);                 return false;             }         } else {             baseRaftDataPath = "";         }         final Endpoint serverAddress = opts.getServerAddress();         //获取RegionEngineOptions和region         final List<RegionEngineOptions> rOptsList = opts.getRegionEngineOptionsList();         final List<Region> regionList = store.getRegions();         Requires.requireTrue(rOptsList.size() == regionList.size());         for (int i = 0; i < rOptsList.size(); i++) {             final RegionEngineOptions rOpts = rOptsList.get(i);             if (!inConfiguration(rOpts.getServerAddress().toString(), rOpts.getInitialServerList())) {                 continue;             }             final Region region = regionList.get(i);             //检验region路径是否为空,为空则重新设值             if (Strings.isBlank(rOpts.getRaftDataPath())) {                 final String childPath = "raft_data_region_" + region.getId() + "_" + serverAddress.getPort();                 rOpts.setRaftDataPath(Paths.get(baseRaftDataPath, childPath).toString());             }             Requires.requireNonNull(region.getRegionEpoch(), "regionEpoch");             //根据Region初始化RegionEngine             final RegionEngine engine = new RegionEngine(region, this);             if (engine.init(rOpts)) {                 // 每个 RegionKVService 对应一个 Region,只处理本身 Region 范畴内的请求                 final RegionKVService regionKVService = new DefaultRegionKVService(engine);                 registerRegionKVService(regionKVService);                 //放入到ConcurrentMap<Long, RegionEngine> regionKVServiceTable 中                 this.regionEngineTable.put(region.getId(), engine);             } else {                 LOG.error("Fail to init [RegionEngine: {}].", region);                 return false;             }         }         return true;     } 

**

    public synchronized boolean init(final RegionEngineOptions opts) {         if (this.started) {             LOG.info("[RegionEngine: {}] already started.", this.region);             return true;         }         this.regionOpts = Requires.requireNonNull(opts, "opts");         //实例化状态机         this.fsm = new KVStoreStateMachine(this.region, this.storeEngine);          // node options         NodeOptions nodeOpts = opts.getNodeOptions();         if (nodeOpts == null) {             nodeOpts = new NodeOptions();         }         //如果度量间隔时间大于零,那么开启度量         final long metricsReportPeriod = opts.getMetricsReportPeriod();         if (metricsReportPeriod > 0) {             // metricsReportPeriod > 0 means enable metrics             nodeOpts.setEnableMetrics(true);         }         final Configuration initialConf = new Configuration();         if (!initialConf.parse(opts.getInitialServerList())) {             LOG.error("Fail to parse initial configuration {}.", opts.getInitialServerList());             return false;         }         //初始化集群配置         nodeOpts.setInitialConf(initialConf);         nodeOpts.setFsm(this.fsm);         //初始化各种日志的路径         final String raftDataPath = opts.getRaftDataPath();         try {             FileUtils.forceMkdir(new File(raftDataPath));         } catch (final Throwable t) {             LOG.error("Fail to make dir for raftDataPath {}.", raftDataPath);             return false;         }         if (Strings.isBlank(nodeOpts.getLogUri())) {             final Path logUri = Paths.get(raftDataPath, "log");             nodeOpts.setLogUri(logUri.toString());         }         if (Strings.isBlank(nodeOpts.getRaftMetaUri())) {             final Path meteUri = Paths.get(raftDataPath, "meta");             nodeOpts.setRaftMetaUri(meteUri.toString());         }         if (Strings.isBlank(nodeOpts.getSnapshotUri())) {             final Path snapshotUri = Paths.get(raftDataPath, "snapshot");             nodeOpts.setSnapshotUri(snapshotUri.toString());         }         LOG.info("[RegionEngine: {}], log uri: {}, raft meta uri: {}, snapshot uri: {}.", this.region,             nodeOpts.getLogUri(), nodeOpts.getRaftMetaUri(), nodeOpts.getSnapshotUri());         final Endpoint serverAddress = opts.getServerAddress();         final PeerId serverId = new PeerId(serverAddress, 0);         final RpcServer rpcServer = this.storeEngine.getRpcServer();         this.raftGroupService = new RaftGroupService(opts.getRaftGroupId(), serverId, nodeOpts, rpcServer, true);         this.node = this.raftGroupService.start(false);         //初始化node节点         RouteTable.getInstance().updateConfiguration(this.raftGroupService.getGroupId(), nodeOpts.getInitialConf());         if (this.node != null) {             final RawKVStore rawKVStore = this.storeEngine.getRawKVStore();             final Executor readIndexExecutor = this.storeEngine.getReadIndexExecutor();             //RaftRawKVStore 是 RheaKV 基于 Raft 复制状态机 KVStoreStateMachine 的 RawKVStore 接口 KV 存储实现         	//RheaKV 的 Raft 入口,从这里开始 Raft 流程             this.raftRawKVStore = new RaftRawKVStore(this.node, rawKVStore, readIndexExecutor);             //拦截请求做指标度量             this.metricsRawKVStore = new MetricsRawKVStore(this.region.getId(), this.raftRawKVStore);             // metrics config             if (this.regionMetricsReporter == null && metricsReportPeriod > 0) {                 final MetricRegistry metricRegistry = this.node.getNodeMetrics().getMetricRegistry();                 if (metricRegistry != null) {                     final ScheduledExecutorService scheduler = this.storeEngine.getMetricsScheduler();                     // start raft node metrics reporter                     this.regionMetricsReporter = Slf4jReporter.forRegistry(metricRegistry) //                         .prefixedWith("region_" + this.region.getId()) //                         .withLoggingLevel(Slf4jReporter.LoggingLevel.INFO) //                         .outputTo(LOG) //                         .scheduleOn(scheduler) //                         .shutdownExecutorOnStop(scheduler != null) //                         .build();                     this.regionMetricsReporter.start(metricsReportPeriod, TimeUnit.SECONDS);                 }             }             this.started = true;             LOG.info("[RegionEngine] start successfully: {}.", this);         }         return this.started;     } 

发表评论

评论已关闭。

相关文章