• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

RocketMQ5.0.0消息消费<二> _ 消息队列负载均衡机制

武飞扬头像
爱我所爱0505
帮助1

目录

一、消费队列负载均衡概览

二、消费队列负载均衡实现

1. 负载均衡UML

2. 启动RebalanceService线程

3. PUSH模式负载均衡

三、负载均衡策略

四、参考资料


一、消费队列负载均衡概览

        RocketMQ默认一个主题下有4个消费队列,集群模式下同一消费组内要求每个消费队列在同一时刻只能被一个消费者消费。那么集群模式下多个消费者是如何负载主题的多个消费队列呢?并且如果有新的消费者加入时,消费队列又会如何重新分布呢?

        RocketMQ消费端每20s周期执行一次消费队列重新负载,每次进行队列重新负载时会从Broker实时查询当前消费组内所有消费者,并且对消息队列、消费者列表进行排序,这样新加入的消费者就会在队列重新分布时分配到消费队列从而消费消息。如下所示,是消息拉取与消费队列负载均衡的交互图。

学新通
消息拉取与消费队列负载均衡的交互流程

二、消费队列负载均衡实现

1. 负载均衡UML

学新通

2. 启动RebalanceService线程

        参考《RocketMQ5.0.0消息消费<一> _ PUSH模式的消息拉取》
章节,消费者启动时,当前消费者添加到MQClientInstance#consumerTable属性中,并启动MQClientInstance实例。启动MQClientInstance实例时,会启动org.apache.rocketmq.client.impl.consumer.RebalanceService消费队列负载均衡服务线程。下图所示是该线程run()调用链。

学新通

        以下代码是MQClientInstance维护整个JVM的所有生产者和消费者的属性。

  1.  
    // 生产者容器
  2.  
    private final ConcurrentMap<String/* 生产组 */, MQProducerInner> producerTable = new ConcurrentHashMap<>();
  3.  
    // 消费者容器
  4.  
    private final ConcurrentMap<String/* 消费组 */, MQConsumerInner> consumerTable = new ConcurrentHashMap<>();

        org.apache.rocketmq.client.impl.consumer.RebalanceService#run()周期20s执行负载均衡任务。-Drocketmq.client.rebalance. waitlnterval参数修改执行周期,默认20s

  1.  
    @Override
  2.  
    public void run() {
  3.  
    log.info(this.getServiceName() " service started");
  4.  
     
  5.  
    while (!this.isStopped()) {
  6.  
    // 线程等待20s
  7.  
    this.waitForRunning(waitInterval);
  8.  
    // topic下消费队列的负载均衡
  9.  
    this.mqClientFactory.doRebalance();
  10.  
    }
  11.  
     
  12.  
    log.info(this.getServiceName() " service end");
  13.  
    }

        org.apache.rocketmq.client.impl.factory.MQClientInstance#doRebalance方法遍历MQClientInstance实例中所有消费组下消费者。每一个消费者DefaultMQPushConsumerImpl拥有一个org.apache.rocketmq.client.impl.consumer.RebalanceImpl对象(实现负载均衡),给每个消费者找到一个消费队列(重新负载)

  1.  
    // 消费队列负载均衡
  2.  
    public void doRebalance() {
  3.  
    for (Map.Entry<String/* 消费组 */, MQConsumerInner> entry : this.consumerTable.entrySet()) {
  4.  
    // 获取消费者
  5.  
    MQConsumerInner impl = entry.getValue();
  6.  
    if (impl != null) {
  7.  
    try {
  8.  
    // 消费者负载均衡
  9.  
    impl.doRebalance();
  10.  
    } catch (Throwable e) {
  11.  
    log.error("doRebalance exception", e);
  12.  
    }
  13.  
    }
  14.  
    }
  15.  
    }
学新通

3. PUSH模式负载均衡

        org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#doRebalance是PUSH模式的负载均衡的入口方法,其调用链如下。

