• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

12-RabbitMQ高级特性-Consumer ACK

武飞扬头像
海洋的渔夫
帮助1

12-RabbitMQ高级特性-Consumer ACK

Consumer Ack

ack指Acknowledge,确认。 表示消费端收到消息后的确认方式。

有三种确认方式:

  • 自动确认:acknowledge="none"

  • 手动确认:acknowledge="manual"

  • 根据异常情况确认:acknowledge="auto",(这种方式使用麻烦,不作讲解)

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。

案例

1. 创建工程

创建一个空的 maven 工程 rabbitmq-consumer-spring:

学新通

2. 添加依赖

修改pom.xml文件内容为如下:

  1.  
    <?xml version="1.0" encoding="UTF-8"?>
  2.  
    <project xmlns="http://maven.apache.org/POM/4.0.0"
  3.  
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.  
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5.  
        <modelVersion>4.0.0</modelVersion>
  6.  
     
  7.  
        <groupId>com.lijw</groupId>
  8.  
        <artifactId>rabbitmq-consumer-spring</artifactId>
  9.  
        <version>1.0-SNAPSHOT</version>
  10.  
     
  11.  
        <dependencies>
  12.  
            <dependency>
  13.  
                <groupId>org.springframework</groupId>
  14.  
                <artifactId>spring-context</artifactId>
  15.  
                <version>5.1.7.RELEASE</version>
  16.  
            </dependency>
  17.  
     
  18.  
            <dependency>
  19.  
                <groupId>org.springframework.amqp</groupId>
  20.  
                <artifactId>spring-rabbit</artifactId>
  21.  
                <version>2.1.8.RELEASE</version>
  22.  
            </dependency>
  23.  
     
  24.  
            <dependency>
  25.  
                <groupId>junit</groupId>
  26.  
                <artifactId>junit</artifactId>
  27.  
                <version>4.12</version>
  28.  
            </dependency>
  29.  
     
  30.  
            <dependency>
  31.  
                <groupId>org.springframework</groupId>
  32.  
                <artifactId>spring-test</artifactId>
  33.  
                <version>5.1.7.RELEASE</version>
  34.  
            </dependency>
  35.  
        </dependencies>
  36.  
    </project>
学新通

3. 配置整合

1.创建rabbitmq.properties连接参数等配置文件;

  1.  
    rabbitmq.host=127.0.0.1
  2.  
    rabbitmq.port=5672
  3.  
    rabbitmq.username=libai
  4.  
    rabbitmq.password=libai
  5.  
    rabbitmq.virtual-host=/test

2.创建 spring-rabbitmq-consumer.xml 整合配置文件;

  1.  
    <?xml version="1.0" encoding="UTF-8"?>
  2.  
    <beans xmlns="http://www.springframework.org/schema/beans"
  3.  
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.  
           xmlns:context="http://www.springframework.org/schema/context"
  5.  
           xmlns:rabbit="http://www.springframework.org/schema/rabbit"
  6.  
           xsi:schemaLocation="http://www.springframework.org/schema/beans
  7.  
           http://www.springframework.org/schema/beans/spring-beans.xsd
  8.  
           http://www.springframework.org/schema/context
  9.  
           https://www.springframework.org/schema/context/spring-context.xsd
  10.  
           http://www.springframework.org/schema/rabbit
  11.  
           http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
  12.  
        <!--加载配置文件-->
  13.  
        <context:property-placeholder location="classpath:rabbitmq.properties"/>
  14.  
     
  15.  
        <!-- 定义rabbitmq connectionFactory -->
  16.  
        <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
  17.  
                                   port="${rabbitmq.port}"
  18.  
                                   username="${rabbitmq.username}"
  19.  
                                   password="${rabbitmq.password}"
  20.  
                                   virtual-host="${rabbitmq.virtual-host}"/>
  21.  
     
  22.  
     
  23.  
    </beans>
学新通
  1. 配置扫描监听器的Bean

后续我们写的消费者都是监听接口的实现类,需要将其生成 Bean,所以我们可以将其写到一个 包下,配置 spring 扫描:

学新通
  1.  
    <!-- 定义扫描监听器类   -->
  2.  
    <context:component-scan base-package="com.lijw.listener" />

4. 编写监听器

实现 MessageListener 类,则可以接收到消息:

学新通
  1.  
    package com.lijw.listener;
  2.  
     
  3.  
    import org.springframework.amqp.core.Message;
  4.  
    import org.springframework.amqp.core.MessageListener;
  5.  
    import org.springframework.stereotype.Component;
  6.  
     
  7.  
    /**
  8.  
     * @author Aron.li
  9.  
     * @date 2022/3/4 23:36
  10.  
     */
  11.  
    @Component
  12.  
    public class AckListener implements MessageListener {
  13.  
        @Override
  14.  
        public void onMessage(Message message) {
  15.  
            String msg = new String(message.getBody());
  16.  
            System.out.printf("接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s \n",
  17.  
                    message.getMessageProperties().getReceivedExchange(),
  18.  
                    message.getMessageProperties().getReceivedRoutingKey(),
  19.  
                    message.getMessageProperties().getConsumerQueue(),
  20.  
                    msg);
  21.  
        }
  22.  
    }
