• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Java RabbitMQ消息队列使用

武飞扬头像
程序猿_liter
帮助1

一、消息队列

什么是消息队列

消息队列,即MQ,Message Queue。

学新通

消息队列是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。

二、RabbitMQ

RabbitMQ是基于AMQP的一款消息管理系统。

支持主流的操作系统,Linux、Windows、MacOX等。

支持多种开发语言,Java、Python、Ruby、.NET、PHP、C/C 、node.js等。

官网: Messaging that just works — RabbitMQ

官方教程:RabbitMQ Tutorials — RabbitMQ

RabbitMQ 基本概念

学新通

Message
消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。

Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序。

Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

Binding
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。

Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

Connection
网络连接,比如一个TCP连接。无论生产者还是消费者,都需要与RabbitMQ建立连接后才可以完成消息的生产和消费。

Channel
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

Virtual Host
虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。

Broker
表示消息队列服务器实体。

 三、简单消息模型使用

RabbitMQ是一个消息代理:它接受和转发消息。 你可以把它想象成一个邮局:当你把邮件放在邮箱里时,你可以确定邮差先生最终会把邮件发送给你的收件人。 在这个比喻中,RabbitMQ是邮政信箱,邮局和邮递员。

RabbitMQ与邮局的主要区别是它不处理纸张,而是接受,存储和转发数据消息的二进制数据块。

学新通

P(producer/ publisher):生产者,一个发送消息的用户应用程序。

C(consumer):消费者,消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序

队列(红色区域):rabbitmq内部类似于邮箱的一个概念。虽然消息流经rabbitmq和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。

总之:生产者将消息发送到队列,消费者从队列中获取消息,队列是存储消息的缓冲区。

代码演示

引入依赖

  1.  
    <dependency>
  2.  
    <groupId>org.springframework.boot</groupId>
  3.  
    <artifactId>spring-boot-starter-amqp</artifactId>
  4.  
    <version>2.0.6.RELEASE</version>
  5.  
    </dependency>

 获取连接

  1.  
    package com.util;
  2.  
     
  3.  
    import com.rabbitmq.client.Connection;
  4.  
    import com.rabbitmq.client.ConnectionFactory;
  5.  
     
  6.  
    public class ConnectionUtil {
  7.  
    /**
  8.  
    * 建立与RabbitMQ的连接
  9.  
    * @return
  10.  
    * @throws Exception
  11.  
    */
  12.  
    public static Connection getConnection() throws Exception {
  13.  
    //定义连接工厂
  14.  
    ConnectionFactory factory = new ConnectionFactory();
  15.  
    //设置服务地址
  16.  
    factory.setHost("192.168.33.88");
  17.  
    //TCP端口
  18.  
    factory.setPort(5672);
  19.  
    //设置账号信息,用户名、密码、vhost
  20.  
    factory.setVirtualHost("vhost_NetDataGather");
  21.  
    factory.setUsername("admin");
  22.  
    factory.setPassword("123456");
  23.  
    // 通过工程获取连接
  24.  
    Connection connection = factory.newConnection();
  25.  
    return connection;
  26.  
    }
  27.  
    }
学新通

生产者

  1.  
    import com.util.ConnectionUtil;
  2.  
     
  3.  
    import com.rabbitmq.client.Channel;
  4.  
    import com.rabbitmq.client.Connection;
  5.  
    /**
  6.  
    * 生产者
  7.  
    */
  8.  
    public class Send {
  9.  
    //声明队列名称
  10.  
    private final static String QUEUE_NAME = "simple_queue";
  11.  
     
  12.  
    public static void main(String[] argv) throws Exception {
  13.  
    // 生产者和Broker建立TCP连接。
  14.  
    Connection connection = ConnectionUtil.getConnection();
  15.  
    // 生产者和Broker建立通道。
  16.  
    Channel channel = connection.createChannel();
  17.  
    // 声明(创建)队列
  18.  
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  19.  
    // 消息内容
  20.  
    String message = "Hello World!";
  21.  
    for (int i = 0; i < 10; i ) {
  22.  
    // 向指定的队列中发送消息
  23.  
    message=message i;
  24.  
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
  25.  
    }
  26.  
    //关闭通道和连接
  27.  
    channel.close();
  28.  
    connection.close();
  29.  
    }
  30.  
    }
