hashicorp/raft模块实现的raft集群存在节点跨集群身份冲突问题

问题场景描述

我通过模块github.com/hashicorp/raft使用golang实现了一个raft集群功能,发现如下场景中会遇到一个问题:

测试启动如下2个raft集群,集群名称,和集群node与IP地址如下,raft集群均通过BootstrapCluster方法初始化:

Cluster1 BootstrapCluster servers:

- node1: {raft.ServerID: c1-node1, raft.ServerAddress: 192.168.100.1:7000} - node2: {raft.ServerID: c1-node2, raft.ServerAddress: 192.168.100.2:7000} - node3: {raft.ServerID: c1-node3, raft.ServerAddress: 192.168.100.3:7000} 

Cluster2 BootstrapCluster servers:

- node3: {raft.ServerID: c2-node3, raft.ServerAddress: 192.168.100.3:7000} - node4: {raft.ServerID: c2-node4, raft.ServerAddress: 192.168.100.4:7000} - node5: {raft.ServerID: c2-node5, raft.ServerAddress: 192.168.100.5:7000} 

其中,"node3"的地址会存在2个集群中。

  1. "node1","node2"按照"Cluster1"启动:

sudo ./raft_svr -cluster 'c1-node1,127.0.0.1,800;c1-node2,127.0.0.2,800;c1-node3,127.0.0.3,800' -id c1-node1

sudo ./raft_svr -cluster 'c1-node1,127.0.0.1,800;c1-node2,127.0.0.2,800;c1-node3,127.0.0.3,800' -id c1-node2

  1. "node3","node4","node5"先按照"Cluster2"启动:

sudo ./raft_svr -cluster 'c2-node3,127.0.0.3,800;c2-node4,127.0.0.4,800;c2-node5,127.0.0.5,800' -id c2-node3

sudo ./raft_svr -cluster 'c2-node3,127.0.0.3,800;c2-node4,127.0.0.4,800;c2-node5,127.0.0.5,800' -id c2-node4

sudo ./raft_svr -cluster 'c2-node3,127.0.0.3,800;c2-node4,127.0.0.4,800;c2-node5,127.0.0.5,800' -id c2-node5

然后就会发现"node3"会在"Cluster1"和"Cluster2"之间来回切换,一会属于"Cluster1",一会属于"Cluster2".

INFO[0170] current state:Follower, leader address:127.0.0.5:800, servers:[{Suffrage:Voter ID:c2-node3 Address:127.0.0.3:800} {Suffrage:Voter ID:c2-node4 Address:127.0.0.4:800} {Suffrage:Voter ID:c2-node5 Address:127.0.0.5:800}], last contact:2025-05-14 15:35:53.330867 +0800 CST m=+169.779019126 INFO[0171] current state:Follower, leader address:127.0.0.1:800, servers:[{Suffrage:Voter ID:c2-node3 Address:127.0.0.3:800} {Suffrage:Voter ID:c2-node4 Address:127.0.0.4:800} {Suffrage:Voter ID:c2-node5 Address:127.0.0.5:800}], last contact:2025-05-14 15:35:54.308388 +0800 CST m=+170.756576126 

我的代码如下

