• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

springboot整合rabbitMQ和手动ack确认

武飞扬头像
Mckzxs
帮助1

springboot版2.2.2

  1.  
    <?xml version="1.0" encoding="UTF-8"?>
  2.  
    <project xmlns="http://maven.apache.org/POM/4.0.0"
  3.  
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.  
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5.  
    <modelVersion>4.0.0</modelVersion>
  6.  
     
  7.  
    <groupId>com.ex</groupId>
  8.  
    <artifactId>tmap2</artifactId>
  9.  
    <version>1.0-SNAPSHOT</version>
  10.  
     
  11.  
    <parent>
  12.  
    <groupId>org.springframework.boot</groupId>
  13.  
    <artifactId>spring-boot-starter-parent</artifactId>
  14.  
    <version>2.2.2.RELEASE</version>
  15.  
    <relativePath/> <!-- lookup parent from repository -->
  16.  
    </parent>
  17.  
    <dependencies>
  18.  
    <dependency>
  19.  
    <groupId>org.springframework.boot</groupId>
  20.  
    <artifactId>spring-boot-starter-web</artifactId>
  21.  
    </dependency>
  22.  
     
  23.  
    <!-- <dependency>-->
  24.  
    <!-- <groupId>io.agora.rtm</groupId>-->
  25.  
    <!-- <artifactId>agora-rtm-sdk</artifactId>-->
  26.  
    <!-- <version>1.3</version>-->
  27.  
    <!-- </dependency>-->
  28.  
    <dependency>
  29.  
    <groupId>org.springframework.boot</groupId>
  30.  
    <artifactId>spring-boot-starter-amqp</artifactId>
  31.  
    </dependency>
  32.  
    <dependency>
  33.  
    <groupId>org.projectlombok</groupId>
  34.  
    <artifactId>lombok</artifactId>
  35.  
    </dependency>
  36.  
    <dependency>
  37.  
    <groupId>org.springframework.boot</groupId>
  38.  
    <artifactId>spring-boot-starter-test</artifactId>
  39.  
    <scope>test</scope>
  40.  
    </dependency>
  41.  
     
  42.  
    </dependencies>
  43.  
    </project>
学新通

application.yml
 

  1.  
    spring:
  2.  
    rabbitmq:
  3.  
    host: localhost
  4.  
    port: 5672
  5.  
    username: admin
  6.  
    password: 123456
  7.  
    virtual-host: /ems
  8.  
    template:
  9.  
    retry: #重试,消息发送失败会重试
  10.  
    enabled: true # 开启重试
  11.  
    initial-interval: 10000ms #第一次十秒重试
  12.  
    max-interval: 80000ms #最后一次是八秒重试
  13.  
    multiplier: 2 #重试翻倍率
  14.  
    publisher-confirms: true #发送者开启 confirm 确认机制
  15.  
    publisher-returns: true # 发送者开启 return 确认机制
学新通