学新通

        每个消费者DefaultMQPushConsumerImpl拥有一个RebalanceImpl对象,其中org.apache.rocketmq.client.impl.consumer.RebalanceImpl#doRebalance方法是对消费者的所有订阅主题进行负载均衡,即:消费者的所有订阅主题重新分配一个或多个消费队列来进行消费。其代码如下。注意事项:

  • Map<String/* topic */, SubscriptionData> subTable:获取当前消费者订阅的主题信息;
  • rebalanceByTopic():每个主题进行重新负载均衡
  1.  
    /**
  2.  
    * 对消费者订阅的每个topic进行消费队列重新负载
  3.  
    * step1:获取消费者订阅的主题信息,注意:消费者可以订阅多个主题
  4.  
    * step2:遍历消费者的每个topic
  5.  
    * step3:消费者订阅的topic进行消费队列重新负载
  6.  
    * {@link RebalanceImpl#rebalanceByTopic(String, boolean)}
  7.  
    * @param isOrder 是否顺序消息
  8.  
    * @return true所有topic重新负载成功
  9.  
    */
  10.  
    public boolean doRebalance(final boolean isOrder) {
  11.  
    boolean balanced = true;
  12.  
    // 获取消费者订阅的主题信息,注意:消费者可以订阅多个主题
  13.  
    Map<String/* topic */, SubscriptionData> subTable = this.getSubscriptionInner();
  14.  
    if (subTable != null) {
  15.  
    // 遍历消费者的每个topic
  16.  
    for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
  17.  
    final String topic = entry.getKey();
  18.  
    try {
  19.  
    if (!clientRebalance(topic) && tryQueryAssignment(topic)) {
  20.  
    balanced = this.getRebalanceResultFromBroker(topic, isOrder);
  21.  
    } else {
  22.  
    // 消费者订阅的topic进行消费队列重新负载
  23.  
    balanced = this.rebalanceByTopic(topic, isOrder);
  24.  
    }
  25.  
    } catch (Throwable e) {
  26.  
    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
  27.  
    log.warn("rebalance Exception", e);
  28.  
    balanced = false;
  29.  
    }
  30.  
    }
  31.  
    }
  32.  
    }
  33.  
     
  34.  
    this.truncateMessageQueueNotMyTopic();
  35.  
     
  36.  
    return balanced;
  37.  
    }
