• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

RabbitMQ消息间件在项目的使用

武飞扬头像
子非鱼呀
帮助1

1. RabbitMQ消息中间件

1.1 什么MQ?

MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。

思考: 原来服务与服务之间如何通信?

Openfeign 服务与服务之间直接调用。

学新通 我们也可以使用MQ完成系统与系统之间得调用。

学新通

1.2 MQ优点

1. 应用解耦

学新通学新通

 2. 异步提速

学新通

3. 削锋填谷  

学新通

1.3 MQ缺点

 学新通

1.4 如何选择MQ

学新通  

1.5 MQ得种类

rabbitMQ

kafka

RocketMQ

ActiveMQ

1.6 RabbitMQ

安装RabbitMQ <<详细内容看另一篇博客---RabbitMQ安装说明文档>>

1.7 概述端口号

学新通

 1.8 rabbit的工作原理

学新通

2. java程序连接RabbitMQ服务---maven项目

提供了5种模式。

        1.简单模式--Hello

        学新通

        2. 工作者模式--work queues

        学新通

        3.发布订阅模式---

        学新通

         4.路由模式--router

        学新通

        5.主题模式--topic

        学新通

 

2.1 准备工作

2.1.1 创建maven项目

学新通

2.1.2 添加依赖--在父工程下添加依赖

  1.  
    <dependencies>
  2.  
    <dependency>
  3.  
    <groupId>com.rabbitmq</groupId>
  4.  
    <artifactId>amqp-client</artifactId>
  5.  
    <version>5.14.2</version>
  6.  
    </dependency>
  7.  
    </dependencies>

2.1.3 启动rabbitmq

我这里rabbitmq安装在本地虚拟机上,直接开启虚拟机输入以下命令就可以进行测试了

centos6用这个命令:
/sbin/service rabbitmq-server restart

centos7用这个命令:
systemctl start rabbitmq-server

2.2 simple 简单模式

学新通

 P: 一个生产者

C: 一个消费者

Q: 队列

 生产者负责把消息发送到队列,消费者负责把队列的消息消费掉并确认消费

代码--生产者:

  1.  
    package com.wt.service;
  2.  
     
  3.  
    import com.rabbitmq.client.Channel;
  4.  
    import com.rabbitmq.client.Connection;
  5.  
    import com.rabbitmq.client.ConnectionFactory;
  6.  
     
  7.  
    import java.io.IOException;
  8.  
    import java.util.concurrent.TimeoutException;
  9.  
     
  10.  
    /**
  11.  
    * @Author wt
  12.  
    * @Date 2022/9/19 20:31
  13.  
    * @PackageName:com.wt.service
  14.  
    * @ClassName: Test01
  15.  
    * @Description: 简单模式
  16.  
    * @Version 1.0
  17.  
    */
  18.  
    public class Test01 {
  19.  
    public static void main(String[] args) throws IOException, TimeoutException {
  20.  
    /**
  21.  
    * 连接rabbitmq
  22.  
    */
  23.  
    ConnectionFactory factory = new ConnectionFactory();
  24.  
    //设置rabbilemMq服务器的地址 默认为localhost
  25.  
    factory.setHost("192.168.135.156");
  26.  
    //设置rabbitMQ的端口号 默认5672
  27.  
    factory.setPort(5672);
  28.  
    //设置账号和密码 默认guest
  29.  
    factory.setUsername("guest");
  30.  
    factory.setPassword("guest");
  31.  
    //设置虚拟主机名 默认 /
  32.  
    factory.setVirtualHost("/");
  33.  
     
  34.  
    //获取连接通道
  35.  
    Connection connection = factory.newConnection();
  36.  
    //获取channel信道
  37.  
    Channel channel = connection.createChannel();
  38.  
     
  39.  
    //创建队列
  40.  
    /**
  41.  
    * 如果该队列名不存在则自动创建,存在则不创建
  42.  
    * String queue ,队列名
  43.  
    * boolean durable ,是否持久化
  44.  
    * boolean exclusive (独占)声明队列同一时间只能包含一个连接,且该队列只有这一个被连接使用。
  45.  
    * boolean autoDelte ,是否自动删除
  46.  
    * Map<String,Object>arguments
  47.  
    */
  48.  
    channel.queueDeclare("simple_queue",true,false,false,null);
  49.  
     
  50.  
    /**
  51.  
    * String exchange ,把消息发给哪个交换机--简单模式没有交换机“”
  52.  
    * String routingKey ,消息绑定的路由key 如果为简单模式 默认写为队列名称
  53.  
    * BasicProperties props, 消息的属性
  54.  
    * byte[] body: 消息的内容
  55.  
    */
  56.  
    String msg = "{code:2000,name:张三,age:18}";
  57.  
    channel.basicPublish("","simple_queue",null,msg.getBytes());
  58.  
     
  59.  
    connection.close();
  60.  
    }
  61.  
    }
学新通

学新通 代码-消费者:

  1.  
    package com.wt.simple;
  2.  
     
  3.  
    import com.rabbitmq.client.*;
  4.  
     
  5.  
    import java.io.IOException;
  6.  
     
  7.  
    /**
  8.  
    * @Author wt
  9.  
    * @Date 2022/9/19 21:22
  10.  
    * @PackageName:com.wt.simple
  11.  
    * @ClassName: Customer
  12.  
    * @Description: 简单模式
  13.  
    * @Version 1.0
  14.  
    */
  15.  
    public class Customer {
  16.  
    public static void main(String[] args) throws Exception{
  17.  
    /**
  18.  
    * 连接rabbitmq
  19.  
    */
  20.  
    ConnectionFactory factory = new ConnectionFactory();
  21.  
    //设置rabbilemMq服务器的地址 默认为localhost
  22.  
    factory.setHost("192.168.135.156");
  23.  
    //设置rabbitMQ的端口号 默认5672
  24.  
    factory.setPort(5672);
  25.  
    //设置账号和密码 默认guest
  26.  
    factory.setUsername("guest");
  27.  
    factory.setPassword("guest");
  28.  
    //设置虚拟主机名 默认 /
  29.  
    factory.setVirtualHost("/");
  30.  
     
  31.  
    //获取连接通道
  32.  
    Connection connection = factory.newConnection();
  33.  
    //获取channel信道
  34.  
    Channel channel = connection.createChannel();
  35.  
     
  36.  
    //监听队列
  37.  
    /**
  38.  
    * String queue 监听的队列名称
  39.  
    * autoAck :是否自动确认消息
  40.  
    * Consumer callback: 监听到消息后触发的回调函数
  41.  
    */
  42.  
     
  43.  
    DefaultConsumer callback = new DefaultConsumer(channel){
  44.  
    //一旦有消息就会触发该方法
  45.  
    //body:表示消息的内容
  46.  
    @Override
  47.  
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  48.  
    System.out.println("接收的消息内容:" new String(body));
  49.  
    }
  50.  
    };
  51.  
     
  52.  
    channel.basicConsume("simple_queue",true,callback);
  53.  
     
  54.  
    }
  55.  
    }
