• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

七 、RabbitMQ高级特性

武飞扬头像
牵兔散步的萝卜
帮助1

7.1 消息的可靠投递

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式

  • confirm 确认模式
  • return 退回模式

rabbitmq 整个消息投递的路径为:

producer—>rabbitmq broker—>exchange—>queue—>consumer

l消息从 producer 到 exchange 则会返回一个 confirmCallback 。

l消息从 exchange–>queue 投递失败则会返回一个 returnCallback 。

我们将利用这两个 callback 控制消息的可靠性投递

7.1.1 提供者代码实现

① 创建项目 rabbitmq-producer-spring

② 添加pom文件

  1.  
    <**dependencies**>
  2.  
     
  3.  
    <**dependency**>
  4.  
     
  5.  
    <**groupId**>org.springframework</**groupId**>
  6.  
     
  7.  
    <**artifactId**>spring-context</**artifactId**>
  8.  
     
  9.  
    <**version**>5.1.7.RELEASE</**version**>
  10.  
     
  11.  
    </**dependency**>
  12.  
     
  13.  
     
  14.  
     
  15.  
    <**dependency**>
  16.  
     
  17.  
    <**groupId**>org.springframework.amqp</**groupId**>
  18.  
     
  19.  
    <**artifactId**>spring-rabbit</**artifactId**>
  20.  
     
  21.  
    <**version**>2.1.8.RELEASE</**version**>
  22.  
     
  23.  
    </**dependency**>
  24.  
     
  25.  
     
  26.  
     
  27.  
    <**dependency**>
  28.  
     
  29.  
    <**groupId**>junit</**groupId**>
  30.  
     
  31.  
    <**artifactId**>junit</**artifactId**>
  32.  
     
  33.  
    <**version**>4.12</**version**>
  34.  
     
  35.  
    </**dependency**>
  36.  
     
  37.  
     
  38.  
     
  39.  
    <**dependency**>
  40.  
     
  41.  
    <**groupId**>org.springframework</**groupId**>
  42.  
     
  43.  
    <**artifactId**>spring-test</**artifactId**>
  44.  
     
  45.  
    <**version**>5.1.7.RELEASE</**version**>
  46.  
     
  47.  
    </**dependency**>
  48.  
     
  49.  
    </**dependencies**>
  50.  
     
  51.  
     
  52.  
     
  53.  
    <**build**>
  54.  
     
  55.  
    <**plugins**>
  56.  
     
  57.  
    <**plugin**>
  58.  
     
  59.  
    <**groupId**>org.apache.maven.plugins</**groupId**>
  60.  
     
  61.  
    <**artifactId**>maven-compiler-plugin</**artifactId**>
  62.  
     
  63.  
    <**version**>3.8.0</**version**>
  64.  
     
  65.  
    <**configuration**>
  66.  
     
  67.  
    <**source**>1.8</**source**>
  68.  
     
  69.  
    <**target**>1.8</**target**>
  70.  
     
  71.  
    </**configuration**>
  72.  
     
  73.  
    </**plugin**>
  74.  
     
  75.  
    </**plugins**>
  76.  
     
  77.  
    </**build**>
学新通
  1. 在resource 文件夹下面添加 配置文件 rabbitmq.properties

rabbitmq.host=192.168.197.129

rabbitmq.port=5672

rabbitmq.username=admin

rabbitmq.password=admin

rabbitmq.virtual-host=/

④ 在resource 文件夹下面添加 配置文件 spring-rabbitmq-producer.xml

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="Index of /schema/beans"

  1.  
    **xmlns:xsi**="http://www.w3.org/2001/XMLSchema-instance"
  2.  
     
  3.  
    **xmlns:context**="http://www.springframework.org/schema/context"
  4.  
     
  5.  
    **xmlns:rabbit**="http://www.springframework.org/schema/rabbit"
  6.  
     
  7.  
    **xsi:schemaLocation**="http://www.springframework.org/schema/beans
  8.  
     
  9.  
    http://www.springframework.org/schema/beans/spring-beans.xsd
  10.  
     
  11.  
    http://www.springframework.org/schema/context
  12.  
     
  13.  
    https://www.springframework.org/schema/context/spring-context.xsd
  14.  
     
  15.  
    http://www.springframework.org/schema/rabbit
  16.  
     
  17.  
    http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
  18.  
     
  19.  
    <!--加载配置文件-->
  20.  
     
  21.  
    <**context:property-placeholder** **location**="classpath:rabbitmq.properties"/>
  22.  
     
  23.  
     
  24.  
     
  25.  
    <!-- 定义rabbitmq connectionFactory
  26.  
     
  27.  
    确认模式开启:publisher-confirms="true"
  28.  
     
  29.  
    -->
  30.  
     
  31.  
    <**rabbit:connection-factory** **id**="connectionFactory" **host**="${rabbitmq.host}"
  32.  
     
  33.  
    **port**="${rabbitmq.port}"
  34.  
     
  35.  
    **username**="${rabbitmq.username}"
  36.  
     
  37.  
    **password**="${rabbitmq.password}"
  38.  
     
  39.  
    **virtual-host**="${rabbitmq.virtual-host}"
  40.  
     
  41.  
    **publisher-confirms**="true"
  42.  
     
  43.  
    **publisher-returns**="true"
  44.  
     
  45.  
    />
  46.  
     
  47.  
    <!--定义管理交换机、队列-->
  48.  
     
  49.  
    <**rabbit:admin** **connection-factory**="connectionFactory"/>
  50.  
     
  51.  
     
  52.  
     
  53.  
    <!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
  54.  
     
  55.  
    <**rabbit:template** **id**="rabbitTemplate" **connection-factory**="connectionFactory"/>
  56.  
     
  57.  
     
  58.  
     
  59.  
    <!--消息可靠性投递(生产端)-->
  60.  
     
  61.  
    <**rabbit:queue** **id**="test_queue_confirm" **name**="test_queue_confirm"></**rabbit:queue**>
  62.  
     
  63.  
    <**rabbit:direct-exchange** **name**="test_exchange_confirm">
  64.  
     
  65.  
    <**rabbit:bindings**>
  66.  
     
  67.  
    <**rabbit:binding** **queue**="test_queue_confirm" **key**="confirm"></**rabbit:binding**>
  68.  
     
  69.  
    </**rabbit:bindings**>
  70.  
     
  71.  
    </**rabbit:direct-exchange**>
  72.  
     
学新通

</beans>

⑤ 创建测试类 , 添加确认模式

package com.atguigu;

import org.junit.Test;

import org.junit.runner.RunWith;

import org.springframework.amqp.AmqpException;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.core.MessagePostProcessor;

import org.springframework.amqp.rabbit.connection.CorrelationData;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.test.context.ContextConfiguration;

import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringJUnit4ClassRunner.class)

@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")

public class ProducerTest {

  1.  
    @Autowired
  2.  
     
  3.  
    **private** RabbitTemplate rabbitTemplate;
  4.  
     
  5.  
    /**
  6.  
     
  7.  
    * 确认模式:
  8.  
     
  9.  
    * 步骤:
  10.  
     
  11.  
    * 1. 确认模式开启:ConnectionFactory中开启publisher-confirms="true"
  12.  
     
  13.  
    * 2. 在rabbitTemplate定义ConfirmCallBack回调函数
  14.  
     
  15.  
    */
  16.  
     
  17.  
    @Test
  18.  
     
  19.  
    **public** **void** **testConfirm**() {
  20.  
     
  21.  
    //2. 定义回调
  22.  
     
  23.  
    rabbitTemplate.setConfirmCallback(**new** RabbitTemplate.ConfirmCallback() {
  24.  
     
  25.  
    @Override
  26.  
     
  27.  
    **public** **void** **confirm**(CorrelationData correlationData, **boolean** ack, String cause) {
  28.  
     
  29.  
    **if** (ack){
  30.  
     
  31.  
    //接收成功
  32.  
     
  33.  
    System.out.println("接收成功消息" cause);
  34.  
     
  35.  
    }**else** {
  36.  
     
  37.  
    //接收失败
  38.  
     
  39.  
    System.out.println("接收失败消息" cause);
  40.  
     
  41.  
    //做一些处理,让消息再次发送。
  42.  
     
  43.  
    }
  44.  
     
  45.  
    }
  46.  
     
  47.  
    });
  48.  
     
  49.  
     
  50.  
     
  51.  
    //3. 发送消息
  52.  
     
  53.  
    rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message confirm....");//成功
  54.  
     
  55.  
    //rabbitTemplate.convertAndSend("test_exchange_confirm000", "confirm", "message confirm....");//失败
  56.  
     
  57.  
    }
