0%

zookeeper源码学习-FastLeaderElection

负责选举的相关

选举流程

  1. 先判断节点当前的状态,以及是否需要停止
  2. 从接收队列里面取出来一个投票,然后校验是否合法
  3. 如果选举周期不一致,比较目标投票以及自己投票的信息, 然后选择更新自身的投票信息然后通知别的节点
  4. 如果选举周期一致,将选票信息存入到recvset中,以接收到的节点的id为key,根据节点投票的信息生成一个vote当做value。当循环结束的时候,recvset中会存储所有的参与者的id,以及他们的投票信息
  5. 根据recvset和自己的投票leader信息获取到一个投票追踪器,里面会对各个投票信息进行统计,以投票的leader id为key,投票的节点放入到一个set中,然后判断该set的大小是否已经超过了参与者的半数
  6. 如果已经达成了quorum协议,会再尝试从接收队列中取投票,看看是否有新的投票出来,如果有新的进来,会重新开始循环。如果没有,则修改目标leader的状态,然后退出选举

相关变量

1
2
3
4
5
6
7
8
9
10
QuorumCnxManager manager;
private SyncedLearnerTracker leadingVoteSet;
LinkedBlockingQueue<ToSend> sendqueue;
LinkedBlockingQueue<Notification> recvqueue;
QuorumPeer self;
Messenger messenger;
AtomicLong logicalclock = new AtomicLong(); //用来记录当前选举实例的计数器,也可以理解过选举的周期
long proposedLeader;
long proposedZxid;
long proposedEpoch;

构造函数

1
2
3
4
5
public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager) {
this.stop = false;
this.manager = manager;
starter(self, manager);
}

starter

1
2
3
4
5
6
7
8
9
private void starter(QuorumPeer self, QuorumCnxManager manager) {
this.self = self;
proposedLeader = -1; //默认没有leader,zxid也为-1
proposedZxid = -1;

sendqueue = new LinkedBlockingQueue<ToSend>(); //建立两个队列,一个发送,一个接收
recvqueue = new LinkedBlockingQueue<Notification>();
this.messenger = new Messenger(manager);
}

Messenger 启动通信的线程(都是单线程操作)

1
2
3
4
5
6
7
8
9
10
11
12
Messenger(QuorumCnxManager manager) {

this.ws = new WorkerSender(manager);

this.wsThread = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]");
this.wsThread.setDaemon(true);

this.wr = new WorkerReceiver(manager);

this.wrThread = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]");
this.wrThread.setDaemon(true);
}

WorkerSender

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class WorkerSender extends ZooKeeperThread {

volatile boolean stop;
QuorumCnxManager manager;

WorkerSender(QuorumCnxManager manager) {
super("WorkerSender");
this.stop = false;
this.manager = manager;
}

public void run() {
while (!stop) {
try {
ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
if (m == null) {
continue;
}

process(m);
} catch (InterruptedException e) {
break;
} //不停的取任务,然后发送数据
}
LOG.info("WorkerSender is down");
}

/**
* Called by run() once there is a new message to send.
*
* @param m message to send
*/
void process(ToSend m) {
//构建要发送的数据
ByteBuffer requestBuffer = buildMsg(m.state.ordinal(), m.leader, m.zxid, m.electionEpoch, m.peerEpoch, m.configData);

manager.toSend(m.sid, requestBuffer);

}

}

toSend是QuorumCnsManager中的方法,传送门

BuildMsg

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static ByteBuffer buildMsg(int state, long leader, long zxid, long electionEpoch, long epoch, byte[] configData) {
byte[] requestBytes = new byte[44 + configData.length];
ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);

/*
* Building notification packet to send
*/

requestBuffer.clear();
requestBuffer.putInt(state);
requestBuffer.putLong(leader);
requestBuffer.putLong(zxid);
requestBuffer.putLong(electionEpoch);
requestBuffer.putLong(epoch);
requestBuffer.putInt(Notification.CURRENTVERSION);
requestBuffer.putInt(configData.length);
requestBuffer.put(configData);

return requestBuffer;
}

