• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

RabbitMQ 6种队列模式——Publish/Subscribe发布订阅

武飞扬头像
·梅花十三
帮助1

发布订阅 (publish/subscribe)

将消息发送给不同类型的消费者。做到发布一次,消费多个。

举例说明

假设我们有一个订单系统,用户进行下单支付,下单成功后,根据业务处理一般都会消息通知用户相关信息。例如通过邮件 手机 微信等方式进行消息推送支付成功信息。

利用 MQ 实现业务异步处理,支付成功向消息队列投递,消费者取出消息后进行业务处理:

学新通

这种方式不但总耗时长,并且业务混乱,实际上短信、邮件、微信 是不同的业务逻辑,不应该放在一块处理,而应该根据业务进行拆分,如下图:

学新通


 

代码展示

准备条件

 提醒:由于生产者和消费者的代码大同小异,为了方便,编写一个通用的连接工具类。

  1.  
    public class MQConnectionUtils {
  2.  
     
  3.  
    // 获取连接
  4.  
    public static Connection getConnection(String connectionName,String vHost){
  5.  
    Connection connection = null;
  6.  
     
  7.  
    // 1.建立连接工厂
  8.  
    ConnectionFactory factory = new ConnectionFactory();
  9.  
    factory.setHost("127.0.0.1");
  10.  
    factory.setPort(5672);
  11.  
    factory.setUsername("wpf2");
  12.  
    factory.setPassword("123");
  13.  
    factory.setVirtualHost(vHost);
  14.  
     
  15.  
    try {
  16.  
    // 2.通过连接工厂建立连接
  17.  
    connection = factory.newConnection(connectionName);
  18.  
    } catch (IOException e) {
  19.  
    e.printStackTrace();
  20.  
    } catch (TimeoutException e) {
  21.  
    e.printStackTrace();
  22.  
    }
  23.  
    return connection;
  24.  
    }
  25.  
     
  26.  
    // 释放资源
  27.  
    public static void close(Connection connection, Channel channel){
  28.  
    // 1.关闭通道
  29.  
    if(channel!=null && channel.isOpen()){
  30.  
    try {
  31.  
    channel.close();
  32.  
    } catch (Exception e) {
  33.  
    e.printStackTrace();
  34.  
    }
  35.  
    }
  36.  
    // 2.关闭连接
  37.  
    if(connection!=null){
  38.  
    try {
  39.  
    connection.close();
  40.  
    } catch (Exception e) {
  41.  
    e.printStackTrace();
  42.  
    }
  43.  
    }
  44.  
    }
  45.  
    }
学新通