学新通

不要关闭连接对象

2.3 woker模式

学新通

work模式与简单模式的区别是worker模式的一个队列对应多个消费者

P:生产者

C1:消费者1

C2:消费者2

Q: 队列

消费者1和消费者2属于竞争关系,一个消息只会被一个消费者消费

 代码---生产者:

  1.  
    package com.wt.work;
  2.  
     
  3.  
    import com.rabbitmq.client.Channel;
  4.  
    import com.rabbitmq.client.Connection;
  5.  
    import com.rabbitmq.client.ConnectionFactory;
  6.  
     
  7.  
    import java.io.IOException;
  8.  
    import java.util.concurrent.TimeoutException;
  9.  
     
  10.  
    /**
  11.  
    * @Author wt
  12.  
    * @Date 2022/9/19 21:38
  13.  
    * @PackageName:com.wt.service.test
  14.  
    * @ClassName: Work
  15.  
    * @Description: work模式
  16.  
    * @Version 1.0
  17.  
    */
  18.  
    public class WorkTest {
  19.  
    public static void main(String[] args) throws IOException, TimeoutException {
  20.  
    ConnectionFactory factory = new ConnectionFactory();
  21.  
    //设置rabbilemMq服务器的地址 默认为localhost
  22.  
    factory.setHost("192.168.135.156");
  23.  
    //设置rabbitMQ的端口号 默认5672
  24.  
    factory.setPort(5672);
  25.  
    //设置账号和密码 默认guest
  26.  
    factory.setUsername("guest");
  27.  
    factory.setPassword("guest");
  28.  
    //设置虚拟主机名 默认 /
  29.  
    factory.setVirtualHost("/");
  30.  
     
  31.  
    //获取连接通道
  32.  
    Connection connection = factory.newConnection();
  33.  
    //获取channel信道
  34.  
    Channel channel = connection.createChannel();
  35.  
    //创建队列
  36.  
    /**
  37.  
    * 如果该队列名不存在则自动创建,存在则不创建
  38.  
    * String queue ,队列名
  39.  
    * boolean durable ,是否持久化
  40.  
    * boolean exclusive (独占)声明队列同一时间只能包含一个连接,且该队列只有这一个被连接使用。
  41.  
    * boolean autoDelte ,是否自动删除
  42.  
    * Map<String,Object>arguments
  43.  
    */
  44.  
    channel.queueDeclare("work_queue",true,false,false,null);
  45.  
     
  46.  
    /**
  47.  
    * String exchange ,把消息发给哪个交换机--简单模式没有交换机“”
  48.  
    * String routingKey ,消息绑定的路由key 如果为简单模式 默认写为队列名称
  49.  
    * BasicProperties props, 消息的属性
  50.  
    * byte[] body: 消息的内容
  51.  
    */
  52.  
    for (int i=0;i<=10;i ){
  53.  
    String msg = "{code:2000,name:张三,age:18}" i;
  54.  
    channel.basicPublish("","work_queue",null,msg.getBytes());
  55.  
    }
  56.  
     
  57.  
     
  58.  
    connection.close();
  59.  
    }
  60.  
    }
学新通

代码--消费01:

  1.  
    package com.wt.work;
  2.  
     
  3.  
    import com.rabbitmq.client.*;
  4.  
     
  5.  
    import java.io.IOException;
  6.  
     
  7.  
    /**
  8.  
    * @Author wt
  9.  
    * @Date 2022/9/19 21:22
  10.  
    * @PackageName:com.wt.simple
  11.  
    * @ClassName: Customer
  12.  
    * @Description: worker-----消费者01
  13.  
    * @Version 1.0
  14.  
    */
  15.  
    public class CustomerWork01 {
  16.  
    public static void main(String[] args) throws Exception{
  17.  
    ConnectionFactory factory = new ConnectionFactory();
  18.  
    //设置rabbilemMq服务器的地址 默认为localhost
  19.  
    factory.setHost("192.168.135.156");
  20.  
    //设置rabbitMQ的端口号 默认5672
  21.  
    factory.setPort(5672);
  22.  
    //设置账号和密码 默认guest
  23.  
    factory.setUsername("guest");
  24.  
    factory.setPassword("guest");
  25.  
    //设置虚拟主机名 默认 /
  26.  
    factory.setVirtualHost("/");
  27.  
     
  28.  
    //获取连接通道
  29.  
    Connection connection = factory.newConnection();
  30.  
    //获取channel信道
  31.  
    Channel channel = connection.createChannel();
  32.  
     
  33.  
    //监听队列
  34.  
    /**
  35.  
    * String queue 监听的队列名称
  36.  
    * autoAck :是否自动确认消息
  37.  
    * Consumer callback: 监听到消息后触发的回调函数
  38.  
    */
  39.  
     
  40.  
    DefaultConsumer callback = new DefaultConsumer(channel){
  41.  
    //一旦有消息就会触发该方法
  42.  
    //body:表示消息的内容
  43.  
    @Override
  44.  
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  45.  
    System.out.println("接收的消息内容:" new String(body));
  46.  
    }
  47.  
    };
  48.  
     
  49.  
    channel.basicConsume("work_queue",true,callback);
  50.  
     
  51.  
    }
  52.  
    }
学新通

 代码--消费者02--和上面的消费者相同。

可以先提前开启两个消费者,然后在开启创造者,观察消息都被哪个消费者消费了,是否有重复消费

2.4 public/Subscribe发布订阅模式

学新通