学新通

}

运行程序

⑥ 添加回退模式

package com.atguigu;

import org.junit.Test;

import org.junit.runner.RunWith;

import org.springframework.amqp.AmqpException;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.core.MessagePostProcessor;

import org.springframework.amqp.rabbit.connection.CorrelationData;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.test.context.ContextConfiguration;

import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringJUnit4ClassRunner.class)

@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")

public class ProducerTest {

  1.  
    @Autowired
  2.  
     
  3.  
    **private** RabbitTemplate rabbitTemplate;
  4.  
     
  5.  
     
  6.  
     
  7.  
    /**
  8.  
     
  9.  
    * 回退模式: 当消息发送给Exchange后,Exchange路由到Queue失败时 才会执行 ReturnCallBack
  10.  
     
  11.  
    * 步骤:
  12.  
     
  13.  
    * 1. 开启回退模式:publisher-returns="true"
  14.  
     
  15.  
    * 2. 设置ReturnCallBack
  16.  
     
  17.  
    * 3. 设置Exchange处理消息的模式:
  18.  
     
  19.  
    * 1). 如果消息没有路由到Queue,则丢弃消息(默认)
  20.  
     
  21.  
    * 2). 如果消息没有路由到Queue,返回给消息发送方ReturnCallBack
  22.  
     
  23.  
    * rabbitTemplate.setMandatory(true);
  24.  
     
  25.  
    */
  26.  
     
  27.  
    @Test
  28.  
     
  29.  
    **public** **void** **testReturn**() {
  30.  
     
  31.  
     
  32.  
     
  33.  
    //设置交换机处理失败消息的模式
  34.  
     
  35.  
    rabbitTemplate.setMandatory(**true**);
  36.  
     
  37.  
     
  38.  
     
  39.  
    //2.设置ReturnCallBack
  40.  
     
  41.  
    rabbitTemplate.setReturnCallback(**new** RabbitTemplate.ReturnCallback() {
  42.  
     
  43.  
    /**
  44.  
     
  45.  
    * @param message 消息对象
  46.  
     
  47.  
    * @param replyCode 错误码
  48.  
     
  49.  
    * @param replyText 错误信息
  50.  
     
  51.  
    * @param exchange 交换机
  52.  
     
  53.  
    * @param routingKey 路由键
  54.  
     
  55.  
    */
  56.  
     
  57.  
    @Override
  58.  
     
  59.  
    **public** **void** **returnedMessage**(Message message, **int** replyCode,String replyText,String exchange,String routingKey) {
  60.  
     
  61.  
    System.out.println("return 执行了....");
  62.  
     
  63.  
     
  64.  
     
  65.  
    System.out.println(message);
  66.  
     
  67.  
    System.out.println(replyCode);
  68.  
     
  69.  
    System.out.println(replyText);
  70.  
     
  71.  
    System.out.println(exchange);
  72.  
     
  73.  
    System.out.println(routingKey);
  74.  
     
  75.  
     
  76.  
     
  77.  
    //处理
  78.  
     
  79.  
    }
  80.  
     
  81.  
    });
  82.  
     
  83.  
     
  84.  
     
  85.  
    //3. 发送消息
  86.  
     
  87.  
    rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message confirm....");
  88.  
     
  89.  
    // rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm11", "message confirm....");
  90.  
     
  91.  
    }
学新通

}

