• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

rocketmq kafaka rabbitmq 三大消息中间件防消息丢失

武飞扬头像
juejin
帮助216

rocketmq方案

消息整体处理过程

这里我们将消息的整体处理阶段分为3个阶段进行分析:

  • Producer发送消息阶段。
  • Broker处理消息阶段。
  • Consumer消费消息阶段。

Producer发送消息阶段

发送消息阶段涉及到Producer到broker的网络通信,因此丢失消息的几率一定会有,那RocketMQ在此阶段用了哪些手段保证消息不丢失了(或者说降低丢失的可能性)。

手段一:提供SYNC的发送消息方式,等待broker处理结果。

RocketMQ提供了3种发送消息方式,分别是:

//同步发送
SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException,
    InterruptedException;


//异步发送
void send(final Message msg, final SendCallback sendCallback) throws MQClientException,
    RemotingException, InterruptedException;


//oneway发送(不等结果)
void sendOneway(final Message msg) throws MQClientException, RemotingException,
    InterruptedException 
  • 同步发送:Producer 向 broker 发送消息,阻塞当前线程等待 broker 响应 发送结果。
  • 异步发送:Producer 首先构建一个向 broker发送消息的任务,把该任务提交给线程池,等执行完该任务时,回调用户自定义的回调函数,执行处理结果。
  • Oneway发送:Oneway 方式只负责发送请求,不等待应答,Producer 只负责把请求发出去,而不处理响应结果。

我们在调用producer.send方法时,不指定回调方法,则默认采用同步发送消息的方式,这也是丢失几率最小的一种发送方式。

手段二:发送消息如果失败或者超时,则重新发送。

发送重试源码如下,本质其实就是一个for循环,当发送消息发生异常的时候重新循环发送。默认重试3次,重试次数可以通过producer指定。

rocketmq中 重试的源码如下

//只有同步发送的方式才会重试
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1   this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
 //for循环来重试
 for (; times < timesTotal; times  ) {
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                    try {
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                        //不同的发送消息方式
                        switch (communicationMode) {
                            case ASYNC:
                                return null;
                            case ONEWAY:
                                return null;
                            
                            case SYNC:
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        continue;
                                    }
                                }
                                return sendResult;
                            default:
                                break;
                        }
                    } catch (RemotingException e) {
                        continue;
                    } catch (MQClientException e) {
                        continue;
                    } catch (MQBrokerException e) {
                        exception = e;
                    }
                } 
            } 

手段三:broker提供多master模式,即使某台broker宕机了,保证消息可以投递到另外一台正常的broker上。

如果broker只有一个节点,则broker宕机了,即使producer有重试机制,也没用,因此利用多主模式,当某台broker宕机了,换一台broker进行投递。

总结

producer消息发送方式虽然有3种,但为了减小丢失消息的可能性尽量采用同步的发送方式,同步等待发送结果,利用同步发送 重试机制 多个master节点,尽可能减小消息丢失的可能性。Broker处理消息阶段

手段四:提供同步刷盘的策略

public enum FlushDiskType {
    SYNC_FLUSH, //同步刷盘
    ASYNC_FLUSH//异步刷盘(默认)
}

我们知道,当消息投递到broker之后,会先存到page cache,然后根据broker设置的刷盘策略是否立即刷盘,也就是如果刷盘策略为异步,broker并不会等待消息落盘就会返回producer成功,也就是说当broker所在的服务器突然宕机,则会丢失部分页的消息。关于broker的处理过程可以参考我之前的文章:RocketMQ Broker端处理消息过程分析

手段五:提供主从模式,同时主从支持同步双写

即使broker设置了同步刷盘,如果主broker磁盘损坏,也是会导致消息丢失。因此可以给broker指定slave,同时设置master为SYNC_MASTER,然后将slave设置为同步刷盘策略。此模式下,producer每发送一条消息,都会等消息投递到master和slave都落盘成功了,broker才会当作消息投递成功,保证休息不丢失。

总结