WorkerReceiver 用来接收别的节点发来的投票

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
class WorkerReceiver extends ZooKeeperThread {

volatile boolean stop;
QuorumCnxManager manager;

WorkerReceiver(QuorumCnxManager manager) {
super("WorkerReceiver");
this.stop = false;
this.manager = manager;
}

public void run() {

Message response;
while (!stop) {
// Sleeps on receive
try {
response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
if (response == null) {
continue;
}

final int capacity = response.buffer.capacity();

// The current protocol and two previous generations all send at least 28 bytes
if (capacity < 28) {
LOG.error("Got a short response from server {}: {}", response.sid, capacity);
continue;
}

// this is the backwardCompatibility mode in place before ZK-107
// It is for a version of the protocol in which we didn't send peer epoch
// With peer epoch and version the message became 40 bytes
boolean backCompatibility28 = (capacity == 28);

// this is the backwardCompatibility mode for no version information
boolean backCompatibility40 = (capacity == 40);

response.buffer.clear();

// Instantiate Notification and set its attributes
Notification n = new Notification();

int rstate = response.buffer.getInt();
long rleader = response.buffer.getLong();
long rzxid = response.buffer.getLong();
long relectionEpoch = response.buffer.getLong();
long rpeerepoch;

int version = 0x0;
QuorumVerifier rqv = null;

try {
if (!backCompatibility28) {
rpeerepoch = response.buffer.getLong();
if (!backCompatibility40) {
/*
* Version added in 3.4.6
*/

version = response.buffer.getInt();
} else {
LOG.info("Backward compatibility mode (36 bits), server id: {}", response.sid);
}
} else {
LOG.info("Backward compatibility mode (28 bits), server id: {}", response.sid);
rpeerepoch = ZxidUtils.getEpochFromZxid(rzxid);
}

// check if we have a version that includes config. If so extract config info from message.
if (version > 0x1) {
int configLength = response.buffer.getInt();

// we want to avoid errors caused by the allocation of a byte array with negative length
// (causing NegativeArraySizeException) or huge length (causing e.g. OutOfMemoryError)
if (configLength < 0 || configLength > capacity) {
throw new IOException(String.format("Invalid configLength in notification message! sid=%d, capacity=%d, version=%d, configLength=%d",
response.sid, capacity, version, configLength));
}

byte[] b = new byte[configLength];
response.buffer.get(b);

synchronized (self) {
try {
rqv = self.configFromString(new String(b));
QuorumVerifier curQV = self.getQuorumVerifier();
if (rqv.getVersion() > curQV.getVersion()) {
LOG.info("{} Received version: {} my version: {}",
self.getId(),
Long.toHexString(rqv.getVersion()),
Long.toHexString(self.getQuorumVerifier().getVersion()));
if (self.getPeerState() == ServerState.LOOKING) {
LOG.debug("Invoking processReconfig(), state: {}", self.getServerState());
self.processReconfig(rqv, null, null, false);
if (!rqv.equals(curQV)) {
LOG.info("restarting leader election");
self.shuttingDownLE = true;
self.getElectionAlg().shutdown();

break;
}
} else {
LOG.debug("Skip processReconfig(), state: {}", self.getServerState());
}
}
} catch (IOException | ConfigException e) {
LOG.error("Something went wrong while processing config received from {}", response.sid);
}
}
} else {
LOG.info("Backward compatibility mode (before reconfig), server id: {}", response.sid);
}
} catch (BufferUnderflowException | IOException e) {
LOG.warn("Skipping the processing of a partial / malformed response message sent by sid={} (message length: {})",
response.sid, capacity, e);
continue;
}
/*
* If it is from a non-voting server (such as an observer or
* a non-voting follower), respond right away.
*/
if (!validVoter(response.sid)) {
Vote current = self.getCurrentVote();
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(
ToSend.mType.notification,
current.getId(),
current.getZxid(),
logicalclock.get(),
self.getPeerState(),
response.sid,
current.getPeerEpoch(),
qv.toString().getBytes());

sendqueue.offer(notmsg);
} else {
// Receive new message
LOG.debug("Receive new notification message. My id = {}", self.getId());

// State of peer that sent this message
QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
switch (rstate) {
case 0:
ackstate = QuorumPeer.ServerState.LOOKING;
break;
case 1:
ackstate = QuorumPeer.ServerState.FOLLOWING;
break;
case 2:
ackstate = QuorumPeer.ServerState.LEADING;
break;
case 3:
ackstate = QuorumPeer.ServerState.OBSERVING;
break;
default:
continue;
}

n.leader = rleader;
n.zxid = rzxid;
n.electionEpoch = relectionEpoch;
n.state = ackstate;
n.sid = response.sid;
n.peerEpoch = rpeerepoch;
n.version = version;
n.qv = rqv;
/*
* Print notification info
*/
LOG.info(
"Notification: my state:{}; n.sid:{}, n.state:{}, n.leader:{}, n.round:0x{}, "
+ "n.peerEpoch:0x{}, n.zxid:0x{}, message format version:0x{}, n.config version:0x{}",
self.getPeerState(),
n.sid,
n.state,
n.leader,
Long.toHexString(n.electionEpoch),
Long.toHexString(n.peerEpoch),
Long.toHexString(n.zxid),
Long.toHexString(n.version),
(n.qv != null ? (Long.toHexString(n.qv.getVersion())) : "0"));

/*
* If this server is looking, then send proposed leader
*/

if (self.getPeerState() == QuorumPeer.ServerState.LOOKING) {
recvqueue.offer(n);

/*
* Send a notification back if the peer that sent this
* message is also looking and its logical clock is
* lagging behind.
*/
if ((ackstate == QuorumPeer.ServerState.LOOKING)
&& (n.electionEpoch < logicalclock.get())) {
Vote v = getVote();
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(
ToSend.mType.notification,
v.getId(),
v.getZxid(),
logicalclock.get(),
self.getPeerState(),
response.sid,
v.getPeerEpoch(),
qv.toString().getBytes());
sendqueue.offer(notmsg);
}
} else {
/*
* If this server is not looking, but the one that sent the ack
* is looking, then send back what it believes to be the leader.
*/
Vote current = self.getCurrentVote();
if (ackstate == QuorumPeer.ServerState.LOOKING) {
if (self.leader != null) {
if (leadingVoteSet != null) {
self.leader.setLeadingVoteSet(leadingVoteSet);
leadingVoteSet = null;
}
self.leader.reportLookingSid(response.sid);
}

LOG.debug(
"Sending new notification. My id ={} recipient={} zxid=0x{} leader={} config version = {}",
self.getId(),
response.sid,
Long.toHexString(current.getZxid()),
current.getId(),
Long.toHexString(self.getQuorumVerifier().getVersion()));

QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(
ToSend.mType.notification,
current.getId(),
current.getZxid(),
current.getElectionEpoch(),
self.getPeerState(),
response.sid,
current.getPeerEpoch(),
qv.toString().getBytes());
sendqueue.offer(notmsg);
}
}
}
} catch (InterruptedException e) {
LOG.warn("Interrupted Exception while waiting for new message", e);
}
}
LOG.info("WorkerReceiver is down");
}

}

