• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

RabbitMQ养成记 10.高级特性死信队列,延迟队列

武飞扬头像
vcaml7717
帮助1

死信队列(DLX)

这个概念 在其他MQ产品里面也是有的,只不过在Rabbitmq中稍微特殊一点
什么叫私信队列呢? 就是当消息成为 dead message之后,可以重新发到另外一台交换机,这个交换机就是DLX。

学新通

注意这里的有翻译歧义, 这里的DLX 指的是 交换机 ,而不是一个队列。

消息成为死信队列的三种情况

  • 队列的消息长度 到达限制。

  • 消费者拒收消息,

            channel.basicNack(deliveryTag,true,false);
            
            //这里拒收消息
            /**
             * deliveryTag 标识
             * requeue 是否打回原队列如果为false 则进入死信队列
             */
            channel.basicReject(deliveryTag,false);
        }
  • 存在消息过期设置 消息超时未消费(就是上一篇中的TTL)

我们拿这个TTL举例:

首先你在发送端 需要有2个队列 一个正常队列 一个私信队列
然后你在创建延时队列的时候 要注明 这个队列的消息一旦超时 那么送到哪个死信交换机的队列里面去:

@Configuration
public class RabbitMQConfig {
    public static final String EXCHANGE_NAME = "boot_topic_exchange";
    public static final String QUEUE_NAME_TTL = "boot_queue_ttl";

    public static final String DEAD_EXCHANGE_NAME = "dead_topic_exchange";
    public static final String DEAD_QUEUE_NAME = "dead_queue";

    //正常交换机

    @Bean("bootExchange")
    public Exchange bootExchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }


    @Bean("bootQueue")
    public Queue bootQueue(){
        Map<String, Object> map = new HashMap<>();
        // 设置TTL
        map.put("x-message-ttl", 20000);
        // 设置死信的目的交换机
        map.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
        // 设置死信交给目的交换机时的路由键
        map.put("x-dead-letter-routing-key", "boot.111");
        return QueueBuilder.durable(QUEUE_NAME_TTL).withArguments(map).build();
    }


    @Bean
    public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue,@Qualifier("bootExchange") Exchange exchange){
       return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
    }

    //死信交换机
    @Bean("deadExchange")
    public Exchange deadExchange(){
        return ExchangeBuilder.topicExchange(DEAD_EXCHANGE_NAME).durable(true).build();
    }

    @Bean("deadQueue")
    public Queue deadQueue(){
        return QueueBuilder.durable(DEAD_QUEUE_NAME).build();
    }

    @Bean
    public Binding bindDeadQueueExchange(@Qualifier("deadQueue") Queue queue,@Qualifier("deadExchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("boot.*").noargs();
    }


}
学新通

好了 我们发送消息 往正常队列里面发:
学新通

看超时之后 它进来了:
学新通


·这是超时的消息进入DLX的情况
再动动手 我们实践一下 消费者拒收之后 进入死信的情况:

我们监听正常的队列 然后接受到消息之后
我们故意写一个bug 触发exception
调用basicNack方法 打回这个消息 让它 进入dlx
这里千万要注意:basicNack第三个参数要为false 才会进入dlx

如果为true 那它就会打回到正常队列 然后不停的给你发

    @RabbitListener(queues = "boot_queue_ttl")
    public void ListenerQueue(Message message,
                              @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,
                              Channel channel) throws IOException {
        try {
            System.out.println("收到消息为" new String(message.getBody()));

            System.out.println("处理业务逻辑。。");

//            int i =3/0; //手动制造一个bug 让程序进入异常

            channel.basicAck(deliveryTag,true);

        }catch (Exception e){
            //拒绝签收
            //这里的第三个参数表示 如果为true 消息重回到queue broker会重新发送该消息给消费端
            System.out.println(" error 处理业务逻辑失败 消息打回 进入私信队列");
            channel.basicNack(deliveryTag,true,false);
}
学新通

学新通

ok我们实践了两种 第三种 超过队列长度的很简单 你设一个队列长度max=10 然后for循环往里面塞进去>10 超过的就会进入dlx 这种很简单
有兴趣的好兄弟 可以自己去试一下啦
新手一定要多动手


延迟队列

什么是延迟队列 就是交换机收到消息之后 它不急着发 它等。
等到你想要它发给消费者方的时候 你再发。

假设你正在开发一个电子商务网站,并且你希望为用户提供一个订单确认后一段时间自动取消的功能。当用户下单后,订单将被发送到延迟队列中,设置一个特定的延迟时间,例如30分钟。如果在30分钟内用户未支付订单,系统将自动取消该订单。

但是问题来了 rabbitmq 中并没有提供现成的延迟队列实现,

需要我们自己去实现:

怎么实现呢 如果你理解熟悉了 前面的TTL 和死信队列之后
这个就非常非常简单了

看上面那个我们死信队列里面那个例子:
发送端向正常队列里面发数据 超时未消费。 消息就进入了死信队列,

那就很简单了 你把消费的监听队列 换成死信队列 就ok了

他就是一个延迟队列了

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhfjjckg
系列文章
更多 icon
同类精品
更多 icon
继续加载