7.1.2 消息的可靠投递小结

  • 设置 ConnectionFactory的publisher-confirms="true" 开启 确认模式。
  • 使用 rabbitTemplate.setConfirmCallback 设置回调函数。当消息发送到 exchange 后回调 confirm 方法。在方法中判断 ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。
  • 设置 ConnectionFactory 的 publisher-returns="true" 开启 退回模式。
  • 使用 rabbitTemplate.setReturnCallback 设置退回函数,当消息从exchange 路由到 queue 失败后,如果设置了 rabbitTemplate.setMandatory(true) 参数,则会将消息退回给 producer并执行回调函数returnedMessage

7.1.3 Consumer Ack

ack指Acknowledge,确认。 表示消费端收到消息后的确认方式。

有二种确认方式:

  • 自动确认:acknowledge=“none” 默认
  • 手动确认:acknowledge=“manual”

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。

如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。

① 创建项目 rabbitmq-consumer-spring

② 添加pom文件

  1.  
    <**dependencies**>
  2.  
     
  3.  
    <**dependency**>
  4.  
     
  5.  
    <**groupId**>org.springframework</**groupId**>
  6.  
     
  7.  
    <**artifactId**>spring-context</**artifactId**>
  8.  
     
  9.  
    <**version**>5.1.7.RELEASE</**version**>
  10.  
     
  11.  
    </**dependency**>
  12.  
     
  13.  
     
  14.  
     
  15.  
    <**dependency**>
  16.  
     
  17.  
    <**groupId**>org.springframework.amqp</**groupId**>
  18.  
     
  19.  
    <**artifactId**>spring-rabbit</**artifactId**>
  20.  
     
  21.  
    <**version**>2.1.8.RELEASE</**version**>
  22.  
     
  23.  
    </**dependency**>
  24.  
     
  25.  
     
  26.  
     
  27.  
    <**dependency**>
  28.  
     
  29.  
    <**groupId**>junit</**groupId**>
  30.  
     
  31.  
    <**artifactId**>junit</**artifactId**>
  32.  
     
  33.  
    <**version**>4.12</**version**>
  34.  
     
  35.  
    </**dependency**>
  36.  
     
  37.  
     
  38.  
     
  39.  
    <**dependency**>
  40.  
     
  41.  
    <**groupId**>org.springframework</**groupId**>
  42.  
     
  43.  
    <**artifactId**>spring-test</**artifactId**>
  44.  
     
  45.  
    <**version**>5.1.7.RELEASE</**version**>
  46.  
     
  47.  
    </**dependency**>
  48.  
     
  49.  
    </**dependencies**>
  50.  
     
  51.  
     
  52.  
     
  53.  
    <**build**>
  54.  
     
  55.  
    <**plugins**>
  56.  
     
  57.  
    <**plugin**>
  58.  
     
  59.  
    <**groupId**>org.apache.maven.plugins</**groupId**>
  60.  
     
  61.  
    <**artifactId**>maven-compiler-plugin</**artifactId**>
  62.  
     
  63.  
    <**version**>3.8.0</**version**>
  64.  
     
  65.  
    <**configuration**>
  66.  
     
  67.  
    <**source**>1.8</**source**>
  68.  
     
  69.  
    <**target**>1.8</**target**>
  70.  
     
  71.  
    </**configuration**>
  72.  
     
  73.  
    </**plugin**>
  74.  
     
  75.  
    </**plugins**>
  76.  
     
  77.  
    </**build**>
学新通

③ 在 resource 文件夹下面新建 rabbitmq.properties 文件 和 spring-rabbitmq-consumer.xml 文件

rabbitmq.properties 文件

rabbitmq.host=192.168.197.129

rabbitmq.port=5672

rabbitmq.username=admin

rabbitmq.password=admin

rabbitmq.virtual-host=/

spring-rabbitmq-consumer.xml 文件

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="Index of /schema/beans"

  1.  
    **xmlns:xsi**="http://www.w3.org/2001/XMLSchema-instance"
  2.  
     
  3.  
    **xmlns:context**="http://www.springframework.org/schema/context"
  4.  
     
  5.  
    **xmlns:rabbit**="http://www.springframework.org/schema/rabbit"
  6.  
     
  7.  
    **xsi:schemaLocation**="http://www.springframework.org/schema/beans
  8.  
     
  9.  
    http://www.springframework.org/schema/beans/spring-beans.xsd
  10.  
     
  11.  
    http://www.springframework.org/schema/context
  12.  
     
  13.  
    https://www.springframework.org/schema/context/spring-context.xsd
  14.  
     
  15.  
    http://www.springframework.org/schema/rabbit
  16.  
     
  17.  
    http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
  18.  
     
  19.  
    <!--加载配置文件-->
  20.  
     
  21.  
    <**context:property-placeholder** **location**="classpath:rabbitmq.properties"/>
  22.  
     
  23.  
     
  24.  
     
  25.  
    <!-- 定义rabbitmq connectionFactory -->
  26.  
     
  27.  
    <**rabbit:connection-factory** **id**="connectionFactory" **host**="${rabbitmq.host}"
  28.  
     
  29.  
    **port**="${rabbitmq.port}"
  30.  
     
  31.  
    **username**="${rabbitmq.username}"
  32.  
     
  33.  
    **password**="${rabbitmq.password}"
  34.  
     
  35.  
    **virtual-host**="${rabbitmq.virtual-host}"/>
  36.  
     
  37.  
     
  38.  
     
  39.  
    <**context:component-scan** **base-package**="com.atguigu.listener" />
  40.  
     
  41.  
     
  42.  
     
  43.  
    <!--定义监听器容器
  44.  
     
  45.  
    acknowledge="manual":手动签收
  46.  
     
  47.  
    -->
  48.  
     
  49.  
    <**rabbit:listener-container** **connection-factory**="connectionFactory" **acknowledge**="manual">
  50.  
     
  51.  
    <**rabbit:listener** **ref**="ackListener" **queue-names**="test_queue_confirm"></**rabbit:listener**>
  52.  
     
  53.  
    </**rabbit:listener-container**>
  54.  
     
学新通

</beans>

7.1.3.1 自动确认

④ 添加监听器

package com.atguigu.listener;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.core.MessageListener;

import org.springframework.stereotype.Component;

@Component

public class AckListener implements MessageListener {

  1.  
    @Override
  2.  
     
  3.  
    **public** **void** **onMessage**(Message message) {
  4.  
     
  5.  
    System.out.println(**new** String(message.getBody()));
  6.  
     
  7.  
    }

}

7.1.3.2 手动确认

④ 添加监听器

package com.atguigu.listener;

import com.rabbitmq.client.Channel;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;

import org.springframework.stereotype.Component;

import java.io.IOException;

/**

  • Consumer ACK机制:

    1. 设置手动签收。acknowledge="manual"
    1. 让监听器类实现ChannelAwareMessageListener接口
    1. 如果消息成功处理,则调用channel的 basicAck()签收
    1. 如果消息处理失败,则调用channel的basicNack()拒绝签收,broker重新发送给consumer

*/

