• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

RabbitMQ在SpringBoot的高级应用(2)

武飞扬头像
流殇꧂
帮助1

过期时间

        1.单独的设置队列的存活时间,队列中的所有消息的过期时间一样

  1.  
    @Bean//创建交换机
  2.  
    public DirectExchange ttlQueueExchange(){
  3.  
    // 交换机名称 是否持久化 是否自动删除
  4.  
    return new DirectExchange("ttl_queue_log",true,false);
  5.  
    }
  6.  
     
  7.  
    @Bean//创建队列
  8.  
    public Queue ttlQueue(){
  9.  
    Map<String, Object> map = new HashMap<>();
  10.  
    map.put("x-message-ttl",30000);//表示当前队列的存活时间
  11.  
    map.put("x-max-length",1000);//最大容量
  12.  
    map.put("x-min-length",0);//最小容量
  13.  
    //1.队列名称 2.是否持久化 3.是否唯一绑定某一个交换机 4.是否自动删除 5.给当前的队列配置初始化参数(存活时间,最大容量,最小容量)
  14.  
    return new Queue("tt_queue",true,false,false,map);
  15.  
    }
  16.  
     
  17.  
    //将队列和交换机进行绑定
  18.  
    @Bean
  19.  
    public Binding bingCreate(){ //绑定的路由键
  20.  
    return BindingBuilder.bind(ttlQueue()).to(ttlQueueExchange()).with("ttl_queue_key");
  21.  
    }
学新通
  1.  
    @Test//队列的过期时间
  2.  
    public void ttlQueue(){
  3.  
    re.convertAndSend("ttl_queue_log","ttl_queue_key","这是过期的消息");
  4.  
    }

        消息队列是在接收消息的30s才会过期,当然,这个时间也可以在创建队列的时候更改

学新通            在队列没有失效之前可以看到队列里的消息,过期后查看就是空数据

学新通

学新通

 2. 对每一条消息单独设置过期时间

  1.  
    @Bean
  2.  
    public DirectExchange ttlMessageExchange(){
  3.  
    return new DirectExchange("ttl_message_exchange",true,false);
  4.  
    }
  5.  
    @Bean
  6.  
    public Queue ttlMessage(){
  7.  
    return new Queue("ttl_message",true,false,false,null);
  8.  
    }
  9.  
    @Bean
  10.  
    public Binding bindingTtl(){
  11.  
    return BindingBuilder.bind(ttlMessage()).to(ttlMessageExchange()).with("mk");
  12.  
    }
        注意到这个包:import org.springframework.amqp.core.*;

                在发送消息的时候设置消息的过期时间

  1.  
    @Test//消息的过期时间 消息
  2.  
    public void ttlQueueMessage(){
  3.  
    MessageProperties messageProperties = new MessageProperties();
  4.  
    messageProperties.setExpiration("30000");//设置存活时间 ms
  5.  
    Message message = new Message("你好,这是消息的过期时间".getBytes(),messageProperties);
  6.  
    re.convertAndSend("ttl_message_exchange","mk",message);
  7.  
    }

         在设置过期时间内我们可以在ttl_message队列中查看到消息,等超过该时间也会查询不到消息学新通

学新通