在broker端,消息丢失的可能性主要在于刷盘策略和同步机制。RocketMQ默认broker的刷盘策略为异步刷盘,如果有主从,同步策略也默认的是异步同步,这样子可以提高broker处理消息的效率,但是会有丢失的可能性。因此可以通过同步刷盘策略 同步slave策略 主从的方式解决丢失消息的可能。

Consumer消费消息阶段

 

手段六:consumer默认提供的是At least Once机制

从producer投递消息到broker,即使前面这些过程保证了消息正常持久化,但如果consumer消费消息没有消费到也不能理解为消息绝对的可靠。因此RockerMQ默认提供了At least Once机制保证消息可靠消费。何为At least Once?

Consumer先pull 消息到本地,消费完成后,才向服务器返回ack。通常消费消息的ack机制一般分为两种思路

  1. 先提交后消费;
  2. 先消费,消费成功后再提交;

思路一可以解决重复消费的问题但是会丢失消息,因此Rocket默认实现的是思路二,由各自consumer业务方保证幂等来解决重复消费问题。

手段七:消费消息重试机制

当消费消息失败了,如果不提供重试消息的能力,则也不能算完全的可靠消费,因此RocketMQ本身提供了重新消费消息的能力。

总结

consumer端要保证消费消息的可靠性,主要通过At least Once 消费重试机制保证。

存储结构

rocketmq 存储文件主要包含三类,

  1. commitlog(一个):所有消息都存储在commitlog中。
  2. commitqueue(每个队列一个):消息到达commitlog后,异步发送到commitqueue中。
  3. indexfile :消息索引文件,存储消息的key与offset的关系。

kafka 方案

kafka支持3种消息投递语义:

  1. At most once——最多一次,消息可能会丢失,但不会重复
  2. At least once——最少一次,消息不会丢失,可能会重复
  3. Exactly once——只且一次,消息不丢失不重复,只且消费一次。

但是整体的消息投递语义需要Producer端和Consumer端两者来保证。

Producer 消息生产者端

一个场景例子:当producer向broker发送一条消息,这时网络出错了,producer无法得知broker是否接受到了这条消息。网络出错可能是发生在消息传递的过程中,也可能发生在broker已经接受到了消息,并返回ack给producer的过程中。

这时,producer只能进行重发,消息可能会重复,但是保证了at least once。0.11.0的版本通过给每个producer一个唯一ID,并且在每条消息中生成一个sequence num,这样就能对消息去重,达到producer端的exactly once。

这里还涉及到producer端的acks设置和broker端的副本数量,以及min.insync.replicas的设置。比如producer端的acks设置如下:

  • acks=0 //消息发了就发了,不等任何响应就认为消息发送成功
  • acks=1 //leader分片写消息成功就返回响应给producer
  • acks=all(-1) //当acks=all,min.insync.replicas=2,就要求INSRNC列表中必须要有2个副本都写成功,才返回响应给producer,如果INSRNC中已同步副本数量不足2,就会报异常,如果没有2个副本写成功,也会报异常,消息就会认为没有写成功。

Broker 消息接收端

上文说过acks=1,表示当leader分片副本写消息成功就返回响应给producer,此时认为消息发送成功。

如果leader写成功单马上挂了,还没有将这个写成功的消息同步给其他的分片副本,那么这个分片此时的ISR列表为空,如果unclean.leader.election.enable=true,就会发生log truncation(日志截取),同样会发生消息丢失。

如果unclean.leader.election.enable=false,那么这个分片上的服务就不可用了,producer向这个分片发消息就会抛异常。

所以我们设置min.insync.replicas=2,unclean.leader.election.enable=false,producer端的acks=all,这样发送成功的消息就绝不会丢失。

Consumer 消息消费者端

所有分片的副本都有自己的log文件(保存消息)和相同的offset值。当consumer没挂的时候,offset直接保存在内存中,如果挂了,就会发生负载均衡,需要consumer group中另外的consumer来接管并继续消费。