更新投票

更新自己的相关投票信息

1
2
3
4
5
6
7
8
9
10
11
12
synchronized void updateProposal(long leader, long zxid, long epoch) {
LOG.debug(
"Updating proposal: {} (newleader), 0x{} (newzxid), {} (oldleader), 0x{} (oldzxid)",
leader,
Long.toHexString(zxid),
proposedLeader,
Long.toHexString(proposedZxid));

proposedLeader = leader;
proposedZxid = zxid;
proposedEpoch = epoch;
}

ZXID相关

zxid表示的是当前节点处理的事务的id,在投票的时候,用来表示当前节点处理的最后一条事务的id,如果当前节点的类型是参与者,在返回数据的时候会先判断当前的文件数据库是否初始化完成,如果没有,会进行初始化。然后从本地的文件中读到最后一条事务的id

1
2
3
4
5
6
7
private long getInitLastLoggedZxid() {
if (self.getLearnerType() == LearnerType.PARTICIPANT) {
return self.getLastLoggedZxid();
} else {
return Long.MIN_VALUE;
}
}

PeerEpoch相关

获取节点周期的时候,如果节点的类型是参与者,返回自己的周期,否则就返回long的最小值。自己本身的周期,在刚启动的时候,会从本地文件中进行读取

在设置节点周期的时候,如果投票的节点是自己,则修改节点状态为leading,否则根据类型来判断,如果是参与者,则改为FOLLOWING状态,否则改为观察状态

如果当前是leader,则把leadingVoteSet也改为voteSet

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private long getPeerEpoch() {
if (self.getLearnerType() == LearnerType.PARTICIPANT) {
try {
return self.getCurrentEpoch();
} catch (IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}
} else {
return Long.MIN_VALUE;
}
}

/**
* Update the peer state based on the given proposedLeader. Also update
* the leadingVoteSet if it becomes the leader.
*/
private void setPeerState(long proposedLeader, SyncedLearnerTracker voteSet) {
ServerState ss = (proposedLeader == self.getId()) ? ServerState.LEADING : learningState();
self.setPeerState(ss);
if (ss == ServerState.LEADING) {
leadingVoteSet = voteSet;
}
}

