
背景
为什么要做 6.248 的 lab2 呢?主要有两个原因,熟悉 go 语言和学习 raft 协议。早在一年前就关注了 6.248 ,由于各种原因一直没有去做 lab,趁现在比较空闲,正好了解一桩心事。
做为 go 语言的小白,在开始之前花了两天时间熟悉了 go 的语法和并发相关的语法之后就开动了。本文记录了实现过程中的思路和一些细节。
选举
2A 的要求是实现 Leader 的选举,整体流程还是比较简单。由于语言的不熟悉,刚开始的时候实现还是比较慢,踩了几个坑之后还算顺利完成了。
角色定义
Raft 中有三个角色,跟随者、候选者、领导者。每个角色有自己的职责,如 follower 进行心跳超时检测、候选者进行选举、Leader 负责复制日志和发送心跳。所以很自然的想法是定义raft 角色的三个状态,根据当前角色的做自己的事情,伪代码如下
1 | // 定义集群状态 |
1 | func (rf *Raft) run() { |
心跳定时检测
在论文中 follower 会定时检测是否与 leader 失联,这一部分的实现关键是 follower 与 leader 超时是随机的。使用 rand.Seed(time.Now().UnixNano())
设置随机因子,定时任务通过 time.Sleep()
实现,而不是 time.Ticker
这也是在实验 hint 中提到。伪代码如下
1 | for rf.killed() == false { |
请求投票 RPC
服务与 leader 心跳超时之后转换为 candidate,开始进行 leader 的选举,直到选举或发现新的 leader 选举流程才会结束。这部分关键的是实现 RequestVote
RPC 接口,功能需要按照 Figure 2 中严格实现,以下介绍实现过程中一些需要注意的地方。
所有 RPC 请求的参数和返回值要大写,否则会序列化失败;
变量共享并发问题
下面代码模拟并发向其他服务发送投票请求的情况,但这段代码是有问题的。在于 变量 i 在新的 goroutine 读取的时候已经被主 goroutine 给修改,导致将请求无法发送给所有的服务。1
2
3
4
5
6
7
8for i := range rf.peers {
if i != rf.me {
go func() {
// 向服务 i 发送投票请求
rf.sendAppendEntries(i, args, reply)
}()
}
}需要使用局部变量传入,避免出现跨 goroutine 之间共享变量的情况。
1
2
3
4
5
6
7for i := range rf.peers {
if i != rf.me {
go func(serverIndex int) {
rf.sendAppendEntries(serverIndex, args, reply)
}(i)
}
}RPC 超时
需要注意的一点,所有的 RPC 请求都有可能超时和乱序,这里先只考虑超时的问题,乱序处理后面会提到。因为存在超时问题,在选举过程中不能等待所有的 RPC 请求返回之后再做处理。正确的方式应该为在发起 RPC 请求之前启动一个超时器,如果超时器已经生效但是还没有选举出 leader 将开启下一轮的选举。选举终止
在论文中提到过,一轮选举会有三种情况- 获取半数以上节点,自身当选为 leader;
- 其他节点晋升为 leader;
- 选举超时
那么在代码层面怎么实现这三个 case 呢?可以使用sync.Cond
或 select 来实现,本人采用的是用 select 来实现,使用如下:1
2
3
4
5
6
7
8select {
case <-selfLeader:
// become leader
case term := <-otherLeader:
// change to follower
case <-timeoutChan:
// next vote
}
附加日志
日志复制实现 2B 的内容,个人认为是lab 中最难的部分,论文中任何细节都不能放过,否则即使 2B 的 case 通过了,在 2C 中大概率会失败,魔鬼在细节中。
日志结构
1
2
3
4
5type Log struct {
Term int
Index int
Cmd interface{}
}日志包含三个部分 Term、Index、Cmd。日志使用 slice 存储,正常情况下
log[n].Index = n
似乎可以使用 slice 的下标代替 Index 属性。但是在 2D 快照实现中会截断日志,此时log[n].Index != n
。还有一个建议是日志从 log[1] 开始,在 2D 的时候用 log[0] 存放快照的 Term 和 Index。日志应用到状态机
日志 commit 之后需要应用到状态机,lab 中通过将日志发送到 channel
ApplyMsg
实现。当lastApplied < commitIndex
时候开始应用到状态机。日志覆盖原则
leader 通过覆盖日志的形式让 follower 的日志保持一致。如果 follower 存在的日志与 leader 发送的日志发生了冲突,follower会删除该日志及其之后的日志。
这里的 if 相当关键,如果 leader 发送的日志在follower 上已经存在但是并没有冲突,就不要覆盖日志,应用未存在的新日志即可。matchIndex 与 nextIndex
在网络正常的情况下
matchIndex[n] = nextIndex[n]-1
,但是不能用 nextIndex 替代 matchIndex。nextIndex 表示发送下一条日志索引是可以回退的,而 matchIndex 表示已经复制到 follower 上最高的日志索引在同一个任期内是不能回退的。使用 matchIndex 来确认 commitIndex 的位置。快速跳过 next index
论文中提到一个优化策略,当 follower 与 leader 日志冲突过多的时候,可以通过 follower 返回冲突日志的 term 和该 term 对应最小的index,leader 可以通过这些信息一次请求可以跨过一个 term 来快速解决日志冲突。
不能提交之前任期的日志
这是 Figure 8 所提到的内容,leader 只能提交当前任期日志不能提交之前任期的日志。在提交日志的时候除了日志应用到大多数节点之外,还需要
log[N].term == currentTerm
成立。
持久化
对于 2C 的内容,需要将 Figure 2 中提到的 currentTerm、votedFor、log[] 这三个属性在变动之后都要进行持久化。本身不难,主要是 case 会比 2B 更严格,测试需要多跑几次,可以借助脚本 go-test-many.sh 进行批量测试。
RPC 乱序处理
- 对于所有 RPC 请求都适用,如果请求响应之后需要判断 currentTerm == arg.trem 并且服务角色没有变动才进行下一步修改;
- 附加日志 RPC 响应成功更改 nextIndex 时需要判断 nextIndex > matchIndex,因为可能是过期的响应返回;
- 附加日志接收端的乱序的处理,参考上文提到的
日志覆盖原则
;
总结
前前后后花了半个月时间总算完成了 lab,总的来说收获很大,主要在对 raft 认知上,go 语言掌握依然很烂。以前看过 raft 的论文,只是了解大体的流程,真正实现的时候才发现有很多细节每一个 if 、and 都相当关键。在实现之前先阅读学生导读可以少踩很多坑。