学新通

        org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic方法是对每个主题进行重新负载均衡的核心逻辑,如下代码所示。 这里介绍集群模式下负载均衡,注意事项:

  • MQClientInstance#findConsumerIdList():从Broker上获取所有订阅该topic且同属一个消费组的所有消费者ID。
  • 对消费队列、消费者ID集合排序:原因是同一个消费组内视图一致,确保同一个消费队列不会被多个消费者分配
  • AllocateMessageQueueStrategy#allocate:根据均衡策略,获取当前消费者的消息队列。
  • RebalanceImpl#updateProcessQueueTableInRebalance:重新负载后,消费者对应的分配后的消息队列是否变化: 新增、删除(其他消费者占用)
  1.  
    /**
  2.  
    * 消费者订阅的topic进行消费队列重新负载
  3.  
    * 集群模式下的步骤:
  4.  
    * step1:从主题订阅信息缓存表(topicSubscribeInfoTable)中获取当前topic的消费队列
  5.  
    * step2:从Broker上获取所有订阅该topic 同属一个消费组 的所有消费者ID
  6.  
    * step3:对消费队列、消费者ID排序,很重要,原因是:同一个消费组内视图一致,确保同一个消费队列不会被多个消费者分配
  7.  
    * step4:根据均衡策略,获取当前消费者的消息队列
  8.  
    * {@link AllocateMessageQueueStrategy#allocate}
  9.  
    * step5:消费者对应的分配消息队列是否变化: 新增、删除(其他消费者占用)
  10.  
    * {@link RebalanceImpl#updateProcessQueueTableInRebalance}
  11.  
    * @param topic 主题
  12.  
    * @param isOrder 是否是顺序消息
  13.  
    * @return true重新分配消息队列成功
  14.  
    */
  15.  
    private boolean rebalanceByTopic(final String topic, final boolean isOrder) {
  16.  
    boolean balanced = true;
  17.  
    switch (messageModel) {
  18.  
    case BROADCASTING: {
  19.  
    Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
  20.  
    if (mqSet != null) {
  21.  
    boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
  22.  
    if (changed) {
  23.  
    this.messageQueueChanged(topic, mqSet, mqSet);
  24.  
    log.info("messageQueueChanged {} {} {} {}", consumerGroup, topic, mqSet, mqSet);
  25.  
    }
  26.  
     
  27.  
    balanced = mqSet.equals(getWorkingMessageQueue(topic));
  28.  
    } else {
  29.  
    this.messageQueueChanged(topic, Collections.<MessageQueue>emptySet(), Collections.<MessageQueue>emptySet());
  30.  
    log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
  31.  
    }
  32.  
    break;
  33.  
    }
  34.  
    case CLUSTERING: {
  35.  
    // 从主题订阅信息缓存表中获取当前topic的消费队列
  36.  
    Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
  37.  
    // 从Broker上获取所有订阅该topic 同属一个消费组 的所有消费者ID
  38.  
    List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
  39.  
    if (null == mqSet) {
  40.  
    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
  41.  
    this.messageQueueChanged(topic, Collections.<MessageQueue>emptySet(), Collections.<MessageQueue>emptySet());
  42.  
    log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
  43.  
    }
  44.  
    }
  45.  
     
  46.  
    if (null == cidAll) {
  47.  
    log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
  48.  
    }
  49.  
     
  50.  
    if (mqSet != null && cidAll != null) {
  51.  
    List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
  52.  
    mqAll.addAll(mqSet);
  53.  
     
  54.  
    /*
  55.  
    消费队列、消费者ID排序很重要:同一个消费组内视图一致,确保同一个消费队列不会被多个消费者分配
  56.  
    */
  57.  
    // 消费队列排序
  58.  
    Collections.sort(mqAll);
  59.  
    // 消费者ID排序
  60.  
    Collections.sort(cidAll);
  61.  
     
  62.  
    // 均衡策略
  63.  
    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
  64.  
     
  65.  
    List<MessageQueue> allocateResult = null;
  66.  
    try {
  67.  
    // 根据均衡策略,获取当前消费者的消息队列
  68.  
    allocateResult = strategy.allocate(
  69.  
    this.consumerGroup,
  70.  
    this.mQClientFactory.getClientId(), // 当前消费者ID
  71.  
    mqAll,
  72.  
    cidAll);
  73.  
    } catch (Throwable e) {
  74.  
    log.error("allocate message queue exception. strategy name: {}, ex: {}", strategy.getName(), e);
  75.  
    return false;
  76.  
    }
  77.  
     
  78.  
    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
  79.  
    if (allocateResult != null) {
  80.  
    allocateResultSet.addAll(allocateResult);
  81.  
    }
  82.  
     
  83.  
    // 消费者对应的分配消息队列是否变化: 新增、删除(其他消费者占用)
  84.  
    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
  85.  
    if (changed) {
  86.  
    log.info(
  87.  
    "client rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
  88.  
    strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
  89.  
    allocateResultSet.size(), allocateResultSet);
  90.  
    this.messageQueueChanged(topic, mqSet, allocateResultSet);
  91.  
    }
  92.  
     
  93.  
    balanced = allocateResultSet.equals(getWorkingMessageQueue(topic));
  94.  
    }
  95.  
    break;
  96.  
    }
  97.  
    default:
  98.  
    break;
  99.  
    }
  100.  
     
  101.  
    return balanced;
  102.  
    }
