盒子
盒子
文章目录
  1. 1.官方版NSQ存在的问题
  2. 2.有赞版本的NSQ
    1. 2.1.源码目录结构
    2. 2.2.nsqd的启动
      1. 2.2.1.TCP监听
      2. 2.2.2.HTTP监听
    3. 2.3.提交消息到Topic
      1. 2.3.1.消费channel的数据同步处理
      2. 2.3.2.internalPUB方法
      3. 2.3.3.internalPubAsync函数
      4. 2.3.4.internalPubLoop方法
    4. 2.4.支持数据副本和高可用
      1. 2.4.1.元数据存储
      2. 2.4.2.Leader选举
      3. 2.4.3.从代码角度进行大致梳理
    5. 2.5.数据副本同步和动态ISR
      1. 2.5.1.从代码角度进行大致梳理
    6. 2.6.严格顺序消费
      1. 2.6.1.存在的问题
      2. 2.6.2.解决方式
    7. 2.7.消息轨迹查询系统
    8. 2.8.其他特性
  3. 3.参考资料
  4. 4.结语

有赞版本 NSQ 源码解析

1.官方版NSQ存在的问题

  在暑期实习期间,我粗略学习了一下 KafkaNSQHippo,并对它们的基础组件或者特性做了总结,但忽略了很重要的一点:把 KafkaNSQ 进行对比,看看各自的优缺点。这部分的学习,对后来我完成组里事件中心的 Agent 组件起到了不小的帮助。

存储Message的额外消耗

Kafka 官方版本NSQ 有赞版本NSQ
至少26字节 24字节 16字节

主要特性

\ NSQ Kafka
数据安全 内存存储,超出Channel容量后,磁盘存储,不支持热备 磁盘存储,多节点互备
依赖 JDK、Zookeeper
性能 十万级 百万级
刷盘机制 异步刷盘 异步刷盘
投递语义 至少一次 准确一次
消息顺序 不保证有序 支持有条件的有序
配置 不到五十项 数百个配置项
异常排查 源码量小,排查容易 源码量巨大,排查困难

  其实,KafkaNSQ 各有优势,本身都是非常优秀的产品。但是,NSQ 作为分布式消息中间件最致命的是节点异常情况下的数据丢失问题。

  这里推荐一篇官方版本 NSQ 源码相关的文章,小米信息部技术团队的走进 NSQ 源码细节,可以先看官方版的实现,也可以先看本文,再和官方版对比一下,加深感受。

2.有赞版本的NSQ

  据有赞团队自己的官方博客详细介绍了为什么要重新设计 NSQ,主要有以下几点原因:

  • 部署不容易;
  • 异常丢失;
  • 难以追踪消息状态;
  • 难以使用的历史消息;
  • 难以按顺序接收消息;
  • 难以在线调试。

  最终,有赞团队给出了如下的设计架构,可以看出他们借鉴了不少 Kafka 的设计思路:

2.1.源码目录结构

  从有赞版本NSQGithub主页下载源码,可以看到如下的目录结构:

  从上面的目录可以很清晰的感觉几个关键组件 nsqdnsqlookupdnsqadmin,而解决异常丢失的关键点,肯定是 nsqd,所以不妨从它如何启动开始看起。

  apps 实际上是各个程序的入口;consistence 目录应该是有赞为实现可靠的一致性,而进行改造的代码;internal 应该是 nsqd 需要用到的 HTTP 或 TCP 的一些基础封装;Jepsen 则是对分布式测试的支持。而 nsqdnsqdserver 的区别是,前者更加专注于 NSQ 功能的实现,后者则是 HTTP 或 TCP 接入层面的一些预处理。其他目录则基本不是代码文件存放的目录,做其他用途。

  目录结构是阅读源码后,自行总结 + 猜测得出的大致用途,如有偏差、错误或者遗漏,还请前辈们指出。

2.2.nsqd的启动

  用于启动的源码,位于 /apps 目录下,具体到 nsqd 就是位于 /apps/nsqd 目录下的 nsqd.go 文件中。其Main函数如下,NsqdServer 正是封装在 program 结构体中:

1
2
3
4
5
6
7
8
func main() {
defer glog.Flush()
prg := &program{}
if err := svc.Run(prg, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGINT); err != nil {
log.Panic(err)
}
log.Println("app exited.")
}

  nsqd 的初始化、启动、停止,运用了 go-svc 作为基础组件,据 go-svcGithub介绍,它可以与Linux配合使用的Windows Service包装器。而 program 实现了如下的接口:

1
2
3
4
5
6
7
8
9
10
11
12
type Service interface {
// Init is called before the program/service is started and after it's
// determined if the program is running as a Windows Service.
Init(Environment) error
// Start is called after Init. This method must be non-blocking.
Start() error
// Stop is called in response to os.Interrupt, os.Kill, or when a
// Windows Service is stopped.
Stop() error
}

  Main函数中调用的 Run 方法,正是位于依赖库的 svc\svc_windows.go 中。它将传入的 Service 封装到 windowsService 中,并调用 Init 方法,判断是否以Windows窗口方式运行,然后进行 run 方法,这才是启动的核心:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func Run(service Service, sig ...os.Signal) error {
...
ws := &windowsService{
i: service,
isInteractive: interactive,
signals: sig,
}
if err = service.Init(ws); err != nil {
return err
}
return ws.run()
}

  run 方法源码如下,区别两种运行方式。其中,以Windows窗口方式运行时,调用 svcRun,其本质是调用依赖库的 windows\svc\service.gofunc Run(name string, handler Handler) error 函数。但两者最终的核心都是调用 windowsService 中封装的 ServiceStart 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (ws *windowsService) run() error {
ws.setError(nil)
if ws.IsWindowsService() {
runErr := svcRun(ws.Name, ws)
...
return nil
}
err := ws.i.Start()
...
err = ws.i.Stop()
return err
}

  由于如何调用到 Start 方法和 Stop 方法不是本次学习的重点,就不做更加详细的展示了。启动 nsqd 的核心在于 Start 方法,其代码如下。这里提一句,在 NewNsqdServer 函数中,有一行代码很重要 s.ctx.nsqd.SetPubLoop(s.ctx.internalPubLoop),记住它,后面会再次讲到。

1
2
3
4
5
6
7
8
9
10
11
func (p *program) Start() error {
// 配置文件、命令参数等解析
...
// 构造NsqdServer实例
nsqd, nsqdServer, err := nsqdserver.NewNsqdServer(opts)
...
// 核心方法
nsqdServer.Main()
p.nsqdServer = nsqdServer
return nil
}

  接下来,看看 Main 方法具体做了什么。其实,Main 方法就是监控TCP、HTTP、HTTPS的指定端口,完成 nsqd 的启动。nsqd 主要在2个TCP端口监听,一个给客户端,另一个是 HTTP API。同时,它也能在第三个端口监听 HTTPS。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (s *NsqdServer) Main() {
...
if s.ctx.nsqdCoord != nil {
// 检查集群元数据,并加载Disk数据等操作
err := s.ctx.nsqdCoord.Start()
...
}
...
tcpListener, err := net.Listen("tcp", opts.TCPAddress)
...
s.waitGroup.Wrap(func() {
protocol.TCPServer(s.tcpListener, tcpServer)
...
})
if s.ctx.GetTlsConfig() != nil && opts.HTTPSAddress != "" {
...
httpsServer := newHTTPServer(s.ctx, true, true)
s.waitGroup.Wrap(func() {
http_api.Serve(s.httpsListener, httpsServer, "HTTPS", opts.Logger)
})
}
...
httpServer := newHTTPServer(s.ctx, false, opts.TLSRequired == TLSRequired)
s.waitGroup.Wrap(func() {
http_api.Serve(s.httpListener, httpServer, "HTTP", opts.Logger)
})
...
}

2.2.1.TCP监听

  TCPServer 函数跟之前分析过的很多服务启动类似,都是在一个循环里不断 Accept 接受连接,并调用 tcpServer 实现的 Handle 接口进行处理。源码位于 internal\protocol\tcp_server.go 文件中,如下:

1
2
3
4
5
6
7
func TCPServer(listener net.Listener, handler TCPHandler) {
for {
clientConn, err := listener.Accept()
...
go handler.Handle(clientConn)
}
}

2.2.2.HTTP监听

  newHTTPServer 的流程也类似,先注册URL和对应的回调函数。源码位于 nsqdserver\http.go,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func newHTTPServer(ctx *context, tlsEnabled bool, tlsRequired bool) *httpServer {
...
router := httprouter.New()
...
s := &httpServer{
ctx: ctx,
tlsEnabled: tlsEnabled,
tlsRequired: tlsRequired,
router: router,
}
// 注册URL和对应的回调
router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))
...
return s
}

  然后调用 Serve 函数,创建 Server 对象,并调用其 Serve 方法,完成监听并接受请求。源码位于 internal\http_api\http_server.go,如下:

