• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

RocketMQ—Producer五路由队列选择

武飞扬头像
IT巅峰技术
帮助1

前言

路由队列选择的作用在于发送消息时可以指定发送到某个broker队列,或均衡发送到broker队列,其作用就是在于选择合适的队列进行消息发送。

目前客户端队列选择分为三种方式:

  • 第一种:可根据MessageQueueSelector的实现或自扩展实现选择队列;
  • 第二种:未开启Broker故障延迟机制(sendLatencyFaultEnable:false),会采用默认轮训机制(默认是此种实现方式);
  • 第三种:开启Broker故障延迟机制(sendLatencyFaultEnable:true),会根据brokerName的可用性选择队列发送。

接下来我们就以这三种方式展开讨论。

一、队列选择

MessageQueueSelector方式队列选择在了解MessageQueueSelector的方式进行队列选择时,我们先回顾下MQProducer接口:里面有多个方法签名带参数MessageQueueSelector,其实就是表明使用此种方式选择消息队列需要显示穿参数才能使用;用下面这个接口方法进行举例分析:

  1.  
    SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg)
  2.  
    throws MQClientException, RemotingException, MQBrokerException, InterruptedException;

接下来我们直接看内部实现源码如何实现的:

DefaultMQProducerImpl#sendSelectImpl

  1.  
    private SendResult sendSelectImpl(
  2.  
    Message msg, MessageQueueSelector selector,
  3.  
    Object arg, final CommunicationMode communicationMode,
  4.  
    final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException,
  5.  
    MQBrokerException, InterruptedException {
  6.  
    long beginStartTime = System.currentTimeMillis();
  7.  
    this.makeSureStateOK(); // 状态检测
  8.  
    Validators.checkMessage(msg, this.defaultMQProducer); // 消息验证
  9.  
     
  10.  
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
  11.  
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
  12.  
    MessageQueue mq = null;
  13.  
    try {
  14.  
    mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg); // 1-队列选择
  15.  
    } catch (Throwable e) {
  16.  
    throw new MQClientException("select message queue throwed exception.", e);
  17.  
    }
  18.  
     
  19.  
    long costTime = System.currentTimeMillis() - beginStartTime;
  20.  
    if (timeout < costTime) {
  21.  
    throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
  22.  
    }
  23.  
    if (mq != null) {
  24.  
    return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime); // 熟悉的配方,前面已经分析
  25.  
    } else {
  26.  
    throw new MQClientException("select message queue return null.", null); // 异常抛出
  27.  
    }
  28.  
    }
  29.  
    throw new MQClientException("No route info for this topic, " msg.getTopic(), null);
  30.  
    }
学新通

分析:

selector.select(topicPublishInfo.getMessageQueueList(), msg, arg) 队列选择;

其实现有以下三种:

  • SelectMessageQueueByHash(hash)
  • SelectMessageQueueByMachineRoom(机器随机)
  • SelectMessageQueueByRandom(队列随机)

当然自己也可以定制化扩展,你说简单不简单?我们可简单查看其中之一的实现源码:

SelectMessageQueueByRandom

  1.  
    public class SelectMessageQueueByRandom implements MessageQueueSelector {
  2.  
    private Random random = new Random(System.currentTimeMillis());
  3.  
    @Override
  4.  
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
  5.  
    int value = random.nextInt(mqs.size());
  6.  
    return mqs.get(value);
  7.  
    }
  8.  
    }

二、轮训机制

未开启Broker故障延迟机制(sendLatencyFaultEnable:false),会采用默认轮训机制;

来来来,我们直接上大餐,看到下面你就明白原来默认机制是如此的简单哈。

以下方法为入口:

DefaultMQProducerImpl#selectOneMessageQueue

  1.  
    /**
  2.  
    * 选择一个消息队列, lastBrokerName 就是上 一 次选择的执行发送消息失败的 Broker。第一次执行消息队列选择时, lastBrokerName 为 null
  3.  
    * @param tpInfo
  4.  
    * @param lastBrokerName
  5.  
    * @return
  6.  
    */
  7.  
    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
  8.  
    return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
  9.  
    }

重头戏:

MQFaultStrategy#selectOneMessageQueue

  1.  
    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
  2.  
    if (this.sendLatencyFaultEnable) {
  3.  
    ...下面分析...
  4.  
    return tpInfo.selectOneMessageQueue();
  5.  
    }
  6.  
    return tpInfo.selectOneMessageQueue(lastBrokerName);
  7.  
    }

