0%

zookeeper源码学习-QuorumCnxManager

原本的代码很长,只记录了个别在看代码时候有用的一些

里面会启动两个线程,一个发送线程QuorumConnectionReqThread还有一个接受线程QuorumConnectionReceiverThread。
在zk启动的时候,会在QuorumConfig里面读到所有的节点信息,然后生成一个Quorum验证器,这时候已经包含了所有的节点信息。
然后是一些节点数据或者投票的发送,已经接到返回时候的相关处理,这个没有仔细研究,有兴趣的可以去看下QuorumCnxManager的源码

消息的发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void toSend(Long sid, ByteBuffer b) {
/*
* If sending message to myself, then simply enqueue it (loopback).
*/
if (this.mySid == sid) { //如果当前的消息需要分给自己,那么就直接把它放入到接收队列中
b.position(0);
addToRecvQueue(new Message(b.duplicate(), sid));
/*
* Otherwise send to the corresponding thread to send.
*/
} else {
/*
* Start a new connection if doesn't have one already.
*/
BlockingQueue<ByteBuffer> bq = queueSendMap.computeIfAbsent(sid, serverId -> new CircularBlockingQueue<>(SEND_CAPACITY));
addToSendQueue(bq, b);
connectOne(sid);
}
}

建立连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
synchronized void connectOne(long sid) {
if (senderWorkerMap.get(sid) != null) {
LOG.debug("There is a connection already for server {}", sid);
if (self.isMultiAddressEnabled() && self.isMultiAddressReachabilityCheckEnabled()) {
// since ZOOKEEPER-3188 we can use multiple election addresses to reach a server. It is possible, that the
// one we are using is already dead and we need to clean-up, so when we will create a new connection
// then we will choose an other one, which is actually reachable
senderWorkerMap.get(sid).asyncValidateIfSocketIsStillReachable();
}
return;
}
synchronized (self.QV_LOCK) {
boolean knownId = false;
// Resolve hostname for the remote server before attempting to
// connect in case the underlying ip address has changed.
self.recreateSocketAddresses(sid);
Map<Long, QuorumPeer.QuorumServer> lastCommittedView = self.getView();
QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();
Map<Long, QuorumPeer.QuorumServer> lastProposedView = lastSeenQV.getAllMembers();
if (lastCommittedView.containsKey(sid)) {
knownId = true;
LOG.debug("Server {} knows {} already, it is in the lastCommittedView", self.getId(), sid);
if (connectOne(sid, lastCommittedView.get(sid).electionAddr)) {
return;
}
}
if (lastSeenQV != null
&& lastProposedView.containsKey(sid)
&& (!knownId
|| (lastProposedView.get(sid).electionAddr != lastCommittedView.get(sid).electionAddr))) {
knownId = true;
LOG.debug("Server {} knows {} already, it is in the lastProposedView", self.getId(), sid);

if (connectOne(sid, lastProposedView.get(sid).electionAddr)) {
return;
}
}
if (!knownId) {
LOG.warn("Invalid server id: {} ", sid);
}
}
}