package main  import ( 	"flag" 	"fmt" 	"io" 	"net" 	"os" 	"strconv" 	"strings" 	"time"  	"github.com/hashicorp/raft" 	log "github.com/sirupsen/logrus" )  type raftCluster struct { 	localRaftID     raft.ServerID 	servers         map[raft.ServerID]raft.ServerAddress // raftID : raftAddressPort 	raft            *raft.Raft 	electionTimeout time.Duration }  func (r *raftCluster) Start() error { 	config := raft.DefaultConfig() 	config.HeartbeatTimeout = 2000 * time.Millisecond 	config.ElectionTimeout = 5000 * time.Millisecond 	config.CommitTimeout = 2000 * time.Millisecond 	config.LeaderLeaseTimeout = 2000 * time.Millisecond 	config.LocalID = r.localRaftID 	config.LogOutput = log.StandardLogger().Out  	r.electionTimeout = config.ElectionTimeout * time.Duration(len(r.servers)*2)  	localAddressPort := string(r.servers[r.localRaftID]) 	tcpAddr, err := net.ResolveTCPAddr("tcp", localAddressPort) 	if err != nil { 		return fmt.Errorf("resolve tcp address %s, %v", localAddressPort, err) 	} 	transport, err := raft.NewTCPTransport(localAddressPort, tcpAddr, 2, 10*time.Second, log.StandardLogger().Out) 	if err != nil { 		return fmt.Errorf("fail to create tcp transport, localAddressPort:%s, tcpAddr:%v, %v", 			localAddressPort, tcpAddr, err) 	} 	snapshots := raft.NewInmemSnapshotStore() 	logStore := raft.NewInmemStore() 	stableStore := raft.NewInmemStore() 	fm := NewFsm() 	r.raft, err = raft.NewRaft(config, fm, logStore, stableStore, snapshots, transport) 	if err != nil { 		return fmt.Errorf("create raft error, %v", err) 	}  	var configuration raft.Configuration 	for sID, addr := range r.servers { 		server := raft.Server{ 			ID:      sID, 			Address: addr, 		} 		configuration.Servers = append(configuration.Servers, server) 	} 	err = r.raft.BootstrapCluster(configuration).Error() 	if err != nil { 		return fmt.Errorf("raft bootstrap faild, conf:%v, %v", configuration, err) 	} 	log.Infof("bootstrap cluster as config: %v", configuration)  	return nil }  func (r *raftCluster) checkLeaderState() { 	ticker := time.NewTicker(time.Second) 	for { 		select { 		case leader := <-r.raft.LeaderCh(): 			log.Infof("im leader:%v, state:%s, leader address:%s", leader, r.raft.State(), r.raft.Leader())  		case <-ticker.C: 			verifyErr := r.raft.VerifyLeader().Error() 			servers := r.raft.GetConfiguration().Configuration().Servers 			switch verifyErr { 			case nil: 				log.Infof("im leader, servers:%v", servers) 			case raft.ErrNotLeader: 				// check cluster leader 				log.Infof("current state:%v, servers:%+v, leader address:%v, last contact:%v", 					r.raft.State(), servers, r.raft.Leader(), r.raft.LastContact()) 			} 		} 	} }  func main() { 	var ( 		clusters = flag.String("cluster", "", 			"cluster node address, fmt: ID,IP,Port;ID,IP,Port") 		clusterId = flag.String("id", "", "cluster id") 	) 	flag.Parse()  	if *clusterId == "" { 		log.Infof("cluster id messing") 		os.Exit(1) 	}  	servers := make(map[raft.ServerID]raft.ServerAddress) 	for _, cluster := range strings.Split(*clusters, ";") { 		info := strings.Split(cluster, ",") 		var ( 			nid   string 			nip   net.IP 			nport int 			err   error 		) 		switch { 		case len(info) == 3: 			nid = info[0] 			nip = net.ParseIP(info[1]) 			if nip == nil { 				log.Infof("cluster %s ip %s parse failed", cluster, info[1]) 				os.Exit(1) 			} 			nport, err = strconv.Atoi(info[2]) 			if err != nil { 				log.Infof("cluster %s port %s parse failed, %v", cluster, info[2], err) 			} 		default: 			log.Infof("cluster args value is bad format") 			os.Exit(1) 		} 		log.Infof("cluster node id:%s, ip:%v, port:%d", nid, nip, nport) 		addr := net.TCPAddr{IP: nip, Port: nport} 		servers[raft.ServerID(nid)] = raft.ServerAddress(addr.String()) 	}  	r := raftCluster{ 		localRaftID: raft.ServerID(*clusterId), 		servers:     servers, 	} 	err := r.Start() 	if err != nil { 		log.Infof("rafter cluster start failed, %v", err) 		os.Exit(1) 	} 	r.checkLeaderState() }  // SimpleFsm: 实现一个简单的Fsm  type SimpleFsm struct { 	db database }  func NewFsm() *SimpleFsm { 	fsm := &SimpleFsm{ 		db: NewDatabase(), 	} 	return fsm }  func (f *SimpleFsm) Apply(l *raft.Log) interface{} { 	return nil }  func (f *SimpleFsm) Snapshot() (raft.FSMSnapshot, error) { 	return &f.db, nil }  func (f *SimpleFsm) Restore(io.ReadCloser) error { 	return nil }  type database struct{}  func NewDatabase() database { 	return database{} }  func (d *database) Get(key string) string { 	return "not implemented" }  func (d *database) Set(key, value string) {}  func (d *database) Persist(sink raft.SnapshotSink) error { 	_, _ = sink.Write([]byte{}) 	_ = sink.Close() 	return nil }  func (d *database) Release() {} 

复现流程

  1. 编译代码

  2. 添加测试环境需要的IP地址,以macOS为例:

sudo ifconfig lo0 alias 127.0.0.2 up sudo ifconfig lo0 alias 127.0.0.3 up sudo ifconfig lo0 alias 127.0.0.4 up sudo ifconfig lo0 alias 127.0.0.5 up 
  1. 启动2个集群的raft进程

  2. "node1","node2"按照"Cluster1"启动:

sudo ./raft_svr -cluster 'c1-node1,127.0.0.1,800;c1-node2,127.0.0.2,800;c1-node3,127.0.0.3,800' -id c1-node1

sudo ./raft_svr -cluster 'c1-node1,127.0.0.1,800;c1-node2,127.0.0.2,800;c1-node3,127.0.0.3,800' -id c1-node2

  1. "node3","node4","node5"先按照"Cluster2"启动:

sudo ./raft_svr -cluster 'c2-node3,127.0.0.3,800;c2-node4,127.0.0.4,800;c2-node5,127.0.0.5,800' -id c2-node3

sudo ./raft_svr -cluster 'c2-node3,127.0.0.3,800;c2-node4,127.0.0.4,800;c2-node5,127.0.0.5,800' -id c2-node4

sudo ./raft_svr -cluster 'c2-node3,127.0.0.3,800;c2-node4,127.0.0.4,800;c2-node5,127.0.0.5,800' -id c2-node5

问题分析

测试了hashicorp/raft多个版本都是相同的情况,以当前最新版本v1.7.3分析了下,应该是如下原因导致的:

  1. 集群启动后各个节点都通过BootstrapCluster初始化,并引导集群选举,在node3上可以看见如下日志,说明在选举阶段node3能判断自己不属于Cluster1集群。
[WARN]  raft: rejecting appendEntries request since node is not in configuration: from=c1-node1 
  1. 但是当Cluster1选举出leader后,node3就可能变成Cluster的成员了,这是因为Cluster1的leader会不断通过心跳向集群内node发送日志,而在这个过程中:
    1. fllower节点是不会判断这个请求的leader是否是自己集群中的设备。
    2. fllower节点只对比请求日志的编号是否比自己本地的大,如果比本地的大,就接收存下来,并将发起请求的leader设置为自己集群的leader。
    3. 同样的,在Cluster2选举出leader后,Cluster2的leader也会向node3不断通过心跳发送日志请求。这就导致node3一会属于Cluster1,一会属于Cluster2

这个过程中的漏洞出在raft节点接收日志修改leader的过程,代码位置为hashicop/raft模块中的raft.go:L1440位置的func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest)函数