死信队列

        死信队列和普通的队列一样,只是用来独特的信息,比如:被拒绝接收的消息,未被处理的过期消息,超过最大的存储的消息

        1.编写核心配置文件application.properties

  1.  
    #开启手动签收(手动ACK)
  2.  
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
  3.  
    # 设置消息被拒绝后,不在重新入队
  4.  
    spring.rabbitmq.listener.simple.default-requeue-rejected=false
  5.  
    # 设置消费者需要手动确认消息
  6.  
    spring.rabbitmq.listener.direct.acknowledge-mode=manual

             2.创建交换机和队列并绑定

  1.  
    @Bean//创建一个死信交换机
  2.  
    public DirectExchange dealExchange(){
  3.  
    return new DirectExchange("dead_letter_exchange",true,false,null);
  4.  
    }
  5.  
    @Bean//创建死信队列
  6.  
    public Queue dealQueue(){
  7.  
    return new Queue("dead_letter",true,false,false,null);
  8.  
    }
  9.  
    @Bean//绑定死信队列和死信消息
  10.  
    public Binding bindDead(){
  11.  
    return BindingBuilder.bind(dealQueue()).to(dealExchange()).with("dead_log");
  12.  
    }
  13.  
     
  14.  
    @Bean//创建业务层交换机
  15.  
    public DirectExchange businessExchange(){
  16.  
    return new DirectExchange("business_exchange",true,false,null);
  17.  
    }
  18.  
     
  19.  
    @Bean//普通的队列
  20.  
    public Queue testQueue(){
  21.  
    HashMap<String, Object> map = new HashMap<>();
  22.  
    map.put("x-dead-letter-exchange","dead_letter_exchange");//配置死信交换机
  23.  
    map.put("x-dead-letter-routing-key","dead_log");//设置死信交换机和绑定队列之间的路由键
  24.  
    return new Queue("test_queue",true,false,false,map);
  25.  
    }
  26.  
    @Bean//绑定业务处理机和普通队列
  27.  
    public Binding testExchange(){
  28.  
    return BindingBuilder.bind(testQueue()).to(businessExchange()).with("b_refuse");
  29.  
    }
  30.  
     
  31.  
    @Bean//创建过期消息队列
  32.  
    public Queue testttlQueue(){
  33.  
    HashMap<String, Object> map = new HashMap<>();
  34.  
    map.put("x-dead-letter-exchange","dead_letter_exchange");
  35.  
    map.put("x-dead-letter-routing-key","dead_log");
  36.  
    map.put("x-message-ttl",30000);
  37.  
    return new Queue("test_ttl",true,false,false,map );
  38.  
    }
  39.  
    @Bean//绑定业务处理机和过期消息队列
  40.  
    public Binding testttl(){
  41.  
    return BindingBuilder.bind(testttlQueue()).to(businessExchange()).with("d_ttl");
  42.  
    }
  43.  
    @Bean//溢出队列
  44.  
    public Queue testMaxQueue(){
  45.  
    HashMap<String, Object> map = new HashMap<>();
  46.  
    map.put("x-dead-letter-exchange","dead_letter_exchange");
  47.  
    map.put("x-dead-letter-routing-key","dead_log");
  48.  
    map.put("x-max-length",3);
  49.  
    return new Queue("test_max",true,false,false,map );
  50.  
    }
  51.  
    @Bean//绑定业务处理机和溢出队列
  52.  
    public Binding testMax(){
  53.  
    return BindingBuilder.bind(testMaxQueue()).to(businessExchange()).with("t_max");
  54.  
    }
学新通

           3.发送消息

  1.  
    @Test//死信队列的消息 拒收
  2.  
    public void refuseMessage(){
  3.  
    re.convertAndSend("business_exchange","b_refuse","死信队列:消息被拒收");
  4.  
    }
  5.  
    @Test//死信队列的消息 过期
  6.  
    public void refuseMessage1(){
  7.  
    re.convertAndSend("business_exchange","d_ttl","死信队列:过期的消息");
  8.  
    }
  9.  
    @Test//死信队列的消息 溢出
  10.  
    public void refuseMessage2(){
  11.  
    for (int i = 1; i < 6; i ) {
  12.  
    re.convertAndSend("business_exchange","t_max","死信队列:溢出的消息" i);
  13.  
    }
  14.  
    }

         4.处理消息

  1.  
    @RabbitListener(queues = "test_queue")
  2.  
    public void refuseConsumer(String msg,Message messagem,Channel channel) throws IOException {
  3.  
    channel.basicNack(messagem.getMessageProperties().getDeliveryTag(),false,false);
  4.  
    System.out.println("消息被拒收:" msg);
  5.  
    }

             dead_letter死信队列,test_queue拒收队列 , test_ttl过期队列,test_max最大队列

          1拒收消息,我们将消息发送到 test_queue队列,但是消息被拒收,消息就会出现在死信队列中,所以我们死信队列中的信息为1,而拒绝队列为0

学新通学新通

         2.过期消息,在消息没有过期的时候消息在test_ttl队列中,等到有效期结束后就会进入到死信队列

学新通

 学新通

         3.消息溢出

学新通

延迟队列

        在队列中的消息是不需要立即消费的,需要等待一段时间时候才会取出消费,通过死信队列进行中转

        1.创建队列并绑定死信队列

  1.  
    @Bean//创建延迟消息队列
  2.  
    public Queue testDelayQueue(){
  3.  
    HashMap<String, Object> map = new HashMap<>();
  4.  
    map.put("x-dead-letter-exchange","dead_letter_exchange");
  5.  
    map.put("x-dead-letter-routing-key","dead_log");
  6.  
    map.put("x-message-ttl",30000);
  7.  
    return new Queue("test_deal",true,false,false,map );
  8.  
    }
  9.  
    @Bean//绑定业务处理机和过期延迟队列
  10.  
    public Binding testdel(){
  11.  
    return BindingBuilder.bind(testDelayQueue()).to(businessExchange()).with("d_deal");
  12.  
    }

        2.发送消息

  1.  
    @Test//延迟消息
  2.  
    public void dealMessage(){
  3.  
    re.convertAndSend("business_exchange","d_deal","延迟的消息:生于小满,小满为安" );
  4.  
    }

        3.处理消息

  1.  
    @RabbitListener(queues = "dead_letter")//监听死信队列
  2.  
    public void deadLetter(Message message,Channel channel){
  3.  
    byte[] body = message.getBody();
  4.  
     
  5.  
    System.out.println("延迟消息:" new String(body));
  6.  
    }

学新通

 学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhghabef
系列文章
更多 icon
同类精品
更多 icon
继续加载