public模式和worker模式的区别是在worker的基础上新增加了一个交换机x,生产者传输消息给交换机,交换机再将相应的信息发送给两个队列(两个队列接收的信息相同),两个队列分别对应的消费者

p: producter 生产者

x:exchange交换机

Q: 队列

C1和C2:消费者

生产者--代码:

  1.  
    package com.wt.publish;
  2.  
     
  3.  
    import com.rabbitmq.client.BuiltinExchangeType;
  4.  
    import com.rabbitmq.client.Channel;
  5.  
    import com.rabbitmq.client.Connection;
  6.  
    import com.rabbitmq.client.ConnectionFactory;
  7.  
     
  8.  
    import java.io.IOException;
  9.  
    import java.util.concurrent.TimeoutException;
  10.  
     
  11.  
    /**
  12.  
    * @Author wt
  13.  
    * @Date 2022/9/19 20:31
  14.  
    * @PackageName:com.wt.service
  15.  
    * @ClassName: Test01
  16.  
    * @Description: public
  17.  
    * @Version 1.0
  18.  
    */
  19.  
    public class PublishTest {
  20.  
    public static void main(String[] args) throws IOException, TimeoutException {
  21.  
    ConnectionFactory factory = new ConnectionFactory();
  22.  
    //设置rabbilemMq服务器的地址 默认为localhost
  23.  
    factory.setHost("192.168.135.156");
  24.  
    //设置rabbitMQ的端口号 默认5672
  25.  
    factory.setPort(5672);
  26.  
    //设置账号和密码 默认guest
  27.  
    factory.setUsername("guest");
  28.  
    factory.setPassword("guest");
  29.  
    //设置虚拟主机名 默认 /
  30.  
    factory.setVirtualHost("/");
  31.  
     
  32.  
    //获取连接通道
  33.  
    Connection connection = factory.newConnection();
  34.  
    //获取channel信道
  35.  
    Channel channel = connection.createChannel();
  36.  
    //创建队列
  37.  
    /**
  38.  
    * 如果该队列名不存在则自动创建,存在则不创建
  39.  
    * String queue ,队列名
  40.  
    * boolean durable ,是否持久化
  41.  
    * boolean exclusive (独占)声明队列同一时间只能包含一个连接,且该队列只有这一个被连接使用。
  42.  
    * boolean autoDelte ,是否自动删除
  43.  
    * Map<String,Object>arguments
  44.  
    */
  45.  
    channel.queueDeclare("publish_queue01",true,false,false,null);
  46.  
    channel.queueDeclare("publish_queue02",true,false,false,null);
  47.  
     
  48.  
    //创建交换机
  49.  
    /**
  50.  
    * String exchange,交换机的名称
  51.  
    * BuiltinExchangeType type,交换机的种类
  52.  
    * boolean durable:是否持久化
  53.  
    */
  54.  
    channel.exchangeDeclare("publish_queue", BuiltinExchangeType.FANOUT,true);
  55.  
     
  56.  
    //交换机和队列绑定
  57.  
    channel.queueBind("publish_queue01","publish_queue","");
  58.  
    channel.queueBind("publish_queue02","publish_queue","");
  59.  
     
  60.  
     
  61.  
     
  62.  
    /**
  63.  
    * String exchange ,把消息发给哪个交换机--简单模式没有交换机“”
  64.  
    * String routingKey ,消息绑定的路由key 如果为简单模式 默认写为队列名称
  65.  
    * BasicProperties props, 消息的属性
  66.  
    * byte[] body: 消息的内容
  67.  
    */
  68.  
    for (int i=0;i<=10;i ){
  69.  
    String msg = "{code:2000,name:张三,age:18}" i;
  70.  
    channel.basicPublish("publish_queue","",null,msg.getBytes());
  71.  
    }
  72.  
     
  73.  
    connection.close();
  74.  
    }
  75.  
    }
学新通

 消费者--01

  1.  
    package com.wt.publish;
  2.  
     
  3.  
    import com.rabbitmq.client.*;
  4.  
     
  5.  
    import java.io.IOException;
  6.  
     
  7.  
    /**
  8.  
    * @Author wt
  9.  
    * @Date 2022/9/19 21:22
  10.  
    * @PackageName:com.wt.simple
  11.  
    * @ClassName: Customer
  12.  
    * @Description: 消费者01---队列public01
  13.  
    * @Version 1.0
  14.  
    */
  15.  
    public class CustomerPublish01 {
  16.  
    public static void main(String[] args) throws Exception{
  17.  
    ConnectionFactory factory = new ConnectionFactory();
  18.  
    //设置rabbilemMq服务器的地址 默认为localhost
  19.  
    factory.setHost("192.168.135.156");
  20.  
    //设置rabbitMQ的端口号 默认5672
  21.  
    factory.setPort(5672);
  22.  
    //设置账号和密码 默认guest
  23.  
    factory.setUsername("guest");
  24.  
    factory.setPassword("guest");
  25.  
    //设置虚拟主机名 默认 /
  26.  
    factory.setVirtualHost("/");
  27.  
     
  28.  
    //获取连接通道
  29.  
    Connection connection = factory.newConnection();
  30.  
    //获取channel信道
  31.  
    Channel channel = connection.createChannel();
  32.  
     
  33.  
    //监听队列
  34.  
    /**
  35.  
    * String queue 监听的队列名称
  36.  
    * autoAck :是否自动确认消息
  37.  
    * Consumer callback: 监听到消息后触发的回调函数
  38.  
    */
  39.  
     
  40.  
    DefaultConsumer callback = new DefaultConsumer(channel){
  41.  
    //一旦有消息就会触发该方法
  42.  
    //body:表示消息的内容
  43.  
    @Override
  44.  
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  45.  
    System.out.println("接收的消息内容:" new String(body));
  46.  
    }
  47.  
    };
  48.  
     
  49.  
    channel.basicConsume("publish_queue01",true,callback);
  50.  
     
  51.  
    }
  52.  
    }
学新通