▸ 发布订阅(Publish/Subscribe

发布一次消息,消费多个。将消息路由给多个队列,多个消费者在不同队列中进行消费。这种模式叫做“发布/订阅”。类似于特别关注,我发布了一篇文章,关注我的粉丝就能看到推文

一个队列对应一个消费者,Publish模式还多了一个exchange(交换机 转发器) ,这时候我们要获取消息,就需要队列绑定到交换机上,交换机把消息发送到队列 , 消费者才能获取队列的消息。

学新通

1. 生产者:定义一个生产者,将消息投递到交换机,代码如下

  1.  
    public class Producer {
  2.  
     
  3.  
    public static void main(String[] args) {
  4.  
    // 1.获取连接
  5.  
    Connection connection = MQConnectionUtils.getConnection("生产者","test_host");
  6.  
    Channel channel = null;
  7.  
    try {
  8.  
    // 2.通过连接建立通道
  9.  
    channel = connection.createChannel();
  10.  
     
  11.  
    String exchangeName = "my-exchange";
  12.  
    // 3.通过通道创建交换机 (第一个参数为交换机名称,第二个参数为交换机的类型)
  13.  
    channel.exchangeDeclare(exchangeName, "fanout");
  14.  
     
  15.  
    // 4.发送消息到交换机
  16.  
    String message = "你好 梅花十三!";
  17.  
    channel.basicPublish(exchangeName, "", null, message.getBytes());
  18.  
    System.out.println("消息生产成功!");
  19.  
    } catch (Exception e) {
  20.  
    e.printStackTrace();
  21.  
    }finally {
  22.  
    MQConnectionUtils.close(connection,channel);
  23.  
    }
  24.  
    }
  25.  
    }
学新通

2. 多个消费者:我们定义2个队列绑定到该交换机,同时也是2个消费者进行对消息的消费,为了投机取巧,直接用消费者类实现Runnable接口,主函数创建2个线程模拟2个消费者,如下

  1.  
    public class Consumer implements Runnable{
  2.  
     
  3.  
    public static void main(String[] args) {
  4.  
    // 定义2个线程,线程名称就用队列名称(投机取巧,避免写2个消费者实例,代码一样只是绑定的队列要不同)
  5.  
    new Thread(new Consumer(),"queue1").start();
  6.  
    new Thread(new Consumer(),"queue2").start();
  7.  
    }
  8.  
     
  9.  
    public void run() {
  10.  
    final String name = Thread.currentThread().getName();
  11.  
     
  12.  
    // 1.获取连接
  13.  
    Connection connection = MQConnectionUtils.getConnection("生产者","test_host");
  14.  
     
  15.  
    Channel channel = null;
  16.  
    try {
  17.  
    // 2.通过连接建立通道
  18.  
    channel = connection.createChannel();
  19.  
     
  20.  
    // 3.通过通道创建队列
  21.  
    /**
  22.  
    * @Params1 队列名称
  23.  
    * @Params2 是否持久化 true:持久化,该队列将在服务器重启后依然继续存在
  24.  
    * @Params3 是否独占队列 true:独占,仅限于此连接
  25.  
    * @Params4 自动删除(最后一条消息被消费完毕后,是否把队列自动删除)
  26.  
    * @Params5 队列的其他属性(构造参数)
  27.  
    *
  28.  
    * 面试题:所谓持久化即消息存盘,非持久化会存盘吗? 回答:会存盘,但会随着服务器宕机而丢失
  29.  
    */
  30.  
    channel.queueDeclare(name, true, false, false, null);
  31.  
     
  32.  
    // 4.绑定交换机和队列的关系
  33.  
    /**
  34.  
    * @Params1 队列名称
  35.  
    * @Params2 需绑定的交换机名称
  36.  
    * @Params3 路由key,用于direct或者topic模式,通过某个routingKey绑定交换机
  37.  
    */
  38.  
    channel.queueBind(name,"my-exchange","");
  39.  
     
  40.  
     
  41.  
    // 5.消费消息
  42.  
    /** @param1:队列名称
  43.  
    * @param2:是否自动应答 true:是,消息一旦被消费成功,消息则从队列中删除
  44.  
    * @param3:消息送达时的回调
  45.  
    * @param4:消费者被取消时的回调
  46.  
    */
  47.  
    channel.basicConsume(name,true, new DeliverCallback() {
  48.  
    public void handle(String consumerTag, Delivery message) throws IOException {
  49.  
    System.out.println("从" name "队列中接收消息成功!内容:" new String(message.getBody(), "UTF-8"));
  50.  
    }
  51.  
    }, new CancelCallback() {
  52.  
    public void handle(String consumerTag) throws IOException {
  53.  
    System.out.println("接收消息失败。。。。。");
  54.  
    }
  55.  
    });
  56.  
    } catch (Exception e) {
  57.  
    e.printStackTrace();
  58.  
    }
  59.  
    }
  60.  
    }
学新通

!! 注意:虽然我们上述也说了,如果消息发送到了一个没有绑定队列的交换机时,消息就会丢失!但由于交换机在生产者类创建的(也可以在消费者类创建),因此我们先必须启动Producer类,使其创建交换机(第一次启动的消息会丢失,因为交换机没有绑定队列)

3. 启动:启动顺序为 Producer类——>消费者类(Consumer类的main函数)——>Producer类

4. 运行结果

 a)producer第二次启动,消息生产成功

学新通

b)切换至Consumer运行面板,可以看到2个消费者,从2个队列中进行了消息的消费

学新通

结论:创建一个交换机my-exchange,将类型设置为fanout广播模式,创建2个队列,分别是 queue1、queue2并进行绑定该交换机,交换机在收到生产者的消息后,会将消息路由到其下绑定的2个队列中,2个队列中存储的消息的内容都是一样的,多个消费者到不同的队列中进行消费。

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhggkjja
系列文章
更多 icon
同类精品
更多 icon
继续加载