publicvoidtoSend(Long sid, ByteBuffer b){ /* * If sending message to myself, then simply enqueue it (loopback). */ if (this.mySid == sid) { //如果当前的消息需要分给自己,那么就直接把它放入到接收队列中 b.position(0); addToRecvQueue(new Message(b.duplicate(), sid)); /* * Otherwise send to the corresponding thread to send. */ } else { /* * Start a new connection if doesn't have one already. */ BlockingQueue<ByteBuffer> bq = queueSendMap.computeIfAbsent(sid, serverId -> new CircularBlockingQueue<>(SEND_CAPACITY)); addToSendQueue(bq, b); connectOne(sid); } }
synchronizedvoidconnectOne(long sid){ if (senderWorkerMap.get(sid) != null) { LOG.debug("There is a connection already for server {}", sid); if (self.isMultiAddressEnabled() && self.isMultiAddressReachabilityCheckEnabled()) { // since ZOOKEEPER-3188 we can use multiple election addresses to reach a server. It is possible, that the // one we are using is already dead and we need to clean-up, so when we will create a new connection // then we will choose an other one, which is actually reachable senderWorkerMap.get(sid).asyncValidateIfSocketIsStillReachable(); } return; } synchronized (self.QV_LOCK) { boolean knownId = false; // Resolve hostname for the remote server before attempting to // connect in case the underlying ip address has changed. self.recreateSocketAddresses(sid); Map<Long, QuorumPeer.QuorumServer> lastCommittedView = self.getView(); QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier(); Map<Long, QuorumPeer.QuorumServer> lastProposedView = lastSeenQV.getAllMembers(); if (lastCommittedView.containsKey(sid)) { knownId = true; LOG.debug("Server {} knows {} already, it is in the lastCommittedView", self.getId(), sid); if (connectOne(sid, lastCommittedView.get(sid).electionAddr)) { return; } } if (lastSeenQV != null && lastProposedView.containsKey(sid) && (!knownId || (lastProposedView.get(sid).electionAddr != lastCommittedView.get(sid).electionAddr))) { knownId = true; LOG.debug("Server {} knows {} already, it is in the lastProposedView", self.getId(), sid);
if (connectOne(sid, lastProposedView.get(sid).electionAddr)) { return; } } if (!knownId) { LOG.warn("Invalid server id: {} ", sid); } } }