consumer消费消息的方式有以下2种;

  • consumer读取消息,保存offset,然后处理消息。现在假设一个场景:保存offset成功,但是消息处理失败,consumer又挂了,这时来接管的consumer就只能从上次保存的offset继续消费,这种情况下就有可能丢消息,但是保证了at most once语义。consumer读取消息,处理消息,处理成功,保存offset。

  • 如果消息处理成功,但是在保存offset时,consumer挂了,这时来接管的consumer也只能从上一次保存的offset开始消费,这时消息就会被重复消费,也就是保证了at least once语义。以上这些机制的保证都不是直接一个配置可以解决的,而是你的consumer代码来完成的,只是一个处理顺序先后问题。

第一种对应的代码:

List<String> messages = consumer.poll();consumer.commitOffset();processMsg(messages);

第二种对应的代码:

List<String> messages = consumer.poll();processMsg(messages);consumer.commitOffset();Exactly Once实现原理

下面详细说说exactly once的实现原理。

Producer端的消息幂等性保证

每个Producer在初始化的时候都会被分配一个唯一的PID,Producer向指定的Topic的特定Partition发送的消息都携带一个sequence number(简称seqNum),从零开始的单调递增的。

Broker会将Topic-Partition对应的seqNum在内存中维护,每次接受到Producer的消息都会进行校验;只有seqNum比上次提交的seqNum刚好大一,才被认为是合法的。比它大的,说明消息有丢失;比它小的,说明消息重复发送了。

以上说的这个只是针对单个Producer在一个session内的情况,假设Producer挂了,又重新启动一个Producer被而且分配了另外一个PID,这样就不能达到防重的目的了,所以kafka又引进了Transactional Guarantees(事务性保证)。

Transactional Guarantees 事务性保证

kafka的事务性保证说的是:同时向多个TopicPartitions发送消息,要么都成功,要么都失败。

为什么搞这么个东西出来?我想了下有可能是这种例子:用户定了一张机票,付款成功之后,订单的状态改了,飞机座位也被占了,这样相当于是2条消息,那么保证这个事务性就是:向订单状态的Topic和飞机座位的Topic分别发送一条消息,这样就需要kafka的这种事务性保证。

这种功能可以使得consumer offset的提交(也是向broker产生消息)和producer的发送消息绑定在一起。用户需要提供一个唯一的全局性TransactionalId,这样就能将PID和TransactionalId映射起来,就能解决producer挂掉后跨session的问题,应该是将之前PID的TransactionalId赋值给新的producer。

Consumer端

以上的事务性保证只是针对的producer端,对consumer端无法保证,有以下原因:

  • 压实类型的topics,有些事务消息可能被新版本的producer重写事务可能跨坐2个log segments,这时旧的segments可能被删除,就会丢消息
  • 消费者可能寻址到事务中任意一点,也会丢失一些初始化的消息
  • 消费者可能不会同时从所有的参与事务的TopicPartitions分片中消费消息
  • 如果是消费kafka中的topic,并且将结果写回到kafka中另外的topic,可以将消息处理后结果的保存和offset的保存绑定为一个事务,这时就能保证消息的处理和offset的提交要么都成功,要么都失败。

如果是将处理消息后的结果保存到外部系统,这时就要用到两阶段提交(tow-phase commit),但是这样做很麻烦,较好的方式是offset自己管理,将它和消息的结果保存到同一个地方,整体上进行绑定,可以参考Kafka Connect中HDFS的例子。

rabbitmq 方案

持久化

  • RabbitMQ持久化分为Exchange、Queue、Message
  • Exchange 和 Queue 持久化 指持久化Exchange、Queue 元数据,持久化的是自身,服务宕机,Exchange 和 Queue 自身就没有了
  • Message 持久化 顾名思义 把每一条消息体持久化,服务宕机,消息不丢失

镜像队列

rabbitmq的队列(queue)镜像,指master node 在接受到请求后,会同步到其他节点上,以此来保证高可用。在confirm模式下,具体过程如下clientpublisher 发送消息–> master node接到消息–> master node 将消息持久化到磁盘 –> 将消息异步发送给其他节点–>master 将ack 返回给client publisher。

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/taneijj
系列文章
更多 icon
同类精品
更多 icon
继续加载