1
2
3
4
5
6
7
8
9
10
11
func Serve(listener net.Listener, handler http.Handler, proto string, l levellogger.Logger) {
...
server := &http.Server{
Handler: handler,
ErrorLog: log.New(logWriter{l}, "", 0),
ReadTimeout: 10 * time.Second,
WriteTimeout: 60 * time.Second,
}
err := server.Serve(listener)
...
}

  Server结构体的 Serve 方法是 Golang 标准库的代码,也是在一个循环里不断 Accept 客户端的请求。源码如下:

1
2
3
4
5
6
7
8
9
10
func (srv *Server) Serve(l net.Listener) error {
...
for {
rw, err := l.Accept()
...
c := srv.newConn(rw)
c.setState(c.rwc, StateNew) // before Serve can return
go c.serve(connCtx)
}
}

2.3.提交消息到Topic

  由于 NSQ 支持 HTTP 提交消息,所以选择从 HTTP 的 URL 为起点,寻找提交消息的具体业务逻辑,应该是一个更直观、更有效的方式。这里参考官方给出的 HTTP API 文档,可以看到提交消息到Topic对应的 URL 是 /pub。相应的可以在 nsqdserver\http.gonewHTTPServer 函数中找到 URL 对应的回调函数 doPUB

  同时,有赞版本的 NSQ 新增了 /pub_ext/pubtrace,以上三者本质上都是调用了 internalPUB 方法,应该是用以支持有赞团队提到的 消息轨迹查询系统

2.3.1.消费channel的数据同步处理

  官方版本之所以要从topic复制所有消息到每个channel,是因为使用的是golang里面的chan这种机制,只有复制才能保证每个消费组的数据互相独立。

  有赞版本则是将数据先进行落盘,所以不再需要做这种数据复制的操作,只需要记录每个channel已经同步的数据位移和每个channel的消费位移即可。这样所有的channel引用的是同一份topic磁盘数据,每个channel维护自己的独立位移信息即可。

优点:

  • 节省了数据复制的操作,提高了性能,也保证了各个channel之间的独立性。
  • 从流程上看,有赞版本把topic的写入和读取流程分离开了,topic本身只负责写入数据,channel仅负责读取数据。

  官方版 NSQ 的设计目标之一就是要限定保持在内存中的消息数。它通过 DiskQueue 透明地将溢出的消息写入到磁盘上。所以有上图的 full 之后再落盘数据。接下来,先以单机的提交消息到Topic为例,看看代码层面如何实现落盘的。

2.3.2.internalPUB方法

  先来看看 internalPUB 方法具体做了什么。该方法将 req.Body 的内容拷贝到缓存中,主节点按照是否为异步提交的策略,选择不同的提交方式,并根据需要跟踪相关信息;从节点则不执行这部分逻辑。源码依旧位于 nsqdserver\http.go,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func (s *httpServer) internalPUB(w http.ResponseWriter, req *http.Request, ps httprouter.Params, enableTrace bool, pubExt bool) (interface{}, error) {
// 消息长度校验
...
// 解析查询参数,通过Topic Name和分区查询到具体的Topic实例
params, topic, err := s.getExistingTopicFromQuery(req)
...
// 内存拷贝
n, err := io.CopyN(b, io.LimitReader(req.Body, readMax), int64(req.ContentLength))
body := b.Bytes()[:req.ContentLength]
...
if s.ctx.checkForMasterWrite(topic.GetTopicName(), topic.GetTopicPart()) {
// 主节点
// 解析traceId
...
//check if request is PUB_WITH_EXT
...
if asyncAction {
// 异步批量提交消息
err = internalPubAsync(nil, body, topic, extContent)
} else {
// 同步提交单条消息
id, offset, rawSize, _, err = s.ctx.PutMessage(topic, body, extContent, traceID)
}
...
// 根据traceId记录相关信息
...
if needTraceRsp {
return struct {
Status string `json:"status"`
ID uint64 `json:"id"`
TraceID string `json:"trace_id"`
QueueOffset uint64 `json:"queue_offset"`
DataRawSize uint32 `json:"rawsize"`
}{"OK", uint64(id), traceIDStr, uint64(offset), uint32(rawSize)}, nil
} else {
return "OK", nil
}
} else {
// 从节点,不写消息
...
return nil, http_api.Err{400, FailedOnNotLeader}
}
}

2.3.3.internalPubAsync函数

  这就来到了有赞团队在How we redesigned the NSQ - NSQ重塑之详细设计一文中提到的关键函数——internalPubAsync 函数,该函数位于 nsqdserver\protocol_v2.go

  在服务端引入常见的Group commit组提交方式,将多个消息一次性提交,减少IO操作。这样不管是因为刷盘还是因为数据副本同步的延迟,都会把这段时间的积累的多个消息作为一组一次性写入,这样就大大减少了需要操作的写入次数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 客户端pub的数据写入临时chan后, 在chan上面等待结果异步返回
