• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

RocketMQ : Rebalance

武飞扬头像
陈少平
帮助3

1、本节关键字

每隔 20s Rebalance、重复消费、默认的平均分配队列算法、ConsumerFromWhere

2、概述

Consumer 概述,我们知道,1个消费组下是会存在多个消费者的,并且可能会有新增消费者,或者新增队列,因此 RocketMQ 需要能够动态的为消费者分配队列。在 RocketMQ 中,这个过程称为 Rebalance

RocketMQ 为了实现动态分配,每隔 20s 执行一次 Rebalance

下面主要讲解 CLUSTERING 模式的 Rebalance 过程。

3、Rebalance 过程

Rebalance 主要就干 2 件事情:(每隔 20s 执行一次)

  1. 根据分配队列算法,重新为消费者分配队列

  2. 分配完队列后,根据 Consumer 设置的 consumeFromWhere 获取一个拉取消息起始偏移量 offset(实际上设置该参数并没有用,RocketMQ 非常坑爹的地方)

  3. 生成 PullRequest, 并将 offset 写入 nextOffset 属性中, 以唤醒拉取消息线程。

需要注意的是 Rebalance 会导致消费者重复消费, 在讲解消息端消费时,会说明该问题。

4、分配队列算法

在创建 Consumer 的时候,如果没有手动指定分配队列算法,那么 RocketMQ 会使用平均分配算法。

如下是启动时,手动指定队列分配算法


DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("topic", "group");

consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragelyByCircle());

consumer.start();

4.1、平均分配算法

对应的实现类:AllocateMessageQueueAveragely。平均分配算法为默认的分配算法。

消费者 分配队列
Consumer1 queueId1, queueId2, queueId3
Consumer2 queueId4, queueId5, queueId6
Consumer3 queueId7, queueId8, queueId9

4.2、轮询分配算法

对应实现类:AllocateMessageQueueAveragelyByCircle

消费者 分配队列
Consumer1 queueId1, queueId4, queueId7
Consumer2 queueId2, queueId5, queueId8
Consumer3 queueId3, queueId6, queueId9

4.3、机房近端优先分配算法

这个是 RocketMQ 的官方注释 An allocate strategy proxy for based on machine room nearside priority

对应实现类 AllocateMachineRoomNearby

我们需要手动实现 AllocateMachineRoomNearby.MachineRoomResolver 告诉 RocketMQ 哪些队列属于哪个 broker;哪些 consumer 属于哪个 broker

则该算法会优先 匹配 同属于 Broker 下的 队列和消费者;如果不匹配,则将队列分配给其他的 消费者

学新通


// 我们需要告诉 RocketMQ 如何 哪些队列属于哪个 broker; 哪些 consumer 属于哪个 broker

AllocateMachineRoomNearby.MachineRoomResolver machineRoomResolver = new AllocateMachineRoomNearby.MachineRoomResolver() {

    @Override 
    public String brokerDeployIn(MessageQueue messageQueue) {
        return messageQueue.getBrokerName().split("-")[0];
    }

    @Override 
    public String consumerDeployIn(String clientID) {
        return clientID.split("-")[0];
    }

};

AllocateMessageQueueStrategy allocateMessageQueueStrategy = new AllocateMachineRoomNearby(new AllocateMessageQueueAveragely(), machineRoomResolver);

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("topic", "group");

consumer.setAllocateMessageQueueStrategy(allocateMessageQueueStrategy);

consumer.start();

4.4、按照机房分配算法

对应实现类:AllocateMessageQueueByMachineRoom


AllocateMessageQueueByMachineRoom strategy = new AllocateMessageQueueByMachineRoom();

Set<String> consumeridcs = new HashSet<>();

consumeridcs.add("broker-a");

strategy.setConsumeridcs(consumeridcs);

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TBW102", "please_rename_unique_group_name_4");

consumer.setAllocateMessageQueueStrategy(strategy);

算法逻辑:

  1. 将消费者分配到指定的机房

  2. 先平均分配,如果有多余,则从第一个 consumer 依次分配

学新通

5、FAQ

Q: 在不新增消费者、队列的前提下,每次 Rebalance 后,每个消费者都会被分配到原先的队列吗?

A: 只要保证所有的消费者 消息队列顺序、消费者列表顺序、分配消息队列策略 一致,就可以保证每次 Rebalance 时,消费者都会固定消费原先分配的队列。RocketMQ 在为每个消费者分配队列时,会对 消息队列、消费者列表 进行排序。

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhekkjch
系列文章
更多 icon
同类精品
更多 icon
继续加载