Java RabbitMQ消息队列使用
一、消息队列
什么是消息队列
消息队列,即MQ,Message Queue。
消息队列是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。
二、RabbitMQ
RabbitMQ是基于AMQP的一款消息管理系统。
支持主流的操作系统,Linux、Windows、MacOX等。
支持多种开发语言,Java、Python、Ruby、.NET、PHP、C/C 、node.js等。
官网: Messaging that just works — RabbitMQ
官方教程:RabbitMQ Tutorials — RabbitMQ
RabbitMQ 基本概念
Message
消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序。Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。Binding
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。Connection
网络连接,比如一个TCP连接。无论生产者还是消费者,都需要与RabbitMQ建立连接后才可以完成消息的生产和消费。Channel
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。Virtual Host
虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。Broker
表示消息队列服务器实体。
三、简单消息模型使用
RabbitMQ是一个消息代理:它接受和转发消息。 你可以把它想象成一个邮局:当你把邮件放在邮箱里时,你可以确定邮差先生最终会把邮件发送给你的收件人。 在这个比喻中,RabbitMQ是邮政信箱,邮局和邮递员。
RabbitMQ与邮局的主要区别是它不处理纸张,而是接受,存储和转发数据消息的二进制数据块。
P(producer/ publisher):生产者,一个发送消息的用户应用程序。
C(consumer):消费者,消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序
队列(红色区域):rabbitmq内部类似于邮箱的一个概念。虽然消息流经rabbitmq和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。
总之:生产者将消息发送到队列,消费者从队列中获取消息,队列是存储消息的缓冲区。
代码演示
引入依赖
-
<dependency>
-
<groupId>org.springframework.boot</groupId>
-
<artifactId>spring-boot-starter-amqp</artifactId>
-
<version>2.0.6.RELEASE</version>
-
</dependency>
获取连接
-
package com.util;
-
-
import com.rabbitmq.client.Connection;
-
import com.rabbitmq.client.ConnectionFactory;
-
-
public class ConnectionUtil {
-
/**
-
* 建立与RabbitMQ的连接
-
* @return
-
* @throws Exception
-
*/
-
public static Connection getConnection() throws Exception {
-
//定义连接工厂
-
ConnectionFactory factory = new ConnectionFactory();
-
//设置服务地址
-
factory.setHost("192.168.33.88");
-
//TCP端口
-
factory.setPort(5672);
-
//设置账号信息,用户名、密码、vhost
-
factory.setVirtualHost("vhost_NetDataGather");
-
factory.setUsername("admin");
-
factory.setPassword("123456");
-
// 通过工程获取连接
-
Connection connection = factory.newConnection();
-
return connection;
-
}
-
}
生产者
-
import com.util.ConnectionUtil;
-
-
import com.rabbitmq.client.Channel;
-
import com.rabbitmq.client.Connection;
-
/**
-
* 生产者
-
*/
-
public class Send {
-
//声明队列名称
-
private final static String QUEUE_NAME = "simple_queue";
-
-
public static void main(String[] argv) throws Exception {
-
// 生产者和Broker建立TCP连接。
-
Connection connection = ConnectionUtil.getConnection();
-
// 生产者和Broker建立通道。
-
Channel channel = connection.createChannel();
-
// 声明(创建)队列
-
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
-
// 消息内容
-
String message = "Hello World!";
-
for (int i = 0; i < 10; i ) {
-
// 向指定的队列中发送消息
-
message=message i;
-
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
-
}
-
//关闭通道和连接
-
channel.close();
-
connection.close();
-
}
-
}
消费者
-
import java.io.IOException;
-
-
import com.rabbitmq.client.AMQP.BasicProperties;
-
import com.rabbitmq.client.Channel;
-
import com.rabbitmq.client.Connection;
-
import com.rabbitmq.client.DefaultConsumer;
-
import com.rabbitmq.client.Envelope;
-
-
import com.util.ConnectionUtil;
-
-
/**
-
* 消费者
-
*/
-
public class Recv {
-
private final static String QUEUE_NAME = "simple_queue";
-
-
public static void main(String[] argv) throws Exception {
-
// 获取到连接
-
Connection connection = ConnectionUtil.getConnection();
-
// 创建通道
-
Channel channel = connection.createChannel();
-
// 声明队列
-
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
-
// 定义队列的消费者
-
DefaultConsumer consumer = new DefaultConsumer(channel) {
-
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
-
-
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
-
byte[] body) throws IOException {
-
// body 即消息体
-
String msg = new String(body);
-
}
-
};
-
// 监听队列,第二个参数:是否自动进行消息确认。
-
channel.basicConsume(QUEUE_NAME, true, consumer);
-
}
-
}
上述代码中:消息一旦被消费者接收,队列中的消息就会被删除。
如果消费者领取消息后,还没执行操作就挂掉了呢?或者抛出了异常?消息消费失败,但是RabbitMQ无从得知,这样消息就丢失了!
因此,RabbitMQ有一个ACK机制。当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收。不过这种回执ACK分两种情况:
自动ACK:消息一旦被接收,消费者自动发送ACK。
手动ACK:消息接收后,不会发送ACK,需要手动调用。
手动ACK
-
import java.io.IOException;
-
-
import com.rabbitmq.client.AMQP.BasicProperties;
-
import com.rabbitmq.client.Channel;
-
import com.rabbitmq.client.Connection;
-
import com.rabbitmq.client.DefaultConsumer;
-
import com.rabbitmq.client.Envelope;
-
-
import com.util.ConnectionUtil;
-
-
/**
-
* 消费者,手动进行ACK
-
*/
-
public class Recv2 {
-
private final static String QUEUE_NAME = "simple_queue";
-
-
public static void main(String[] argv) throws Exception {
-
// 获取到连接
-
Connection connection = ConnectionUtil.getConnection();
-
// 创建通道
-
final Channel channel = connection.createChannel();
-
// 声明队列
-
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
-
// 定义队列的消费者
-
DefaultConsumer consumer = new DefaultConsumer(channel) {
-
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
-
-
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
-
byte[] body) throws IOException {
-
// body 即消息体
-
String msg = new String(body);
-
// 手动进行ACK
-
channel.basicAck(envelope.getDeliveryTag(), false);
-
}
-
};
-
// 监听队列,第二个参数false,手动进行ACK
-
// 如果第二个参数为true,则会自动进行ACK;如果为false,则需要手动ACK。方法的声明:
-
channel.basicConsume(QUEUE_NAME, false, consumer);
-
}
-
}
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhggiagi
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
怎样阻止微信小程序自动打开
PHP中文网 06-13