func internalPubAsync(clientTimer *time.Timer, msgBody []byte, topic *nsqd.Topic, extContent ext.IExtContent) error {
if topic.Exiting() {
return nsqd.ErrExiting
}
info := &nsqd.PubInfo{
Done: make(chan struct{}),
MsgBody: msgBody,
ExtContent: extContent,
StartPub: time.Now(),
}
select {
case topic.GetWaitChan() <- info:
default:
if clientTimer == nil {
clientTimer = time.NewTimer(pubWaitTimeout)
} else {
clientTimer.Reset(pubWaitTimeout)
}
select {
case topic.GetWaitChan() <- info:
case <-topic.QuitChan():
nsqd.NsqLogger().Infof("topic %v put messages failed at exiting", topic.GetFullName())
return nsqd.ErrExiting
case <-clientTimer.C:
nsqd.NsqLogger().Infof("topic %v put messages timeout ", topic.GetFullName())
topic.IncrPubFailed()
incrServerPubFailed()
return ErrPubToWaitTimeout
}
}
<-info.Done
return info.Err
}

2.3.4.internalPubLoop方法

  通过 topic.GetWaitChan(),可以找到 PubInfo 结构体被发送到了哪里。可以看到,数据被发送到了 internalPubLoop 方法。

  组提交循环,在chan上面等待异步提交写入请求,并尝试一次提交所有等待中的请求。请求完成后,通过关闭对应请求的chan来通知客户端结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
func (c *context) internalPubLoop(topic *nsqd.Topic) {
messages := make([]*nsqd.Message, 0, 100)
pubInfoList := make([]*nsqd.PubInfo, 0, 100)
...
defer func() {
// 循环退出后,剩余未落盘的消息,返回相应的错误信息
done := false
for !done {
select {
case info := <-topic.GetWaitChan():
pubInfoList = append(pubInfoList, info)
default:
done = true
}
}
nsqd.NsqLogger().Logf("quit pub loop for topic: %v, left: %v ", topic.GetFullName(), len(pubInfoList))
for _, info := range pubInfoList {
info.Err = nsqd.ErrExiting
close(info.Done)
}
}()
quitChan := topic.QuitChan()
infoChan := topic.GetWaitChan()
for {
select {
case <-quitChan:
return
case info := <-infoChan:
// 参数校验
...
if !topic.IsExt() {
messages = append(messages, nsqd.NewMessage(0, info.MsgBody))
} else {
messages = append(messages, nsqd.NewMessageWithExt(0, info.MsgBody, info.ExtContent.ExtVersion(), info.ExtContent.GetBytes()))
}
pubInfoList = append(pubInfoList, info)
default:
if len(pubInfoList) == 0 {
// 这段代码几乎与case info := <-infoChan相同
...
continue
}
var retErr error
if c.checkForMasterWrite(topicName, partition) {
s := time.Now()
_, _, _, err := c.PutMessages(topic, messages)
// 错误处理、统计耗时等
...
} else {
// 从节点不进行落盘操作
...
}
// 错误处理
...
// 请求完成后, 通过关闭对应请求的chan来通知客户端结果
for _, info := range pubInfoList {
info.Err = retErr
close(info.Done)
}
pubInfoList = pubInfoList[:0]
messages = messages[:0]
}
}
}

  上述方法是如何运行起来的,可以参考 nsqd\topic.go 文件,是在New出一个新的Topic时,将该方法作为 Topic 结构体的成员变量,并起一个协程调用该方法。

  可以看到,上述方法只是接受数据,再存到数组中,并统一进行落盘处理,而具体的落盘方法是 PutMessages 方法。通过 Goland 的代码转跳,可以很容易地找到最终的刷盘代码,是位于 nsqd\diskqueue_writer.gofunc (d *diskQueueWriter) writeOne(...) (...)。可以看到文件名,基本上跟图片上的 Topic Disk Queue 对应上了。

2.4.支持数据副本和高可用

  这部分做了类似Kafka的设计,将每个topic的数据节点副本元信息写入etcd,然后通过etcd选举出每个topic的leader节点。选举的topic的leader节点负责自己topic的数据副本同步,其他follower节点从leader节点同步topic数据。

2.4.1.元数据存储

  一些少量的元数据存储在etcd,保证整个集群的元数据的一致性。

  具体包括每个topic的配置信息,副本节点的分布,选举出来的leader节点信息,nsqd数据节点的配置信息以及nsqlookupd数据查询节点的配置信息。元数据树结构图如下:

