nsq read notes
nsqd
topicMap map[string]*Topic
clientMap map[id]*Client
初始化过程
- new
- load meta data
- persist meta data
- main
- listen tcp & http port, start server
- start queueScanLoop, lookupLoop, statsLoop
有新连接到达时,会生成 connection 的 client,保存到 nsqd 的 clientMap 中,同时启动 goroutine 负责 client 的 messagePump。conn 所在的 goroutine 之后会负责读客户端的命令,并执行以及返回结果。可执行命令如下:
- IDENTIFY: 表名身份
- FIN: 在 client 绑定的 channel 上,完成一条消息(调用 channel.FinishMessage),同时会更新当前 client 的 metrics 信息,以及状态
- RDY: 更新 client ready count
- REQ: 在 client 绑定的 channel 上,把一条消息重新送入队列(调用 channel.RequeueMessage),同时更新 metrics 和状态。这条消息会被放入 defered 队列,延后执行
- PUB: 往某个 client 有权限的 topic 发送一条消息(调用 topic.PutMessage),更新 metrics 状态
- MPUB: 和 PUB 一样,不过接收多条消息
- DPUB: 和 PUB 一样,不过会被放入 defered 队列
- NOP: 最简单,啥也不干
- TOUCH: 在 client 绑定的 channel 上,重置一条消息的过期时间(调用 channel.TouchMessage)
- SUB: 将 client 绑定到 channel 上,如果 topic 和 channel 任一个属于 “”, 且 topic 或 channel 正在关闭, client 会不断重试绑定操作。
- CLS: 关闭连接
- AUTH: 授权
client 有多种状态:init, disconnect, connect, subscribe, closing ,状态迁移由一系列命令执行顺序决定。除此外还有 ready 状态,client 通过 RDY 更新了自己的 ready count,表示 client 最多同时处理多少条消息,如果 inflight count >= ready count,则未 ready,需要等待。
client 使用 SUB 进入 subscribe 模式,该模式只能进入一次,进入后 messagePumb 会接收到 subEvent,然后从对应 channel 中读取 message 发送到 client 里。
topic
messageChan chan *Message
backendChan BackendChan
channelsMap map[string]Channels
topic 创建流程
- new topic, save to topicMap
- lookup each lookupd, get all channels in topic $TOPIC
- skip “#ephemeral” and create channels
- start topic messagePump
delete channel
- remove from topic channelsMap
- mark channel deleted
- if left channels is zero, and topic is ephemeral, delete topic self
put messages
- try put message into memory message channel
- fallthrough into backend queue, most case into disk, but ephemeral just ignore
- update message count
message pump
- read message from memory message channel
- else read from backend message
- else update channel status
- copy memory into each channels in current topic
- if message is defered, put into channels defered
- else put into normal channels
channel
clients map[string]Consumer
backend BackendQueue
memoryChan chan *Message
deferedMessages map[MessageID]*Message
defredPQ PriorityQueue
inFlightMessages map[MessageID]*Message
inFlightPQ PriorityQueue
channel put message 和 topic put message 类似。put defered message 会把 message 放在deferedMessages 中,并加入 deferedPQ 中。如果 defered 时间到了,使用正常流程 put。clients 提供了 Add 和 Remove 接口,但管理职责不是 channel 的。
nsqlookupd
nsqlookupd 数据组织方式
{
{"client", "", ""} => {
"127.0.0.1:9490" => Producer{"127.0.0.1:8081"},
"127.0.0.1:9491" => Producer{"127.0.0.1:8081"},
},
{"channel", "topic_a", "channel_a"} => {
"ip1" => Producer{"addr"},
},
{"topic", "topic_a", ""} => {
"ip1" => Producer{"addr_1"},
"ip2" => Producer{"addr2"},
},
}
nsqd <-> nsqlookupd 交互
- connect: send “ V1”
- ping: send “PING “
- nsqlookupd update peer info’s lastUpdate
- response “OK”
- identify: send “IDENTIFY LEN(data) data”
- remote addr as id
- load broadcase address, tcp port, http port, version
- update peer info’s lastUpdate
- add producer to db: Registration{“client”} => PeerInfo{id}
- response {tcp_port, http_port, version, hostname, broadcast_address}
- register: send “REGISTER TOPIC [CHANNEL]”
- read topic and channel name
- if channel name exists:
- add producer to db: Registration{“channel”, $TOPIC, $CHANNEL} => PeerInfo{id}
- add producer to db: Registration{“topic”, $TOPIC, “”} => PeerInfo{id}
- response “OK”
- unregister: send “UNREGISTER TOPIC [CHANNEL]”
- read topic and channel name
- if channel name exists:
- remove producer from db: Registration{“channel”, $TOPIC, $CHANNEL}
- remove registration for channel has suffix “#ephemeral” if left producer is zero
- else:
- find all registrations of channel of $TOPIC
- remove all channels of current peer
- remove producer form db: Registration{“topic”, $TOPIC, “”}
- response “OK”
nsqlookupd support http request
- GET /lookup?topic=topic_name
{ "channels": ["channel1"], "producers": [{ }], }
- GET /topics
- GET /channels?topic=topic_name
- GET /nodes
- POST /topic/create?topic=
- POST /topic/delete?topic=
- POST /channel/create?topic=topic&channel=channel
- POST /channel/delete?topic=&channel=
- POST /topic/tombstone?topic=topic_name&node=node_id