• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

rocketmq的消费者源码解读二消费负载均衡

武飞扬头像
orcharddd_real
帮助1

6)负载均衡

首先要弄清楚一个问题,啥是集群消费,这里假设单台机器对应一个jvm进程,且只包含一个clientId,其实一个jvm进程,可以包含多个clientId,多个clientId意味着多个客户端实例MQClientInstance;
多台机器都订阅了同一个主题,但该主题又分配在了不同broker的queue中,所以需要将每个主题对应的这些不同broker上的queue收集到集合set中,再将该set平均分配给不同机器;
负载均衡指将messageQueue分配给不同的clientId,通常一个clientId对应一台机器;这和生产者发消息很相似,生产者是轮询选择一个messageQueue发送;
集群中每个机器代码都一样,订阅同一个topic的所有机器(clientId不同)都是一样的业务处理逻辑,所以每个机器分别处理一部分消息,最终目的都是入库落表;

6.1)rebalanceService线程的run方法

调用countDownLatch的await方法,进行超时阻塞,时间默认20s,可配置;醒来后执行MQClientInstance#doRebalance方法,其中会遍历consumerTable,执行每个消费者组下的RebalanceImpl#doRebalance方法,其中又会遍历subscriptionInner表,对每个topic执行RebalanceImpl#rebalanceByTopic方法;

6.1.1)获取set

根据topic从topicSubscriptionInfoTable中获取set;

6.1.2)MQClientInstance#findConsumerIdList

根据topic——>topicRouteData——>List,最后随机选择一个主节点broker,向该broker发送rpc同步请求查询订阅了当前topic的所有clientId,请求码为RequestCode.GET_CONSUMER_LIST_BY_GROUP;

6.1.3)调用AllocateMessageQueueAveragely#allocate方法

默认调用AllocateMessageQueueAveragely#allocate方法,将set平均分配给订阅了当前topic的所有clientId,返回当前clientId分配到的所有messageQueue;

6.1.4)RebalanceImpl#updateProcessQueueTableInRebalance

processQueueTable集合中每个messageQueue和processQueue是一对一对应的关系,从messageQueue中拉取到的消息,在消费者本地会放入到processQueue中缓存,再取出来消费;当前clientId订阅此topic主题会对应多个mq即set,也会构建相应多个pq;最新分配给当前clientId的一组mq,可能和上一次分配的mq不同,新增的mq要新建processQueue实例,减少的mq要设置相应pq为不可用;

6.1.4.1)遍历processQueueTable

6.1.4.1.1)pqTable中的mq在当前最新分配的set中存在
若存在,则判断当前mq对应的pq是否超过最大空闲时间,默认120s;若超过120秒当前pq没有被消费,则也会执行RebalancePushImpl#removeUnnecessaryMessageQueue方法;若是并发集群消费,则只执行下边的persist方法和removeOffset方法,若是集群顺序消费,则还要执行unlockDelay方法;
6.1.4.1.1.1)RemoteBrokerOffsetStore#persisit
从offsetTable中取出当前mq对应的进度,调用RemoteBrokerOffsetStore#updateConsumeOffsetToBroker方法,向broker发送rpc请求,将当前mq的消费进度存进broker端,请求码为UPDATE_CONSUMER_OFFSET,请求头包括topic,消费者组名,当前mq对应的queueId,当前mq的消费进度offset;
6.1.4.1.1.2)RemoteBrokerOffsetStore#removeOffset
将当前mq从offsetTable中移除;
6.1.4.1.1.3)RebalancePushImpl#unlockDelay
若是顺序消费且是集群模式,则先拿到pq的lock,接着执行RebalancePushImpl#unlockDelay方法,其中又会执行RebalanceImpl#unlock方法,其中又会执行MQClientAPIImpl#unlockBatchMQ方法,向broker发送rpc请求,请求码为UNLOCK_BATCH_MQ,请求broker端释放当前mq的分布式锁,由于顺序消费,需要先获取当前mq的分布式锁,所以若当前mq这次没有分配给当前clientId,则需要释放当前mq的分布式锁;这里可以选择发送单步或者同步请求;
6.1.4.1.1.4)broker端释放mq的分布式锁
根据请求码找到AdminBrokerProcessor#unlockBatchMQ,其中又会执行RebalanceLockManager#unlockBatch方法,将从mqLockTable表中删除<mq,lockEntry>,mqLockTable表中key为消费者组名,value为chm<MessageQueue,LockEntry>;这里删除的就是value;关于mq分布式锁的具体细节,在顺序消费时,再分析;删除后,客户端这边拿到返回结果,并且释放pq的lock;
removeUnnecessaryMessageQueue方法执行结束并返回true后,会 删除processQueueTable中的当前mq对应的键值对<mq,pq>;
6.1.4.1.2)pqTable中的mq在当前最新分配的set中不存在
!mqSet.contains(mq),该条件成立表示当前mq在这次被分配出去了;此时需要再次执行RebalancePushImpl#removeUnnecessaryMessageQueue方法,和上边processQueue超时没使用,逻辑是一样的;
removeUnnecessaryMessageQueue方法执行结束并返回true后,会 删除processQueueTable中的当前mq对应的键值对<mq,pq>;