2.4.2.Leader选举

  改造后的nsq架构,每个topic需要一个leader节点负责处理读写请求和数据同步。为了保证每个节点的负载趋于均衡,我们通过nsqlookupd来选择合适的topic的leader节点,并通知给所有副本进行leader确认。leader节点会尝试从etcd获取topic对应的leader锁确认leader有效。

  当某个节点失效时,会触发etcd的watch事件,从而触发nsqlookupd重新选择其他存活的节点作为topic的新leader,完成leader的快速切换。客户端如果此时正在写入老的leader也会触发失败重试,并获取最新的leader节点,完成自动HA。选举具体流程如下:

2.4.3.从代码角度进行大致梳理

  在 nsqlookupd 启动的时候,会启动一个名为 NsqLookupCoordinator 的实例,由它进行 Leader 节点的选举工作。可以看到它的 Start 方法在 NSQLookupdMain 方法中被调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (l *NSQLookupd) Main() {
...
if l.opts.RPCPort != "" {
...
err = l.coordinator.Start()
...
} else {
...
}
...
httpListener, err := net.Listen("tcp", l.opts.HTTPAddress)
...
httpServer := newHTTPServer(ctx)
...
}

  下述方法的源码均在 consistence\nsqlookup_coordinator.go 中,在 NsqLookupCoordinatorStart 方法中,启动一个协程去处理 Leader 选举的工作。代码如下:

1
2
3
4
5
6
func (nlcoord *NsqLookupCoordinator) Start() error {
...
go nlcoord.handleLeadership()
...
return nil
}

  在 NsqLookupCoordinatorhandleLeadership 方法中,主要就是循环监听 Leader 节点的变化,特别是在 Leader 发生改变时,发出 Leader 改变的通知。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (nlcoord *NsqLookupCoordinator) handleLeadership() {
...
for {
select {
case l, ok := <-lookupdLeaderChan:
if !ok {
coordLog.Warningf("leader chan closed.")
return
}
if l == nil {
coordLog.Warningln("leader is lost.")
continue
}
if l.GetID() != nlcoord.leaderNode.GetID() ||
l.Epoch != nlcoord.leaderNode.Epoch {
nlcoord.leaderNode = *l
...
nlcoord.notifyLeaderChanged(nlcoord.nsqdMonitorChan)
}
if nlcoord.leaderNode.GetID() == "" {
coordLog.Warningln("leader is missing.")
}
case <-ticker.C:
// reload topics to cache, used for query from client
_, err := nlcoord.leadership.ScanTopics()
...
}
}
}

  可以看到上述代码中,调用了 notifyLeaderChanged 方法,其中又调用了 checkTopics,实际上它也是调用 doCheckTopics 进行处理。而 doCheckTopics 方法中,有一些重要的东西。通过源码本身的注释,可以看到其中有一句,如果 Leader 宕机了,就需要先选举一个新的 Leader ,这里应该就是选举的算法了。而根据之前的选举具体流程图来看,该方法应该是主干部分。其源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
func (nlcoord *NsqLookupCoordinator) doCheckTopics(monitorChan chan struct{}, failedInfo *TopicNameInfo,
waitingMigrateTopic map[string]map[int]time.Time, lostLeaderSessions map[string]bool, fullCheck bool) {
...
for _, t := range topics {
...
// move the failed node from ISR to catchup
coordErr := nlcoord.handleRemoveFailedISRNodes(failedNodes, &topicInfo)
...
// if leader of topic is down, we need elect new leader first
if _, ok := currentNodes[t.Leader]; !ok {
...
coordErr := nlcoord.handleTopicLeaderElection(&topicInfo, aliveNodes, aliveEpoch, false)
...
continue
} else {
// check topic leader session key.
...
for retry < 3 {
retry++
leaderSession, err = nlcoord.leadership.GetTopicLeaderSession(t.Name, t.Partition)
if err != nil {
...
// notify the nsqd node to acquire the leader session.
nlcoord.notifyISRTopicMetaInfo(&topicInfo)
nlcoord.notifyAcquireTopicLeader(&topicInfo)
...
continue
} else {
break
}
}
...
if leaderSession.LeaderNode == nil || leaderSession.Session == "" {
...
nlcoord.notifyISRTopicMetaInfo(&topicInfo)
nlcoord.notifyAcquireTopicLeader(&topicInfo)
continue
}
if leaderSession.LeaderNode.ID != t.Leader {
...
nlcoord.notifyISRTopicMetaInfo(&topicInfo)
nlcoord.notifyAcquireTopicLeader(&topicInfo)
continue
}
}
...
}
...
}

  handleTopicLeaderElection 方法的源码就不在这里列出,继续按照流程图进行梳理。可以看到上面的代码里调用了 notifyAcquireTopicLeader 方法,其中以回调的方式,调用 sendAcquireTopicLeaderToNsqd,最终调用到 NsqdRpcClientNotifyAcquireTopicLeader 方法完成通知 Leader 节点获取 Leader 锁。NsqdRpcClient 以 RPC 的方式,调用 Leader 节点的 NotifyAcquireTopicLeader方法。NotifyAcquireTopicLeader 源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (self *NsqdCoordRpcServer) NotifyAcquireTopicLeader(rpcTopicReq *RpcAcquireTopicLeaderReq) *CoordErr {
...
if err := self.checkLookupForWrite(rpcTopicReq.LookupdEpoch); err != nil {
...
}
...
topicCoord, err := self.nsqdCoord.getTopicCoord(rpcTopicReq.TopicName, rpcTopicReq.TopicPartition)
...
tcData := topicCoord.GetData()
...
err = self.nsqdCoord.notifyAcquireTopicLeader(tcData)
...
return &ret
}

  可以看到上述代码是根据 TopicName 和 TopicPartition 查询到 NsqdCoordinator,实际上是调用它的 acquireTopicLeader 方法去获取 Leader 锁。源码位于 consistence\nsqd_coordinator.go,如下:

1
2
3
4
5
6
7
8
9
10
11
12
// New leader should acquire the leader lock and check if success to avoid multi leader session
func (ncoord *NsqdCoordinator) acquireTopicLeader(topicInfo *TopicPartitionMetaInfo) *CoordErr {
...
err := ncoord.leadership.AcquireTopicLeader(topicInfo.Name, topicInfo.Partition, &ncoord.myNode, topicInfo.Epoch)
if err != nil {
coordLog.Infof("failed to acquire leader for topic(%v): %v", topicInfo.Name, err)
return &CoordErr{err.Error(), RpcNoErr, CoordElectionErr}
}
coordLog.Infof("acquiring leader for topic(%v) success", topicInfo.Name)
return nil
}

  AcquireTopicLeader 的源码位于 consistence\nsqd_node_etcd.go,尝试从 ETCD 中读取 Leader 锁,如果不存在,则尝试创建。如果创建失败,就表示获锁失败;如果创建成功,则表示成为 Leader 节点。源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func (nem *NsqdEtcdMgr) AcquireTopicLeader(topic string, partition int, nodeData *NsqdNodeInfo, epoch EpochType) error {
topicLeaderSession := &TopicLeaderSession{
...
}
valueB, err := json.Marshal(topicLeaderSession)
...
topicKey := nem.createTopicLeaderPath(topic, partition)
rsp, err := nem.client.Get(topicKey, false, false)
if err != nil {
if client.IsKeyNotFound(err) {
// 无锁,则尝试创建
coordLog.Infof("try to acquire topic leader session [%s]", topicKey)
rsp, err = nem.client.Create(topicKey, string(valueB), 0)
if err != nil {
// 创建失败
coordLog.Infof("acquire topic leader session [%s] failed: %v", topicKey, err)
return err
}
// 创建成功,成为Leader
coordLog.Infof("acquire topic leader [%s] success: %v", topicKey, string(valueB))
return nil
} else {
// 异常
...
return err
}
}
if rsp.Node.Value == string(valueB) {
// 已经成为Leader
coordLog.Infof("get topic leader with the same [%s] ", topicKey)
return nil
}
// 已经有Leader节点(非当前节点)
coordLog.Infof("get topic leader [%s] failed, lock exist value[%s]", topicKey, rsp.Node.Value)
return ErrKeyAlreadyExist
}

2.5.数据副本同步和动态ISR

  每个topic选举出来的leader节点负责同步数据到所有副本。为了支持副本节点的动态变化,参考了Kafka的ISR(In synced replica)的设计。和Kafka不同的是,有赞版本用push模式,不是pull的模式,来保证数据的同步复制,避免数据同步不一致。

  因此,数据写入首先由leader节点发起,并且同步到所有ISR副本节点成功后,才返回给客户端。如果同步ISR节点失败,则尝试动态调整ISR并重试直到成功为止。重试的过程中会检查leader的有效性,以及是否重复提交等条件。写入流程和ISR的动态调整流程如图所示:

2.5.1.从代码角度进行大致梳理

  从之前对 internalPUB 的分析可以看到,同步消息是使用 PutMessage 提交的;而从对 internalPubLoop 的分析可以看到,异步消息是使用 PutMessages 提交的。

  两段代码都位于 nsqdserver\context.go 中,以后者为例,可以看到单机使用 TopicPutMessages;而集群模式时,则使用 NsqdCoordinatorPutMessagesToCluster 方法。源码如下:

1
2
3
4
5
6
7
func (c *context) PutMessages(topic *nsqd.Topic, msgs []*nsqd.Message) (nsqd.MessageID, nsqd.BackendOffset, int32, error) {
if c.nsqdCoord == nil {
id, offset, rawSize, _, _, err := topic.PutMessages(msgs)
return id, offset, rawSize, err
}
return c.nsqdCoord.PutMessagesToCluster(topic, msgs)
}

  NsqdCoordinatorPutMessagesToCluster 方法位于 consistence\nsqd_coordinator_cluster_write.go 文件中,可以看到其中定义了多个回调方法,例如 doLocalWritedoLocalExitdoLocalCommitdoLocalRollbackdoRefreshdoSlaveSynchandleSyncResult,基本都可以在上图中找到一个对应的步骤。源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func (ncoord *NsqdCoordinator) PutMessagesToCluster(topic *nsqd.Topic,
msgs []*nsqd.Message) (nsqd.MessageID, nsqd.BackendOffset, int32, error) {
...
doLocalWrite := func(d *coordData) *CoordErr {
...
}
doLocalExit := func(err *CoordErr) {
...
}
doLocalCommit := func() error {
...
}
doLocalRollback := func() {
...
}
doRefresh := func(d *coordData) *CoordErr {
...
}
doSlaveSync := func(c *NsqdRpcClient, nodeID string, tcData *coordData) *CoordErr {
if isTestSlaveTimeout() {
return NewCoordErr("timeout test for slave sync", CoordNetErr)
}
// should retry if failed, and the slave should keep the last success write to avoid the duplicated
putErr := c.PutMessages(&tcData.topicLeaderSession, &tcData.topicInfo, commitLog, msgs)
...
return putErr
}
handleSyncResult := func(successNum int, tcData *coordData) bool {
...
}
// 以上为定义回调,这里是执行具体的逻辑
clusterErr := ncoord.doSyncOpToCluster(true, coord, doLocalWrite, doLocalExit, doLocalCommit, doLocalRollback, doRefresh, doSlaveSync, handleSyncResult)
...
return nsqd.MessageID(commitLog.LogID), nsqd.BackendOffset(commitLog.MsgOffset), commitLog.MsgSize, err
}

  doSyncOpToCluster 方法同样位于 consistence\nsqd_coordinator_cluster_write.go 文件中,由于代码比较长,这里就不列出了,其中的业务逻辑跟上图基本是一致的。

  接下来具体看看主从节点是如何进行数据同步的。在上述方法的 doSlaveSync 变量中,可以看到回调函数是调用了根据 nodeId 找到的 NsqdRpcClientPutMessages 方法,这其实是一个 gRPC 的客户端方法,进而可以找到对应的 Server 名为 nsqdCoordGRpcServer,对应的方法是位于 consistence\coord_grpc_server.go 文件中的 PutMessages 方法。

  PutMessages 方法会解析 RPC 请求里的数据,并调用 putMessagesOnSlave 方法,去构建回调函数。源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (s *nsqdCoordGRpcServer) PutMessages(ctx context.Context, req *pb.RpcPutMessages) (*pb.CoordErr, error) {
...
tc, err := s.nsqdCoord.checkWriteForGRpcCall(req.TopicData)
...
// do local pub message
commitData := fromPbCommitLogData(req.LogData)
var msgs []*nsqd.Message
for _, pbm := range req.TopicMessage {
var msg nsqd.Message
...
msgs = append(msgs, &msg)
}
err = s.nsqdCoord.putMessagesOnSlave(tc, commitData, msgs)
...
return &coordErr, nil
}

  重新回到 consistence\nsqd_coordinator_cluster_write.go 文件中,putMessagesOnSlave 方法主要是定义了回调函数,例如 checkDupOnSlavedoLocalWriteOnSlavedoLocalCommitdoLocalExit。源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (ncoord *NsqdCoordinator) putMessagesOnSlave(coord *TopicCoordinator, logData CommitLogData, msgs []*nsqd.Message) *CoordErr {
...
checkDupOnSlave := func(tc *coordData) bool {
...
}
doLocalWriteOnSlave := func(tc *coordData) *CoordErr {
// 写入从节点
...
topic, localErr = ncoord.localNsqd.GetExistingTopic(topicName, partition)
...
topic.Lock()
...
queueEnd, localErr = topic.PutMessagesOnReplica(msgs, nsqd.BackendOffset(logData.MsgOffset), int64(logData.MsgSize))
...
topic.Unlock()
...
}
doLocalCommit := func() error {
...
}
doLocalExit := func(err *CoordErr) {
...
}
return ncoord.doWriteOpOnSlave(coord, checkDupOnSlave, doLocalWriteOnSlave, doLocalCommit, doLocalExit)
}

  同样,doWriteOpOnSlave 方法,由于代码比较长,这里就不列出了,其中的业务逻辑跟上图基本是一致的。

  更多细节,可以参考How we redesigned the NSQ - NSQ重塑之详细设计数据副本同步和动态ISR 部分。

  到这里为止,基本上算是解决了官方版本 NSQ 的最大问题——节点异常情况下的数据丢失的问题,可以将不同分区复制到不同节点,且每个分区单独选举 Leader 节点,