消费者02:

  1.  
    package com.wt.publish;
  2.  
     
  3.  
    import com.rabbitmq.client.*;
  4.  
     
  5.  
    import java.io.IOException;
  6.  
     
  7.  
    /**
  8.  
    * @Author wt
  9.  
    * @Date 2022/9/19 21:22
  10.  
    * @PackageName:com.wt.simple
  11.  
    * @ClassName: Customer
  12.  
    * @Description: 消费者02---队列public02
  13.  
    * @Version 1.0
  14.  
    */
  15.  
    public class CustomerPublish02 {
  16.  
    public static void main(String[] args) throws Exception{
  17.  
    ConnectionFactory factory = new ConnectionFactory();
  18.  
    //设置rabbilemMq服务器的地址 默认为localhost
  19.  
    factory.setHost("192.168.135.156");
  20.  
    //设置rabbitMQ的端口号 默认5672
  21.  
    factory.setPort(5672);
  22.  
    //设置账号和密码 默认guest
  23.  
    factory.setUsername("guest");
  24.  
    factory.setPassword("guest");
  25.  
    //设置虚拟主机名 默认 /
  26.  
    factory.setVirtualHost("/");
  27.  
     
  28.  
    //获取连接通道
  29.  
    Connection connection = factory.newConnection();
  30.  
    //获取channel信道
  31.  
    Channel channel = connection.createChannel();
  32.  
     
  33.  
    //监听队列
  34.  
    /**
  35.  
    * String queue 监听的队列名称
  36.  
    * autoAck :是否自动确认消息
  37.  
    * Consumer callback: 监听到消息后触发的回调函数
  38.  
    */
  39.  
     
  40.  
    DefaultConsumer callback = new DefaultConsumer(channel){
  41.  
    //一旦有消息就会触发该方法
  42.  
    //body:表示消息的内容
  43.  
    @Override
  44.  
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  45.  
    System.out.println("接收的消息内容:" new String(body));
  46.  
    }
  47.  
    };
  48.  
     
  49.  
    channel.basicConsume("publish_queue02",true,callback);
  50.  
     
  51.  
    }
  52.  
    }
学新通

2.5 router路由模式

学新通

 router和public的区别是在原先的基础上增加了路由,交换机通过路由来判断将消息发送给哪个队列,而不是像public一样两个队列都会接收到一模一样的消息,只有满足路由的队列才会收到消息

p:生产者

x: 交换机---Direct (路由模式)

c1和c2表示消费者:

Q:队列

生产者--代码:

  1.  
    package com.wt.router;
  2.  
     
  3.  
    import com.rabbitmq.client.BuiltinExchangeType;
  4.  
    import com.rabbitmq.client.Channel;
  5.  
    import com.rabbitmq.client.Connection;
  6.  
    import com.rabbitmq.client.ConnectionFactory;
  7.  
     
  8.  
    import java.io.IOException;
  9.  
    import java.util.concurrent.TimeoutException;
  10.  
     
  11.  
    /**
  12.  
    * @Author wt
  13.  
    * @Date 2022/9/20 14:33
  14.  
    * @PackageName:com.wt.router
  15.  
    * @ClassName: RouterTest
  16.  
    * @Description: router路由模式
  17.  
    * @Version 1.0
  18.  
    */
  19.  
    public class RouterTest {
  20.  
    public static void main(String[] args) throws IOException, TimeoutException {
  21.  
    ConnectionFactory factory = new ConnectionFactory();
  22.  
    //设置rabbilemMq服务器的地址 默认为localhost
  23.  
    factory.setHost("192.168.135.156");
  24.  
    //设置rabbitMQ的端口号 默认5672
  25.  
    factory.setPort(5672);
  26.  
    //设置账号和密码 默认guest
  27.  
    factory.setUsername("guest");
  28.  
    factory.setPassword("guest");
  29.  
    //设置虚拟主机名 默认 /
  30.  
    factory.setVirtualHost("/");
  31.  
     
  32.  
    //获取连接通道
  33.  
    Connection connection = factory.newConnection();
  34.  
    //获取channel信道
  35.  
    Channel channel = connection.createChannel();
  36.  
    //创建队列
  37.  
    /**
  38.  
    * 如果该队列名不存在则自动创建,存在则不创建
  39.  
    * String queue ,队列名
  40.  
    * boolean durable ,是否持久化
  41.  
    * boolean exclusive (独占)声明队列同一时间只能包含一个连接,且该队列只有这一个被连接使用。
  42.  
    * boolean autoDelte ,是否自动删除
  43.  
    * Map<String,Object>arguments
  44.  
    */
  45.  
    channel.queueDeclare("router_queue01",true,false,false,null);
  46.  
    channel.queueDeclare("router_queue02",true,false,false,null);
  47.  
     
  48.  
    //创建交换机
  49.  
    /**
  50.  
    * String exchange,交换机的名称
  51.  
    * BuiltinExchangeType type,交换机的种类
  52.  
    * boolean durable:是否持久化
  53.  
    */
  54.  
    channel.exchangeDeclare("router_queue", BuiltinExchangeType.DIRECT,true);
  55.  
     
  56.  
    //交换机和队列绑定
  57.  
    /**
  58.  
    * s:String queue,队列名 s1:String exchange,交换机名 s2:String routerkey 路由key
  59.  
    * 如果为发布订阅模式(public)则无需有路由key
  60.  
    */
  61.  
    channel.queueBind("router_queue01","router_queue","error");
  62.  
     
  63.  
    channel.queueBind("router_queue02","router_queue","error");
  64.  
    channel.queueBind("router_queue02","router_queue","info");
  65.  
    channel.queueBind("router_queue02","router_queue","warning");
  66.  
     
  67.  
     
  68.  
     
  69.  
    /**
  70.  
    * String exchange ,把消息发给哪个交换机--简单模式没有交换机“”
  71.  
    * String routingKey ,消息绑定的路由key 如果为简单模式 默认写为队列名称
  72.  
    * BasicProperties props, 消息的属性
  73.  
    * byte[] body: 消息的内容
  74.  
    */
  75.  
    String msg = "{code:2000,name:张三,age:18}";
  76.  
    channel.basicPublish("router_queue","info",null,msg.getBytes());
  77.  
    //channel.basicPublish("router_queue","lazy.orange.ss",null,msg.getBytes());
  78.  
     
  79.  
     
  80.  
    connection.close();
  81.  
    }
  82.  
    }
学新通