RabbitCallbackConfig.java

  1.  
    import lombok.extern.slf4j.Slf4j;
  2.  
    import org.springframework.amqp.core.Message;
  3.  
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  4.  
    import org.springframework.amqp.rabbit.connection.CorrelationData;
  5.  
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
  6.  
    import org.springframework.context.annotation.Bean;
  7.  
    import org.springframework.context.annotation.Configuration;
  8.  
     
  9.  
    @Slf4j
  10.  
    @Configuration
  11.  
    public class RabbitCallbackConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
  12.  
    @Bean
  13.  
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
  14.  
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
  15.  
    //rabbitTemplate发送消息json转换配置
  16.  
    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
  17.  
    rabbitTemplate.setMandatory(true);
  18.  
    rabbitTemplate.setConfirmCallback(this);
  19.  
    rabbitTemplate.setReturnCallback(this);
  20.  
    return rabbitTemplate;
  21.  
    }
  22.  
    /**
  23.  
    * 配置接收消息json转换为对象
  24.  
    * @return
  25.  
    */
  26.  
    @Bean
  27.  
    public MessageConverter jsonMessageConverter(){
  28.  
    return new Jackson2JsonMessageConverter();
  29.  
    }
  30.  
     
  31.  
    // 下边这样写也可以
  32.  
    // @Autowired
  33.  
    // private RabbitTemplate rabbitTemplate;
  34.  
    // @PostConstruct
  35.  
    // public void init() {
  36.  
    // rabbitTemplate.setMandatory(true);
  37.  
    // rabbitTemplate.setReturnCallback(this);
  38.  
    // rabbitTemplate.setConfirmCallback(this);
  39.  
    // }
  40.  
     
  41.  
    @Override
  42.  
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  43.  
    if (!ack) {
  44.  
    log.error("confirm==>发送到broker.exchange失败\r\n"
  45.  
    "correlationData={}\r\n" "ack={}\r\n" "cause={}",
  46.  
    correlationData, ack, cause);
  47.  
    } else {
  48.  
    log.info("confirm==>发送到broker.exchange成功\r\n"
  49.  
    "correlationData={}\r\n" "ack={}\r\n" "cause={}",
  50.  
    correlationData, ack, cause);
  51.  
    }
  52.  
    }
  53.  
     
  54.  
    @Override
  55.  
    public void returnedMessage(Message message, int replyCode, String replyText,
  56.  
    String exchange, String routingKey) {
  57.  
    log.info("returnedMessage==> \r\n" "message={}\r\n" "replyCode={}\r\n"
  58.  
    "replyText={}\r\n" "exchange={}\r\n" "routingKey={}",
  59.  
    message, replyCode, replyText, exchange, routingKey);
  60.  
    }
  61.  
    }
学新通