修改该函数,增加对请求Leader的ID的判断,则可避免这个问题:

	// Ignore an older term 	if a.Term < r.getCurrentTerm() { 		return 	}  	// yzc add,这里是我们添加在appendEntries中的判断逻辑,注意,拒绝之后需要返回错误,否则会导致另外一个集群不断重新选举。 	if len(r.configurations.latest.Servers) > 0 && !inConfiguration(r.configurations.latest, ServerID(a.ID)) { 		r.logger.Warn("rejecting appendEntries request since node is not in configuration", 			"from", ServerID(a.ID)) 		rpcErr = fmt.Errorf("node is not in configuration") // 返回错误,否则另外一个集群leader会认为自己的日志落后了,会触发重新选举 		return 	}  	// Increase the term if we see a newer one, also transition to follower 	// if we ever get an appendEntries call 	if a.Term > r.getCurrentTerm() || (r.getState() != Follower && !r.candidateFromLeadershipTransfer.Load()) { 		// Ensure transition to follower 		r.setState(Follower) 		r.setCurrentTerm(a.Term) 		resp.Term = a.Term 	} 

问题排除

重新编译运行后,我们看到node3始终保持在Cluster2中,并且可以看到如下日志

[WARN]  raft: rejecting appendEntries request since node is not in configuration: from=c1-node1 

Cluster1的leader日志中,我们可以看到该leader向node3发送心跳失败的日志:

[DEBUG] raft: failed to contact: server-id=c1-node3 time=1m29.121143167s [ERROR] raft: failed to heartbeat to: peer=127.0.0.3:800 backoff time=1s error="node is not in configuration" 

提醒

注意,这个修改方法还没有得到官方的认可,可能会有其他潜在的影响,使用之前应该自我评估。

发表评论

评论已关闭。

相关文章