• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

RabbitMQ幂等性、优先级队列、惰性队列

武飞扬头像
星悦糖
帮助1

目录

1、幂等性

1.1 消息重复消费

1.2 消费端的幂等性保障

2、优先级队列

2.1 使用场景

2.2 设置优先级队列(在web页面添加)

2.3 设置优先级队列(声明队列的时候添加优先级)

2.4 实例代码

3、惰性队列

3.1 使用场景

3.2 队列的两种模式

3.3 声明方式

3.4 内存开销对比

1、幂等性

        什么是幂等性?用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。

        拿支付来说,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱了,流水记录也变成了两条【显然这是有问题的】。

        在以前的单应用系统中,我们只需要把数据操作放入事务中即可,发生错误立即回滚。

1.1 消息重复消费

        消费者在消费 MQ 中的消息时,MQ 已把消息发送给消费者,消费者在给 MQ 返回 ack 时网络中断, 故 MQ 未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。

解决思路:

        MQ 消费者的幂等性的解决一般使用全局 ID 或者写个唯一标识比如时间戳 或者 UUID 或者可按自己的规则生成一个全局唯一 id,每次消费消息时用该 id 先判断该消息是否已消费过。

        也可以理解为验证码,只能输入一次,再次重新输入会刷新验证码,原来的验证码失效。

1.2 消费端的幂等性保障

        在海量订单生成的业务高峰期,生产端有可能就会重复发送了消息,这时候消费端就要实现幂等性,这就意味着我们的消息永远不会被消费多次,即使我们收到了一样的消息。

业界主流的幂等性有两种操作:

  • 唯一 ID 指纹码机制,利用数据库主键去重

        指纹码:我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基本都是由我们的业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个 id 是否存在数据库中,优势就是实现简单就一个拼接,然后查询判断是否重复;劣势就是在高并发时,如果是单个数据库就会有写入性能瓶颈,当然也可以采用分库分表提升性能,不过并不推荐使用这种方式。

  • Redis 的原子性(推荐)

        利用 redis 执行 setnx 命令,天然具有幂等性。从而实现不重复消费。

2、优先级队列

2.1 使用场景

        举个例子,客户在某宝下的订单,某宝会及时将订单推送给我们,如果在用户设定的时间内未付款那么就会给用户推送一条短信提醒。

        但是,对于某宝来说,肯定是要分大客户和小客户的,比如像苹果,小米这样大商家一年能创造很大的利润;所以理应当然的,他们的订单必须得到优先处理。所以订单量大了后采用 RabbitMQ 进行改造和优化,如果发现是大客户的订单就赋予一个相对比较高的优先级, 否则就是默认优先级。

学新通

2.2 设置优先级队列(在web页面添加)

设置队列的最大优先级 最大可以设置到 255,官网推荐 1-10,如果设置太高比较吃内存和 CPU。

学新通

2.3 设置优先级队列(声明队列的时候添加优先级)

设置队列的最大优先级 最大可以设置到 255,官网推荐 1-10,如果设置太高比较吃内存和 CPU。

  1.  
    Map<String, Object> params = new HashMap();
  2.  
    // 优先级为 10
  3.  
    params.put("x-max-priority", 10);
  4.  
    channel.queueDeclare("hello", true, false, false, params);

注意事项:

        (1)队列需要设置为优先级队列,消息需要设置消息的优先级,只有这两个都设置了才能进行优先级排序。

        (2)消费者需要等待消息已经发送到队列中才去消费,因为这样才有机会对消息进行排序。

2.4 实例代码

        生产者发送10个消息,如果消息为 info5,则优先级是最高的,当消费者从队列获取消息的时候,优先获取 info5 消息。