@Component

public class AckListener implements ChannelAwareMessageListener {

  1.  
    @Override
  2.  
     
  3.  
    **public** **void** **onMessage**(Message message, Channel channel) **throws** Exception {
  4.  
     
  5.  
    Thread.sleep(1000);
  6.  
     
  7.  
    // 获取消息传递标记
  8.  
     
  9.  
    **long** deliveryTag = message.getMessageProperties().getDeliveryTag();
  10.  
     
  11.  
    **try** {
  12.  
     
  13.  
    // ① 接收消息
  14.  
     
  15.  
    System.out.println(**new** String(message.getBody()));
  16.  
     
  17.  
    // ② 处理业务逻辑
  18.  
     
  19.  
    System.out.println("处理业务逻辑");
  20.  
     
  21.  
    **int** i = 3/0;//出现错误
  22.  
     
  23.  
    // ③ 手动签收
  24.  
     
  25.  
    /**
  26.  
     
  27.  
    * 第一个参数:表示收到的标签
  28.  
     
  29.  
    * 第二个参数:如果为true表示可以签收所有的消息
  30.  
     
  31.  
    */
  32.  
     
  33.  
    channel.basicAck(deliveryTag,**true**);
  34.  
     
  35.  
    } **catch** (Exception e) {
  36.  
     
  37.  
    e.printStackTrace();
  38.  
     
  39.  
    // ④ 拒绝签收
  40.  
     
  41.  
    /*
  42.  
     
  43.  
    第三个参数:requeue:重回队列。
  44.  
     
  45.  
    设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
  46.  
     
  47.  
    */
  48.  
     
  49.  
    channel.basicNack(deliveryTag,**true**,**true**);
  50.  
     
  51.  
    }
  52.  
     
  53.  
    }
学新通

}

⑤ 添加测试类

package com.atguigu;

import org.junit.Test;

import org.junit.runner.RunWith;

import org.springframework.test.context.ContextConfiguration;

import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)

@ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")

public class ConsumerTest {

  1.  
    @Test
  2.  
     
  3.  
    **public** **void** **test**(){
  4.  
     
  5.  
    **while** (**true**){
  6.  
     
  7.  
     
  8.  
     
  9.  
    }
  10.  
     
  11.  
    }
  12.  
    }

运行测试类 ,会一直监听消息 ,查看后台 http://192.168.137.118:15672/#/queues 

学新通

 当程序报错,程序会拒绝签收,直到修改错误,修改上面的监听器,注释 除 0 错误 ,重新运行程序学新通

Consumer Ack 小结

在rabbit:listener-container标签中设置acknowledge属性,设置ack方式 none:自动确认,manual:手动确认

如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,true);方法确认签收消息

如果出现异常,则在catch中调用 basicNack,拒绝消息,让MQ重新发送消息。

7.2 消费端限流

学新通

① 在项目 rabbitmq-consumer-spring ,新建 com.atguigu.listener.QosListener

package com.atguigu.listener;

import com.rabbitmq.client.Channel;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;

/**

  • Consumer 限流机制

    1. 确保消息被确认。不确认是不继续处理其他消息的
    1. listener-container配置属性
  •  prefetch = 1,表示消费端每次从mq拉去一条消息来消费,直到手动确认消费完毕后,才会继续拉取下一条消息。
    

*/

@Component

public class QosListener implements ChannelAwareMessageListener {

  1.  
    @Override
  2.  
     
  3.  
    **public** **void** **onMessage**(Message message, Channel channel) **throws** Exception {
  4.  
     
  5.  
    //1.获取消息
  6.  
     
  7.  
    System.out.println(**new** String(message.getBody()));
  8.  
     
  9.  
    }

}

② 修改spring-rabbitmq-consumer.xml 配置文件

 学新通

运行消费者,等待消息….

③ 在项目 rabbitmq-producer-spring , ProducerTest 测试方法

package com.atguigu;

import org.junit.Test;

import org.junit.runner.RunWith;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.connection.CorrelationData;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.test.context.ContextConfiguration;

import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)

@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")

public class ProducerTest {

  1.  
    @Test
  2.  
     
  3.  
    **public** **void** **testSend**() {
  4.  
     
  5.  
    **for** (**int** i = 0; i < 10; i ) {
  6.  
     
  7.  
    rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","message confirm");
  8.  
     
  9.  
    }
  10.  
     
  11.  
    }

}

运行提供者

学新通

④ 查看后台 ,有 9 条消息待消费 ,有 1 条消息未确认

学新通 

 ⑤ 修改消费者配置文件 ,去掉 prefetch="1" 会发现一次就可以消费所有消息

