Kafka 3.0 源码笔记(5)-Kafka 服务端集群 Leader 选举的流程
1. Kafka 集群选举的流程
在 Kafka 3.0 源码笔记(1)-Kafka 服务端的网络通信架构 中笔者提到在 KRaft 模式下 Kafka 集群的元数据已经交由 Controller 集群自治,则在分布式环境下必然要涉及到集群节点的交互,包括集群选主、集群元数据同步等。其中 Kafka 集群选举涉及的状态流转如下图所示,关键的请求交互如下:
Vote
由 Candidate 候选者节点发送,请求其他节点为自己投票BeginQuorumEpoch
由 Leader 节点发送,告知其他节点当前的 Leader 信息EndQuorumEpoch
当前 Leader 退位时发送,触发重新选举Fetch
由 Follower 发送,用于复制 Leader 日志,另外通过 Fetch 请求 Follower 也可以完成对 Leader 的探活
从集群元数据的维护角度来看,Kafka 集群中的每个节点都是以下 3 种身份之一:
Leader
整个 Kafka 集群的主节点,由具有 controller 角色并在controller.quorum.voters
配置的列表中的节点担任,负责维护元数据的读写Follower(Voter)
有投票权的从节点,由具有 controller 角色并在controller.quorum.voters
配置的列表中的节点担任,从 Leader 节点处同步集群元数据,并负责处理部分来自Follower(Observer)
的集群元数据读请求Follower(Observer)
没有投票权的从节点,从Leader/Follower(Voter)
处同步元数据,包含以下两类节点:
- 只具有 broker 角色的节点,需注意 broker 角色功能模块将通过监听集群元数据变化来进行对应创建分区等动作,负责消息数据的读写
- 具有 controller 角色但不在
controller.quorum.voters
列表中的节点
2. Kafka 集群选举的源码分析
Controller 集群节点交互的这部分其实依赖协调集群信息的 KafkaRaftManager
组件,本文以集群选主的场景切入分析,将集群的运作机制分为以下几个部分:
- 集群组件
KafkaRaftManager
的初始化准备- 集群组件
KafkaRaftManager
的启动运行- 集群选主的主流程处理
- 选举僵局的处理
2.1 KafkaRaftManager 的初始化准备
-
KafkaRaftManager
组件初始化的触发点在KafkaRaftServer
实例的创建过程中,可以看到其初始化过程中的关键处理如下:- 调用
RaftManager.scala#buildNetworkChannel()
方法创建底层网络通信组件KafkaNetworkChannel
- 调用
RaftManager.scala#buildRaftClient()
方法创建集群客户端KafkaRaftClient
- 创建
RaftIoThread
线程用于不断进行本地网络通信客户端请求响应处理
class KafkaRaftManager[T]( metaProperties: MetaProperties, config: KafkaConfig, recordSerde: RecordSerde[T], topicPartition: TopicPartition, topicId: Uuid, time: Time, metrics: Metrics, threadNamePrefixOpt: Option[String], val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]] ) extends RaftManager[T] with Logging { private val raftConfig = new RaftConfig(config) private val threadNamePrefix = threadNamePrefixOpt.getOrElse("kafka-raft") private val logContext = new LogContext(s"[RaftManager nodeId=${config.nodeId}] ") this.logIdent = logContext.logPrefix() private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix "-scheduler") scheduler.startup() private val dataDir = createDataDir() override val replicatedLog: ReplicatedLog = buildMetadataLog() private val netChannel = buildNetworkChannel() override val client: KafkaRaftClient[T] = buildRaftClient() private val raftIoThread = new RaftIoThread(client, threadNamePrefix) ... }
- 调用
-
RaftManager.scala#buildNetworkChannel()
方法的核心为以下两步:- 调用
RaftManager.scala#buildNetworkClient()
方法创建网络客户端,可以看到这个方法创建了NetworkClient
实例用于底层网络连接的监听处理 - 使用上一步创建的
NetworkClient
实例新建KafkaNetworkChannel
对象,给上层提供集群交互请求的发送入口
private def buildNetworkChannel(): KafkaNetworkChannel = { val netClient = buildNetworkClient() new KafkaNetworkChannel(time, netClient, config.quorumRequestTimeoutMs, threadNamePrefix) } private def buildNetworkClient(): NetworkClient = { val controllerListenerName = new ListenerName(config.controllerListenerNames.head) val controllerSecurityProtocol = config.listenerSecurityProtocolMap.getOrElse(controllerListenerName, SecurityProtocol.forName(controllerListenerName.value())) val channelBuilder = ChannelBuilders.clientChannelBuilder( controllerSecurityProtocol, JaasContext.Type.SERVER, config, controllerListenerName, config.saslMechanismControllerProtocol, time, config.saslInterBrokerHandshakeRequestEnable, logContext ) val metricGroupPrefix = "raft-channel" val collectPerConnectionMetrics = false val selector = new Selector( NetworkReceive.UNLIMITED, config.connectionsMaxIdleMs, metrics, time, metricGroupPrefix, Map.empty[String, String].asJava, collectPerConnectionMetrics, channelBuilder, logContext ) val clientId = s"raft-client-${config.nodeId}" val maxInflightRequestsPerConnection = 1 val reconnectBackoffMs = 50 val reconnectBackoffMsMs = 500 val discoverBrokerVersions = true new NetworkClient( selector, new ManualMetadataUpdater(), clientId, maxInflightRequestsPerConnection, reconnectBackoffMs, reconnectBackoffMsMs, Selectable.USE_DEFAULT_BUFFER_SIZE, config.socketReceiveBufferBytes, config.quorumRequestTimeoutMs, config.connectionSetupTimeoutMs, config.connectionSetupTimeoutMaxMs, time, discoverBrokerVersions, new ApiVersions, logContext ) }
- 调用
-
KafkaNetworkChannel
实例创建过程中关键组件也会被创建,其中就包括RaftSendThread
请求发送线程。RaftSendThread
继承自InterBrokerSendThread
,其用途将在后文分析class KafkaNetworkChannel( time: Time, client: KafkaClient, requestTimeoutMs: Int, threadNamePrefix: String ) extends NetworkChannel with Logging { import KafkaNetworkChannel._ type ResponseHandler = AbstractResponse => Unit private val correlationIdCounter = new AtomicInteger(0) private val endpoints = mutable.HashMap.empty[Int, Node] private val requestThread = new RaftSendThread( name = threadNamePrefix "-outbound-request-thread", networkClient = client, requestTimeoutMs = requestTimeoutMs, time = time, isInterruptible = false ) }
-
回到本节步骤1第2步
RaftManager.scala#buildRaftClient()
方法执行,可以看到核心的处理分为两步:- 创建
KafkaRaftClient
集群客户端实例 - 调用
KafkaRaftClient.java#initialize()
方法初始化当前节点的 quorum 状态
private def buildRaftClient(): KafkaRaftClient[T] = { val expirationTimer = new SystemTimer("raft-expiration-executor") val expirationService = new TimingWheelExpirationService(expirationTimer) val quorumStateStore = new FileBasedStateStore(new File(dataDir, "quorum-state")) val nodeId = if (config.processRoles.contains(ControllerRole)) { OptionalInt.of(config.nodeId) } else { OptionalInt.empty() } val client = new KafkaRaftClient( recordSerde, netChannel, replicatedLog, quorumStateStore, time, metrics, expirationService, logContext, metaProperties.clusterId, nodeId, raftConfig ) client.initialize() client }
- 创建
-
KafkaRaftClient
集群客户端的构造方法如下,需要关注的点有以下几个:- 调用
RaftConfig.java#quorumVoterIds()
方法获取配置文件中controller.quorum.voters
属性配置的有选举权的节点的 id 列表 - 创建代表自身在集群中的角色状态的
QuorumState
实例 - 调用
KafkaNetworkChannel.java#updateEndpoint()
方法将其它可以投票的节点连接地址信息更新到集群请求发送组件内部
KafkaRaftClient( RecordSerde<T> serde, NetworkChannel channel, RaftMessageQueue messageQueue, ReplicatedLog log, QuorumStateStore quorumStateStore, MemoryPool memoryPool, Time time, Metrics metrics, ExpirationService expirationService, int fetchMaxWaitMs, String clusterId, OptionalInt nodeId, LogContext logContext, Random random, RaftConfig raftConfig ) { this.serde = serde; this.channel = channel; this.messageQueue = messageQueue; this.log = log; this.memoryPool = memoryPool; this.fetchPurgatory = new ThresholdPurgatory<>(expirationService); this.appendPurgatory = new ThresholdPurgatory<>(expirationService); this.time = time; this.clusterId = clusterId; this.fetchMaxWaitMs = fetchMaxWaitMs; this.logger = logContext.logger(KafkaRaftClient.class); this.random = random; this.raftConfig = raftConfig; this.snapshotCleaner = new RaftMetadataLogCleanerManager(logger, time, 60000, log::maybeClean); Set<Integer> quorumVoterIds = raftConfig.quorumVoterIds(); this.requestManager = new RequestManager(quorumVoterIds, raftConfig.retryBackoffMs(), raftConfig.requestTimeoutMs(), random); this.quorum = new QuorumState( nodeId, quorumVoterIds, raftConfig.electionTimeoutMs(), raftConfig.fetchTimeoutMs(), quorumStateStore, time, logContext, random); this.kafkaRaftMetrics = new KafkaRaftMetrics(metrics, "raft", quorum); kafkaRaftMetrics.updateNumUnknownVoterConnections(quorum.remoteVoters().size()); // Update the voter endpoints with what's in RaftConfig Map<Integer, RaftConfig.AddressSpec> voterAddresses = raftConfig.quorumVoterConnections(); voterAddresses.entrySet().stream() .filter(e -> e.getValue() instanceof RaftConfig.InetAddressSpec) .forEach(e -> this.channel.updateEndpoint(e.getKey(), (RaftConfig.InetAddressSpec) e.getValue())); }
- 调用
-
回到本节步骤4第2步,
KafkaRaftClient.java#initialize()
方法的核心其实是调用QuorumState.java#initialize()
方法去初始化当前节点在集群中所处的角色状态public void initialize() { quorum.initialize(new OffsetAndEpoch(log.endOffset().offset, log.lastFetchedEpoch())); long currentTimeMs = time.milliseconds(); if (quorum.isLeader()) { throw new IllegalStateException("Voter cannot initialize as a Leader"); } else if (quorum.isCandidate()) { onBecomeCandidate(currentTimeMs); } else if (quorum.isFollower()) { onBecomeFollower(currentTimeMs); } // When there is only a single voter, become candidate immediately if (quorum.isVoter() && quorum.remoteVoters().isEmpty() && !quorum.isCandidate()) { transitionToCandidate(currentTimeMs); } }
-
QuorumState.java#initialize()
方法比较长,不过核心只有两点:- 首先调用
QuorumStateStore#readElectionState()
从本地quorum-state文件读取选举状态记录,此处主要是为了覆盖节点重启的场景 - 根据选举状态初始化节点的集群状态,初次启动的节点将被设置为
UnattachedState
状态
public void initialize(OffsetAndEpoch logEndOffsetAndEpoch) throws IllegalStateException { // We initialize in whatever state we were in on shutdown. If we were a leader // or candidate, probably an election was held, but we will find out about it // when we send Vote or BeginEpoch requests. ElectionState election; try { election = store.readElectionState(); if (election == null) { election = ElectionState.withUnknownLeader(0, voters); } } catch (final UncheckedIOException e) { // For exceptions during state file loading (missing or not readable), // we could assume the file is corrupted already and should be cleaned up. log.warn("Clearing local quorum state store after error loading state {}", store.toString(), e); store.clear(); election = ElectionState.withUnknownLeader(0, voters); } final EpochState initialState; if (!election.voters().isEmpty() && !voters.equals(election.voters())) { throw new IllegalStateException("Configured voter set: " voters " is different from the voter set read from the state file: " election.voters() ". Check if the quorum configuration is up to date, " "or wipe out the local state file if necessary"); } else if (election.hasVoted() && !isVoter()) { String localIdDescription = localId.isPresent() ? localId.getAsInt() " is not a voter" : "is undefined"; throw new IllegalStateException("Initialized quorum state " election " with a voted candidate, which indicates this node was previously " " a voter, but the local id " localIdDescription); } else if (election.epoch < logEndOffsetAndEpoch.epoch) { log.warn("Epoch from quorum-state file is {}, which is " "smaller than last written epoch {} in the log", election.epoch, logEndOffsetAndEpoch.epoch); initialState = new UnattachedState( time, logEndOffsetAndEpoch.epoch, voters, Optional.empty(), randomElectionTimeoutMs(), logContext ); } else if (localId.isPresent() && election.isLeader(localId.getAsInt())) { // If we were previously a leader, then we will start out as resigned // in the same epoch. This serves two purposes: // 1. It ensures that we cannot vote for another leader in the same epoch. // 2. It protects the invariant that each record is uniquely identified by // offset and epoch, which might otherwise be violated if unflushed data // is lost after restarting. initialState = new ResignedState( time, localId.getAsInt(), election.epoch, voters, randomElectionTimeoutMs(), Collections.emptyList(), logContext ); } else if (localId.isPresent() && election.isVotedCandidate(localId.getAsInt())) { initialState = new CandidateState( time, localId.getAsInt(), election.epoch, voters, Optional.empty(), 1, randomElectionTimeoutMs(), logContext ); } else if (election.hasVoted()) { initialState = new VotedState( time, election.epoch, election.votedId(), voters, Optional.empty(), randomElectionTimeoutMs(), logContext ); } else if (election.hasLeader()) { initialState = new FollowerState( time, election.epoch, election.leaderId(), voters, Optional.empty(), fetchTimeoutMs, logContext ); } else { initialState = new UnattachedState( time, election.epoch, voters, Optional.empty(), randomElectionTimeoutMs(), logContext ); } transitionTo(initialState); }
- 首先调用
-
至此
KafkaRaftClient
对象的实例化基本结束,回到本节步骤1第3步,RaftIoThread
的定义比较简单,可以看到核心的方法doWork()
方法其实就是调用KafkaRaftClien.java#poll()
,这部分后文将详细分析,至此集群组件KafkaRaftManager
的初始化结束class RaftIoThread( client: KafkaRaftClient[_], threadNamePrefix: String ) extends ShutdownableThread( name = threadNamePrefix "-io-thread", isInterruptible = false ) { override def doWork(): Unit = { client.poll() } ...... }
2.2 KafkaRaftManager 的启动运行
-
KafkaRaftManager
组件启动运行的触发点在KafkaRaftServer
实例的启动过程中,KafkaRaftManager.scala#startup()
方法的关键如下:- 调用
KafkaNetworkChannel.scala#start()
启动集群交互请求发送组件 - 调用
RaftIoThread.scala#start()
方法启动线程,用于不断进行本地网络通信客户端请求响应处理
def startup(): Unit = { // Update the voter endpoints (if valid) with what's in RaftConfig val voterAddresses: util.Map[Integer, AddressSpec] = controllerQuorumVotersFuture.get() for (voterAddressEntry <- voterAddresses.entrySet.asScala) { voterAddressEntry.getValue match { case spec: InetAddressSpec => netChannel.updateEndpoint(voterAddressEntry.getKey, spec) case _: UnknownAddressSpec => logger.info(s"Skipping channel update for destination ID: ${voterAddressEntry.getKey} " s"because of non-routable endpoint: ${NON_ROUTABLE_ADDRESS.toString}") case invalid: AddressSpec => logger.warn(s"Unexpected address spec (type: ${invalid.getClass}) for channel update for " s"destination ID: ${voterAddressEntry.getKey}") } } netChannel.start() raftIoThread.start() }
- 调用
-
KafkaNetworkChannel.scala#start()
方法其实就是启动RaftSendThread
线程,这个线程类继承于InterBrokerSendThread
,InterBrokerSendThread
又继承于ShutdownableThread
,下文将进行分析其工作机制def start(): Unit = { requestThread.start() }
-
RaftIoThread.scala#start()
方法比较关键,从流程上看,此处实际上是整个KafkaRaftManager
组件开始工作的入口。RaftIoThread
间接继承了Thread
,实际上线程启动后应该会调用RaftIoThread.scala#run()
方法,而这个方法由其父类ShutdownableThread.scala#run()
实现可以看到,
ShutdownableThread.scala#run()
的核心逻辑其实是循环调用子类RaftIoThread.scala#doWork()
方法override def run(): Unit = { isStarted = true info("Starting") try { while (isRunning) doWork() } catch { case e: FatalExitError => shutdownInitiated.countDown() shutdownComplete.countDown() info("Stopped") Exit.exit(e.statusCode()) case e: Throwable => if (isRunning) error("Error due to", e) } finally { shutdownComplete.countDown() } info("Stopped") }
-
RaftIoThread.scala#doWork()
方法非常简明,重要逻辑就是调用KafkaRaftClient.java#poll()
方法开始进行集群节点间的交互,至此核心组件的启动告一段落override def doWork(): Unit = { client.poll() }
2.3 集群选主的流程
-
KafkaRaftClient.java#poll()
方法源码简单易懂,核心在于以下几步:- 首先调用
KafkaRaftClient.java#pollCurrentState()
方法根据节点当前状态进行下一步处理,推动集群状态的变迁 - 通过
messageQueue.poll()
轮询消息队列,取出消息队列中的消息调用KafkaRaftClient.java#handleInboundMessage()
进行处理。需注意,此处消息队列实际只会存储两种消息,一种是来自其他节点的RaftRequest.Inbound
请求,另一种是其他节点对本节点的请求作出的RaftResponse.Inbound
响应
public void poll() { pollListeners(); long currentTimeMs = time.milliseconds(); if (maybeCompleteShutdown(currentTimeMs)) { return; } long pollStateTimeoutMs = pollCurrentState(currentTimeMs); long cleaningTimeoutMs = snapshotCleaner.maybeClean(currentTimeMs); long pollTimeoutMs = Math.min(pollStateTimeoutMs, cleaningTimeoutMs); kafkaRaftMetrics.updatePollStart(currentTimeMs); RaftMessage message = messageQueue.poll(pollTimeoutMs); currentTimeMs = time.milliseconds(); kafkaRaftMetrics.updatePollEnd(currentTimeMs); if (message != null) { handleInboundMessage(message, currentTimeMs); } }
- 首先调用
-
KafkaRaftClient.java#pollCurrentState()
方法是节点集群状态流转的重点,上文中我们提到初次启动的节点都处于UnattachedState
状态,则此处将调用KafkaRaftClient.java#pollUnattached()
方法private long pollCurrentState(long currentTimeMs) { if (quorum.isLeader()) { return pollLeader(currentTimeMs); } else if (quorum.isCandidate()) { return pollCandidate(currentTimeMs); } else if (quorum.isFollower()) { return pollFollower(currentTimeMs); } else if (quorum.isVoted()) { return pollVoted(currentTimeMs); } else if (quorum.isUnattached()) { return pollUnattached(currentTimeMs); } else if (quorum.isResigned()) { return pollResigned(currentTimeMs); } else { throw new IllegalStateException("Unexpected quorum state " quorum); } }
-
KafkaRaftClient.java#pollUnattached()
方法的关键逻辑如下:- 首先判断当前节点是否有
Controller
角色,以及是否在属性controller.quorum.voters
配置的有选举权的节点列表中,条件成立的话则调用KafkaRaftClient.java#pollUnattachedAsVoter()
方法 KafkaRaftClient.java#pollUnattachedAsVoter()
方法的处理非常简单,可以看到核心其实是调用UnattachedState#hasElectionTimeoutExpired()
方法判断当前节点的选举时间是否到达,如果是的话则调用KafkaRaftClient.java#transitionToCandidate()
方法将当前节点的角色切换为候选者,切换的过程中当前节点会给自己投一票
private long pollUnattached(long currentTimeMs) { UnattachedState state = quorum.unattachedStateOrThrow(); if (quorum.isVoter()) { return pollUnattachedAsVoter(state, currentTimeMs); } else { return pollUnattachedAsObserver(state, currentTimeMs); } } private long pollUnattachedAsVoter(UnattachedState state, long currentTimeMs) { GracefulShutdown shutdown = this.shutdown.get(); if (shutdown != null) { // If shutting down, then remain in this state until either the // shutdown completes or an epoch bump forces another state transition return shutdown.remainingTimeMs(); } else if (state.hasElectionTimeoutExpired(currentTimeMs)) { transitionToCandidate(currentTimeMs); return 0L; } else { return state.remainingElectionTimeMs(currentTimeMs); } }
- 首先判断当前节点是否有
-
节点状态变迁为候选者后,下一轮
KafkaRaftClient.java#poll()
方法调用最终将触发KafkaRaftClient.java#pollCandidate()
方法,这个方法的几个异常分支我们暂且不管,先看下KafkaRaftClient.java#maybeSendVoteRequests()
向其他节点发起 Vote 投票请求的处理private long pollCandidate(long currentTimeMs) { CandidateState state = quorum.candidateStateOrThrow(); GracefulShutdown shutdown = this.shutdown.get(); if (shutdown != null) { // If we happen to shutdown while we are a candidate, we will continue // with the current election until one of the following conditions is met: // 1) we are elected as leader (which allows us to resign) // 2) another leader is elected // 3) the shutdown timer expires long minRequestBackoffMs = maybeSendVoteRequests(state, currentTimeMs); return Math.min(shutdown.remainingTimeMs(), minRequestBackoffMs); } else if (state.isBackingOff()) { if (state.isBackoffComplete(currentTimeMs)) { logger.info("Re-elect as candidate after election backoff has completed"); transitionToCandidate(currentTimeMs); return 0L; } return state.remainingBackoffMs(currentTimeMs); } else if (state.hasElectionTimeoutExpired(currentTimeMs)) { long backoffDurationMs = binaryExponentialElectionBackoffMs(state.retries()); logger.debug("Election has timed out, backing off for {}ms before becoming a candidate again", backoffDurationMs); state.startBackingOff(currentTimeMs, backoffDurationMs); return backoffDurationMs; } else { long minRequestBackoffMs = maybeSendVoteRequests(state, currentTimeMs); return Math.min(minRequestBackoffMs, state.remainingElectionTimeMs(currentTimeMs)); } }
-
KafkaRaftClient.java#maybeSendVoteRequests()
方法源码如下,可以看到经过层层检查,Vote 请求会被发送给每一个有投票权的节点,最终调用到的方法的核心逻辑如下:- 首先生成
RaftRequest.Outbound
请求出站对象,并且设置这个请求完成后的回调方法,可以看到其回调中会把对端的响应包装成RaftResponse.Inbound
响应入站对象,并通过messageQueue.add()
调用将响应投入到本节步骤1第2步提到的消息队列中 - 调用
KafkaNetworkChannel.scala#send()
方法将出站请求投入到队列中
private long maybeSendVoteRequests( CandidateState state, long currentTimeMs ) { // Continue sending Vote requests as long as we still have a chance to win the election if (!state.isVoteRejected()) { return maybeSendRequests( currentTimeMs, state.unrecordedVoters(), this::buildVoteRequest ); } return Long.MAX_VALUE; } private long maybeSendRequests( long currentTimeMs, Set<Integer> destinationIds, Supplier<ApiMessage> requestSupplier ) { long minBackoffMs = Long.MAX_VALUE; for (Integer destinationId : destinationIds) { long backoffMs = maybeSendRequest(currentTimeMs, destinationId, requestSupplier); if (backoffMs < minBackoffMs) { minBackoffMs = backoffMs; } } return minBackoffMs; } private long maybeSendRequest( long currentTimeMs, int destinationId, Supplier<ApiMessage> requestSupplier ) { ConnectionState connection = requestManager.getOrCreate(destinationId); if (connection.isBackingOff(currentTimeMs)) { long remainingBackoffMs = connection.remainingBackoffMs(currentTimeMs); logger.debug("Connection for {} is backing off for {} ms", destinationId, remainingBackoffMs); return remainingBackoffMs; } if (connection.isReady(currentTimeMs)) { int correlationId = channel.newCorrelationId(); ApiMessage request = requestSupplier.get(); RaftRequest.Outbound requestMessage = new RaftRequest.Outbound( correlationId, request, destinationId, currentTimeMs ); requestMessage.completion.whenComplete((response, exception) -> { if (exception != null) { ApiKeys api = ApiKeys.forId(request.apiKey()); Errors error = Errors.forException(exception); ApiMessage errorResponse = RaftUtil.errorResponse(api, error); response = new RaftResponse.Inbound( correlationId, errorResponse, destinationId ); } messageQueue.add(response); }); channel.send(requestMessage); logger.trace("Sent outbound request: {}", requestMessage); connection.onRequestSent(correlationId, currentTimeMs); return Long.MAX_VALUE; } return connection.remainingRequestTimeMs(currentTimeMs); }
- 首先生成
-
KafkaNetworkChannel.scala#send()
方法的实现如下,核心动作是调用RaftSendThread.scala#sendRequest()
方法将请求投入到发送队列。需注意,此处设置了异步请求的回调函数,请求完成拿到对端响应后将一路回调,通知到本节步骤5第1步override def send(request: RaftRequest.Outbound): Unit = { def completeFuture(message: ApiMessage): Unit = { val response = new RaftResponse.Inbound( request.correlationId, message, request.destinationId ) request.completion.complete(response) } def onComplete(clientResponse: ClientResponse): Unit = { val response = if (clientResponse.versionMismatch != null) { error(s"Request $request failed due to unsupported version error", clientResponse.versionMismatch) errorResponse(request.data, Errors.UNSUPPORTED_VERSION) } else if (clientResponse.authenticationException != null) { // For now we treat authentication errors as retriable. We use the // `NETWORK_EXCEPTION` error code for lack of a good alternative. // Note that `BrokerToControllerChannelManager` will still log the // authentication errors so that users have a chance to fix the problem. error(s"Request $request failed due to authentication error", clientResponse.authenticationException) errorResponse(request.data, Errors.NETWORK_EXCEPTION) } else if (clientResponse.wasDisconnected()) { errorResponse(request.data, Errors.BROKER_NOT_AVAILABLE) } else { clientResponse.responseBody.data } completeFuture(response) } endpoints.get(request.destinationId) match { case Some(node) => requestThread.sendRequest(RequestAndCompletionHandler( request.createdTimeMs, destination = node, request = buildRequest(request.data), handler = onComplete )) case None => completeFuture(errorResponse(request.data, Errors.BROKER_NOT_AVAILABLE)) } }
-
RaftSendThread.scala#sendRequest()
方法只是个请求入队的操作,在 2.2节步骤2 我们提到了RaftSendThread
的继承结构,则可以知道这个线程启动后,核心逻辑在于InterBrokerSendThread.scala#doWork()
def sendRequest(request: RequestAndCompletionHandler): Unit = { queue.add(request) wakeup() }
-
InterBrokerSendThread.scala#doWork()
方法的处理关键是触发InterBrokerSendThread.scala#pollOnce()
方法,这个方法的核心如下:- 调用
InterBrokerSendThread.scala#drainGeneratedRequests()
方法生成协议要求的请求对象 - 调用
InterBrokerSendThread.scala#sendRequests()
方法将请求投入到底层网络客户端NetworkClient
的发送缓冲区 - 调用
NetworkClient.java#poll()
方法触发底层网络数据的收发
override def doWork(): Unit = { pollOnce(Long.MaxValue) } protected def pollOnce(maxTimeoutMs: Long): Unit = { try { drainGeneratedRequests() var now = time.milliseconds() val timeout = sendRequests(now, maxTimeoutMs) networkClient.poll(timeout, now) now = time.milliseconds() checkDisconnects(now) failExpiredRequests(now) unsentRequests.clean() } catch { case _: DisconnectException if !networkClient.active() => // DisconnectException is expected when NetworkClient#initiateClose is called case e: FatalExitError => throw e case t: Throwable => error(s"unhandled exception caught in InterBrokerSendThread", t) // rethrow any unhandled exceptions as FatalExitError so the JVM will be terminated // as we will be in an unknown state with potentially some requests dropped and not // being able to make progress. Known and expected Errors should have been appropriately // dealt with already. throw new FatalExitError() } }
- 调用
-
InterBrokerSendThread.scala#drainGeneratedRequests()
方法的关键处理分为两步:- 调用子类
RaftSendThread.scala#generateRequests()
将上层发送队列中的请求数据存入 Buffer 对象 - 调用
NetworkClient.java#newClientRequest()
方法将上层请求转化为网络客户端请求对象ClientRequest
,并将其存入集合
private def drainGeneratedRequests(): Unit = { generateRequests().foreach { request => unsentRequests.put(request.destination, networkClient.newClientRequest( request.destination.idString, request.request, request.creationTimeMs, true, requestTimeoutMs, request.handler )) } }
- 调用子类
-
RaftSendThread.scala#generateRequests()
的处理简单明了,不再赘述def generateRequests(): Iterable[RequestAndCompletionHandler] = { val buffer = mutable.Buffer[RequestAndCompletionHandler]() while (true) { val request = queue.poll() if (request == null) { return buffer } else { buffer = request } } buffer }
-
回到本节步骤8第2步,
InterBrokerSendThread.scala#sendRequests()
方法的处理其实就是从未发送请求的集合中取出请求,再通过NetworkClient.java#send()
方法将其存入网络客户端的发送缓冲区,笔者在Kafka 3.0 源码笔记(3)-Kafka 消费者的核心流程源码分析 中详细分析过底层网络客户端的运行机制,此处不再赘述private def sendRequests(now: Long, maxTimeoutMs: Long): Long = { var pollTimeout = maxTimeoutMs for (node <- unsentRequests.nodes.asScala) { val requestIterator = unsentRequests.requestIterator(node) while (requestIterator.hasNext) { val request = requestIterator.next if (networkClient.ready(node, now)) { networkClient.send(request, now) requestIterator.remove() } else pollTimeout = Math.min(pollTimeout, networkClient.connectionDelay(node, now)) } } pollTimeout }
-
至此 Vote 投票请求 已经发送出去,对端节点接收到请求后经过一系列分发将会触发
KafkaRaftClient.java#handleVoteRequest()
方法,可以看到其处理核心如下:- 首先校验请求携带过来的参数,如果不合法直接响应返回
- 其次比较请求携带过来的集群版本
epoch
和本地epoch
,如果本地版本更小,则说明当前节点已经落后,需要切换到UnattachedState
状态,需注意,这是回退机制的重要基础 - 最后比较请求携带的日志尾部 Offset 和本地 Offset,并根据本节点当前的角色状态决定投票结果。如果当前节点投了赞成票且自身处于
UnattachedState
状态则切换到VotedState
状态
private VoteResponseData handleVoteRequest( RaftRequest.Inbound requestMetadata ) { VoteRequestData request = (VoteRequestData) requestMetadata.data; if (!hasValidClusterId(request.clusterId())) { return new VoteResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code()); } if (!hasValidTopicPartition(request, log.topicPartition())) { // Until we support multi-raft, we treat individual topic partition mismatches as invalid requests return new VoteResponseData().setErrorCode(Errors.INVALID_REQUEST.code()); } VoteRequestData.PartitionData partitionRequest = request.topics().get(0).partitions().get(0); int candidateId = partitionRequest.candidateId(); int candidateEpoch = partitionRequest.candidateEpoch(); int lastEpoch = partitionRequest.lastOffsetEpoch(); long lastEpochEndOffset = partitionRequest.lastOffset(); if (lastEpochEndOffset < 0 || lastEpoch < 0 || lastEpoch >= candidateEpoch) { return buildVoteResponse(Errors.INVALID_REQUEST, false); } Optional<Errors> errorOpt = validateVoterOnlyRequest(candidateId, candidateEpoch); if (errorOpt.isPresent()) { return buildVoteResponse(errorOpt.get(), false); } if (candidateEpoch > quorum.epoch()) { transitionToUnattached(candidateEpoch); } OffsetAndEpoch lastEpochEndOffsetAndEpoch = new OffsetAndEpoch(lastEpochEndOffset, lastEpoch); boolean voteGranted = quorum.canGrantVote(candidateId, lastEpochEndOffsetAndEpoch.compareTo(endOffset()) >= 0); if (voteGranted && quorum.isUnattached()) { transitionToVoted(candidateId, candidateEpoch); } logger.info("Vote request {} with epoch {} is {}", request, candidateEpoch, voteGranted ? "granted" : "rejected"); return buildVoteResponse(Errors.NONE, voteGranted); }
-
对端节点处理完 Vote 请求,其响应将被本节步骤5提到的请求完成回调投入到消息队列中,并最终触发当前节点的
KafkaRaftClient.java#handleVoteResponse()
方法,这个方法的核心流程如下:- 如果对端节点投的是赞成票,则调用
CandidateState#recordGrantedVote()
计票,并调用KafkaRaftClient.java#maybeTransitionToLeader()
检查得票数是否超过有投票权的节点数的一半,如果条件成立则当前节点切换到LeaderState
状态,成为集群的 Leader - 如果对端节点投的是反对票,同样记录下来,并检查反对票是否已经过半,如是则调用
CandidateState#startBackingOff()
方法通过回退避免多个候选者的选举僵局
private boolean handleVoteResponse( RaftResponse.Inbound responseMetadata, long currentTimeMs ) { int remoteNodeId = responseMetadata.sourceId(); VoteResponseData response = (VoteResponseData) responseMetadata.data; Errors topLevelError = Errors.forCode(response.errorCode()); if (topLevelError != Errors.NONE) { return handleTopLevelError(topLevelError, responseMetadata); } if (!hasValidTopicPartition(response, log.topicPartition())) { return false; } VoteResponseData.PartitionData partitionResponse = response.topics().get(0).partitions().get(0); Errors error = Errors.forCode(partitionResponse.errorCode()); OptionalInt responseLeaderId = optionalLeaderId(partitionResponse.leaderId()); int responseEpoch = partitionResponse.leaderEpoch(); Optional<Boolean> handled = maybeHandleCommonResponse( error, responseLeaderId, responseEpoch, currentTimeMs); if (handled.isPresent()) { return handled.get(); } else if (error == Errors.NONE) { if (quorum.isLeader()) { logger.debug("Ignoring vote response {} since we already became leader for epoch {}", partitionResponse, quorum.epoch()); } else if (quorum.isCandidate()) { CandidateState state = quorum.candidateStateOrThrow(); if (partitionResponse.voteGranted()) { state.recordGrantedVote(remoteNodeId); maybeTransitionToLeader(state, currentTimeMs); } else { state.recordRejectedVote(remoteNodeId); // If our vote is rejected, we go immediately to the random backoff. This // ensures that we are not stuck waiting for the election timeout when the // vote has become gridlocked. if (state.isVoteRejected() && !state.isBackingOff()) { logger.info("Insufficient remaining votes to become leader (rejected by {}). " "We will backoff before retrying election again", state.rejectingVoters()); state.startBackingOff( currentTimeMs, binaryExponentialElectionBackoffMs(state.retries()) ); } } } else { logger.debug("Ignoring vote response {} since we are no longer a candidate in epoch {}", partitionResponse, quorum.epoch()); } return true; } else { return handleUnexpectedError(error, responseMetadata); } }
- 如果对端节点投的是赞成票,则调用
2.4 选举僵局的处理-回退机制
读者可以想象一下如果集群中多个可投票节点同时启动,并且都是初次启动,那么很可能这些节点都会切换到候选者状态。此时它们即便收到了其他候选者的 Vote 投票请求,也不会为其他候选者投票,选举就陷入了失败的僵局。对于这种情况, Kafka 引入了回退机制
进行处理,大致流程如下图所示:
回退机制的核心在于使用
controller.quorum.election.backoff.max.ms
配置设置一个随机的回退超时时间,KafkaRaftClient.java#pollCandidate()
方法会检查候选者节点是否处于回退状态,回退状态的候选者将不再发送 Vote 请求。一旦回退的超时时间到达,最早退出回退状态的候选者节点将重新发起 Vote 投票,此时投票请求中携带的集群 epoch 增加了一个版本,收到请求的其他候选者会因为版本落后而回退到UnattachedState
状态,此时可以顺利地投赞成票,选举僵局解除
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhgkhghg
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
怎样阻止微信小程序自动打开
PHP中文网 06-13