检查是否leader

根据投票集合和目标投票,判断是否leader

如果选出来的leader不在之前的投票节点列表中,或者选出来leader的状态不是leading状态,或者如果选中了当前节点是leader,但是选举周期跟目标选票不一致,则返回false

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
protected boolean checkLeader(Map<Long, Vote> votes, long leader, long electionEpoch) {

boolean predicate = true;

/*
* If everyone else thinks I'm the leader, I must be the leader.
* The other two checks are just for the case in which I'm not the
* leader. If I'm not the leader and I haven't received a message
* from leader stating that it is leading, then predicate is false.
*/

if (leader != self.getId()) {
if (votes.get(leader) == null) {
predicate = false;
} else if (votes.get(leader).getState() != ServerState.LEADING) {
predicate = false;
}
} else if (logicalclock.get() != electionEpoch) {
predicate = false;
}

return predicate;
}

getVoteTracker 获取到投票追踪器,用来判断是否结束选举

传入一个需要比较的投票,

首先会把当前的验证器加入到qvset。 QuorumVerifier和lastSeenQuorumVerifier都是在启动的时候,QuorumPeer创建出来的

如果lastSeenQuorumVerifier不为空,并且版本比当前的大,也将lastSeenQuorumVerifier加入到追中期中,然后在比较的时候进行对比

vote中存储的是每个sid对应的投票ack信息。

因为votes传入的是recvset,而且recvset中存储的是每个id对应的投票信息,在进行tracer的创建的时候,会根据传入的选票信息和目标的选票信息,来判断某个sid是否获取到了足够的ack数量,进而退出选举

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
protected SyncedLearnerTracker getVoteTracker(Map<Long, Vote> votes, Vote vote) {
SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
voteSet.addQuorumVerifier(self.getQuorumVerifier());
if (self.getLastSeenQuorumVerifier() != null
&& self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()) {
voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}

/*
* First make the views consistent. Sometimes peers will have different
* zxids for a server depending on timing.
*/
for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
if (vote.equals(entry.getValue())) { //equal被重写了
voteSet.addAck(entry.getKey()); //添加之前,会校验该peer是否在投票列表
}
}

return voteSet;
}