学新通

消费者

  1.  
    import java.io.IOException;
  2.  
     
  3.  
    import com.rabbitmq.client.AMQP.BasicProperties;
  4.  
    import com.rabbitmq.client.Channel;
  5.  
    import com.rabbitmq.client.Connection;
  6.  
    import com.rabbitmq.client.DefaultConsumer;
  7.  
    import com.rabbitmq.client.Envelope;
  8.  
     
  9.  
    import com.util.ConnectionUtil;
  10.  
     
  11.  
    /**
  12.  
    * 消费者
  13.  
    */
  14.  
    public class Recv {
  15.  
    private final static String QUEUE_NAME = "simple_queue";
  16.  
     
  17.  
    public static void main(String[] argv) throws Exception {
  18.  
    // 获取到连接
  19.  
    Connection connection = ConnectionUtil.getConnection();
  20.  
    // 创建通道
  21.  
    Channel channel = connection.createChannel();
  22.  
    // 声明队列
  23.  
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  24.  
    // 定义队列的消费者
  25.  
    DefaultConsumer consumer = new DefaultConsumer(channel) {
  26.  
    // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
  27.  
    @Override
  28.  
    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
  29.  
    byte[] body) throws IOException {
  30.  
    // body 即消息体
  31.  
    String msg = new String(body);
  32.  
    }
  33.  
    };
  34.  
    // 监听队列,第二个参数:是否自动进行消息确认。
  35.  
    channel.basicConsume(QUEUE_NAME, true, consumer);
  36.  
    }
  37.  
    }
学新通

上述代码中:消息一旦被消费者接收,队列中的消息就会被删除。

如果消费者领取消息后,还没执行操作就挂掉了呢?或者抛出了异常?消息消费失败,但是RabbitMQ无从得知,这样消息就丢失了!

因此,RabbitMQ有一个ACK机制。当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收。不过这种回执ACK分两种情况:

自动ACK:消息一旦被接收,消费者自动发送ACK。

手动ACK:消息接收后,不会发送ACK,需要手动调用。

手动ACK

  1.  
    import java.io.IOException;
  2.  
     
  3.  
    import com.rabbitmq.client.AMQP.BasicProperties;
  4.  
    import com.rabbitmq.client.Channel;
  5.  
    import com.rabbitmq.client.Connection;
  6.  
    import com.rabbitmq.client.DefaultConsumer;
  7.  
    import com.rabbitmq.client.Envelope;
  8.  
     
  9.  
    import com.util.ConnectionUtil;
  10.  
     
  11.  
    /**
  12.  
    * 消费者,手动进行ACK
  13.  
    */
  14.  
    public class Recv2 {
  15.  
    private final static String QUEUE_NAME = "simple_queue";
  16.  
     
  17.  
    public static void main(String[] argv) throws Exception {
  18.  
    // 获取到连接
  19.  
    Connection connection = ConnectionUtil.getConnection();
  20.  
    // 创建通道
  21.  
    final Channel channel = connection.createChannel();
  22.  
    // 声明队列
  23.  
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  24.  
    // 定义队列的消费者
  25.  
    DefaultConsumer consumer = new DefaultConsumer(channel) {
  26.  
    // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
  27.  
    @Override
  28.  
    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
  29.  
    byte[] body) throws IOException {
  30.  
    // body 即消息体
  31.  
    String msg = new String(body);
  32.  
    // 手动进行ACK
  33.  
    channel.basicAck(envelope.getDeliveryTag(), false);
  34.  
    }
  35.  
    };
  36.  
    // 监听队列,第二个参数false,手动进行ACK
  37.  
    // 如果第二个参数为true,则会自动进行ACK;如果为false,则需要手动ACK。方法的声明:
  38.  
    channel.basicConsume(QUEUE_NAME, false, consumer);
  39.  
    }
  40.  
    }
学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhggiagi
系列文章
更多 icon
同类精品
更多 icon
继续加载