消费者01-代码:

  1.  
    package com.wt.router;
  2.  
     
  3.  
    import com.rabbitmq.client.*;
  4.  
     
  5.  
    import java.io.IOException;
  6.  
     
  7.  
    /**
  8.  
    * @Author wt
  9.  
    * @Date 2022/9/19 21:22
  10.  
    * @PackageName:com.wt.simple
  11.  
    * @ClassName: Customer
  12.  
    * @Description: router路由模式 ---消费者01
  13.  
    * @Version 1.0
  14.  
    */
  15.  
    public class CustomerRouter01 {
  16.  
    public static void main(String[] args) throws Exception{
  17.  
    ConnectionFactory factory = new ConnectionFactory();
  18.  
    //设置rabbilemMq服务器的地址 默认为localhost
  19.  
    factory.setHost("192.168.135.156");
  20.  
    //设置rabbitMQ的端口号 默认5672
  21.  
    factory.setPort(5672);
  22.  
    //设置账号和密码 默认guest
  23.  
    factory.setUsername("guest");
  24.  
    factory.setPassword("guest");
  25.  
    //设置虚拟主机名 默认 /
  26.  
    factory.setVirtualHost("/");
  27.  
     
  28.  
    //获取连接通道
  29.  
    Connection connection = factory.newConnection();
  30.  
    //获取channel信道
  31.  
    Channel channel = connection.createChannel();
  32.  
     
  33.  
    //监听队列
  34.  
    /**
  35.  
    * String queue 监听的队列名称
  36.  
    * autoAck :是否自动确认消息
  37.  
    * Consumer callback: 监听到消息后触发的回调函数
  38.  
    */
  39.  
     
  40.  
    DefaultConsumer callback = new DefaultConsumer(channel){
  41.  
    //一旦有消息就会触发该方法
  42.  
    //body:表示消息的内容
  43.  
    @Override
  44.  
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  45.  
    System.out.println("接收的消息内容:" new String(body));
  46.  
    }
  47.  
    };
  48.  
     
  49.  
    channel.basicConsume("router_queue01",true,callback);
  50.  
     
  51.  
    }
  52.  
    }
学新通

消费者02:

  1.  
    package com.wt.router;
  2.  
     
  3.  
    import com.rabbitmq.client.*;
  4.  
     
  5.  
    import java.io.IOException;
  6.  
     
  7.  
    /**
  8.  
    * @Author wt
  9.  
    * @Date 2022/9/19 21:22
  10.  
    * @PackageName:com.wt.simple
  11.  
    * @ClassName: Customer
  12.  
    * @Description: router路由模式 ---消费者01
  13.  
    * @Version 1.0
  14.  
    */
  15.  
    public class CustomerRouter02 {
  16.  
    public static void main(String[] args) throws Exception{
  17.  
    ConnectionFactory factory = new ConnectionFactory();
  18.  
    //设置rabbilemMq服务器的地址 默认为localhost
  19.  
    factory.setHost("192.168.135.156");
  20.  
    //设置rabbitMQ的端口号 默认5672
  21.  
    factory.setPort(5672);
  22.  
    //设置账号和密码 默认guest
  23.  
    factory.setUsername("guest");
  24.  
    factory.setPassword("guest");
  25.  
    //设置虚拟主机名 默认 /
  26.  
    factory.setVirtualHost("/");
  27.  
     
  28.  
    //获取连接通道
  29.  
    Connection connection = factory.newConnection();
  30.  
    //获取channel信道
  31.  
    Channel channel = connection.createChannel();
  32.  
     
  33.  
    //监听队列
  34.  
    /**
  35.  
    * String queue 监听的队列名称
  36.  
    * autoAck :是否自动确认消息
  37.  
    * Consumer callback: 监听到消息后触发的回调函数
  38.  
    */
  39.  
     
  40.  
    DefaultConsumer callback = new DefaultConsumer(channel){
  41.  
    //一旦有消息就会触发该方法
  42.  
    //body:表示消息的内容
  43.  
    @Override
  44.  
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  45.  
    System.out.println("接收的消息内容:" new String(body));
  46.  
    }
  47.  
    };
  48.  
     
  49.  
    channel.basicConsume("router_queue02",true,callback);
  50.  
     
  51.  
    }
  52.  
    }
学新通

2.6 主题模式--topic

学新通

topic模式和router模式的区别是将路由有指定内容变成了通配符 

*: 统配一个单词

: 统配零个或者n个单词  

生产者代码

  1.  
    package com.wt.topic;
  2.  
     
  3.  
    import com.rabbitmq.client.BuiltinExchangeType;
  4.  
    import com.rabbitmq.client.Channel;
  5.  
    import com.rabbitmq.client.Connection;
  6.  
    import com.rabbitmq.client.ConnectionFactory;
  7.  
     
  8.  
    import java.io.IOException;
  9.  
    import java.util.concurrent.TimeoutException;
  10.  
     
  11.  
    /**
  12.  
    * @Author wt
  13.  
    * @Date 2022/9/20 14:33
  14.  
    * @PackageName:com.wt.router
  15.  
    * @ClassName: RouterTest
  16.  
    * @Description: topic模式
  17.  
    * @Version 1.0
  18.  
    */
  19.  
    public class TopicTest {
  20.  
    public static void main(String[] args) throws IOException, TimeoutException {
  21.  
    ConnectionFactory factory = new ConnectionFactory();
  22.  
    //设置rabbilemMq服务器的地址 默认为localhost
  23.  
    factory.setHost("192.168.135.156");
  24.  
    //设置rabbitMQ的端口号 默认5672
  25.  
    factory.setPort(5672);
  26.  
    //设置账号和密码 默认guest
  27.  
    factory.setUsername("guest");
  28.  
    factory.setPassword("guest");
  29.  
    //设置虚拟主机名 默认 /
  30.  
    factory.setVirtualHost("/");
  31.  
     
  32.  
    //获取连接通道
  33.  
    Connection connection = factory.newConnection();
  34.  
    //获取channel信道
  35.  
    Channel channel = connection.createChannel();
  36.  
    //创建队列
  37.  
    /**
  38.  
    * 如果该队列名不存在则自动创建,存在则不创建
  39.  
    * String queue ,队列名
  40.  
    * boolean durable ,是否持久化
  41.  
    * boolean exclusive (独占)声明队列同一时间只能包含一个连接,且该队列只有这一个被连接使用。
  42.  
    * boolean autoDelte ,是否自动删除
  43.  
    * Map<String,Object>arguments: 其它参数
  44.  
    */
  45.  
    channel.queueDeclare("topic_queue01",true,false,false,null);
  46.  
    channel.queueDeclare("topic_queue02",true,false,false,null);
  47.  
     
  48.  
     
  49.  
    //创建交换机
  50.  
    /**
  51.  
    * String exchange,交换机的名称
  52.  
    * BuiltinExchangeType type,交换机的种类
  53.  
    * boolean durable:是否持久化
  54.  
    */
  55.  
    channel.exchangeDeclare("topic_queue", BuiltinExchangeType.TOPIC,true);
  56.  
     
  57.  
    //交换机和队列绑定
  58.  
    /**
  59.  
    * String queue,队列名 String exchange,交换机名 String routerkey 路由key 如果为发布订阅模式则
  60.  
    * 无需有路由key
  61.  
    */
  62.  
    channel.queueBind("topic_queue01","topic_queue","*.orange.*");
  63.  
     
  64.  
    channel.queueBind("topic_queue02","topic_queue","*.*.rabbit");
  65.  
    channel.queueBind("topic_queue02","topic_queue","lazy.#");
  66.  
     
  67.  
     
  68.  
     
  69.  
     
  70.  
    //发送消息到队列
  71.  
    /**
  72.  
    * String exchange ,把消息发给哪个交换机--简单模式没有交换机“”
  73.  
    * String routingKey ,消息绑定的路由key 如果为简单模式 默认写为队列名称
  74.  
    * BasicProperties props, 消息的属性
  75.  
    * byte[] body: 消息的内容
  76.  
    */
  77.  
    String msg = "{code:2000,name:张三,age:18}";
  78.  
    channel.basicPublish("topic_queue","lazy.orange.rabbit",null,msg.getBytes());
  79.  
    //channel.basicPublish("router_queue","lazy.orange.ss",null,msg.getBytes());
  80.  
     
  81.  
     
  82.  
    connection.close();
  83.  
    }
  84.  
    }