学新通

        org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance重新分配后消费队列集合与上次负载的分配集合是否改变(新增或删除)来重新拉取消息。如下代码所示。

  • 删除(消费队列分配给其他消费者):暂停消费并移除,且持久化待移除消费队列的消费进度。
  • 新增(缓存表没有的消费队列):

                step1:删除内存中该消费队列的消费进度;

                step2:创建broker的消费队列;

                step3:从磁盘中获取该消费队列的消费进度(若进度<0时,则根据配置矫正消费进度),创建拉取消息请求。

  • 新增消费队列:重新创建拉取请求PullRequest加入到PullMessageService线程中,唤醒该线程拉取消息RebalanceImpl#dispatchPullRequest。
  • 若是顺序消息:是局部顺序消息,尝试向Broker请求锁定该消费队列,锁定失败延迟时则重新负载。
  1.  
    /**
  2.  
    * 消费者对应的分配消息队列是否变化
  3.  
    * step1:消费队列缓存表中不在本次均衡分配的消费队列时,则暂停消费并移除,且持久化待移除消费队列的消费进度;
  4.  
    * step2:本次均衡分配的消费队列不在消费队列缓存表中,则新增:
  5.  
    * 1):删除内存中该消费队列的消费进度;
  6.  
    * 2):创建broker的消费队列;
  7.  
    * 3):从磁盘中获取该消费队列的消费进度(若进度<0时,则根据配置矫正消费进度),创建拉取消息请求
  8.  
    * {@link RebalanceImpl#computePullFromWhere}
  9.  
    * step3: 新增消费队列,则创建{@link PullRequest}加入到{@link PullMessageService},唤醒该线程拉取消息
  10.  
    * {@link RebalanceImpl#dispatchPullRequest}
  11.  
    * step4:顺序消息时,则尝试向Broker请求锁定该消费队列,锁定失败延迟重新负载
  12.  
    * @param topic 主题
  13.  
    * @param mqSet 本次均衡分配的消费队列
  14.  
    * @param isOrder 是否顺序
  15.  
    * @return true变化;false未改变
  16.  
    */
  17.  
    private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
  18.  
    final boolean isOrder) {
  19.  
    boolean changed = false;
  20.  
     
  21.  
    // drop process queues no longer belong me 当前消费队列不在分配队列中
  22.  
    HashMap<MessageQueue, ProcessQueue> removeQueueMap = new HashMap<MessageQueue, ProcessQueue>(this.processQueueTable.size());
  23.  
    // 遍历当前消费队列缓存表
  24.  
    Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
  25.  
    while (it.hasNext()) {
  26.  
    Entry<MessageQueue, ProcessQueue> next = it.next();
  27.  
    MessageQueue mq = next.getKey();
  28.  
    ProcessQueue pq = next.getValue();
  29.  
     
  30.  
    // 是该topic的消费队列
  31.  
    if (mq.getTopic().equals(topic)) {
  32.  
    // 当前消费队列不在现有的分配消息队列中,则暂停消费、废弃当前消费队列并移除(分配给其他消费者)
  33.  
    if (!mqSet.contains(mq)) {
  34.  
    pq.setDropped(true);
  35.  
    removeQueueMap.put(mq, pq);
  36.  
    } else if (pq.isPullExpired() && this.consumeType() == ConsumeType.CONSUME_PASSIVELY) {
  37.  
    pq.setDropped(true);
  38.  
    removeQueueMap.put(mq, pq);
  39.  
    log.error("[BUG]doRebalance, {}, try remove unnecessary mq, {}, because pull is pause, so try to fixed it",
  40.  
    consumerGroup, mq);
  41.  
    }
  42.  
    }
  43.  
    }
  44.  
     
  45.  
    // remove message queues no longer belong me 移除不在分配的消费队列
  46.  
    for (Entry<MessageQueue, ProcessQueue> entry : removeQueueMap.entrySet()) {
  47.  
    MessageQueue mq = entry.getKey();
  48.  
    ProcessQueue pq = entry.getValue();
  49.  
     
  50.  
    /*
  51.  
    判断是否将{@link MessageQueue}、{@link ProcessQueue}缓存表中移除
  52.  
    a. 持久化待移除的{@link MessageQueue}消费进度;
  53.  
    b. 顺序消息时,需先解锁队列
  54.  
    */
  55.  
    if (this.removeUnnecessaryMessageQueue(mq, pq)) {
  56.  
    this.processQueueTable.remove(mq);
  57.  
    changed = true;
  58.  
    log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
  59.  
    }
  60.  
    }
  61.  
     
  62.  
    // add new message queue 遍历本次负载均衡分配的消费队列,缓存表中没有,则新增的消费队列
  63.  
    boolean allMQLocked = true; // 消费队列是否有锁定(顺序消息使用)
  64.  
    List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
  65.  
    for (MessageQueue mq : mqSet) {
  66.  
    // 新增的消费队列
  67.  
    if (!this.processQueueTable.containsKey(mq)) {
  68.  
    // 若是顺序消息,则尝试向Broker请求锁定该消费队列,锁定失败延迟重新负载
  69.  
    if (isOrder && !this.lock(mq)) {
  70.  
    log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
  71.  
    allMQLocked = false;
  72.  
    continue;
  73.  
    }
  74.  
     
  75.  
    // 删除内存中该消费队列的消费进度
  76.  
    this.removeDirtyOffset(mq);
  77.  
    // 创建broker的消费队列
  78.  
    ProcessQueue pq = createProcessQueue(topic);
  79.  
    pq.setLocked(true);
  80.  
    // 从磁盘中获取该消费队列的消费进度(若进度<0时,则根据配置矫正消费进度),创建拉取消息请求
  81.  
    long nextOffset = this.computePullFromWhere(mq);
  82.  
    if (nextOffset >= 0) {
  83.  
    ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
  84.  
    if (pre != null) {
  85.  
    log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
  86.  
    } else {
  87.  
    log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
  88.  
    // 创建拉取消息请求
  89.  
    PullRequest pullRequest = new PullRequest();
  90.  
    pullRequest.setConsumerGroup(consumerGroup);
  91.  
    pullRequest.setNextOffset(nextOffset);
  92.  
    pullRequest.setMessageQueue(mq);
  93.  
    pullRequest.setProcessQueue(pq);
  94.  
    pullRequestList.add(pullRequest);
  95.  
    changed = true;
  96.  
    }
  97.  
    } else {
  98.  
    log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
  99.  
    }
  100.  
    }
  101.  
     
  102.  
    }
  103.  
     
  104.  
    // 锁定消费队列失败,延迟重新负载
  105.  
    if (!allMQLocked) {
  106.  
    mQClientFactory.rebalanceLater(500);
  107.  
    }
  108.  
     
  109.  
    // 将拉取消息对象{@link PullRequest}加入到{@link PullMessageService},唤醒该线程拉取消息
  110.  
    this.dispatchPullRequest(pullRequestList, 500);
  111.  
     
  112.  
    return changed;
  113.  
    }