学新通

运行消费者测试类 ConsumerTest

 学新通

 ⑥ 修改 QosListener , 添加手动签收方法 ,这样就可以确认消费限流

学新通

运行程序

⑦ 消费端限流小结

  • 在 rabbit:listener-container中配置 prefetch 属性设置消费端一次拉取多少条消息
  • 消费端的必须确认才会继续处理其他消息。

7.3 TTL

TTL 全称 Time To Live(存活时间/过期时间)。

当消息到达存活时间后,还没有被消费,会被自动清除。

RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。

 学新通

7.3.1 控制后台演示消息过期

① 修改管理后台界面,增加队列

参数:表示过期时间,单位毫秒 ,10000表示10秒

学新通

 ② 增加交换机

学新通

 ③ 绑定队列

学新通

④ 发送消息

Delivery mode:2-Persistent表示需要进行持久化

学新通

 ⑤ 查看消息,可以看到消息,但十秒之后,消息自动消失,因为我们设置了十秒消息过期学新通

7.3.2 代码实现

7.3.2.1 队列统一过期

修改 rabbitmq-producer-spring 项目的 配置文件 spring-rabbitmq-producer.xml

学新通

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="Index of /schema/beans"

  1.  
    **xmlns:xsi**="http://www.w3.org/2001/XMLSchema-instance"
  2.  
     
  3.  
    **xmlns:context**="http://www.springframework.org/schema/context"
  4.  
     
  5.  
    **xmlns:rabbit**="http://www.springframework.org/schema/rabbit"
  6.  
     
  7.  
    **xsi:schemaLocation**="http://www.springframework.org/schema/beans
  8.  
     
  9.  
    http://www.springframework.org/schema/beans/spring-beans.xsd
  10.  
     
  11.  
    http://www.springframework.org/schema/context
  12.  
     
  13.  
    https://www.springframework.org/schema/context/spring-context.xsd
  14.  
     
  15.  
    http://www.springframework.org/schema/rabbit
  16.  
     
  17.  
    http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
  18.  
     
  19.  
    <!--加载配置文件-->
  20.  
     
  21.  
    <**context:property-placeholder** **location**="classpath:rabbitmq.properties"/>
  22.  
     
  23.  
     
  24.  
     
  25.  
    <!-- 定义rabbitmq connectionFactory -->
  26.  
     
  27.  
    <**rabbit:connection-factory** **id**="connectionFactory" **host**="${rabbitmq.host}"
  28.  
     
  29.  
    **port**="${rabbitmq.port}"
  30.  
     
  31.  
    **username**="${rabbitmq.username}"
  32.  
     
  33.  
    **password**="${rabbitmq.password}"
  34.  
     
  35.  
    **virtual-host**="${rabbitmq.virtual-host}"
  36.  
     
  37.  
    **publisher-confirms**="true"
  38.  
     
  39.  
    **publisher-returns**="true"/>
  40.  
     
  41.  
    <!--定义管理交换机、队列-->
  42.  
     
  43.  
    <**rabbit:admin** **connection-factory**="connectionFactory"/>
  44.  
     
  45.  
     
  46.  
     
  47.  
    <!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
  48.  
     
  49.  
    <**rabbit:template** **id**="rabbitTemplate" **connection-factory**="connectionFactory"/>
  50.  
     
  51.  
     
  52.  
     
  53.  
    <!--消息可靠性投递(生产端)-->
  54.  
     
  55.  
    <**rabbit:queue** **id**="test_queue_confirm" **name**="test_queue_confirm"></**rabbit:queue**>
  56.  
     
  57.  
    <**rabbit:direct-exchange** **name**="test_exchange_confirm">
  58.  
     
  59.  
    <**rabbit:bindings**>
  60.  
     
  61.  
    <**rabbit:binding** **queue**="test_queue_confirm" **key**="confirm"></**rabbit:binding**>
  62.  
     
  63.  
    </**rabbit:bindings**>
  64.  
     
  65.  
    </**rabbit:direct-exchange**>
  66.  
     
  67.  
     
  68.  
     
  69.  
    <!--TTL 队列-->
  70.  
     
  71.  
    <**rabbit:queue** **name**="test_queue_ttl" **id**="test_queue_ttl">
  72.  
     
  73.  
    <!--设置queue的参数-->
  74.  
     
  75.  
    <**rabbit:queue-arguments**>
  76.  
     
  77.  
    <!--
  78.  
     
  79.  
    设置x-message-ttl队列的过期时间
  80.  
     
  81.  
    默认情况下value-type的类型是String类型,但时间的类型是number类型,所以需要设置成integer类型
  82.  
     
  83.  
    -->
  84.  
     
  85.  
    <**entry** **key**="x-message-ttl" **value**="10000" **value-type**="java.lang.Integer"></**entry**>
  86.  
     
  87.  
    </**rabbit:queue-arguments**>
  88.  
     
  89.  
    </**rabbit:queue**>
  90.  
     
  91.  
     
  92.  
     
  93.  
    <!--设置交换机-->
  94.  
     
  95.  
    <**rabbit:topic-exchange** **name**="test_exchange_ttl">
  96.  
     
  97.  
    <!--交换机绑定队列-->
  98.  
     
  99.  
    <**rabbit:bindings**>
  100.  
     
  101.  
    <**rabbit:binding** **pattern**="ttl.#" **queue**="test_queue_ttl"></**rabbit:binding**>
  102.  
     
  103.  
    </**rabbit:bindings**>
  104.  
     
  105.  
    </**rabbit:topic-exchange**>
学新通

</beans>

在测试类 ProducerTest 中,添加测试方法,发送消息

@Test

public void testTTL() {

  1.  
    **for** (**int** i = 0; i < 10; i ) {
  2.  
     
  3.  
    rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.hehe","message ttl");
  4.  
     
  5.  
    }

}

查看控制台,发现有10条消息,十秒之后自动过期

学新通

7.3.2.2 消息过期

  1.  
    /**
  2.  
     
  3.  
    * TTL:过期时间
  4.  
     
  5.  
    * 1. 队列统一过期
  6.  
     
  7.  
    * 2. 消息单独过期
  8.  
     
  9.  
    * 如果设置了消息的过期时间,也设置了队列的过期时间,它**以时间****短****的为准**。
  10.  
     
  11.  
    */
  12.  
     
  13.  
    @Test
  14.  
     
  15.  
    **public** **void** **testMessageTtl**() {
  16.  
     
  17.  
    // 消息后处理对象,设置一些消息的参数信息
  18.  
     
  19.  
    MessagePostProcessor messagePostProcessor = **new** MessagePostProcessor() {
  20.  
     
  21.  
     
  22.  
     
  23.  
    @Override
  24.  
     
  25.  
    **public** Message **postProcessMessage**(Message message) **throws** AmqpException {
  26.  
     
  27.  
    //1.设置message的信息
  28.  
     
  29.  
    // 第二个方法:消息的过期时间 ,5秒之后过期
  30.  
     
  31.  
    message.getMessageProperties().setExpiration("5000");
  32.  
     
  33.  
    //2.返回该消息
  34.  
     
  35.  
    **return** message;
  36.  
     
  37.  
    }
  38.  
     
  39.  
    };
  40.  
     
  41.  
     
  42.  
     
  43.  
    //消息单独过期
  44.  
     
  45.  
    rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.hehe","message ttl....",messagePostProcessor);
  46.  
     
  47.  
    }