WorkCustomer.java

  1.  
    import com.rabbitmq.client.Channel;
  2.  
    import lombok.SneakyThrows;
  3.  
    import lombok.extern.slf4j.Slf4j;
  4.  
    import org.springframework.amqp.rabbit.annotation.Exchange;
  5.  
    import org.springframework.amqp.rabbit.annotation.Queue;
  6.  
    import org.springframework.amqp.rabbit.annotation.QueueBinding;
  7.  
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
  8.  
    import org.springframework.amqp.support.AmqpHeaders;
  9.  
    import org.springframework.messaging.handler.annotation.Header;
  10.  
    import org.springframework.stereotype.Component;
  11.  
     
  12.  
    import java.io.IOException;
  13.  
    @Slf4j
  14.  
    @Component
  15.  
    public class WorkCustomer {
  16.  
     
  17.  
     
  18.  
    @RabbitListener(queuesToDeclare = @Queue(value = "hello",declare = "true"),ackMode ="MANUAL" )
  19.  
    @SneakyThrows
  20.  
    public void receive12(Student student, Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag){
  21.  
    try {
  22.  
    log.info("message: {}",student);
  23.  
    // 处理实际业务
  24.  
    // 制造异常
  25.  
    // int wrongNumber = 1/0;
  26.  
    // 无异常,确认消息消费成功
  27.  
    channel.basicAck(deliveryTag, true);
  28.  
    }catch (IOException | ArithmeticException exception) {
  29.  
    log.error("处理消息发生异常", exception);
  30.  
    // 有异常,将消息返回给Queue里,第三个参数requeue可以直接看出来,是否返回到Queue中
  31.  
    channel.basicNack(deliveryTag, true, true);
  32.  
    }
  33.  
     
  34.  
    }
  35.  
     
  36.  
     
  37.  
    //-----------------工作模式---------------------------------------
  38.  
    // 生产端没有指定交换机只有routingKey和Object。
  39.  
    //消费方产生work队列,放在默认的交换机(AMQP default)上。
  40.  
    //而默认的交换机有一个特点,只要你的routerKey的名字与这个
  41.  
    //交换机的队列有相同的名字,他就会自动路由上。
  42.  
    //生产端routingKey 叫work ,消费端生产work队列。
  43.  
    //他们就路由上了
  44.  
    @RabbitListener(queuesToDeclare = @Queue("work"))
  45.  
    public void receive1(String message){
  46.  
    System.out.println("work message1 = " message);
  47.  
    }
  48.  
     
  49.  
     
  50.  
    @RabbitListener(queuesToDeclare = @Queue("work"))
  51.  
    public void receive2(String message){
  52.  
    System.out.println("work message2 = " message);
  53.  
    }
  54.  
     
  55.  
    //-------------------------广播模式--------------------------------------------------
  56.  
    @RabbitListener(bindings = @QueueBinding(
  57.  
    value = @Queue, // 创建临时队列
  58.  
    exchange = @Exchange(name = "logs", type = "fanout")
  59.  
    ))
  60.  
    public void fanout1(String message) {
  61.  
    System.out.println("message1 = " message);
  62.  
    }
  63.  
     
  64.  
    @RabbitListener(bindings = @QueueBinding(
  65.  
    value = @Queue, // 创建临时队列
  66.  
    exchange = @Exchange(name = "logs", type = "fanout")
  67.  
    ))
  68.  
    public void fanout2(String message) {
  69.  
    System.out.println("message2 = " message);
  70.  
    }
  71.  
    //-------------------------路由模式--------------------------------------------------
  72.  
     
  73.  
    @RabbitListener(bindings = {
  74.  
    @QueueBinding(
  75.  
    value = @Queue(value = "infoQue",declare = "true"), // 创建info队列,declare默认队列持久化
  76.  
    key = {"info"}, // 路由key
  77.  
    exchange = @Exchange(type = "direct", name = "directs")
  78.  
    )})
  79.  
    public void receive1221(String message) {
  80.  
    System.out.println("message1 = " message);
  81.  
    }
  82.  
     
  83.  
     
  84.  
     
  85.  
    @RabbitListener(bindings = {
  86.  
    @QueueBinding(
  87.  
    value = @Queue(value = "info||error"), // 创建临时队列
  88.  
    key = {"info", "error"}, // 路由key
  89.  
    exchange = @Exchange(type = "direct", name = "directs")
  90.  
    )})
  91.  
    public void receive11(String message) {
  92.  
    System.out.println("message1 = " message);
  93.  
    }
  94.  
     
  95.  
    @RabbitListener(bindings = {
  96.  
    @QueueBinding(
  97.  
    value = @Queue,
  98.  
    key = {"error"},
  99.  
    exchange = @Exchange(type = "direct", name = "directs")
  100.  
    )})
  101.  
    public void receive22(String message) {
  102.  
    System.out.println("message2 = " message);
  103.  
    }
  104.  
    //-------------------------Topic 订阅模型(动态路由模型)--------------------------------------------------
  105.  
    @RabbitListener(bindings = {
  106.  
    @QueueBinding(
  107.  
    value = @Queue,
  108.  
    key = {"user.*"},
  109.  
    exchange = @Exchange(type = "topic",name = "topics")
  110.  
    )
  111.  
    })
  112.  
    public void receive111(String message){
  113.  
    System.out.println("message1 = " message);
  114.  
    }
  115.  
     
  116.  
    @RabbitListener(bindings = {
  117.  
    @QueueBinding(
  118.  
    value = @Queue,
  119.  
    key = {"user.#"},
  120.  
    exchange = @Exchange(type = "topic",name = "topics")
  121.  
    )
  122.  
    })
  123.  
    public void receive222(String message){
  124.  
    System.out.println("message2 = " message);
  125.  
    }
  126.  
     
  127.  
     
  128.  
     
  129.  
    /**
  130.  
    * 监听Queue的时候,直接获取消息体.
  131.  
    * 在注解上开启手动确认, 必须是ackMode的大写.
  132.  
    * 在进行消息确认的时候,要带上Rabbit MQ Server发送过来头上的tag,可以通过@Header注解获取delivery tag,
  133.  
    * @param firstTopicQueueMessage 消息体
  134.  
    * @param channel Broker和Consumer建立的channel
  135.  
    * @param tag 消息头中的tag
  136.  
    */
  137.  
    // @RabbitListener(queues = "${queue.topic.first}", ackMode = "MANUAL")
  138.  
    // @RabbitHandler
  139.  
    // @SneakyThrows
  140.  
    // public void receiveFirstTopicQueueMessage(String firstTopicQueueMessage, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
  141.  
    // log.info("This is firstTopicQueue received message: {}", firstTopicQueueMessage);
  142.  
    // try {
  143.  
    // // 处理实际业务
  144.  
    // TimeUnit.SECONDS.sleep(5);
  145.  
    // // 制造异常
  146.  
    // // int wrongNumber = 1/0;
  147.  
    // // 无异常,确认消息消费成功
  148.  
    // channel.basicAck(tag, true);
  149.  
    // }catch (IOException | ArithmeticException exception) {
  150.  
    // log.error("处理消息发生异常", exception);
  151.  
    // // 有异常,将消息返回给Queue里,第三个参数requeue可以直接看出来,是否返回到Queue中
  152.  
    // channel.basicNack(tag, true, true);
  153.  
    // }
  154.  
    // }
  155.  
     
  156.  
    }
