RocketMQ—Producer五路由队列选择
前言
路由队列选择的作用在于发送消息时可以指定发送到某个broker队列,或均衡发送到broker队列,其作用就是在于选择合适的队列进行消息发送。
目前客户端队列选择分为三种方式:
- 第一种:可根据MessageQueueSelector的实现或自扩展实现选择队列;
- 第二种:未开启Broker故障延迟机制(sendLatencyFaultEnable:false),会采用默认轮训机制(默认是此种实现方式);
- 第三种:开启Broker故障延迟机制(sendLatencyFaultEnable:true),会根据brokerName的可用性选择队列发送。
接下来我们就以这三种方式展开讨论。
一、队列选择
MessageQueueSelector方式队列选择在了解MessageQueueSelector的方式进行队列选择时,我们先回顾下MQProducer接口:里面有多个方法签名带参数MessageQueueSelector,其实就是表明使用此种方式选择消息队列需要显示穿参数才能使用;用下面这个接口方法进行举例分析:
-
SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg)
-
throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
接下来我们直接看内部实现源码如何实现的:
DefaultMQProducerImpl#sendSelectImpl
-
private SendResult sendSelectImpl(
-
Message msg, MessageQueueSelector selector,
-
Object arg, final CommunicationMode communicationMode,
-
final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException,
-
MQBrokerException, InterruptedException {
-
long beginStartTime = System.currentTimeMillis();
-
this.makeSureStateOK(); // 状态检测
-
Validators.checkMessage(msg, this.defaultMQProducer); // 消息验证
-
-
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
-
if (topicPublishInfo != null && topicPublishInfo.ok()) {
-
MessageQueue mq = null;
-
try {
-
mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg); // 1-队列选择
-
} catch (Throwable e) {
-
throw new MQClientException("select message queue throwed exception.", e);
-
}
-
-
long costTime = System.currentTimeMillis() - beginStartTime;
-
if (timeout < costTime) {
-
throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
-
}
-
if (mq != null) {
-
return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime); // 熟悉的配方,前面已经分析
-
} else {
-
throw new MQClientException("select message queue return null.", null); // 异常抛出
-
}
-
}
-
throw new MQClientException("No route info for this topic, " msg.getTopic(), null);
-
}
分析:
selector.select(topicPublishInfo.getMessageQueueList(), msg, arg) 队列选择;
其实现有以下三种:
- SelectMessageQueueByHash(hash)
- SelectMessageQueueByMachineRoom(机器随机)
- SelectMessageQueueByRandom(队列随机)
当然自己也可以定制化扩展,你说简单不简单?我们可简单查看其中之一的实现源码:
SelectMessageQueueByRandom
-
public class SelectMessageQueueByRandom implements MessageQueueSelector {
-
private Random random = new Random(System.currentTimeMillis());
-
-
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
-
int value = random.nextInt(mqs.size());
-
return mqs.get(value);
-
}
-
}
二、轮训机制
未开启Broker故障延迟机制(sendLatencyFaultEnable:false),会采用默认轮训机制;
来来来,我们直接上大餐,看到下面你就明白原来默认机制是如此的简单哈。
以下方法为入口:
DefaultMQProducerImpl#selectOneMessageQueue
-
/**
-
* 选择一个消息队列, lastBrokerName 就是上 一 次选择的执行发送消息失败的 Broker。第一次执行消息队列选择时, lastBrokerName 为 null
-
* @param tpInfo
-
* @param lastBrokerName
-
* @return
-
*/
-
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
-
return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
-
}
重头戏:
MQFaultStrategy#selectOneMessageQueue
-
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
-
if (this.sendLatencyFaultEnable) {
-
...下面分析...
-
return tpInfo.selectOneMessageQueue();
-
}
-
return tpInfo.selectOneMessageQueue(lastBrokerName);
-
}
备注:
TopicPublishInfo 熟悉不熟悉哈?可以翻翻文章(路由动态更新)selectOneMessageQueue源码注释分析,可以简单理解为队列轮询。
三、队列发送
开启Broker故障延迟机制(sendLatencyFaultEnable:true),进行选择队列发送。
3.1 发送延迟故障
如果发送延迟故障打开[sendLatencyFaultEnable:true],则发送时会统计发送耗时和失败[updateFaultItem],当某个broker节点发送失败和发送耗时较长,则在一段时间内不再选择该broker[selectOneMessageQueue]
3.2 流程图
简单流程图片如下:
描述:
- LatencyFaultToleranceImpl包含一个Map(key:brokerName,value:FaultItem可用信息):ConcurrentHashMap faultItemTable
- FaultItem 的数据结构如下:
-
class FaultItem implements Comparable<FaultItem> {
-
//条目唯一键,这里为 brokerName。
-
private final String name;
-
//本次消息发送延迟 。(消耗时间)
-
private volatile long currentLatency;
-
//故障规避开始时间 = 发生的时间 notAvailableDuration 。(故障恢复时间)
-
private volatile long startTimestamp;
-
-
.....
-
}
备注:
FaultItem包含currentLatency发送耗时,brokerName节点名称,startTimestamp时间戳后broker可用。
3.3 元数据映射
从MQFaultStrategy可以看出:发送延时一可用性延时~元数据映射
-
private final static InternalLogger log = ClientLogger.getLog();
-
private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
-
//启用 Broker故障延迟机制 ,默认不启用
-
private boolean sendLatencyFaultEnable = false;
-
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
-
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
简单总结如下:
- latencyMax数组:50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L 对应:发送耗时50ms 100ms 550ms 1s 2s 3s 15s
- notAvailableDuration数组:0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L 对应:可用延时 0s 0s 30s 60s 2min 3min 10min
故可以理解为:
- 发送耗时50和100毫秒,则当前broker延迟0秒
- 发送耗时550和1000毫秒,则当前broker延迟30秒和60秒
- 发送耗时2秒,则当前broker延迟2min
- 发送耗时3秒,则当前broker延迟3min
- 发送耗时15秒,则当前broker延迟10min6
- 如果发送失败,则直接延迟10min
3.4 源码分析:
1:选择队列
MQFaultStrategy#selectOneMessageQueue
-
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
-
if (this.sendLatencyFaultEnable) { // 1> 默认false,等于true相当于开启
-
try {
-
int index = tpInfo.getSendWhichQueue().getAndIncrement();
-
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i ) {
-
int pos = Math.abs(index ) % tpInfo.getMessageQueueList().size();
-
if (pos < 0)
-
pos = 0;
-
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
-
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { // 2> 判断可用性-时间判断(存储了所有发送消息失败过的broker)
-
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
-
return mq; // 3> 说明找到了可用的MessageQueue直接返回
-
}
-
}
-
//4> 尝试从规避的 Broker 中选择一个可用的 Broker(shuffle),如果没有找到,将返回 null。--
-
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
-
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
-
if (writeQueueNums > 0) {
-
final MessageQueue mq = tpInfo.selectOneMessageQueue();
-
if (notBestBroker != null) {
-
mq.setBrokerName(notBestBroker);
-
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
-
}
-
return mq;
-
} else {
-
latencyFaultTolerance.remove(notBestBroker);
-
}
-
} catch (Exception e) {
-
log.error("Error occurred when selecting message queue", e);
-
}
-
//5>兜底选择队列
-
return tpInfo.selectOneMessageQueue();
-
}
-
return tpInfo.selectOneMessageQueue(lastBrokerName);
-
}
备注:
此处主要是通过判断brokerName是否可用、不可用则该brokerName所有queue不可能、继续找下一个brokerName、如果找不到则排序shuffle找一个可用的。如果最终找不到则调用TopicPublishInfo.selectOneMessageQueue兜底选择一个队列返回。
2:更新broker的可用性
(根据发送延时换算可用性延时):updateFaultItem#updateFaultItem
-
/**
-
* 如果 isolation为 true,则使用 30s作为 computeNotAvailableDuration方法的参数;
-
* 如果 isolation为 false,则使用本次消息发送时延作为 computeNotAvailableDuration方法的参数,
-
* 那 computeNotAvailableDuration 的作用 是 计算因本次消息发送故障需要 将 Broker 规避的时长,
-
* @param brokerName
-
* @param currentLatency
-
* @param isolation =true 表示10分钟后可用,参见MQFaultStrategy的数组元数据
-
*/
-
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
-
if (this.sendLatencyFaultEnable) {
-
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
-
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
-
}
-
}
-
-
private long computeNotAvailableDuration(final long currentLatency) {
-
for (int i = latencyMax.length - 1; i >= 0; i--) {
-
if (currentLatency >= latencyMax[i])
-
return this.notAvailableDuration[i]; // 计算可用性延时
-
}
-
-
return 0;
-
}
备注:
此处逻辑相对简单、小知识点:记住:isolation=true的特殊情况
3:更新broker可用性延时
LatencyFaultToleranceImpl#faultItemTable
-
/**
-
* 根据 broker名称从 缓存表中获 取 Faultitem,如果找到则更新 Faultltem,否则创 建 Faultltem。
-
* 1) currentLatency、 startTimeStamp被volatile修饰。
-
* 2) startTimeStamp 为当前系统时间加上需要规避的时长 。startTimeStamp 是 判断 broker当前是否可用的直接一句,请看 Faultltem#isAvailable方法。
-
* @param name brokerName
-
* @param currentLatency 消息发送故障延迟时间 。
-
* @param notAvailableDuration 不可用持续时辰, 在这个时间内, Broker 将被规避 。
-
*/
-
-
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
-
FaultItem old = this.faultItemTable.get(name);
-
if (null == old) {
-
final FaultItem faultItem = new FaultItem(name);
-
faultItem.setCurrentLatency(currentLatency);
-
faultItem.setStartTimestamp(System.currentTimeMillis() notAvailableDuration);
-
-
old = this.faultItemTable.putIfAbsent(name, faultItem);
-
if (old != null) {
-
old.setCurrentLatency(currentLatency);
-
old.setStartTimestamp(System.currentTimeMillis() notAvailableDuration);//计算新的可用性延时
-
}
-
} else {
-
old.setCurrentLatency(currentLatency);
-
old.setStartTimestamp(System.currentTimeMillis() notAvailableDuration); //计算新的可用性延时
-
}
-
}
备注:
此处逻辑就是更新计算FaultItem的StartTimestamp
四、结论
- 通过本文分析我们已经清楚消息发送流程队列选择的三种方式,由于发送消息流程过程中不能动态切换此三种方式,故每种选择队列方式建议根据实际情况进行选择使用;
- 至此Producer的核心流程源码已经分析完、建议有兴趣可以回顾历史文章。
程序员的核心竞争力其实还是技术,因此对技术还是要不断的学习,关注 “IT巅峰技术” 公众号 ,该公众号内容定位:中高级开发、架构师、中层管理人员等中高端岗位服务的,除了技术交流外还有很多架构思想和实战案例。
作者是 《 消息中间件 RocketMQ 技术内幕》 一书作者,同时也是 “RocketMQ 上海社区”联合创始人,曾就职于拼多多、德邦等公司,现任上市快递公司架构负责人,主要负责开发框架的搭建、中间件相关技术的二次开发和运维管理、混合云及基础服务平台的建设。
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhghgaeg
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
怎样阻止微信小程序自动打开
PHP中文网 06-13