学新通

运行程序,查看后台管理系统

学新通

7.4 死信队列

死信队列,英文缩写:DLX 。DeadLetter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。

什么是死信队列

先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信,自然就有了死信队列;

学新通

消息成为死信的三种情况:

  1. 队列消息数量到达限制;比如队列最大只能存储10条消息,而发了11条消息,根据先进先出,最先发的消息会进入死信队列。
  2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
  3. 原队列存在消息过期设置,消息到达超时时间未被消费;

死信的处理方式

死信的产生既然不可避免,那么就需要从实际的业务角度和场景出发,对这些死信进行后续的处理,常见的处理方式大致有下面几种,

① 丢弃,如果不是很重要,可以选择丢弃

② 记录死信入库,然后做后续的业务分析或处理

③ 通过死信队列,由负责监听死信的应用程序进行处理

综合来看,更常用的做法是第三种,即通过死信队列,将产生的死信通过程序的配置路由到指定的死信队列,然后应用监听死信队列,对接收到的死信做后续的处理,

队列绑定死信交换机:

给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key

学新通

7.4.1 过期时间代码实现

修改生产者项目的配置文件 spring-rabbitmq-producer.xml ,增加如下代码

<!--

  1.  
    死信队列:
  2.  
     
  3.  
    1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)
  4.  
     
  5.  
    2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
  6.  
     
  7.  
    3. 正常队列绑定死信交换机
  8.  
     
  9.  
    设置两个参数:
  10.  
     
  11.  
    * x-dead-letter-exchange:死信交换机名称
  12.  
     
  13.  
    * x-dead-letter-routing-key:发送给死信交换机的routingkey

-->

  1.  
    <!--
  2.  
     
  3.  
    1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)
  4.  
     
  5.  
    -->
  6.  
     
  7.  
    <**rabbit:queue** **name**="test_queue_dlx" **id**="test_queue_dlx">
  8.  
     
  9.  
    <!--3. 正常队列绑定死信交换机-->
  10.  
     
  11.  
    <**rabbit:queue-arguments**>
  12.  
     
  13.  
    <!--3.1 x-dead-letter-exchange:死信交换机名称-->
  14.  
     
  15.  
    <**entry** **key**="x-dead-letter-exchange" **value**="exchange_dlx"/>
  16.  
     
  17.  
    <!--3.2 x-dead-letter-routing-key:发送给死信交换机的routingkey-->
  18.  
     
  19.  
    <**entry** **key**="x-dead-letter-routing-key" **value**="dlx.hehe"></**entry**>
  20.  
     
  21.  
    <!--4.1 设置队列的过期时间 ttl-->
  22.  
     
  23.  
    <**entry** **key**="x-message-ttl" **value**="10000" **value-type**="java.lang.Integer"/>
  24.  
     
  25.  
    <!--4.2 设置队列的长度限制 max-length -->
  26.  
     
  27.  
    <**entry** **key**="x-max-length" **value**="10" **value-type**="java.lang.Integer"/>
  28.  
     
  29.  
    </**rabbit:queue-arguments**>
  30.  
     
  31.  
    </**rabbit:queue**>
  32.  
     
  33.  
    <!--正常交换机-->
  34.  
     
  35.  
    <**rabbit:topic-exchange** **name**="test_exchange_dlx">
  36.  
     
  37.  
    <**rabbit:bindings**>
  38.  
     
  39.  
    <**rabbit:binding** **pattern**="test.dlx.#" **queue**="test_queue_dlx"></**rabbit:binding**>
  40.  
     
  41.  
    </**rabbit:bindings**>
  42.  
     
  43.  
    </**rabbit:topic-exchange**>
  44.  
     
  45.  
    <!--
  46.  
     
  47.  
    2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
学新通

-->

  1.  
    <**rabbit:queue** **name**="queue_dlx" **id**="queue_dlx"></**rabbit:queue**>
  2.  
     
  3.  
    <**rabbit:topic-exchange** **name**="exchange_dlx">
  4.  
     
  5.  
    <**rabbit:bindings**>
  6.  
     
  7.  
    <**rabbit:binding** **pattern**="dlx.#" **queue**="queue_dlx"></**rabbit:binding**>
  8.  
     
  9.  
    </**rabbit:bindings**>
  10.  
     
  11.  
    </**rabbit:topic-exchange**>

在测试类中,添加如下方法,进行测试

  1.  
    /**
  2.  
     
  3.  
    * 发送测试死信消息:
  4.  
     
  5.  
    * 1. 过期时间
  6.  
     
  7.  
    * 2. 长度限制
  8.  
     
  9.  
    * 3. 消息拒收
  10.  
     
  11.  
    */
  12.  
     
  13.  
    @Test
  14.  
     
  15.  
    **public** **void** **testDlx**(){
  16.  
     
  17.  
    //1. 测试过期时间,死信消息
  18.  
     
  19.  
    rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");
  20.  
     
  21.  
    }