6.1.4.2)遍历新分配的set集合

!this.processQueueTable.containsKey(mq)成立表示当前mq是新分配到当前clientId的;针对该mq,需要做几件事:
6.1.4.2.1)顺序消费时,先获取当前mq分布式锁
若是顺序消费则执行this.lock方法尝试获取当前mq的分布式锁,获取锁成功则继续执行,否则判断下一个mq;
6.1.4.2.2)RebalancePushImpl#removeDirtyOffset
删除当前mq在offsetTable上的进度,相当于清空原有记录;消费端本地的记录是旧的记录,需要从broker端拉取当前mq最新的记录;
6.1.4.2.3)实例化ProcessQueue
执行processQueue的无参构造方法;每创建一个ProcessQueue实例,其实例变量msgTreeMap,consumingMsgOrderlyTreeMap,lockTreeMap均会创建一个各自实例;
6.1.4.2.4)RebalancePushImpl#computePullFromWhere
默认的消费起点是COSUME_FROM_FIRST_OFFSET,接着调用RemoteBrokerOffsetStore#readOffset方法,这里选择向broker发送rpc请求,拉取当前mq的消费进度;请求码是QUERY_CONSUMER_OFFSET,这是个同步调用;拿到进度值后更新客户端本地的offsetTable表;
6.1.4.2.5)更新processQueueTable
将当前mq与pq,放入processQueueTable中;
6.1.4.2.6)实例化PullRequest
创建一个PullRequest实例,放入pullRequestList中;新建PullRequest实例,封装了消费者组,mq,pq,nextOffset;
6.1.4.2.7)RebalancePushImpl#dispatchPullRequest
结束set的遍历回来到这里,接着遍历pullRequestList,最终执行pullRequestQueue.put(pullRequest);

6.1.4.3)关于分配messageQueue给消费者的其他细节:

可以将pullRequest看成是一个通道,通道连接broker中队列与消费者,每次消费结束,更新通道中消费进度,再次进行消费;
最开始一个消费组内有4个消费者,id分别为0,1,2,3;他们同时订阅了相同的主题A,A路由在7个messageQueue上,所属在不同的broker上;若按照平均分配策略,则为2 2 2 1分配给4个消费者,当id为3的0的消费者退出了,此时需要对剩下消费者重新排序为0,1,2,重新分配messageQueue,则为3 2 2;所以每次消费前,一般都会拿到最新的路由信息,再去找messageQueue;
rebalanceService线程启动后,每隔20s会去broker拉取当前clientId订阅的topic包括的messageQueue,并重新分配,检查正在消费的messageQueue是否有变化,检查方式是比较最新的分配与缓存的分配,若新增了,则创建pullRequest,加入pullRequestQueue中,在一个jvm进程中,同一个消费组,同一个队列只会存在一个pullRequest对象;该对象中封装了待拉取的messageQueue,messageQueue的下次消费偏移量等;这样pullmessageService线程就知道从哪个messageQueue哪个位置拉取消息了;重平衡线程可以看成是为拉取消息线程提供一些信息,告诉从哪儿拉取;当拉取消息线程知道从哪儿拉取后,每次异步拉取到消息后,又会将更新后的pullRequest加入pullRequestQueue中,让阻塞的拉取消息线程继续执行,实现了循环拉取,准实时;

6.2)broker端唤醒客户端rebalanceSevice线程

当broker端的channel通道有事件发生时,会通知到消费者,即broker会向客户端发送一个rpc请求,请求唤醒rebalanceService线程;客户端收到请求后,会调用ClientRemotingProcessor#notifyConsumerIdsChanged方法,其中又会调用MQClientInstance#rebalanceImmediately方法,其中会执行countDownLatch的countDown方法唤醒rebalanceService线程;

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgebhjj
系列文章
更多 icon
同类精品
更多 icon
继续加载