学新通

消费者01:

  1.  
    package com.wt.topic;
  2.  
     
  3.  
    import com.rabbitmq.client.*;
  4.  
     
  5.  
    import java.io.IOException;
  6.  
     
  7.  
    /**
  8.  
    * @Author wt
  9.  
    * @Date 2022/9/19 21:22
  10.  
    * @PackageName:com.wt.simple
  11.  
    * @ClassName: Customer
  12.  
    * @Description: topic模式-------消费者01
  13.  
    * @Version 1.0
  14.  
    */
  15.  
    public class CustomerTopic01 {
  16.  
    public static void main(String[] args) throws Exception{
  17.  
    ConnectionFactory factory = new ConnectionFactory();
  18.  
    //设置rabbilemMq服务器的地址 默认为localhost
  19.  
    factory.setHost("192.168.135.156");
  20.  
    //设置rabbitMQ的端口号 默认5672
  21.  
    factory.setPort(5672);
  22.  
    //设置账号和密码 默认guest
  23.  
    factory.setUsername("guest");
  24.  
    factory.setPassword("guest");
  25.  
    //设置虚拟主机名 默认 /
  26.  
    factory.setVirtualHost("/");
  27.  
     
  28.  
    //获取连接通道
  29.  
    Connection connection = factory.newConnection();
  30.  
    //获取channel信道
  31.  
    Channel channel = connection.createChannel();
  32.  
     
  33.  
    //监听队列
  34.  
    /**
  35.  
    * String queue 监听的队列名称
  36.  
    * autoAck :是否自动确认消息
  37.  
    * Consumer callback: 监听到消息后触发的回调函数
  38.  
    */
  39.  
     
  40.  
    DefaultConsumer callback = new DefaultConsumer(channel){
  41.  
    //一旦有消息就会触发该方法
  42.  
    //body:表示消息的内容
  43.  
    @Override
  44.  
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  45.  
    System.out.println("接收的消息内容:" new String(body));
  46.  
    }
  47.  
    };
  48.  
     
  49.  
    channel.basicConsume("topic_queue01",true,callback);
  50.  
     
  51.  
    }
  52.  
    }
学新通

消费者02:

  1.  
    package com.wt.topic;
  2.  
     
  3.  
    import com.rabbitmq.client.*;
  4.  
     
  5.  
    import java.io.IOException;
  6.  
     
  7.  
    /**
  8.  
    * @Author wt
  9.  
    * @Date 2022/9/19 21:22
  10.  
    * @PackageName:com.wt.simple
  11.  
    * @ClassName: Customer
  12.  
    * @Description: TODO
  13.  
    * @Version 1.0
  14.  
    */
  15.  
    public class CustomerTopic02 {
  16.  
    public static void main(String[] args) throws Exception{
  17.  
    ConnectionFactory factory = new ConnectionFactory();
  18.  
    //设置rabbilemMq服务器的地址 默认为localhost
  19.  
    factory.setHost("192.168.135.156");
  20.  
    //设置rabbitMQ的端口号 默认5672
  21.  
    factory.setPort(5672);
  22.  
    //设置账号和密码 默认guest
  23.  
    factory.setUsername("guest");
  24.  
    factory.setPassword("guest");
  25.  
    //设置虚拟主机名 默认 /
  26.  
    factory.setVirtualHost("/");
  27.  
     
  28.  
    //获取连接通道
  29.  
    Connection connection = factory.newConnection();
  30.  
    //获取channel信道
  31.  
    Channel channel = connection.createChannel();
  32.  
     
  33.  
    //监听队列
  34.  
    /**
  35.  
    * String queue 监听的队列名称
  36.  
    * autoAck :是否自动确认消息
  37.  
    * Consumer callback: 监听到消息后触发的回调函数
  38.  
    */
  39.  
     
  40.  
    DefaultConsumer callback = new DefaultConsumer(channel){
  41.  
    //一旦有消息就会触发该方法
  42.  
    //body:表示消息的内容
  43.  
    @Override
  44.  
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  45.  
    System.out.println("接收的消息内容:" new String(body));
  46.  
    }
  47.  
    };
  48.  
     
  49.  
    channel.basicConsume("topic_queue02",true,callback);
  50.  
     
  51.  
    }
  52.  
    }
学新通

3. springboot整合rabbitMQ

3.1 准备工作