学新通

        根据RebalanceImpl#updateProcessQueueTableInRebalance来判定消费者对应的分配到的消息队列是否变化(新增或删除)时,若是新增,则先删除内存消费进度,再从Broker端获取该消费队列的消费进度;若是删除,持久化消费进度同时删除旧的消费队列。 

a. 删除操作

        org.apache.rocketmq.client.impl.consumer.RebalanceImpl#removeUnnecessaryMessageQueue负载均衡时删除未分配的消费队列,其调用链如下。

学新通

b. 新增操作

        先删除该消费队列旧的内存消费进度,执行方法RebalanceImpl#removeDirtyOffset,其调用链如下。

学新通

        再从Broker磁盘获取该消费队列消费进度,执行RebalanceImpl#computePullFromWhere,其调用链如下。 

学新通

三、负载均衡策略

        org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy是消费队列负载均衡策略的接口,其有6个实现类,UML图如下。其中:

  • AllocateMessageQueueAveragely:平均分配算法(默认),如:8个消息消费队列q1、q2、q3、q4、q5、q6、q7、q8,有3个消费者c1、c2、c3,则分配如下:

                c1:q1、q2、q3

                c2:q4、q5、q6

                c3:q7、q8

  • AllocateMessageQueueAveragelyByCircle:平均轮询算法,如:8个消息消费队列q1、q2、q3、q4、q5、q6、q7、q8,有3个消费者c1、c2、c3,则分配如下:

                c1:q1、q4、q7

                c2:q2、q5、q8

                c3:q3、q6

学新通

四、参考资料

https://www.cnblogs.com/alisystemsoftware/p/16935521.html

消费者负载均衡 | RocketMQ

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhggbfhg
系列文章
更多 icon
同类精品
更多 icon
继续加载