备注:

TopicPublishInfo 熟悉不熟悉哈?可以翻翻文章(路由动态更新)selectOneMessageQueue源码注释分析,可以简单理解为队列轮询。

三、队列发送

开启Broker故障延迟机制(sendLatencyFaultEnable:true),进行选择队列发送。

3.1 发送延迟故障

如果发送延迟故障打开[sendLatencyFaultEnable:true],则发送时会统计发送耗时和失败[updateFaultItem],当某个broker节点发送失败和发送耗时较长,则在一段时间内不再选择该broker[selectOneMessageQueue]

3.2 流程图

简单流程图片如下:

学新通

 描述:

  1. LatencyFaultToleranceImpl包含一个Map(key:brokerName,value:FaultItem可用信息):ConcurrentHashMap faultItemTable
  2. FaultItem 的数据结构如下:
  1.  
    class FaultItem implements Comparable<FaultItem> {
  2.  
    //条目唯一键,这里为 brokerName。
  3.  
    private final String name;
  4.  
    //本次消息发送延迟 。(消耗时间)
  5.  
    private volatile long currentLatency;
  6.  
    //故障规避开始时间 = 发生的时间 notAvailableDuration 。(故障恢复时间)
  7.  
    private volatile long startTimestamp;
  8.  
     
  9.  
    .....
  10.  
    }

备注:

FaultItem包含currentLatency发送耗时,brokerName节点名称,startTimestamp时间戳后broker可用。

3.3 元数据映射

从MQFaultStrategy可以看出:发送延时一可用性延时~元数据映射

  1.  
    private final static InternalLogger log = ClientLogger.getLog();
  2.  
    private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
  3.  
    //启用 Broker故障延迟机制 ,默认不启用
  4.  
    private boolean sendLatencyFaultEnable = false;
  5.  
    private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
  6.  
    private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

简单总结如下:

  • latencyMax数组:50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L 对应:发送耗时50ms 100ms 550ms 1s 2s 3s 15s
  • notAvailableDuration数组:0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L 对应:可用延时 0s 0s 30s 60s 2min 3min 10min

故可以理解为:

  1. 发送耗时50和100毫秒,则当前broker延迟0秒
  2. 发送耗时550和1000毫秒,则当前broker延迟30秒和60秒
  3. 发送耗时2秒,则当前broker延迟2min
  4. 发送耗时3秒,则当前broker延迟3min
  5. 发送耗时15秒,则当前broker延迟10min6
  6. 如果发送失败,则直接延迟10min

3.4 源码分析:

1:选择队列

MQFaultStrategy#selectOneMessageQueue

  1.  
    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
  2.  
    if (this.sendLatencyFaultEnable) { // 1> 默认false,等于true相当于开启
  3.  
    try {
  4.  
    int index = tpInfo.getSendWhichQueue().getAndIncrement();
  5.  
    for (int i = 0; i < tpInfo.getMessageQueueList().size(); i ) {
  6.  
    int pos = Math.abs(index ) % tpInfo.getMessageQueueList().size();
  7.  
    if (pos < 0)
  8.  
    pos = 0;
  9.  
    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
  10.  
    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { // 2> 判断可用性-时间判断(存储了所有发送消息失败过的broker)
  11.  
    if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
  12.  
    return mq; // 3> 说明找到了可用的MessageQueue直接返回
  13.  
    }
  14.  
    }
  15.  
    //4> 尝试从规避的 Broker 中选择一个可用的 Broker(shuffle),如果没有找到,将返回 null。--
  16.  
    final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
  17.  
    int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
  18.  
    if (writeQueueNums > 0) {
  19.  
    final MessageQueue mq = tpInfo.selectOneMessageQueue();
  20.  
    if (notBestBroker != null) {
  21.  
    mq.setBrokerName(notBestBroker);
  22.  
    mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
  23.  
    }
  24.  
    return mq;
  25.  
    } else {
  26.  
    latencyFaultTolerance.remove(notBestBroker);
  27.  
    }
  28.  
    } catch (Exception e) {
  29.  
    log.error("Error occurred when selecting message queue", e);
  30.  
    }
  31.  
    //5>兜底选择队列
  32.  
    return tpInfo.selectOneMessageQueue();
  33.  
    }
  34.  
    return tpInfo.selectOneMessageQueue(lastBrokerName);
  35.  
    }
学新通

备注:

此处主要是通过判断brokerName是否可用、不可用则该brokerName所有queue不可能、继续找下一个brokerName、如果找不到则排序shuffle找一个可用的。如果最终找不到则调用TopicPublishInfo.selectOneMessageQueue兜底选择一个队列返回。

2:更新broker的可用性

(根据发送延时换算可用性延时):updateFaultItem#updateFaultItem

  1.  
    /**
  2.  
    * 如果 isolation为 true,则使用 30s作为 computeNotAvailableDuration方法的参数;
  3.  
    * 如果 isolation为 false,则使用本次消息发送时延作为 computeNotAvailableDuration方法的参数,
  4.  
    * 那 computeNotAvailableDuration 的作用 是 计算因本次消息发送故障需要 将 Broker 规避的时长,
  5.  
    * @param brokerName
  6.  
    * @param currentLatency
  7.  
    * @param isolation =true 表示10分钟后可用,参见MQFaultStrategy的数组元数据
  8.  
    */
  9.  
    public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
  10.  
    if (this.sendLatencyFaultEnable) {
  11.  
    long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
  12.  
    this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
  13.  
    }
  14.  
    }
  15.  
     
  16.  
    private long computeNotAvailableDuration(final long currentLatency) {
  17.  
    for (int i = latencyMax.length - 1; i >= 0; i--) {
  18.  
    if (currentLatency >= latencyMax[i])
  19.  
    return this.notAvailableDuration[i]; // 计算可用性延时
  20.  
    }
  21.  
     
  22.  
    return 0;
  23.  
    }
学新通

备注:

此处逻辑相对简单、小知识点:记住:isolation=true的特殊情况

3:更新broker可用性延时

LatencyFaultToleranceImpl#faultItemTable

  1.  
    /**
  2.  
    * 根据 broker名称从 缓存表中获 取 Faultitem,如果找到则更新 Faultltem,否则创 建 Faultltem。
  3.  
    * 1) currentLatency、 startTimeStamp被volatile修饰。
  4.  
    * 2) startTimeStamp 为当前系统时间加上需要规避的时长 。startTimeStamp 是 判断 broker当前是否可用的直接一句,请看 Faultltem#isAvailable方法。
  5.  
    * @param name brokerName
  6.  
    * @param currentLatency 消息发送故障延迟时间 。
  7.  
    * @param notAvailableDuration 不可用持续时辰, 在这个时间内, Broker 将被规避 。
  8.  
    */
  9.  
    @Override
  10.  
    public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
  11.  
    FaultItem old = this.faultItemTable.get(name);
  12.  
    if (null == old) {
  13.  
    final FaultItem faultItem = new FaultItem(name);
  14.  
    faultItem.setCurrentLatency(currentLatency);
  15.  
    faultItem.setStartTimestamp(System.currentTimeMillis() notAvailableDuration);
  16.  
     
  17.  
    old = this.faultItemTable.putIfAbsent(name, faultItem);
  18.  
    if (old != null) {
  19.  
    old.setCurrentLatency(currentLatency);
  20.  
    old.setStartTimestamp(System.currentTimeMillis() notAvailableDuration);//计算新的可用性延时
  21.  
    }
  22.  
    } else {
  23.  
    old.setCurrentLatency(currentLatency);
  24.  
    old.setStartTimestamp(System.currentTimeMillis() notAvailableDuration); //计算新的可用性延时
  25.  
    }
  26.  
    }
学新通

备注:

此处逻辑就是更新计算FaultItem的StartTimestamp

四、结论

  1. 通过本文分析我们已经清楚消息发送流程队列选择的三种方式,由于发送消息流程过程中不能动态切换此三种方式,故每种选择队列方式建议根据实际情况进行选择使用;
  2. 至此Producer的核心流程源码已经分析完、建议有兴趣可以回顾历史文章。

程序员的核心竞争力其实还是技术,因此对技术还是要不断的学习,关注 “IT巅峰技术” 公众号 ,该公众号内容定位:中高级开发、架构师、中层管理人员等中高端岗位服务的,除了技术交流外还有很多架构思想和实战案例。

作者是 《 消息中间件 RocketMQ 技术内幕》 一书作者,同时也是 “RocketMQ 上海社区”联合创始人,曾就职于拼多多、德邦等公司,现任上市快递公司架构负责人,主要负责开发框架的搭建、中间件相关技术的二次开发和运维管理、混合云及基础服务平台的建设。

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhghgaeg
系列文章
更多 icon
同类精品
更多 icon
继续加载