学新通

5.配置 监听器类 与 队列 绑定

学新通
  1.  
    <!--  定义监听器与队列的绑定  -->
  2.  
    <rabbit:listener-container connection-factory="connectionFactory" >
  3.  
        <rabbit:listener ref="ackListener" queue-names="test_queue_confirm"/>
  4.  
    </rabbit:listener-container>

6.测试 接收消息

配置好之后,下面我们来启动服务,验证一下能否接收到消息。

需要我们编写一个测试类,启动加载 spring 的相关配置:

学新通
  1.  
    package com.lijw;
  2.  
     
  3.  
    import org.junit.Test;
  4.  
    import org.junit.runner.RunWith;
  5.  
    import org.springframework.test.context.ContextConfiguration;
  6.  
    import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
  7.  
     
  8.  
    /**
  9.  
     * @author Aron.li
  10.  
     * @date 2022/3/4 23:41
  11.  
     */
  12.  
    @RunWith(SpringJUnit4ClassRunner.class)
  13.  
    @ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
  14.  
    public class ConsumerTest {
  15.  
     
  16.  
        @Test
  17.  
        public void test01() {
  18.  
            while (true) {
  19.  
     
  20.  
            }
  21.  
        }
  22.  
     
  23.  
    }
学新通

执行测试方法,开启加载 spring 框架,接收消息如下:

学新通

说明现在监听器已经正常工作了,那么下一步我们就要开始来写 Consumer Ack 的功能了。

7. 配置消息接收 手动确认:acknowledge="manual"

学新通
  1.  
    <!--  定义监听器与队列的绑定  -->
  2.  
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
  3.  
        <rabbit:listener ref="ackListener" queue-names="test_queue_confirm"/>
  4.  
    </rabbit:listener-container>

8.改写监听器,实现 ChannelAwareMessageListener 接口

在消息手动确认中,我们需要使用 channel.basicAck() channel.basicNack() 进行确认,那么就需要有 channel 提供我们调用。

而原来的 MessageListener 接口没有提供 channel ,我们可以实现 MessageListener 的子接口 ChannelAwareMessageListener

学新通

改写监听器如下:

学新通
  1.  
    package com.lijw.listener;
  2.  
     
  3.  
    import com.rabbitmq.client.Channel;
  4.  
    import org.springframework.amqp.core.Message;
  5.  
    import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
  6.  
    import org.springframework.stereotype.Component;
  7.  
     
  8.  
     
  9.  
    /**
  10.  
     * Consumer ACK机制:
  11.  
     * 1. 设置手动签收。acknowledge="manual"
  12.  
     * 2. 让监听器类实现ChannelAwareMessageListener接口
  13.  
     * 3. 如果消息成功处理,则调用channel的 basicAck()签收
  14.  
     * 4. 如果消息处理失败,则调用channel的basicNack()拒绝签收,broker重新发送给consumer
  15.  
     *
  16.  
     * @author Aron.li
  17.  
     * @date 2022/3/4 23:36
  18.  
     */
  19.  
    @Component
  20.  
    public class AckListener implements ChannelAwareMessageListener {
  21.  
     
  22.  
        @Override
  23.  
        public void onMessage(Message message, Channel channel) throws Exception {
  24.  
            //1. 获取传递的标签,用于消息确认
  25.  
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
  26.  
     
  27.  
            try {
  28.  
                //2. 接收消息
  29.  
                System.out.println(new String(message.getBody()));
  30.  
     
  31.  
                //3. 处理业务逻辑
  32.  
                System.out.println("处理业务逻辑...");
  33.  
    //            int i = 3 / 0; // 产生异常
  34.  
                //4. 手动签收
  35.  
                channel.basicAck(deliveryTag, true);
  36.  
            } catch (Exception e) {
  37.  
                //e.printStackTrace();
  38.  
     
  39.  
                //5. 拒绝签收
  40.  
                /*
  41.  
                第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
  42.  
                 */
  43.  
                channel.basicNack(deliveryTag, truetrue); // 一次性可以拒绝多条消息
  44.  
                //channel.basicReject(deliveryTag,true); // 一次性只能拒绝一条消息,了解即可
  45.  
            }
  46.  
        }
  47.  
     
  48.  
    }
学新通

9.测试 拒绝重发消息

首先我们正常启动监听器,并且生产者发送消息:

学新通

下面我们在处理业务逻辑的位置 编写一个异常代码,如下:

学新通

可以看到只要没有签收成功,就可以让消息不断重发,直到我们解决了异常,正常签收消息为止。

Consumer Ack 小结

  • 在rabbit:listener-container标签中设置acknowledge属性,设置ack方式 none:自动确认,manual:手动确认

  • 如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,false);方法确认签收消息

  • 如果出现异常,则在catch中调用 basicNack或 basicReject,拒绝消息,让MQ重新发送消息。

消息可靠性总结

  • 持久化

    • exchange要持久化

    • queue要持久化

    • message要持久化

  • 生产方确认Confirm

  • 消费方确认Ack

  • Broker高可用

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhggkbge
系列文章
更多 icon
同类精品
更多 icon
继续加载