springboot整合rabbitMQ和手动ack确认
springboot版2.2.2
-
-
<project xmlns="http://maven.apache.org/POM/4.0.0"
-
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-
<modelVersion>4.0.0</modelVersion>
-
-
<groupId>com.ex</groupId>
-
<artifactId>tmap2</artifactId>
-
<version>1.0-SNAPSHOT</version>
-
-
<parent>
-
<groupId>org.springframework.boot</groupId>
-
<artifactId>spring-boot-starter-parent</artifactId>
-
<version>2.2.2.RELEASE</version>
-
<relativePath/> <!-- lookup parent from repository -->
-
</parent>
-
<dependencies>
-
<dependency>
-
<groupId>org.springframework.boot</groupId>
-
<artifactId>spring-boot-starter-web</artifactId>
-
</dependency>
-
-
<!-- <dependency>-->
-
<!-- <groupId>io.agora.rtm</groupId>-->
-
<!-- <artifactId>agora-rtm-sdk</artifactId>-->
-
<!-- <version>1.3</version>-->
-
<!-- </dependency>-->
-
<dependency>
-
<groupId>org.springframework.boot</groupId>
-
<artifactId>spring-boot-starter-amqp</artifactId>
-
</dependency>
-
<dependency>
-
<groupId>org.projectlombok</groupId>
-
<artifactId>lombok</artifactId>
-
</dependency>
-
<dependency>
-
<groupId>org.springframework.boot</groupId>
-
<artifactId>spring-boot-starter-test</artifactId>
-
<scope>test</scope>
-
</dependency>
-
-
</dependencies>
-
</project>
application.yml
-
spring:
-
rabbitmq:
-
host: localhost
-
port: 5672
-
username: admin
-
password: 123456
-
virtual-host: /ems
-
template:
-
retry: #重试,消息发送失败会重试
-
enabled: true # 开启重试
-
initial-interval: 10000ms #第一次十秒重试
-
max-interval: 80000ms #最后一次是八秒重试
-
multiplier: 2 #重试翻倍率
-
publisher-confirms: true #发送者开启 confirm 确认机制
-
publisher-returns: true # 发送者开启 return 确认机制
RabbitCallbackConfig.java
-
import lombok.extern.slf4j.Slf4j;
-
import org.springframework.amqp.core.Message;
-
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
-
import org.springframework.amqp.rabbit.connection.CorrelationData;
-
import org.springframework.amqp.rabbit.core.RabbitTemplate;
-
import org.springframework.context.annotation.Bean;
-
import org.springframework.context.annotation.Configuration;
-
-
-
-
public class RabbitCallbackConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
-
-
RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
-
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
-
//rabbitTemplate发送消息json转换配置
-
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
-
rabbitTemplate.setMandatory(true);
-
rabbitTemplate.setConfirmCallback(this);
-
rabbitTemplate.setReturnCallback(this);
-
return rabbitTemplate;
-
}
-
/**
-
* 配置接收消息json转换为对象
-
* @return
-
*/
-
-
public MessageConverter jsonMessageConverter(){
-
return new Jackson2JsonMessageConverter();
-
}
-
-
// 下边这样写也可以
-
// @Autowired
-
// private RabbitTemplate rabbitTemplate;
-
// @PostConstruct
-
// public void init() {
-
// rabbitTemplate.setMandatory(true);
-
// rabbitTemplate.setReturnCallback(this);
-
// rabbitTemplate.setConfirmCallback(this);
-
// }
-
-
-
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
-
if (!ack) {
-
log.error("confirm==>发送到broker.exchange失败\r\n"
-
"correlationData={}\r\n" "ack={}\r\n" "cause={}",
-
correlationData, ack, cause);
-
} else {
-
log.info("confirm==>发送到broker.exchange成功\r\n"
-
"correlationData={}\r\n" "ack={}\r\n" "cause={}",
-
correlationData, ack, cause);
-
}
-
}
-
-
-
public void returnedMessage(Message message, int replyCode, String replyText,
-
String exchange, String routingKey) {
-
log.info("returnedMessage==> \r\n" "message={}\r\n" "replyCode={}\r\n"
-
"replyText={}\r\n" "exchange={}\r\n" "routingKey={}",
-
message, replyCode, replyText, exchange, routingKey);
-
}
-
}
WorkCustomer.java
-
import com.rabbitmq.client.Channel;
-
import lombok.SneakyThrows;
-
import lombok.extern.slf4j.Slf4j;
-
import org.springframework.amqp.rabbit.annotation.Exchange;
-
import org.springframework.amqp.rabbit.annotation.Queue;
-
import org.springframework.amqp.rabbit.annotation.QueueBinding;
-
import org.springframework.amqp.rabbit.annotation.RabbitListener;
-
import org.springframework.amqp.support.AmqpHeaders;
-
import org.springframework.messaging.handler.annotation.Header;
-
import org.springframework.stereotype.Component;
-
-
import java.io.IOException;
-
-
-
public class WorkCustomer {
-
-
-
-
-
public void receive12(Student student, Channel channel,long deliveryTag){
-
try {
-
log.info("message: {}",student);
-
// 处理实际业务
-
// 制造异常
-
// int wrongNumber = 1/0;
-
// 无异常,确认消息消费成功
-
channel.basicAck(deliveryTag, true);
-
}catch (IOException | ArithmeticException exception) {
-
log.error("处理消息发生异常", exception);
-
// 有异常,将消息返回给Queue里,第三个参数requeue可以直接看出来,是否返回到Queue中
-
channel.basicNack(deliveryTag, true, true);
-
}
-
-
}
-
-
-
//-----------------工作模式---------------------------------------
-
// 生产端没有指定交换机只有routingKey和Object。
-
//消费方产生work队列,放在默认的交换机(AMQP default)上。
-
//而默认的交换机有一个特点,只要你的routerKey的名字与这个
-
//交换机的队列有相同的名字,他就会自动路由上。
-
//生产端routingKey 叫work ,消费端生产work队列。
-
//他们就路由上了
-
-
public void receive1(String message){
-
System.out.println("work message1 = " message);
-
}
-
-
-
-
public void receive2(String message){
-
System.out.println("work message2 = " message);
-
}
-
-
//-------------------------广播模式--------------------------------------------------
-
-
-
-
-
public void fanout1(String message) {
-
System.out.println("message1 = " message);
-
}
-
-
-
-
-
-
public void fanout2(String message) {
-
System.out.println("message2 = " message);
-
}
-
//-------------------------路由模式--------------------------------------------------
-
-
-
-
-
-
-
-
public void receive1221(String message) {
-
System.out.println("message1 = " message);
-
}
-
-
-
-
-
-
-
-
-
-
public void receive11(String message) {
-
System.out.println("message1 = " message);
-
}
-
-
-
-
-
-
-
-
public void receive22(String message) {
-
System.out.println("message2 = " message);
-
}
-
//-------------------------Topic 订阅模型(动态路由模型)--------------------------------------------------
-
-
-
-
-
-
-
-
public void receive111(String message){
-
System.out.println("message1 = " message);
-
}
-
-
-
-
-
-
-
-
-
public void receive222(String message){
-
System.out.println("message2 = " message);
-
}
-
-
-
-
/**
-
* 监听Queue的时候,直接获取消息体.
-
* 在注解上开启手动确认, 必须是ackMode的大写.
-
* 在进行消息确认的时候,要带上Rabbit MQ Server发送过来头上的tag,可以通过@Header注解获取delivery tag,
-
* @param firstTopicQueueMessage 消息体
-
* @param channel Broker和Consumer建立的channel
-
* @param tag 消息头中的tag
-
*/
-
// @RabbitListener(queues = "${queue.topic.first}", ackMode = "MANUAL")
-
// @RabbitHandler
-
// @SneakyThrows
-
// public void receiveFirstTopicQueueMessage(String firstTopicQueueMessage, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
-
// log.info("This is firstTopicQueue received message: {}", firstTopicQueueMessage);
-
// try {
-
// // 处理实际业务
-
// TimeUnit.SECONDS.sleep(5);
-
// // 制造异常
-
// // int wrongNumber = 1/0;
-
// // 无异常,确认消息消费成功
-
// channel.basicAck(tag, true);
-
// }catch (IOException | ArithmeticException exception) {
-
// log.error("处理消息发生异常", exception);
-
// // 有异常,将消息返回给Queue里,第三个参数requeue可以直接看出来,是否返回到Queue中
-
// channel.basicNack(tag, true, true);
-
// }
-
// }
-
-
}
TTest.java
-
-
import org.junit.Test;
-
import org.junit.runner.RunWith;
-
import org.springframework.amqp.rabbit.core.RabbitTemplate;
-
import org.springframework.beans.factory.annotation.Autowired;
-
import org.springframework.boot.test.context.SpringBootTest;
-
import org.springframework.test.context.junit4.SpringRunner;
-
-
-
-
public class TTest {
-
-
-
private RabbitTemplate rabbitTemplate;
-
-
//5.2 第一种hello world模型使用
-
-
public void test1() {
-
Student student = new Student();
-
student.setName("小明");
-
student.setAge("18");
-
student.setAddress("杭州");
-
rabbitTemplate.convertAndSend("hello", student);
-
// 生产端没有指定交换机只有routingKey和Object。
-
//消费方产生hello队列,放在默认的交换机(AMQP default)上。
-
//而默认的交换机有一个特点,只要你的routerKey的名字与这个
-
//交换机的队列有相同的名字,他就会自动路由上。
-
//生产端routingKey 叫hello ,消费端生产hello队列。
-
//他们就路由上了
-
}
-
-
//5.3 第二种work模型使用
-
-
public void test2() {
-
for (int i = 0; i < 10; i ) {
-
rabbitTemplate.convertAndSend("work", "hello work!");
-
// 生产端没有指定交换机只有routingKey和Object。
-
//消费方产生work队列,放在默认的交换机(AMQP default)上。
-
//而默认的交换机有一个特点,只要你的routerKey的名字与这个
-
//交换机的队列有相同的名字,他就会自动路由上。
-
//生产端routingKey 叫work ,消费端生产work队列。
-
//他们就路由上了
-
}
-
}
-
// 5.4 Fanout 广播模型
-
-
-
public void test3() {
-
rabbitTemplate.convertAndSend("logs", "", "这是日志广播"); // 参数1为交换机,参数2为路由key,“”表示为任意路由,参数3为消息内容
-
}
-
-
//5.5 Route 路由模型
-
-
public void contextLoads() {
-
// rabbitTemplate.convertAndSend("directs", "error", "error 的日志信息");
-
rabbitTemplate.convertAndSend("directs", "info", "info 的日志信息");
-
}
-
-
//5.6 Topic 订阅模型(动态路由模型)
-
-
public void contextLoads1() {
-
rabbitTemplate.convertAndSend("topics", "user.save.findAll", "user.save.findAll 的消息");
-
}
-
}
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhgfjhba
系列文章
更多
同类精品
更多
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
怎样阻止微信小程序自动打开
PHP中文网 06-13 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01