3.1.1 创建如下的springboot项目

学新通

 3.1.2 添加依赖

  1.  
    <dependency>
  2.  
    <groupId>org.springframework.boot</groupId>
  3.  
    <artifactId>spring-boot-starter-amqp</artifactId>
  4.  
    </dependency>

3.1.3 添加配置文件application.properties

生产者微服务和消费者微服务都要添加

  1.  
    server.port=9999
  2.  
     
  3.  
    #rabbitMQ的配置
  4.  
    spring.rabbitmq.host=192.168.135.156
  5.  
    spring.rabbitmq.port=5672
  6.  
    #账号密码和host默认为以下配置,如果没有修改可以不写
  7.  
    #spring.rabbitmq.username=guest
  8.  
    #spring.rabbitmq.password=guest
  9.  
    #spring.rabbitmq.virtual-host=/

3.2 springboot的测试

队列我们这次直接在rabbitmq 的图形化界面创建 ,并且给队列创建exchange交换机

创建普通队列test01和test02,创建方式入下图

学新通

 创建交换机testX学新通

 点击testX进入创建好的交换机testX并配置相关内容学新通

 学新通

3.2.1 测试开始

使用product工具类发送消息到队列

  1.  
    @SpringBootTest
  2.  
    public class ProductTest {
  3.  
    //springboot集成了rabbitMQ 提供了一个工具类 ,该类封装了消息的发送
  4.  
    @Autowired
  5.  
    private RabbitTemplate rabbitTemplate;
  6.  
     
  7.  
    /**
  8.  
    * 给交换机为testX,路由为a的队列发送消息hello springboot
  9.  
    */
  10.  
    @Test
  11.  
    public void test01(){
  12.  
    rabbitTemplate.convertAndSend("testX","a","hello springboot");
  13.  
    }
  14.  
    }

消费者

  1.  
    package com.wt.rabbitmq;
  2.  
     
  3.  
    import com.rabbitmq.client.Channel;
  4.  
    import org.springframework.amqp.core.Message;
  5.  
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6.  
    import org.springframework.stereotype.Component;
  7.  
     
  8.  
    @Component
  9.  
    public class MyListener {
  10.  
     
  11.  
     
  12.  
    /**
  13.  
    * @RabbitListener:队列监听,queues:队列名
  14.  
    */
  15.  
    @RabbitListener(queues = "test02")
  16.  
    public void test(Map<String,Object> msg){
  17.  
    System.out.println(msg);
  18.  
    //运行相关的业务处理
  19.  
    }
  20.  
    }
学新通

3.3 如何确保消息的可靠性

首先确定消息可能在哪些位置丢失---不同的位置可以有不同的解决方案。

学新通

3.3.1 保证消息从生产者到交换机

1. comfirm确认机制

 该模式必须在生产者的application.properties配置文件中开启手动确认机制

#开启确认机制
spring.rabbitmq.publisher-confirm-type=correlated
  1.  
    //保证消息从生产者到交换机
  2.  
    //测试确认机制
  3.  
     
  4.  
    /**
  5.  
    * 1,手动开启确认机制spring.rabbitmq.publisher-confirm-type=correlated
  6.  
    * 2.为rabbitTemplate设置确认回调函数
  7.  
    */
  8.  
    @Test
  9.  
    public void testConfirm(){
  10.  
    //为rabbitTemplate设置确认回调函数
  11.  
    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){
  12.  
    //不管是否到大交换机,都会触发该方法
  13.  
    @Override
  14.  
    public void confirm(CorrelationData correlationData, boolean b, String s) {
  15.  
    System.out.println("b~~~~~~~~~~~" b);
  16.  
    if (b==false){
  17.  
    System.out.println("消息发送失败,开启回滚");
  18.  
    }
  19.  
    }
  20.  
    });
  21.  
    //故意设置一个不存在的交换机
  22.  
    rabbitTemplate.convertAndSend("testX2","a","Heollo Simple2");
  23.  
    }
学新通

3.3.2 保证消息可以从交换机到队列

returning机制: 如果消息无法到达队列,则会触发returning机制。如果能从交换机到队列则不会触发returning机制。

默认rabbitMQ不开启该机制。

 该模式必须在生产者的application.properties配置文件中开启手动returning机制  

#开启returning机制
spring.rabbitmq.publisher-returns=true
  1.  
    **
  2.  
    * 1.开启returning机制
  3.  
    * 2.为rabbitTemplate设置returning回调函数
  4.  
    */
  5.  
    @Test
  6.  
    public void testReturning(){
  7.  
    rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
  8.  
    //该方法只有从交换机到队列失败时才会触发
  9.  
    @Override
  10.  
    public void returnedMessage(Message message, int i, String s, String s1, String s2) {
  11.  
    System.out.println("~~~~~~~~~~~~~~~~~~~~");
  12.  
    }
  13.  
    });
  14.  
    //故意写一个不存在的队列测试是否能执行
  15.  
    rabbitTemplate.convertAndSend("testX","a","Hello Springboot3");
  16.  
    }
学新通

3.3.3 如何保证消息在队列

  1. 队列持久化--->

  2. 搭建rabbitmq集群--保证高可用

3.3.4 消费者可靠的消费消息

  1. 在消费者的配置文件中修改为手动确认模式

 #开启消息确认auto :自动确认  manual:手动确认  none:不确认
 spring.rabbitmq.listener.simple.acknowledge-mode=manual

 当业务处理完毕后在确认消息给队列让其删除该消息

  1.  
    @RabbitListener(queues = "test01")
  2.  
    public void test02(Message message, Channel channel){
  3.  
    byte[] body = message.getBody();
  4.  
    long deliveryTag = message.getMessageProperties().getDeliveryTag();
  5.  
    System.out.println("接收到的消息" new String(body));
  6.  
     
  7.  
    try{
  8.  
    System.out.println("处理业务代码");
  9.  
    //可以故意抛出一个异常来测试是否会继续发送
  10.  
    // int i = 10/0;
  11.  
    System.out.println("业务处理完毕");
  12.  
    //创建手动确认---队列会把消息溢出
  13.  
    /**
  14.  
    * long deliveryTag 消息的标记
  15.  
    * boolean multiple: 是否把之前没有确认的消息也确认掉
  16.  
    */
  17.  
    //手动确认,需要在配置文件中进行开启
  18.  
    channel.basicAck(deliveryTag,true);
  19.  
    }catch (Exception e){
  20.  
    //出现异常让队列再发一次
  21.  
    //boolean requeue: true继续发给我 false还是直接丢掉
  22.  
    try {
  23.  
    channel.basicNack(deliveryTag,true,true);
  24.  
     
  25.  
    }catch (Exception exception){
  26.  
    e.printStackTrace();
  27.  
    }
  28.  
    }
  29.  
     
  30.  
    }