2.6.严格顺序消费

2.6.1.存在的问题

  默认的消费模式,是允许多个客户端链接同时消费topic下的同一个分区的不同消息的,这样可以使用最小的分区数来达到较高的并发消费能力。

优点:

  • 避免了像Kafka那样为了提高消费能力,创建过多的分区,从而避免了过多分区数带来的磁盘随机IO问题。

缺点:

  • 带来乱序投递的问题。

2.6.2.解决方式

  对于需要保证消息严格按照生产写入的顺序进行投递的场景,有赞引入了顺序投递的特性。

  • 在生产方也支持按照业务定制id进行分区hash的生产能力,从而保证从生产到消费整条链路是按照分区内有序的方式进行消息流转的。
  • 消息生产方会根据业务的分区id将同样的id投递到同一个topic分区,保证相同id的数据的顺序一致性。
  • 而在投递时,会调整并发消费策略,保证同一时刻只会投递一条消息进行消费,等待上一次ack成功后才继续投递下一条消息。
  • 同时,为了避免leader切换时影响消息的顺序性,ack的消息还会同步到所有副本才算成功。

  然而也会存在跟Kafka相似的缺点,单个分区的消费并发能力不如乱序消费,主要取决于消费业务本身的处理能力,为了提高更高的顺序消费并发能力,需要更多的分区数。

2.7.消息轨迹查询系统

  之前的NSQ系统由于消息在chan里面流转一遍就没了,很难事后进行数据追查,也没有一个很好的方式去跟踪一条消息在从生产到被消费的各种中间状态。

  为了满足业务方时不时的来排查业务消息状态的需求,有赞改造后的NSQ也支持动态的开启这种消息轨迹排查功能。为了支持消息轨迹查询,有赞团队做了如下几个工作:

  • 能通过消息id定位到磁盘上的消息内容;
  • 支持传入业务traceId, 并将业务traceId和nsq的消息id关联起来;
  • 记录一条消息的所有中间态变化信息;
  • 将消息的状态变迁信息同步到ES搜索系统;
  • nsqadmin提供方便的统一查询入口;
  • 支持针对topic和channel级别的动态跟踪开关以减少资源消耗。

  通过这一套轨迹查询系统,在业务方需要排查问题的时候,就能非常快速的找到异常消息的轨迹从而快速的定位业务问题了。特别是在排查顺序消费的业务场景时,经常能发现业务潜在的并发误用问题。

2.8.其他特性

  参考How we redesigned the NSQ - NSQ重塑之详细设计,可以看到有赞版本的NSQ还有 自动数据平衡分区支持改造消费channel处理重试和延时消费消费数据预读优化重放历史数据 等特点。

  另外,参考How we redesigned the NSQ - 其他特性及未来计划,还可以看到 NSQ拓展消息格式的设计nsq迁移工具 等。

3.参考资料

为何要抛弃Kafka,选择NSQ!
走进 NSQ 源码细节
官方文档 - 组件 - NSQD
How we redesigned the NSQ - Overview
How we redesigned the NSQ - NSQ重塑之详细设计
How we redesigned the NSQ - 其他特性及未来计划

4.结语

  这是在实习过程中学习到的新知识,且全部是开源数据或者资料,因此分享出来,希望我和大家都有所收获。最终,这部分的学习也成为了我实习答辩的重要材料,对我帮助非常大。并帮助我完成了事件中心的 Agent 落盘部分的代码,用自己的学习帮助到了团队。

参考资料已经在文中列出,这里不再一一列举。

转载说明

转载请注明出处,无偿提供。

支持一下
感谢大佬们的支持