1、生产者代码

  1.  
    public class PriorityProducer {
  2.  
     
  3.  
    private static final String QUEUE_NAME = "priority_queue";
  4.  
     
  5.  
    public static void main(String[] args) throws Exception {
  6.  
    Channel channel = RabbitMQUtils.getChannel();
  7.  
     
  8.  
    //给消息赋予一个priority属性
  9.  
    AMQP.BasicProperties properties =
  10.  
    new AMQP.BasicProperties().builder().priority(5).build();
  11.  
     
  12.  
    //设置队列的最大优先级 最大可以设置到255 官网推荐1-10 如果设置太高比较吃内存和CPU
  13.  
    Map<String, Object> params = new HashMap<>();
  14.  
    params.put("x-max-priority",10);
  15.  
    channel.queueDeclare(QUEUE_NAME,true,false,false,params);
  16.  
     
  17.  
    for (int i = 1; i < 11; i ) {
  18.  
    String message = "info" i;
  19.  
    if(i==5){
  20.  
    channel.basicPublish("",QUEUE_NAME,properties,message.getBytes());
  21.  
    }else {
  22.  
    channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
  23.  
    }
  24.  
    System.out.println("消息发送完成:" message);
  25.  
    }
  26.  
    }
  27.  
    }

学新通

2、消费者代码

  1.  
    public class PriorityConsumer {
  2.  
     
  3.  
    private final static String QUEUE_NAME = "priority_queue";
  4.  
     
  5.  
    public static void main(String[] args) throws Exception {
  6.  
    Channel channel = RabbitMQUtils.getChannel();
  7.  
     
  8.  
    //推送消息如何进行消费的接口回调
  9.  
    DeliverCallback deliverCallback = (consumerTag, delivery) ->{
  10.  
    String message = new String(delivery.getBody());
  11.  
    System.out.println("消费的消息: " message);
  12.  
    };
  13.  
     
  14.  
    //取消消费的一个回调接口 如在消费的时候队列被删除掉了
  15.  
    CancelCallback cancelCallback = (consumerTag) ->{
  16.  
    System.out.println("消息消费被中断");
  17.  
    };
  18.  
     
  19.  
    channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
  20.  
    }
  21.  
    }

3、效果演示

info 5 的优先级为 5,优先级最高。消费者消费信息效果如图:

学新通

3、惰性队列

学新通

3.1 使用场景

        RabbitMQ 从 3.6.0 版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。

        默认情况下,当生产者将消息发送到 RabbitMQ 的时候,队列中的消息会尽可能的存储在内存之中,这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。当 RabbitMQ 需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收新的消息。虽然 RabbitMQ 的开发者们一直在升级相关的算法, 但是效果始终不太理想,尤其是在消息量特别大的时候。

3.2 队列的两种模式

        队列具备两种模式:default 和 lazy。默认的为 default 模式,在 3.6.0 之前的版本无需做任何变更。lazy 模式即为惰性队列的模式,可以通过调用 channel.queueDeclare 方法的时候在参数中设置,也可以通过 Policy 的方式设置,如果一个队列同时使用这两种方式设置的话,那么 Policy 的方式具备更高的优先级。如果要通过声明的方式改变已有队列的模式的话,那么只能先删除队列,然后再重新声明一个新的。

3.3 声明方式

        在队列声明的时候可以通过 x-queue-mode 参数来设置队列的模式,取值为 default 和 lazy。下面示例中演示了一个惰性队列的声明细节:

  1.  
    Map<String, Object> args = new HashMap<String, Object>();
  2.  
    args.put("x-queue-mode", "lazy");
  3.  
    channel.queueDeclare("myqueue", false, false, false, args);

        也可以在 Web 页面添加队列时,选择 Lazy mode:

学新通

3.4 内存开销对比

学新通

        在发送 1 百万条消息,每条消息大概占 1KB 的情况下,普通队列占用内存是 1.2GB,而惰性队列仅仅占用 1.5MB。

        两种模式各有优缺点,适合的才是最好的,真正应用时要根据业务场景进行选择。

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhggkhcc
系列文章
更多 icon
同类精品
更多 icon
继续加载