学新通

如何保证消息的可靠性。

  1. 设置confirm和returning机制

  2. 设置队列和交互机的持久化

  3. 搭建rabbitMQ服务集群

  4. 消费者改为手动确认机制。

3.4 如何限制消费者消费消息的条数

学新通

  1. 设置消费者消费消息的条数

  2. 消费者端必须为手动确认模式。

在消费者的配置文件中修改每次拉取消息的条数

#设置消息限流---消费者一次最多消费的消息个数
spring.rabbitmq.listener.simple.prefetch=3

按照上述创建队列的方式,创建test03、test04队列,并将队列加入到testX交换机中,并设置test03 的路由为d ,test04的路由为e 

生产者:

  1.  
    //测试限制消费者消费的个数
  2.  
    @Test
  3.  
    public void testMessage(){
  4.  
    for (int i = 0; i <10; i ) {
  5.  
    rabbitTemplate.convertAndSend("testX","d","Hello Word00" i);
  6.  
    }
  7.  
    }

消费者:

  1.  
    /**
  2.  
    * 限制消费者消费的个数
  3.  
    *
  4.  
    * 设置限制个数,需要在配置文件中进行配置
  5.  
    * 这里测试时没有进行消息确认,否则会因为消息确认太快而无法看出限流
  6.  
    * 这里测试的是没次通过3个,在3个消息确认之后才会接着发送其他请求
  7.  
    */
  8.  
    @RabbitListener(queues = "test03")
  9.  
    public void test03 (Message message, Channel channel) throws IOException {
  10.  
    byte[] body = message.getBody();
  11.  
    System.out.println("消息的内容:" new String(body));
  12.  
    //消息确认
  13.  
    // long deliveryTag = message.getMessageProperties().getDeliveryTag();
  14.  
    // channel.basicAck(deliveryTag,true);
  15.  
    }
学新通

3.5 设置过期时间

TTL:time to live

可以为整个队列设置也可以单独为某条信息设置

在rabbitmq图形化界面创建设置过期时间的队列test04 

学新通 队列创建完成后将该队列加入到交换机exchange内,并设置路由为e

3.5.1 创建队列时为队列设置过期时间

 生产者:

  1.  
    //测试设置过期时间
  2.  
     
  3.  
    /**
  4.  
    * 为队列设置过期时间
  5.  
    * 设置休眠时间,判断数据过期是所有数据都被删除,还是哪个数据进入队列给这个数据进行计时,到期自动删除这个数据
  6.  
    */
  7.  
    @Test
  8.  
    public void test(){
  9.  
    for (int i = 0; i <10; i ) {
  10.  
    if (i<5){
  11.  
    rabbitTemplate.convertAndSend("testX","e","Hello Word00" i);
  12.  
    }else {
  13.  
    try{
  14.  
    Thread.sleep(6000);
  15.  
    }catch (Exception e){
  16.  
    e.printStackTrace();
  17.  
    }
  18.  
    rabbitTemplate.convertAndSend("testX","e","Hello Word00" i);
  19.  
    }
  20.  
     
  21.  
    }
  22.  
    }
学新通

 消费者:

  1.  
    @RabbitListener(queues = "test04")
  2.  
    public void test04 (Message message, Channel channel) throws IOException {
  3.  
    byte[] body = message.getBody();
  4.  
    System.out.println("消息的内容:" new String(body));
  5.  
    //消息确认
  6.  
    // long deliveryTag = message.getMessageProperties().getDeliveryTag();
  7.  
    // channel.basicAck(deliveryTag,true);
  8.  
    }

3.5.2 单独为消息设置过期时间

  1.  
    /*
  2.  
    单独为一条信息设置过期时间
  3.  
    */
  4.  
    @Test
  5.  
    public void test00(){
  6.  
    MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
  7.  
    @Override
  8.  
    public Message postProcessMessage(Message message) throws AmqpException {
  9.  
    message.getMessageProperties().setExpiration("10000");
  10.  
    return message;
  11.  
    }
  12.  
    };
  13.  
    rabbitTemplate.convertAndSend("testX","d","Hello Word0",messagePostProcessor);
  14.  
     
  15.  
    }
学新通
  1. 如何保证消息的可靠性

  2. TTL过期

  3. 限定消费消息的条数

3.6 创建死信队列 

3.6.1 准备工作

在rabbitmq中创建创建普通队列和死信队列

死信队列连接连接的是普通交换机、普通队列连接的是死信交换机

创建死信队列

学新通创建普通队列

学新通

创建死信队列连接的普通交换机

学新通 创建普通队列连接的死信交换机

学新通

 生产者

 我们的队列设置了消息上限和超时时间,并且没有设置消息确认,这次一共发送十条,有五条会等待普通队列确认,剩下的五条会进入死信交换机,若普通队列的五条超过20秒,这5条消息也会进入死信队列

  1.  
    /**
  2.  
    * 死信队列
  3.  
    */
  4.  
    @Test
  5.  
    public void testSx(){
  6.  
    for (int i = 0; i < 10; i ) {
  7.  
    rabbitTemplate.convertAndSend("pt_exchange","dead","Hello Word~~~~~~~" i);
  8.  
    }
  9.  
    }

学新通

3.7 延迟队列

学新通

学新通

学新通  这里的判断订单状态 是因为 如果支付系统第29分分钟去支付,支付的比较慢,最后在第31分钟支付成功了。消息30分钟加入死信队列执行库存回滚,就会出错。

3.8 如何防止消息被重复消费

学新通

3.9 rabbitMQ的常见面试题 

1. 如何防止消息被重复消费

2.如何保证消息的可靠性

3.rabbitMQ消息积压过多

学新通

学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhggjjgf
系列文章
更多 icon
同类精品
更多 icon
继续加载