学新通

运行测试,查看管理台界面

学新通

7.4.2 长度限制代码实现

修改测试类,添加测试方法

  1.  
    /**
  2.  
     
  3.  
    * 发送测试死信消息:
  4.  
     
  5.  
    * 1. 过期时间
  6.  
     
  7.  
    * 2. 长度限制
  8.  
     
  9.  
    * 3. 消息拒收
  10.  
     
  11.  
    */
  12.  
     
  13.  
    @Test
  14.  
     
  15.  
    **public** **void** **testDlx**(){
  16.  
     
  17.  
    //1. 测试过期时间,死信消息
  18.  
     
  19.  
    //rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");
  20.  
     
  21.  
     
  22.  
     
  23.  
    //2. 测试长度限制后,消息死信
  24.  
     
  25.  
    **for** (**int** i = 0; i < 20; i ) {
  26.  
     
  27.  
    rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");
  28.  
     
  29.  
    }
  30.  
     
  31.  
    }
学新通

运行测试方法,进行测试

学新通

7.4.3 测试消息拒收

在消费者工程 创建 com.atguigu.listener.DlxListener

package com.atguigu.listener;

import com.rabbitmq.client.Channel;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;

import org.springframework.stereotype.Component;

@Component

public class DlxListener implements ChannelAwareMessageListener {

  1.  
    @Override
  2.  
     
  3.  
    **public** **void** **onMessage**(Message message, Channel channel) **throws** Exception {
  4.  
     
  5.  
    **long** deliveryTag = message.getMessageProperties().getDeliveryTag();
  6.  
     
  7.  
     
  8.  
     
  9.  
    **try** {
  10.  
     
  11.  
    //1.接收转换消息
  12.  
     
  13.  
    System.out.println(**new** String(message.getBody()));
  14.  
     
  15.  
     
  16.  
     
  17.  
    //2. 处理业务逻辑
  18.  
     
  19.  
    System.out.println("处理业务逻辑...");
  20.  
     
  21.  
    **int** i = 3/0;//出现错误
  22.  
     
  23.  
    //3. 手动签收
  24.  
     
  25.  
    channel.basicAck(deliveryTag,**true**);
  26.  
     
  27.  
    } **catch** (Exception e) {
  28.  
     
  29.  
    //e.printStackTrace();
  30.  
     
  31.  
    System.out.println("出现异常,拒绝接受");
  32.  
     
  33.  
    //4.拒绝签收,不重回队列 requeue=false
  34.  
     
  35.  
    channel.basicNack(deliveryTag,**true**,**false**);
  36.  
     
  37.  
    }
  38.  
     
  39.  
    }
学新通

}

修改消费者配置文件 spring-rabbitmq-consumer.xml

学新通

  1.  
    <**rabbit:listener-container** **connection-factory**="connectionFactory" **acknowledge**="manual">
  2.  
     
  3.  
    <!--<rabbit:listener ref="ackListener" queue-names="test_queue_confirm"></rabbit:listener>-->
  4.  
     
  5.  
    <!--<rabbit:listener ref="qosListener" queue-names="test_queue_confirm"></rabbit:listener>-->
  6.  
     
  7.  
    <!--定义监听器,监听正常队列-->
  8.  
     
  9.  
    <**rabbit:listener** **ref**="dlxListener" **queue-names**="test_queue_dlx"></**rabbit:listener**>
  10.  
     
  11.  
    </**rabbit:listener-container**>

运行消费者测试类

修改生产者测试代码

  1.  
    /**
  2.  
     
  3.  
    * 发送测试死信消息
  4.  
     
  5.  
    * 1. 过期时间
  6.  
     
  7.  
    * 2. 长度限制
  8.  
     
  9.  
    * 3. 消息拒收
  10.  
     
  11.  
    */
  12.  
     
  13.  
    @Test
  14.  
     
  15.  
    **public** **void** **testDlx**(){
  16.  
     
  17.  
    //1. 测试过期时间,死信消息
  18.  
     
  19.  
    //rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");
  20.  
     
  21.  
     
  22.  
     
  23.  
    //2. 测试长度限制后,消息死信
学新通

// for (int i = 0; i < 20; i ) {

// rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");

// }

  1.  
    //3. 测试消息拒收
  2.  
     
  3.  
    rabbitTemplate.convertAndSend("test_exchange_dlx",
  4.  
     
  5.  
    "test.dlx.haha",
  6.  
     
  7.  
    "我是一条消息,我会死吗?");
  8.  
     
  9.  
    }

发送消息,运行程序,查看后台管理界面

死信队列小结

  1. 死信交换机和死信队列和普通的没有区别
  2. 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列
  3. 消息成为死信的三种情况:
    • 队列消息长度(数量)到达限制;
    • 消费者拒接消费消息,并且不重回队列;
    • 原队列存在消息过期设置,消息到达超时时间未被消费;

7.5 延迟队列

延迟队列存储的对象肯定是对应的延时消息,所谓”延时消息”是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。

场景:在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行取消处理。这时就可以使用延时队列将订单信息发送到延时队列。

需求:

  1. 下单后,30分钟未支付,取消订单,回滚库存。
  2. 新用户注册成功30分钟后,发送短信问候。

实现方式:

  1. 延迟队列

学新通

 学新通

7.5.1 代码实现

7.5.1.1 生产者

