Raft-实现指北-领导选举
和其他一致性算法相比,Raft 使用一种更强的领导能力形式。比如,日志条目只从领导者发送给其他的服务器。在选举上,Raft 算法使用一个随机计时器来选举领导者,这种方式只是在任何一致性算法都必须实现的心跳机制上增加了一点机制。[1]
Raft 把时间分割任意长度的任期(term),并使用连续整数标记,每个任期都从一次选举开始。每次选举有一个或多个候选人参选,如果一个候选人赢得选举,其就会在该任期充当领导人的职责。某些情况下会出现选票瓜分的现象,那么该任期无法选出领导人,所以进入下一期选举,其中 Raft 通过随机计时来保证选举成功。[1]
设计
实现领导人选举之前,先看到上一节提到的纯函数式的状态机,将 Raft 实现成为一个无副作用的纯函数状态机。Raft 算法可以看作一个角色状态机,通过其他节点传递的消息、计时器、客户端的提交请求和快照等输入消息,从一个状态转移到另一个状态、或修改部分内部状态并返回一个发送给外部的信息。
(state, message) -> state machine -> message
state machine 是一个纯函数式的状态机,负责处理消息,并将改动写入到 state 里,然后返回给外部的消息。
根据 Raft 论文,一个 state 几部分组成:
状态 | 所有服务器上持久存在的 |
---|---|
currentTerm | 服务器最后一次知道的任期号(初始化为 0,持续递增) |
votedFor | 在当前获得选票的候选人的 Id |
log[] | 日志条目集;每一个条目包含一个用户状态机执行的指令,和收到时的任期号 |
状态 | 所有服务器上经常变的 |
---|---|
commitIndex | 已知的最大的已经被提交的日志条目的索引值 |
lastApplied | 最后被应用到状态机的日志条目索引值(初始化为 0,持续递增) |
状态 | 在领导人里经常改变的 (选举后重新初始化) |
---|---|
nextIndex[] | 对于每一个服务器,需要发送给他的下一个日志条目的索引值(初始化为领导人最后索引值加一) |
matchIndex[] | 对于每一个服务器,已经复制给他的日志的最高索引值 |
实现大体是类似的,不过以后需要加入流量控制、成员加入退出等,所以将 state 中保存的其他服务器的信息抽象一下:
type node struct {
id uint64
nextIdx uint64
matched uint64
}
id
表示其他服务器在 Raft 中的唯一 ID,nextIdx
与 matched
分别是 nextIndex
数组和 matchIndex
数据中第 id
个元素。
Raft 通过超时来驱动心跳和选举,一共由两种超时:1、心跳超时,领导人定期给跟随者发送心跳信息宣布自己的领导权;2、选举超时,超时时间是随机选择的。
整个 state 如下:
type core struct {
id uint64
leaderId uint64
state StateRole
term uint64
vote uint64
log *LogHolder
nodes []node
timeElapsed int
randomizedElectionTimtout int
electionTimeout int
heartbeatTimeout int
}
id
是 Raft weiyiqueding d term
和 vote
分别是 currentTerm
和 voteFor
的实现,而日志由应用负责持久化。lastApplied
和 commitIndex
由 log
负责管理。nodes
表示该 Raft 集群的其他服务器的状态。timeElapsed
表示从时间累积,randomizedElectionTimeout
表示随机生成的选举超时阈值,每次转为跟随者、候选人状态时都会改变。heartbeatTimeout
表示领导人两次心跳的间隔。electionTimeout
用来表示领导人选举超时基准,其用于计算randomizedElectionTimeout
,使用公式:$electionTimeout + rand() \% electionTimeout$ 计算得到。另外还有 leaderId
和 state
状态,leaderId
表示当前领导人的 ID,state
则是 Raft 目前所处的角色。
此外,还需要设计消息结构作为外部应用、服务器和 Raft 状态机进行数据交换。
type Message struct {
From uint64
To uint64
MsgType MessageType
Term uint64
Index uint64
LogIndex uint64
LogTerm uint64
Reject uint64
}
该结构不仅仅用于发起请求,也用于状态机返回数据,所以需要 Reject
字段表示拒绝请求,比如拒绝给某个候选人投票。
最后,状态机需要返回消息给消息发送者,由于希望将 Raft 设计为一个纯函数式状态机,消息的接受发送交给了应用处理,所以还得提供一个 Application
接口,供 Raft 和应用交互。
type Application interface {
send(msg *raftpd.Message)
}
当应用接收到消息后,将其输入到 Raft 状态机,处理完后,调用 send
发送回复消息,并保存信息到机中。整个状态机由消息驱动,所以 Raft 接口如下:
type Raft interface {
Step(msg *raftpd.Message)
Periodic(millsSinceLastPeriod int)
}
当应用接受到外部传递的消息后,调用 Step
驱动状态机改变状态。Raft 中通过超时进行心跳或选举,外部应用需要通过某个固定的定时源隔一段时间调用 Periodic
驱动状态机进行心跳、选举等。
选举过程
系统进行初始化时,每个节点都处于跟随者状态,由于没有领导人定期广播心跳,所以一段时间后部分跟随者成为候选人并进行下一届选举。
当某个候选人获得了超过半数的投票后,成为领导人,并向所有节点广播自己成功的信息。当候选人接收到其他候选人成为领导人的信息后,一届只能选出一个领导人(选举安全特性),该候选人退回到跟随者的状态,并投票给该领导人。
如果到了下一个选举超时,仍然没有候选人成为领导人,就会跳过这一届,开始下一届的领导人选举。
PreVote
由于选举安全特性的限制,成员会忽略已经过期的信息时,并返回自己所在的任期,用于发送者更新自己。由于这一特性的存在,在一个存在网络延迟的网络中,某个节点由于延迟进入了选举,而实际上大多数节点都能接收到领导人的心跳,也会进入选举。Raft 原论文 9.6 节中提出了 Prevote 算法:在选举前可以选询问其他节点是否愿意参与选举,如果节点能够感知到领导人的心跳,那么它就不会参与选举,否则参与选举。只有过半的节点参与选举,才能开始下一届领导人选举。
领导人选举实现
加入了 PreVote 算法后,Raft 的状态变为 4 个:领导人、跟随者,候选人,预候选人。此时的外部事件为:选举、心跳超时;以及:
- MsgPreVoteRequest
- MsgPreVoteResponse
- MsgVoteRequest
- MsgVoteResponse
- MsgAppendRequest
MsgAppendRequest
这种消息类型发生在某个节点成竞选成功后向其他节点宣示领导权,在选举过程中也由该种类型在节点间传递。比如 PreVote 阶段正常的跟随者能够接收到领导人的心跳;又或者新晋领导人首次对外宣誓领导权。
func (c *core) Periodic(millsSinceLastPeriod int) {
c.timeElapsed += millsSinceLastPeriod
log.Debugf("%d periodic %d, time elapsed %d", c.id, millsSinceLastPeriod, c.timeElapsed)
if c.state.IsLeader() {
if c.heartbeatTick <= c.timeElapsed {
c.broadcastAppend()
c.timeElapsed = 0
}
} else if c.randomizedElectionTick <= c.timeElapsed {
if len(c.nodes) > 1 {
c.campaign(campaignPreCandidate)
}
}
}
每次应用程序调用 Periodic
时,Raft 判断是否为 leader,是判断 timeElapsed
是否超过 heartbeatTick
,然后向其他节点发送追加日志(心跳)信息,并清空 timElapsed
;如果不是领导人,且已经超过随机生成的选举超时,那么状态转移到预候选人同时开始 PreVote 阶段。
除此之外,PreVote 算法还需要记录其他节点对某次预选举请求的响应状态,所以在 node 结构中添加字段标记:
type voteState int
const (
voteNone voteState = iota
voteReject
voteGranted
)
type node struct {
...
vote voteState
}
状态间转换
继续之前需要看看 Raft 状态机的状态转换是如何实现的:
func (c *core) resetRandomizedElectionTimeout() {
c.randomizedElectionTick =
c.electionTick + rand.Intn(c.electionTick)
}
func (c *core) reset(term uint64) {
if c.term != term {
c.term = term
c.vote = InvalidId
}
c.leaderId = InvalidId
c.timeElapsed = 0
c.resetRandomizedElectionTimeout()
}
func (c *core) becomeFollower(term, leaderId uint64) {
c.reset(term)
c.leaderId = leaderId
c.state = FOLLOWER
c.vote = leaderId
log.Infof("%v become follower at %d", c.id, c.term)
}
func (c *core) becomeLeader() {
utils.Assert(c.state == CANDIDATE, "invalid translation [%v => Leader]", c.state)
c.reset(c.term)
c.leaderId = c.id
c.state = LEADER
c.vote = c.id
log.Infof("%v become leader at %d", c.id, c.term)
}
func (c *core) becomeCandidate() {
utils.Assert(c.state != LEADER, "invalid translation [Leader => Candidate]")
c.reset(c.term + 1)
c.vote = c.id
c.state = CANDIDATE
for i := 0; i < len(c.nodes); i++ {
node := &c.nodes[i]
node.resetVoteState()
}
log.Infof("%v become candidate at %d", c.id, c.term)
}
func (c *core) becomePreCandidate() {
c.reset(c.term)
c.state = PRE_CANDIDATE
for i := 0; i < len(c.nodes); i++ {
node := &c.nodes[i]
node.resetVoteState()
}
// Becoming a pre-candidate changes our state,
// but doesn't change anything else. In particular it does not increase
// currentTerm or change votedFor.
log.Infof("%x became pre-candidate at term %d", c.id, c.term)
}
首先看到 reset
,它是负责在 Raft 状态转换过程中重置部分状态。reset
中第一步是根据任期是否改变决定重置 vote
和 term
信息;除此之外还重置了 leaderId
以及超时相关的两个属性: timeElapsed
和 randomizedElectionTimeout
。
进入预选举的节点在被大多数节点拒绝后会回退到跟随者的状态,因此在 becomePreCandidate
中除了重置基础状态外,仅仅修改了 state
属性和重置其他节点的投票情况。特别需要注意的是不能修改 currentTerm
和 votedFor
。
如果预选举的节点获得了半数的节点参选支持,就会进入候选人状态,因此任期加一,同时给自己投票。
此外,当领导者和跟随者在发送心跳或接收到领导人的通知后,都需要重置 timeElappsed
,因此将 becomeLeader
和 becomeFollower
设计为重入只会影响到 timeElapsed
和 randomizedElectinTimeout
属性。
PreCampaign
竞选时首先调用 campaign
给其他节点发送 MsgPreVoteRequest
请求。
func (c *core) campaign(ct campaignState) {
utils.Assert(c.state != LEADER,
"invalid translation [Leader => PreCandidate/Candidate]")
msg := raftpd.Message{}
msg.LogIndex = c.log.lastIndex()
msg.LogTerm = c.log.lastTerm()
if ct == campaignPreCandidate {
msg.Term = c.term + 1
msg.MsgType = raftpd.MsgPreVoteRequest
c.becomePreCandidate()
} else {
msg.Term = c.term
msg.MsgType = raftpd.MsgVoteRequest
c.becomeCandidate()
}
for i := 0; i < len(c.nodes); i++ {
node := &c.nodes[i]
msg.To = node.id
log.Infof("%x [term: %d, index: %d] send %v request to %x at term %d",
c.id, c.log.lastTerm(), c.log.lastIndex(), msg.MsgType, msg.To, c.term)
c.send(&msg)
}
}
PreVote 要求某个节点只有在长时间未和领导人交换心跳时才参与选举。同时参与选举要求候选人的日志必须是最新的(领导人完全特性)。所以在接收到其他节点发送的 MsgPreVoteRequest
时,1、如果在一个选举超时内(注意:electionTimeout
)有和领导交换过一次心跳;2、或者候选人的任期号小于自身的任期号;3、或者候选人的日志不是最新的都拒绝参加选举。否则回复参加选举。
func (c *core) handlePreVote(msg *raftpd.Message) {
reply := raftpd.Message{}
reply.To = msg.From
reply.MsgType = raftpd.MsgPreVoteResponse
// Reply false if last AppendEntries call was received less than election timeout ago.
// Reply false if term < currentTerm.
// Reply false if candidate's log isn't at least as uptodate as receiver's log.
if (c.leaderId != InvalidId && c.timeElapsed < c.electionTick) ||
(msg.Term < c.term) ||
!c.log.IsUpToDate(msg.LogIndex, msg.LogTerm) {
reply.Reject = false
} else {
reply.Reject = true
}
c.send(&reply)
}
注意:实际上在处理远程信息时,如果接收到了过期信息,会直接丢弃(后面有讲),不会进入 handlePreVote
函数,所以上面第二点实际上永远为假。
预候选人接收到其他节点回复的信息时:
func (c *core) handleVoteResponse(msg *raftpd.Message) {
if msg.Reject {
log.Infof("%x received %v rejection from %x at term %d",
c.id, msg.MsgType, msg.From, c.term)
} else {
log.Infof("%x received %v from %x at term %s",
c.id, msg.MsgType, msg.From, msg.Term)
}
node := c.getNodeById(msg.From)
node.updateVoteState(msg.Reject)
count := c.voteStateCount(voteGranted)
if count >= c.quorum() {
if msg.MsgType == raftpd.MsgVoteResponse {
c.becomeLeader()
c.broadcastVictory()
} else {
c.campaign(campaignCandidate)
}
return
}
// return to follower state if it receives vote denial from a majority
count = c.voteStateCount(voteReject)
if count >= c.quorum() {
c.becomeFollower(msg.Term, InvalidId)
}
}
更新某个 node 对此次请求的投票情况,并判断支持者和反对者人数,如果支持者人数过半,那么调用 campaign(campaignCandidate)
进入候选人状态。如果反对者人数过半,那么节点会回退到跟随者的状态。
Campaign
在候选人一方,选举过程使用了相同的函数,不同的是跟随者对候选人的处理。只有当候选人未投票或者上一次投给了该候选人,候选人才能获得跟随者的选票。
func (c *core) handleVote(msg *raftpd.Message) {
reply := raftpd.Message{}
reply.To = msg.From
reply.MsgType = raftpd.MsgVoteResponse
// no vote or vote for candidate, and log is at least as up-to-date as receiver's.
if c.vote == InvalidId || c.vote == msg.From ||
c.log.IsUpToDate(msg.LogIndex, msg.LogTerm) {
reply.Reject = false
} else {
reply.Reject = true
}
c.send(&reply)
}
处理过时消息
根据 Raft 论文中将 term 用作逻辑时间,判断过期的消息。在论文图 2 中提到如果接收到来自高任期的消息,应该回退到跟随者状态;接收到过时消息,直接忽略。所以代码可以写为:
if msg.Term < c.term {
c.reject(msg)
} if msg.Term > c.term {
c.becomeFollower()
}
实际上可能某个节点成为候选人后,又重新连接到网络中。此时发起投票会导致其他节点增大任期,因此对投票相关的消息做特殊处理。
func (c *core) Step(msg *raftpd.Message) {
if msg.Term < c.term {
c.reject(msg)
} else if msg.Term > c.term {
if msg.MsgType == raftpd.MsgPreVoteRequest {
} else if msg.MsgType == raftpd.MsgPreVoteResponse && msg.Reject {
} else {
c.becomeFollower(msg.Term, leaderId)
}
}
switch msg.MsgType {
case raftpd.MsgPreVoteRequest:
c.handlePreVote(msg)
case raftpd.MsgVoteRequest:
c.handleVote(msg)
default:
c.dispatch(msg)
}
}
Raft 算法虽然更易于理解,但是实现并不简单。就以上述代码为例,Raft 如果没有 PreVote 机制,那么重新上线的候选人会通过 c.reject(msg)
强制让候选人参与选举。加入 PreVote 机制也不能完全避免这种情况。如果一个节点成为了候选人,此时领导人重新上线,那么候选人仍然会强制发起一次选举[3]。