学新通

TTest.java

  1.  
     
  2.  
    import org.junit.Test;
  3.  
    import org.junit.runner.RunWith;
  4.  
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5.  
    import org.springframework.beans.factory.annotation.Autowired;
  6.  
    import org.springframework.boot.test.context.SpringBootTest;
  7.  
    import org.springframework.test.context.junit4.SpringRunner;
  8.  
     
  9.  
    @RunWith(SpringRunner.class)
  10.  
    @SpringBootTest
  11.  
    public class TTest {
  12.  
     
  13.  
    @Autowired
  14.  
    private RabbitTemplate rabbitTemplate;
  15.  
     
  16.  
    //5.2 第一种hello world模型使用
  17.  
    @Test
  18.  
    public void test1() {
  19.  
    Student student = new Student();
  20.  
    student.setName("小明");
  21.  
    student.setAge("18");
  22.  
    student.setAddress("杭州");
  23.  
    rabbitTemplate.convertAndSend("hello", student);
  24.  
    // 生产端没有指定交换机只有routingKey和Object。
  25.  
    //消费方产生hello队列,放在默认的交换机(AMQP default)上。
  26.  
    //而默认的交换机有一个特点,只要你的routerKey的名字与这个
  27.  
    //交换机的队列有相同的名字,他就会自动路由上。
  28.  
    //生产端routingKey 叫hello ,消费端生产hello队列。
  29.  
    //他们就路由上了
  30.  
    }
  31.  
     
  32.  
    //5.3 第二种work模型使用
  33.  
    @Test
  34.  
    public void test2() {
  35.  
    for (int i = 0; i < 10; i ) {
  36.  
    rabbitTemplate.convertAndSend("work", "hello work!");
  37.  
    // 生产端没有指定交换机只有routingKey和Object。
  38.  
    //消费方产生work队列,放在默认的交换机(AMQP default)上。
  39.  
    //而默认的交换机有一个特点,只要你的routerKey的名字与这个
  40.  
    //交换机的队列有相同的名字,他就会自动路由上。
  41.  
    //生产端routingKey 叫work ,消费端生产work队列。
  42.  
    //他们就路由上了
  43.  
    }
  44.  
    }
  45.  
    // 5.4 Fanout 广播模型
  46.  
     
  47.  
    @Test
  48.  
    public void test3() {
  49.  
    rabbitTemplate.convertAndSend("logs", "", "这是日志广播"); // 参数1为交换机,参数2为路由key,“”表示为任意路由,参数3为消息内容
  50.  
    }
  51.  
     
  52.  
    //5.5 Route 路由模型
  53.  
    @Test
  54.  
    public void contextLoads() {
  55.  
    // rabbitTemplate.convertAndSend("directs", "error", "error 的日志信息");
  56.  
    rabbitTemplate.convertAndSend("directs", "info", "info 的日志信息");
  57.  
    }
  58.  
     
  59.  
    //5.6 Topic 订阅模型(动态路由模型)
  60.  
    @Test
  61.  
    public void contextLoads1() {
  62.  
    rabbitTemplate.convertAndSend("topics", "user.save.findAll", "user.save.findAll 的消息");
  63.  
    }
  64.  
    }
学新通

学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgfjhba
系列文章
更多 icon
同类精品
更多 icon
继续加载