12-RabbitMQ高级特性-Consumer ACK
12-RabbitMQ高级特性-Consumer ACK
Consumer Ack
ack指Acknowledge,确认。 表示消费端收到消息后的确认方式。
有三种确认方式:
-
自动确认:acknowledge="none"
-
手动确认:acknowledge="manual"
-
根据异常情况确认:acknowledge="auto",(这种方式使用麻烦,不作讲解)
“其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。
”
案例
1. 创建工程
创建一个空的 maven 工程 rabbitmq-consumer-spring:
2. 添加依赖
修改pom.xml文件内容为如下:
-
<?xml version="1.0" encoding="UTF-8"?>
-
<project xmlns="http://maven.apache.org/POM/4.0.0"
-
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-
<modelVersion>4.0.0</modelVersion>
-
-
<groupId>com.lijw</groupId>
-
<artifactId>rabbitmq-consumer-spring</artifactId>
-
<version>1.0-SNAPSHOT</version>
-
-
<dependencies>
-
<dependency>
-
<groupId>org.springframework</groupId>
-
<artifactId>spring-context</artifactId>
-
<version>5.1.7.RELEASE</version>
-
</dependency>
-
-
<dependency>
-
<groupId>org.springframework.amqp</groupId>
-
<artifactId>spring-rabbit</artifactId>
-
<version>2.1.8.RELEASE</version>
-
</dependency>
-
-
<dependency>
-
<groupId>junit</groupId>
-
<artifactId>junit</artifactId>
-
<version>4.12</version>
-
</dependency>
-
-
<dependency>
-
<groupId>org.springframework</groupId>
-
<artifactId>spring-test</artifactId>
-
<version>5.1.7.RELEASE</version>
-
</dependency>
-
</dependencies>
-
</project>
3. 配置整合
1.创建rabbitmq.properties
连接参数等配置文件;
-
rabbitmq.host=127.0.0.1
-
rabbitmq.port=5672
-
rabbitmq.username=libai
-
rabbitmq.password=libai
-
rabbitmq.virtual-host=/test
2.创建 spring-rabbitmq-consumer.xml
整合配置文件;
-
<?xml version="1.0" encoding="UTF-8"?>
-
<beans xmlns="http://www.springframework.org/schema/beans"
-
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-
xmlns:context="http://www.springframework.org/schema/context"
-
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
-
xsi:schemaLocation="http://www.springframework.org/schema/beans
-
http://www.springframework.org/schema/beans/spring-beans.xsd
-
http://www.springframework.org/schema/context
-
https://www.springframework.org/schema/context/spring-context.xsd
-
http://www.springframework.org/schema/rabbit
-
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
-
<!--加载配置文件-->
-
<context:property-placeholder location="classpath:rabbitmq.properties"/>
-
-
<!-- 定义rabbitmq connectionFactory -->
-
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
-
port="${rabbitmq.port}"
-
username="${rabbitmq.username}"
-
password="${rabbitmq.password}"
-
virtual-host="${rabbitmq.virtual-host}"/>
-
-
-
</beans>
-
配置扫描监听器的Bean
后续我们写的消费者都是监听接口的实现类,需要将其生成 Bean,所以我们可以将其写到一个 包下,配置 spring 扫描:
-
<!-- 定义扫描监听器类 -->
-
<context:component-scan base-package="com.lijw.listener" />
4. 编写监听器
实现 MessageListener 类,则可以接收到消息:
-
package com.lijw.listener;
-
-
import org.springframework.amqp.core.Message;
-
import org.springframework.amqp.core.MessageListener;
-
import org.springframework.stereotype.Component;
-
-
/**
-
* @author Aron.li
-
* @date 2022/3/4 23:36
-
*/
-
@Component
-
public class AckListener implements MessageListener {
-
@Override
-
public void onMessage(Message message) {
-
String msg = new String(message.getBody());
-
System.out.printf("接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s \n",
-
message.getMessageProperties().getReceivedExchange(),
-
message.getMessageProperties().getReceivedRoutingKey(),
-
message.getMessageProperties().getConsumerQueue(),
-
msg);
-
}
-
}
5.配置 监听器类 与 队列 绑定
-
<!-- 定义监听器与队列的绑定 -->
-
<rabbit:listener-container connection-factory="connectionFactory" >
-
<rabbit:listener ref="ackListener" queue-names="test_queue_confirm"/>
-
</rabbit:listener-container>
6.测试 接收消息
配置好之后,下面我们来启动服务,验证一下能否接收到消息。
需要我们编写一个测试类,启动加载 spring 的相关配置:
-
package com.lijw;
-
-
import org.junit.Test;
-
import org.junit.runner.RunWith;
-
import org.springframework.test.context.ContextConfiguration;
-
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
-
-
/**
-
* @author Aron.li
-
* @date 2022/3/4 23:41
-
*/
-
@RunWith(SpringJUnit4ClassRunner.class)
-
@ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
-
public class ConsumerTest {
-
-
@Test
-
public void test01() {
-
while (true) {
-
-
}
-
}
-
-
}
执行测试方法,开启加载 spring 框架,接收消息如下:
说明现在监听器已经正常工作了,那么下一步我们就要开始来写 Consumer Ack 的功能了。
7. 配置消息接收 手动确认:acknowledge="manual"
-
<!-- 定义监听器与队列的绑定 -->
-
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
-
<rabbit:listener ref="ackListener" queue-names="test_queue_confirm"/>
-
</rabbit:listener-container>
8.改写监听器,实现 ChannelAwareMessageListener 接口
在消息手动确认中,我们需要使用 channel.basicAck()
channel.basicNack()
进行确认,那么就需要有 channel 提供我们调用。
而原来的 MessageListener 接口没有提供 channel ,我们可以实现 MessageListener 的子接口 ChannelAwareMessageListener
改写监听器如下:
-
package com.lijw.listener;
-
-
import com.rabbitmq.client.Channel;
-
import org.springframework.amqp.core.Message;
-
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
-
import org.springframework.stereotype.Component;
-
-
-
/**
-
* Consumer ACK机制:
-
* 1. 设置手动签收。acknowledge="manual"
-
* 2. 让监听器类实现ChannelAwareMessageListener接口
-
* 3. 如果消息成功处理,则调用channel的 basicAck()签收
-
* 4. 如果消息处理失败,则调用channel的basicNack()拒绝签收,broker重新发送给consumer
-
*
-
* @author Aron.li
-
* @date 2022/3/4 23:36
-
*/
-
@Component
-
public class AckListener implements ChannelAwareMessageListener {
-
-
@Override
-
public void onMessage(Message message, Channel channel) throws Exception {
-
//1. 获取传递的标签,用于消息确认
-
long deliveryTag = message.getMessageProperties().getDeliveryTag();
-
-
try {
-
//2. 接收消息
-
System.out.println(new String(message.getBody()));
-
-
//3. 处理业务逻辑
-
System.out.println("处理业务逻辑...");
-
// int i = 3 / 0; // 产生异常
-
//4. 手动签收
-
channel.basicAck(deliveryTag, true);
-
} catch (Exception e) {
-
//e.printStackTrace();
-
-
//5. 拒绝签收
-
/*
-
第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
-
*/
-
channel.basicNack(deliveryTag, true, true); // 一次性可以拒绝多条消息
-
//channel.basicReject(deliveryTag,true); // 一次性只能拒绝一条消息,了解即可
-
}
-
}
-
-
}
9.测试 拒绝重发消息
首先我们正常启动监听器,并且生产者发送消息:
下面我们在处理业务逻辑的位置 编写一个异常代码,如下:
可以看到只要没有签收成功,就可以让消息不断重发,直到我们解决了异常,正常签收消息为止。
Consumer Ack 小结
-
在rabbit:listener-container标签中设置acknowledge属性,设置ack方式 none:自动确认,manual:手动确认
-
如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,false);方法确认签收消息
-
如果出现异常,则在catch中调用 basicNack或 basicReject,拒绝消息,让MQ重新发送消息。
消息可靠性总结
-
持久化
-
exchange要持久化
-
queue要持久化
-
message要持久化
-
-
生产方确认Confirm
-
消费方确认Ack
-
Broker高可用
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhggkbge
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
怎样阻止微信小程序自动打开
PHP中文网 06-13