rocketmq kafaka rabbitmq 三大消息中间件防消息丢失
rocketmq方案
消息整体处理过程
这里我们将消息的整体处理阶段分为3个阶段进行分析:
- Producer发送消息阶段。
- Broker处理消息阶段。
- Consumer消费消息阶段。
Producer发送消息阶段
发送消息阶段涉及到Producer到broker的网络通信,因此丢失消息的几率一定会有,那RocketMQ在此阶段用了哪些手段保证消息不丢失了(或者说降低丢失的可能性)。
手段一:提供SYNC的发送消息方式,等待broker处理结果。
RocketMQ提供了3种发送消息方式,分别是:
//同步发送
SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException;
//异步发送
void send(final Message msg, final SendCallback sendCallback) throws MQClientException,
RemotingException, InterruptedException;
//oneway发送(不等结果)
void sendOneway(final Message msg) throws MQClientException, RemotingException,
InterruptedException
- 同步发送:Producer 向 broker 发送消息,阻塞当前线程等待 broker 响应 发送结果。
- 异步发送:Producer 首先构建一个向 broker发送消息的任务,把该任务提交给线程池,等执行完该任务时,回调用户自定义的回调函数,执行处理结果。
- Oneway发送:Oneway 方式只负责发送请求,不等待应答,Producer 只负责把请求发出去,而不处理响应结果。
我们在调用producer.send方法时,不指定回调方法,则默认采用同步发送消息的方式,这也是丢失几率最小的一种发送方式。
手段二:发送消息如果失败或者超时,则重新发送。
发送重试源码如下,本质其实就是一个for循环,当发送消息发生异常的时候重新循环发送。默认重试3次,重试次数可以通过producer指定。
rocketmq中 重试的源码如下
//只有同步发送的方式才会重试
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
//for循环来重试
for (; times < timesTotal; times ) {
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
try {
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
//不同的发送消息方式
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
} catch (RemotingException e) {
continue;
} catch (MQClientException e) {
continue;
} catch (MQBrokerException e) {
exception = e;
}
}
}
手段三:broker提供多master模式,即使某台broker宕机了,保证消息可以投递到另外一台正常的broker上。
如果broker只有一个节点,则broker宕机了,即使producer有重试机制,也没用,因此利用多主模式,当某台broker宕机了,换一台broker进行投递。
总结
producer消息发送方式虽然有3种,但为了减小丢失消息的可能性尽量采用同步的发送方式,同步等待发送结果,利用同步发送 重试机制 多个master节点,尽可能减小消息丢失的可能性。Broker处理消息阶段
手段四:提供同步刷盘的策略
public enum FlushDiskType {
SYNC_FLUSH, //同步刷盘
ASYNC_FLUSH//异步刷盘(默认)
}
我们知道,当消息投递到broker之后,会先存到page cache,然后根据broker设置的刷盘策略是否立即刷盘,也就是如果刷盘策略为异步,broker并不会等待消息落盘就会返回producer成功,也就是说当broker所在的服务器突然宕机,则会丢失部分页的消息。关于broker的处理过程可以参考我之前的文章:RocketMQ Broker端处理消息过程分析
手段五:提供主从模式,同时主从支持同步双写
即使broker设置了同步刷盘,如果主broker磁盘损坏,也是会导致消息丢失。因此可以给broker指定slave,同时设置master为SYNC_MASTER,然后将slave设置为同步刷盘策略。此模式下,producer每发送一条消息,都会等消息投递到master和slave都落盘成功了,broker才会当作消息投递成功,保证休息不丢失。
总结
在broker端,消息丢失的可能性主要在于刷盘策略和同步机制。RocketMQ默认broker的刷盘策略为异步刷盘,如果有主从,同步策略也默认的是异步同步,这样子可以提高broker处理消息的效率,但是会有丢失的可能性。因此可以通过同步刷盘策略 同步slave策略 主从的方式解决丢失消息的可能。
Consumer消费消息阶段
手段六:consumer默认提供的是At least Once机制
从producer投递消息到broker,即使前面这些过程保证了消息正常持久化,但如果consumer消费消息没有消费到也不能理解为消息绝对的可靠。因此RockerMQ默认提供了At least Once机制保证消息可靠消费。何为At least Once?
Consumer先pull 消息到本地,消费完成后,才向服务器返回ack。通常消费消息的ack机制一般分为两种思路
- 先提交后消费;
- 先消费,消费成功后再提交;
思路一可以解决重复消费的问题但是会丢失消息,因此Rocket默认实现的是思路二,由各自consumer业务方保证幂等来解决重复消费问题。
手段七:消费消息重试机制
当消费消息失败了,如果不提供重试消息的能力,则也不能算完全的可靠消费,因此RocketMQ本身提供了重新消费消息的能力。
总结
consumer端要保证消费消息的可靠性,主要通过At least Once 消费重试机制保证。
存储结构
rocketmq 存储文件主要包含三类,
- commitlog(一个):所有消息都存储在commitlog中。
- commitqueue(每个队列一个):消息到达commitlog后,异步发送到commitqueue中。
- indexfile :消息索引文件,存储消息的key与offset的关系。
kafka 方案
kafka支持3种消息投递语义:
- At most once——最多一次,消息可能会丢失,但不会重复
- At least once——最少一次,消息不会丢失,可能会重复
- Exactly once——只且一次,消息不丢失不重复,只且消费一次。
但是整体的消息投递语义需要Producer端和Consumer端两者来保证。
Producer 消息生产者端
一个场景例子:当producer向broker发送一条消息,这时网络出错了,producer无法得知broker是否接受到了这条消息。网络出错可能是发生在消息传递的过程中,也可能发生在broker已经接受到了消息,并返回ack给producer的过程中。
这时,producer只能进行重发,消息可能会重复,但是保证了at least once。0.11.0的版本通过给每个producer一个唯一ID,并且在每条消息中生成一个sequence num,这样就能对消息去重,达到producer端的exactly once。
这里还涉及到producer端的acks设置和broker端的副本数量,以及min.insync.replicas的设置。比如producer端的acks设置如下:
- acks=0 //消息发了就发了,不等任何响应就认为消息发送成功
- acks=1 //leader分片写消息成功就返回响应给producer
- acks=all(-1) //当acks=all,min.insync.replicas=2,就要求INSRNC列表中必须要有2个副本都写成功,才返回响应给producer,如果INSRNC中已同步副本数量不足2,就会报异常,如果没有2个副本写成功,也会报异常,消息就会认为没有写成功。
Broker 消息接收端
上文说过acks=1,表示当leader分片副本写消息成功就返回响应给producer,此时认为消息发送成功。
如果leader写成功单马上挂了,还没有将这个写成功的消息同步给其他的分片副本,那么这个分片此时的ISR列表为空,如果unclean.leader.election.enable=true,就会发生log truncation(日志截取),同样会发生消息丢失。
如果unclean.leader.election.enable=false,那么这个分片上的服务就不可用了,producer向这个分片发消息就会抛异常。
所以我们设置min.insync.replicas=2,unclean.leader.election.enable=false,producer端的acks=all,这样发送成功的消息就绝不会丢失。
Consumer 消息消费者端
所有分片的副本都有自己的log文件(保存消息)和相同的offset值。当consumer没挂的时候,offset直接保存在内存中,如果挂了,就会发生负载均衡,需要consumer group中另外的consumer来接管并继续消费。
consumer消费消息的方式有以下2种;
-
consumer读取消息,保存offset,然后处理消息。现在假设一个场景:保存offset成功,但是消息处理失败,consumer又挂了,这时来接管的consumer就只能从上次保存的offset继续消费,这种情况下就有可能丢消息,但是保证了at most once语义。consumer读取消息,处理消息,处理成功,保存offset。
-
如果消息处理成功,但是在保存offset时,consumer挂了,这时来接管的consumer也只能从上一次保存的offset开始消费,这时消息就会被重复消费,也就是保证了at least once语义。以上这些机制的保证都不是直接一个配置可以解决的,而是你的consumer代码来完成的,只是一个处理顺序先后问题。
第一种对应的代码:
List<String> messages = consumer.poll();consumer.commitOffset();processMsg(messages);
第二种对应的代码:
List<String> messages = consumer.poll();processMsg(messages);consumer.commitOffset();Exactly Once实现原理
下面详细说说exactly once的实现原理。
Producer端的消息幂等性保证
每个Producer在初始化的时候都会被分配一个唯一的PID,Producer向指定的Topic的特定Partition发送的消息都携带一个sequence number(简称seqNum),从零开始的单调递增的。
Broker会将Topic-Partition对应的seqNum在内存中维护,每次接受到Producer的消息都会进行校验;只有seqNum比上次提交的seqNum刚好大一,才被认为是合法的。比它大的,说明消息有丢失;比它小的,说明消息重复发送了。
以上说的这个只是针对单个Producer在一个session内的情况,假设Producer挂了,又重新启动一个Producer被而且分配了另外一个PID,这样就不能达到防重的目的了,所以kafka又引进了Transactional Guarantees(事务性保证)。
Transactional Guarantees 事务性保证
kafka的事务性保证说的是:同时向多个TopicPartitions发送消息,要么都成功,要么都失败。
为什么搞这么个东西出来?我想了下有可能是这种例子:用户定了一张机票,付款成功之后,订单的状态改了,飞机座位也被占了,这样相当于是2条消息,那么保证这个事务性就是:向订单状态的Topic和飞机座位的Topic分别发送一条消息,这样就需要kafka的这种事务性保证。
这种功能可以使得consumer offset的提交(也是向broker产生消息)和producer的发送消息绑定在一起。用户需要提供一个唯一的全局性TransactionalId,这样就能将PID和TransactionalId映射起来,就能解决producer挂掉后跨session的问题,应该是将之前PID的TransactionalId赋值给新的producer。
Consumer端
以上的事务性保证只是针对的producer端,对consumer端无法保证,有以下原因:
- 压实类型的topics,有些事务消息可能被新版本的producer重写事务可能跨坐2个log segments,这时旧的segments可能被删除,就会丢消息
- 消费者可能寻址到事务中任意一点,也会丢失一些初始化的消息
- 消费者可能不会同时从所有的参与事务的TopicPartitions分片中消费消息
- 如果是消费kafka中的topic,并且将结果写回到kafka中另外的topic,可以将消息处理后结果的保存和offset的保存绑定为一个事务,这时就能保证消息的处理和offset的提交要么都成功,要么都失败。
如果是将处理消息后的结果保存到外部系统,这时就要用到两阶段提交(tow-phase commit),但是这样做很麻烦,较好的方式是offset自己管理,将它和消息的结果保存到同一个地方,整体上进行绑定,可以参考Kafka Connect中HDFS的例子。
rabbitmq 方案
持久化
- RabbitMQ持久化分为Exchange、Queue、Message
- Exchange 和 Queue 持久化 指持久化Exchange、Queue 元数据,持久化的是自身,服务宕机,Exchange 和 Queue 自身就没有了
- Message 持久化 顾名思义 把每一条消息体持久化,服务宕机,消息不丢失
镜像队列
rabbitmq的队列(queue)镜像,指master node 在接受到请求后,会同步到其他节点上,以此来保证高可用。在confirm模式下,具体过程如下clientpublisher 发送消息–> master node接到消息–> master node 将消息持久化到磁盘 –> 将消息异步发送给其他节点–>master 将ack 返回给client publisher。
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/taneijj
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
怎样阻止微信小程序自动打开
PHP中文网 06-13 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
photoshop蒙版画笔没反应怎么办
PHP中文网 06-24