RabbitMQ在SpringBoot的高级应用(2)
过期时间
1.单独的设置队列的存活时间,队列中的所有消息的过期时间一样
@Bean//创建交换机 public DirectExchange ttlQueueExchange(){ // 交换机名称 是否持久化 是否自动删除 return new DirectExchange("ttl_queue_log",true,false); } @Bean//创建队列 public Queue ttlQueue(){ Map<String, Object> map = new HashMap<>(); map.put("x-message-ttl",30000);//表示当前队列的存活时间 map.put("x-max-length",1000);//最大容量 map.put("x-min-length",0);//最小容量 //1.队列名称 2.是否持久化 3.是否唯一绑定某一个交换机 4.是否自动删除 5.给当前的队列配置初始化参数(存活时间,最大容量,最小容量) return new Queue("tt_queue",true,false,false,map); } //将队列和交换机进行绑定 @Bean public Binding bingCreate(){ //绑定的路由键 return BindingBuilder.bind(ttlQueue()).to(ttlQueueExchange()).with("ttl_queue_key"); }
//队列的过期时间 public void ttlQueue(){ re.convertAndSend("ttl_queue_log","ttl_queue_key","这是过期的消息"); }消息队列是在接收消息的30s才会过期,当然,这个时间也可以在创建队列的时候更改
在队列没有失效之前可以看到队列里的消息,过期后查看就是空数据
2. 对每一条消息单独设置过期时间
public DirectExchange ttlMessageExchange(){ return new DirectExchange("ttl_message_exchange",true,false); } public Queue ttlMessage(){ return new Queue("ttl_message",true,false,false,null); } public Binding bindingTtl(){ return BindingBuilder.bind(ttlMessage()).to(ttlMessageExchange()).with("mk"); }注意到这个包:import org.springframework.amqp.core.*;
在发送消息的时候设置消息的过期时间
//消息的过期时间 消息 public void ttlQueueMessage(){ MessageProperties messageProperties = new MessageProperties(); messageProperties.setExpiration("30000");//设置存活时间 ms Message message = new Message("你好,这是消息的过期时间".getBytes(),messageProperties); re.convertAndSend("ttl_message_exchange","mk",message); }在设置过期时间内我们可以在ttl_message队列中查看到消息,等超过该时间也会查询不到消息
死信队列
死信队列和普通的队列一样,只是用来独特的信息,比如:被拒绝接收的消息,未被处理的过期消息,超过最大的存储的消息
1.编写核心配置文件application.properties
spring.rabbitmq.listener.simple.acknowledge-mode=manual spring.rabbitmq.listener.simple.default-requeue-rejected=false spring.rabbitmq.listener.direct.acknowledge-mode=manual2.创建交换机和队列并绑定
//创建一个死信交换机 public DirectExchange dealExchange(){ return new DirectExchange("dead_letter_exchange",true,false,null); } //创建死信队列 public Queue dealQueue(){ return new Queue("dead_letter",true,false,false,null); } //绑定死信队列和死信消息 public Binding bindDead(){ return BindingBuilder.bind(dealQueue()).to(dealExchange()).with("dead_log"); } //创建业务层交换机 public DirectExchange businessExchange(){ return new DirectExchange("business_exchange",true,false,null); } //普通的队列 public Queue testQueue(){ HashMap<String, Object> map = new HashMap<>(); map.put("x-dead-letter-exchange","dead_letter_exchange");//配置死信交换机 map.put("x-dead-letter-routing-key","dead_log");//设置死信交换机和绑定队列之间的路由键 return new Queue("test_queue",true,false,false,map); } //绑定业务处理机和普通队列 public Binding testExchange(){ return BindingBuilder.bind(testQueue()).to(businessExchange()).with("b_refuse"); } //创建过期消息队列 public Queue testttlQueue(){ HashMap<String, Object> map = new HashMap<>(); map.put("x-dead-letter-exchange","dead_letter_exchange"); map.put("x-dead-letter-routing-key","dead_log"); map.put("x-message-ttl",30000); return new Queue("test_ttl",true,false,false,map ); } //绑定业务处理机和过期消息队列 public Binding testttl(){ return BindingBuilder.bind(testttlQueue()).to(businessExchange()).with("d_ttl"); } //溢出队列 public Queue testMaxQueue(){ HashMap<String, Object> map = new HashMap<>(); map.put("x-dead-letter-exchange","dead_letter_exchange"); map.put("x-dead-letter-routing-key","dead_log"); map.put("x-max-length",3); return new Queue("test_max",true,false,false,map ); } //绑定业务处理机和溢出队列 public Binding testMax(){ return BindingBuilder.bind(testMaxQueue()).to(businessExchange()).with("t_max"); }3.发送消息
//死信队列的消息 拒收 public void refuseMessage(){ re.convertAndSend("business_exchange","b_refuse","死信队列:消息被拒收"); } //死信队列的消息 过期 public void refuseMessage1(){ re.convertAndSend("business_exchange","d_ttl","死信队列:过期的消息"); } //死信队列的消息 溢出 public void refuseMessage2(){ for (int i = 1; i < 6; i ) { re.convertAndSend("business_exchange","t_max","死信队列:溢出的消息" i); } }4.处理消息
public void refuseConsumer(String msg,Message messagem,Channel channel) throws IOException { channel.basicNack(messagem.getMessageProperties().getDeliveryTag(),false,false); System.out.println("消息被拒收:" msg); }dead_letter死信队列,test_queue拒收队列 , test_ttl过期队列,test_max最大队列
1拒收消息,我们将消息发送到 test_queue队列,但是消息被拒收,消息就会出现在死信队列中,所以我们死信队列中的信息为1,而拒绝队列为0
2.过期消息,在消息没有过期的时候消息在test_ttl队列中,等到有效期结束后就会进入到死信队列
3.消息溢出
延迟队列
在队列中的消息是不需要立即消费的,需要等待一段时间时候才会取出消费,通过死信队列进行中转
1.创建队列并绑定死信队列
//创建延迟消息队列 public Queue testDelayQueue(){ HashMap<String, Object> map = new HashMap<>(); map.put("x-dead-letter-exchange","dead_letter_exchange"); map.put("x-dead-letter-routing-key","dead_log"); map.put("x-message-ttl",30000); return new Queue("test_deal",true,false,false,map ); } //绑定业务处理机和过期延迟队列 public Binding testdel(){ return BindingBuilder.bind(testDelayQueue()).to(businessExchange()).with("d_deal"); }2.发送消息
//延迟消息 public void dealMessage(){ re.convertAndSend("business_exchange","d_deal","延迟的消息:生于小满,小满为安" ); }3.处理消息
"dead_letter")//监听死信队列(queues = public void deadLetter(Message message,Channel channel){ byte[] body = message.getBody(); System.out.println("延迟消息:" new String(body)); }
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhghabef
系列文章
更多
同类精品
更多
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
怎样阻止微信小程序自动打开
PHP中文网 06-13