LookForLeader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
public Vote lookForLeader() throws InterruptedException {
try {
self.jmxLeaderElectionBean = new LeaderElectionBean();
MBeanRegistry.getInstance().register(self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
self.jmxLeaderElectionBean = null;
}

self.start_fle = Time.currentElapsedTime(); //记录开始选举时候的时间
try {
/* The votes from the current leader election are stored in recvset. In other words, a vote v is in recvset
* if v.electionEpoch == logicalclock. The current participant uses recvset to deduce on whether a majority
* of participants has voted for it.
*/
Map<Long, Vote> recvset = new HashMap<Long, Vote>(); //存储投票信息,如果一个投票的epoch等于当前的本地时钟,则会把该投票放入到recvset中
//当前的参与者会判断哪个参与者获得了大部分的投票

/*
* The votes from previous leader elections, as well as the votes from the current leader election are
* stored in outofelection. Note that notifications in a LOOKING state are not stored in outofelection.
* Only FOLLOWING or LEADING notifications are stored in outofelection. The current participant could use
* outofelection to learn which participant is the leader if it arrives late (i.e., higher logicalclock than
* the electionEpoch of the received notifications) in a leader election.
*/
Map<Long, Vote> outofelection = new HashMap<Long, Vote>(); //上一次选举时候的投票,如果当前节点是后面加入的,则可以利用outofelection来学习谁是leader

int notTimeout = minNotificationInterval;

synchronized (this) {
logicalclock.incrementAndGet(); //增加本地的epoch
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); //更新自己的提议信息,先投自己,id是自己的id
}

LOG.info(
"New election. My id = {}, proposed zxid=0x{}",
self.getId(),
Long.toHexString(proposedZxid));
sendNotifications(); //通知所有的节点,自己有了新的提议

SyncedLearnerTracker voteSet;

/*
* Loop in which we exchange notifications until we find a leader
*/

while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) { //如果当前的状态还是LOOKING,并且没有收到stop的标志
/*
* Remove next notification from queue, times out after 2 times
* the termination time
*/
Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS); //从接收到的队列里面,取出来一个通知

/*
* Sends more notifications if haven't received enough.
* Otherwise processes new notification.
*/
if (n == null) {
if (manager.haveDelivered()) { //判断发送队列是不是发送完了,如果已经发完了,再重新发送通知
sendNotifications();
} else {
manager.connectAll(); //如果没有发送完,说明连接断了,尝试重新连接所有的节点
}

/*
* Exponential backoff
*/
int tmpTimeOut = notTimeout * 2;
notTimeout = Math.min(tmpTimeOut, maxNotificationInterval);
LOG.info("Notification time out: {}", notTimeout);
} else if (validVoter(n.sid) && validVoter(n.leader)) { //校验投票节点,跟要投的节点是否合法
/*
* Only proceed if the vote comes from a replica in the current or next
* voting view for a replica in the current or next voting view.
*
*/
switch (n.state) {
case LOOKING:
if (getInitLastLoggedZxid() == -1) { //只有在刚开始的时候,自己的zxid才会是-1,这里避免在初始化未完成的时候就发出来的投票
LOG.debug("Ignoring notification as our zxid is -1");
break;
}
if (n.zxid == -1) { //忽略对方的zxid为-1的情况
LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid);
break;
}
// If notification > current, replace and send messages out
if (n.electionEpoch > logicalclock.get()) { //如果投票里面的选举周期比当前大,更换为目标的选举周期,然后发送消息出去
logicalclock.set(n.electionEpoch);
recvset.clear(); //清除本地的投票信息
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { //比较目标节点和当前节点的投票,有三种规则
updateProposal(n.leader, n.zxid, n.peerEpoch); //epoch 大 ,epoch相等但是zxid大,epoch相等而且zxid相等但是sid大
} else { //还是投给自己
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
sendNotifications(); //继续向所有机器发送消息
} else if (n.electionEpoch < logicalclock.get()) {
LOG.debug(
"Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x{}, logicalclock=0x{}",
Long.toHexString(n.electionEpoch),
Long.toHexString(logicalclock.get()));
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}

LOG.debug(
"Adding vote: from={}, proposed leader={}, proposed zxid=0x{}, proposed election epoch=0x{}",
n.sid,
n.leader,
Long.toHexString(n.zxid),
Long.toHexString(n.electionEpoch));

// 把收到的投票信息存储到本地
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));

if (voteSet.hasAllQuorums()) { //如果已经收到了所有的投票信息

// Verify if there is any change in the proposed leader
while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) { //再检查一下,有没有新的收到的信息进来了
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
recvqueue.put(n);
break;
}
}

/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
if (n == null) {
setPeerState(proposedLeader, voteSet);
Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
case OBSERVING:
LOG.debug("Notification from observer: {}", n.sid);
break;
case FOLLOWING:
case LEADING: //如果取出来的节点是leading的状态
/*
* Consider all notifications from the same epoch
* together.
*/
if (n.electionEpoch == logicalclock.get()) {
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
voteSet = getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
if (voteSet.hasAllQuorums() && checkLeader(recvset, n.leader, n.electionEpoch)) {
setPeerState(n.leader, voteSet);
Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}

/*
* Before joining an established ensemble, verify that
* a majority are following the same leader.
*
* Note that the outofelection map also stores votes from the current leader election.
* See ZOOKEEPER-1732 for more information.
*/
outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
voteSet = getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));

if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {
synchronized (this) {
logicalclock.set(n.electionEpoch);
setPeerState(n.leader, voteSet);
}
Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
default:
LOG.warn("Notification state unrecoginized: {} (n.state), {}(n.sid)", n.state, n.sid);
break;
}
} else {
if (!validVoter(n.leader)) {
LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
}
if (!validVoter(n.sid)) {
LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
}
}
}
return null;
} finally {
try {
if (self.jmxLeaderElectionBean != null) {
MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
}
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
self.jmxLeaderElectionBean = null;
LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount());
}
}

Notification (为了提醒别的节点,例如更改的vote或者加入了选举)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public static class Notification {
/*
* Format version, introduced in 3.4.6
*/

public static final int CURRENTVERSION = 0x2;
int version;

/*
* Proposed leader
*/ long leader;

/*
* zxid of the proposed leader
*/ long zxid;

/*
* Epoch
*/ long electionEpoch;

/*
* current state of sender
*/ QuorumPeer.ServerState state;

/*
* Address of sender
*/ long sid;

QuorumVerifier qv;
/*
* epoch of the proposed leader
*/ long peerEpoch;

}