修改生产者代码 ,修改生产者配置文件 spring-rabbitmq-producer.xml

学新通

修改生产者,添加测试方法

  1.  
    @Test
  2.  
     
  3.  
    **public** **void** **testDelay**() **throws** InterruptedException {
  4.  
     
  5.  
    //1.发送订单消息。 将来是在订单系统中,下单成功后,发送消息
  6.  
     
  7.  
    rabbitTemplate.convertAndSend("order_exchange",
  8.  
     
  9.  
    "order.msg","订单信息:id=1,time=2020年10月17日11:41:47");
  10.  
     
  11.  
     
  12.  
     
  13.  
    //2.打印倒计时10
  14.  
     
  15.  
    **for** (**int** i = 10; i > 0 ; i--) {
  16.  
     
  17.  
    System.out.println(i "...");
  18.  
     
  19.  
    Thread.sleep(1000);
  20.  
     
  21.  
    }
  22.  
     
  23.  
    }
学新通

运行程序创建订单延时队列

7.5.1.2 消费者

修改消费者项目,添加 com.atguigu.listener.OrderListener

消费者

package com.atguigu.listener;

import com.rabbitmq.client.Channel;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;

import org.springframework.stereotype.Component;

@Component

public class OrderListener implements ChannelAwareMessageListener {

  1.  
    @Override
  2.  
     
  3.  
    **public** **void** **onMessage**(Message message, Channel channel) **throws** Exception {
  4.  
     
  5.  
    **long** deliveryTag = message.getMessageProperties().getDeliveryTag();
  6.  
     
  7.  
     
  8.  
     
  9.  
    **try** {
  10.  
     
  11.  
    //1.接收转换消息
  12.  
     
  13.  
    System.out.println(**new** String(message.getBody()));
  14.  
     
  15.  
     
  16.  
     
  17.  
    //2. 处理业务逻辑
  18.  
     
  19.  
    System.out.println("处理业务逻辑...");
  20.  
     
  21.  
    System.out.println("根据订单id查询其状态...");
  22.  
     
  23.  
    System.out.println("判断状态是否为支付成功");
  24.  
     
  25.  
    System.out.println("取消订单,回滚库存....");
  26.  
     
  27.  
    //3. 手动签收
  28.  
     
  29.  
    channel.basicAck(deliveryTag,**true**);
  30.  
     
  31.  
    } **catch** (Exception e) {
  32.  
     
  33.  
    //e.printStackTrace();
  34.  
     
  35.  
    System.out.println("出现异常,拒绝接受");
  36.  
     
  37.  
    //4.拒绝签收,不重回队列 requeue=false
  38.  
     
  39.  
    channel.basicNack(deliveryTag,**true**,**false**);
  40.  
     
  41.  
    }
  42.  
     
  43.  
    }
学新通

}

修改消费者配置文件 spring-rabbitmq-consumer.xml

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="Index of /schema/beans"

  1.  
    **xmlns:xsi**="http://www.w3.org/2001/XMLSchema-instance"
  2.  
     
  3.  
    **xmlns:context**="http://www.springframework.org/schema/context"
  4.  
     
  5.  
    **xmlns:rabbit**="http://www.springframework.org/schema/rabbit"
  6.  
     
  7.  
    **xsi:schemaLocation**="http://www.springframework.org/schema/beans
  8.  
     
  9.  
    http://www.springframework.org/schema/beans/spring-beans.xsd
  10.  
     
  11.  
    http://www.springframework.org/schema/context
  12.  
     
  13.  
    https://www.springframework.org/schema/context/spring-context.xsd
  14.  
     
  15.  
    http://www.springframework.org/schema/rabbit
  16.  
     
  17.  
    http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
  18.  
     
  19.  
    <!--加载配置文件-->
  20.  
     
  21.  
    <**context:property-placeholder** **location**="classpath:rabbitmq.properties"/>
  22.  
     
  23.  
     
  24.  
     
  25.  
    <!-- 定义rabbitmq connectionFactory -->
  26.  
     
  27.  
    <**rabbit:connection-factory** **id**="connectionFactory" **host**="${rabbitmq.host}"
  28.  
     
  29.  
    **port**="${rabbitmq.port}"
  30.  
     
  31.  
    **username**="${rabbitmq.username}"
  32.  
     
  33.  
    **password**="${rabbitmq.password}"
  34.  
     
  35.  
    **virtual-host**="${rabbitmq.virtual-host}"/>
  36.  
     
  37.  
     
  38.  
     
  39.  
    <**context:component-scan** **base-package**="com.atguigu.listener" />
  40.  
     
  41.  
     
  42.  
     
  43.  
    <!--定义监听器容器
  44.  
     
  45.  
    acknowledge="manual":手动签收
  46.  
     
  47.  
    acknowledge="auto" 自动签收
  48.  
     
  49.  
    -->
  50.  
     
  51.  
    <**rabbit:listener-container** **connection-factory**="connectionFactory" **acknowledge**="manual">
  52.  
     
  53.  
     
  54.  
     
  55.  
    <!--<rabbit:listener ref="ackListener" queue-names="test_queue_confirm"></rabbit:listener>-->
  56.  
     
  57.  
    <!--<rabbit:listener ref="qosListener" queue-names="test_queue_confirm"></rabbit:listener>-->
  58.  
     
  59.  
    <!--定义监听器,监听正常队列-->
  60.  
     
  61.  
    <!--<rabbit:listener ref="dlxListener" queue-names="test_queue_dlx"></rabbit:listener>-->
  62.  
     
  63.  
    <!--延迟队列效果实现: 一定要监听的是 死信队列!!!-->
  64.  
     
  65.  
    <**rabbit:listener** **ref**="orderListener" **queue-names**="order_queue_dlx"></**rabbit:listener**>
  66.  
     
  67.  
    </**rabbit:listener-container**>
学新通

</beans>

学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgbhabc
系列文章
更多 icon
同类精品
更多 icon
继续加载