RabbitMQ消息间件在项目的使用
1. RabbitMQ消息中间件
1.1 什么MQ?
思考: 原来服务与服务之间如何通信?
Openfeign 服务与服务之间直接调用。
我们也可以使用MQ完成系统与系统之间得调用。
1.2 MQ优点
1. 应用解耦
2. 异步提速
3. 削锋填谷
1.3 MQ缺点
1.4 如何选择MQ
1.5 MQ得种类
rabbitMQ
kafka
RocketMQ
ActiveMQ
1.6 RabbitMQ
安装RabbitMQ <<详细内容看另一篇博客---RabbitMQ安装说明文档>>
1.7 概述端口号
1.8 rabbit的工作原理
2. java程序连接RabbitMQ服务---maven项目
提供了5种模式。
1.简单模式--Hello
2. 工作者模式--work queues
3.发布订阅模式---
4.路由模式--router
5.主题模式--topic
2.1 准备工作
2.1.1 创建maven项目
2.1.2 添加依赖--在父工程下添加依赖
-
<dependencies>
-
<dependency>
-
<groupId>com.rabbitmq</groupId>
-
<artifactId>amqp-client</artifactId>
-
<version>5.14.2</version>
-
</dependency>
-
</dependencies>
2.1.3 启动rabbitmq
我这里rabbitmq安装在本地虚拟机上,直接开启虚拟机输入以下命令就可以进行测试了
centos6用这个命令:
/sbin/service rabbitmq-server restartcentos7用这个命令:
systemctl start rabbitmq-server
2.2 simple 简单模式
P: 一个生产者
C: 一个消费者
Q: 队列
生产者负责把消息发送到队列,消费者负责把队列的消息消费掉并确认消费
代码--生产者:
-
package com.wt.service;
-
-
import com.rabbitmq.client.Channel;
-
import com.rabbitmq.client.Connection;
-
import com.rabbitmq.client.ConnectionFactory;
-
-
import java.io.IOException;
-
import java.util.concurrent.TimeoutException;
-
-
/**
-
* @Author wt
-
* @Date 2022/9/19 20:31
-
* @PackageName:com.wt.service
-
* @ClassName: Test01
-
* @Description: 简单模式
-
* @Version 1.0
-
*/
-
public class Test01 {
-
public static void main(String[] args) throws IOException, TimeoutException {
-
/**
-
* 连接rabbitmq
-
*/
-
ConnectionFactory factory = new ConnectionFactory();
-
//设置rabbilemMq服务器的地址 默认为localhost
-
factory.setHost("192.168.135.156");
-
//设置rabbitMQ的端口号 默认5672
-
factory.setPort(5672);
-
//设置账号和密码 默认guest
-
factory.setUsername("guest");
-
factory.setPassword("guest");
-
//设置虚拟主机名 默认 /
-
factory.setVirtualHost("/");
-
-
//获取连接通道
-
Connection connection = factory.newConnection();
-
//获取channel信道
-
Channel channel = connection.createChannel();
-
-
//创建队列
-
/**
-
* 如果该队列名不存在则自动创建,存在则不创建
-
* String queue ,队列名
-
* boolean durable ,是否持久化
-
* boolean exclusive (独占)声明队列同一时间只能包含一个连接,且该队列只有这一个被连接使用。
-
* boolean autoDelte ,是否自动删除
-
* Map<String,Object>arguments
-
*/
-
channel.queueDeclare("simple_queue",true,false,false,null);
-
-
/**
-
* String exchange ,把消息发给哪个交换机--简单模式没有交换机“”
-
* String routingKey ,消息绑定的路由key 如果为简单模式 默认写为队列名称
-
* BasicProperties props, 消息的属性
-
* byte[] body: 消息的内容
-
*/
-
String msg = "{code:2000,name:张三,age:18}";
-
channel.basicPublish("","simple_queue",null,msg.getBytes());
-
-
connection.close();
-
}
-
}
代码-消费者:
-
package com.wt.simple;
-
-
import com.rabbitmq.client.*;
-
-
import java.io.IOException;
-
-
/**
-
* @Author wt
-
* @Date 2022/9/19 21:22
-
* @PackageName:com.wt.simple
-
* @ClassName: Customer
-
* @Description: 简单模式
-
* @Version 1.0
-
*/
-
public class Customer {
-
public static void main(String[] args) throws Exception{
-
/**
-
* 连接rabbitmq
-
*/
-
ConnectionFactory factory = new ConnectionFactory();
-
//设置rabbilemMq服务器的地址 默认为localhost
-
factory.setHost("192.168.135.156");
-
//设置rabbitMQ的端口号 默认5672
-
factory.setPort(5672);
-
//设置账号和密码 默认guest
-
factory.setUsername("guest");
-
factory.setPassword("guest");
-
//设置虚拟主机名 默认 /
-
factory.setVirtualHost("/");
-
-
//获取连接通道
-
Connection connection = factory.newConnection();
-
//获取channel信道
-
Channel channel = connection.createChannel();
-
-
//监听队列
-
/**
-
* String queue 监听的队列名称
-
* autoAck :是否自动确认消息
-
* Consumer callback: 监听到消息后触发的回调函数
-
*/
-
-
DefaultConsumer callback = new DefaultConsumer(channel){
-
//一旦有消息就会触发该方法
-
//body:表示消息的内容
-
-
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
-
System.out.println("接收的消息内容:" new String(body));
-
}
-
};
-
-
channel.basicConsume("simple_queue",true,callback);
-
-
}
-
}
不要关闭连接对象
2.3 woker模式
work模式与简单模式的区别是worker模式的一个队列对应多个消费者
P:生产者
C1:消费者1
C2:消费者2
Q: 队列
消费者1和消费者2属于竞争关系,一个消息只会被一个消费者消费
代码---生产者:
-
package com.wt.work;
-
-
import com.rabbitmq.client.Channel;
-
import com.rabbitmq.client.Connection;
-
import com.rabbitmq.client.ConnectionFactory;
-
-
import java.io.IOException;
-
import java.util.concurrent.TimeoutException;
-
-
/**
-
* @Author wt
-
* @Date 2022/9/19 21:38
-
* @PackageName:com.wt.service.test
-
* @ClassName: Work
-
* @Description: work模式
-
* @Version 1.0
-
*/
-
public class WorkTest {
-
public static void main(String[] args) throws IOException, TimeoutException {
-
ConnectionFactory factory = new ConnectionFactory();
-
//设置rabbilemMq服务器的地址 默认为localhost
-
factory.setHost("192.168.135.156");
-
//设置rabbitMQ的端口号 默认5672
-
factory.setPort(5672);
-
//设置账号和密码 默认guest
-
factory.setUsername("guest");
-
factory.setPassword("guest");
-
//设置虚拟主机名 默认 /
-
factory.setVirtualHost("/");
-
-
//获取连接通道
-
Connection connection = factory.newConnection();
-
//获取channel信道
-
Channel channel = connection.createChannel();
-
//创建队列
-
/**
-
* 如果该队列名不存在则自动创建,存在则不创建
-
* String queue ,队列名
-
* boolean durable ,是否持久化
-
* boolean exclusive (独占)声明队列同一时间只能包含一个连接,且该队列只有这一个被连接使用。
-
* boolean autoDelte ,是否自动删除
-
* Map<String,Object>arguments
-
*/
-
channel.queueDeclare("work_queue",true,false,false,null);
-
-
/**
-
* String exchange ,把消息发给哪个交换机--简单模式没有交换机“”
-
* String routingKey ,消息绑定的路由key 如果为简单模式 默认写为队列名称
-
* BasicProperties props, 消息的属性
-
* byte[] body: 消息的内容
-
*/
-
for (int i=0;i<=10;i ){
-
String msg = "{code:2000,name:张三,age:18}" i;
-
channel.basicPublish("","work_queue",null,msg.getBytes());
-
}
-
-
-
connection.close();
-
}
-
}
代码--消费01:
-
package com.wt.work;
-
-
import com.rabbitmq.client.*;
-
-
import java.io.IOException;
-
-
/**
-
* @Author wt
-
* @Date 2022/9/19 21:22
-
* @PackageName:com.wt.simple
-
* @ClassName: Customer
-
* @Description: worker-----消费者01
-
* @Version 1.0
-
*/
-
public class CustomerWork01 {
-
public static void main(String[] args) throws Exception{
-
ConnectionFactory factory = new ConnectionFactory();
-
//设置rabbilemMq服务器的地址 默认为localhost
-
factory.setHost("192.168.135.156");
-
//设置rabbitMQ的端口号 默认5672
-
factory.setPort(5672);
-
//设置账号和密码 默认guest
-
factory.setUsername("guest");
-
factory.setPassword("guest");
-
//设置虚拟主机名 默认 /
-
factory.setVirtualHost("/");
-
-
//获取连接通道
-
Connection connection = factory.newConnection();
-
//获取channel信道
-
Channel channel = connection.createChannel();
-
-
//监听队列
-
/**
-
* String queue 监听的队列名称
-
* autoAck :是否自动确认消息
-
* Consumer callback: 监听到消息后触发的回调函数
-
*/
-
-
DefaultConsumer callback = new DefaultConsumer(channel){
-
//一旦有消息就会触发该方法
-
//body:表示消息的内容
-
-
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
-
System.out.println("接收的消息内容:" new String(body));
-
}
-
};
-
-
channel.basicConsume("work_queue",true,callback);
-
-
}
-
}
代码--消费者02--和上面的消费者相同。
可以先提前开启两个消费者,然后在开启创造者,观察消息都被哪个消费者消费了,是否有重复消费
2.4 public/Subscribe发布订阅模式
public模式和worker模式的区别是在worker的基础上新增加了一个交换机x,生产者传输消息给交换机,交换机再将相应的信息发送给两个队列(两个队列接收的信息相同),两个队列分别对应的消费者
p: producter 生产者
x:exchange交换机
Q: 队列
C1和C2:消费者
生产者--代码:
-
package com.wt.publish;
-
-
import com.rabbitmq.client.BuiltinExchangeType;
-
import com.rabbitmq.client.Channel;
-
import com.rabbitmq.client.Connection;
-
import com.rabbitmq.client.ConnectionFactory;
-
-
import java.io.IOException;
-
import java.util.concurrent.TimeoutException;
-
-
/**
-
* @Author wt
-
* @Date 2022/9/19 20:31
-
* @PackageName:com.wt.service
-
* @ClassName: Test01
-
* @Description: public
-
* @Version 1.0
-
*/
-
public class PublishTest {
-
public static void main(String[] args) throws IOException, TimeoutException {
-
ConnectionFactory factory = new ConnectionFactory();
-
//设置rabbilemMq服务器的地址 默认为localhost
-
factory.setHost("192.168.135.156");
-
//设置rabbitMQ的端口号 默认5672
-
factory.setPort(5672);
-
//设置账号和密码 默认guest
-
factory.setUsername("guest");
-
factory.setPassword("guest");
-
//设置虚拟主机名 默认 /
-
factory.setVirtualHost("/");
-
-
//获取连接通道
-
Connection connection = factory.newConnection();
-
//获取channel信道
-
Channel channel = connection.createChannel();
-
//创建队列
-
/**
-
* 如果该队列名不存在则自动创建,存在则不创建
-
* String queue ,队列名
-
* boolean durable ,是否持久化
-
* boolean exclusive (独占)声明队列同一时间只能包含一个连接,且该队列只有这一个被连接使用。
-
* boolean autoDelte ,是否自动删除
-
* Map<String,Object>arguments
-
*/
-
channel.queueDeclare("publish_queue01",true,false,false,null);
-
channel.queueDeclare("publish_queue02",true,false,false,null);
-
-
//创建交换机
-
/**
-
* String exchange,交换机的名称
-
* BuiltinExchangeType type,交换机的种类
-
* boolean durable:是否持久化
-
*/
-
channel.exchangeDeclare("publish_queue", BuiltinExchangeType.FANOUT,true);
-
-
//交换机和队列绑定
-
channel.queueBind("publish_queue01","publish_queue","");
-
channel.queueBind("publish_queue02","publish_queue","");
-
-
-
-
/**
-
* String exchange ,把消息发给哪个交换机--简单模式没有交换机“”
-
* String routingKey ,消息绑定的路由key 如果为简单模式 默认写为队列名称
-
* BasicProperties props, 消息的属性
-
* byte[] body: 消息的内容
-
*/
-
for (int i=0;i<=10;i ){
-
String msg = "{code:2000,name:张三,age:18}" i;
-
channel.basicPublish("publish_queue","",null,msg.getBytes());
-
}
-
-
connection.close();
-
}
-
}
消费者--01
-
package com.wt.publish;
-
-
import com.rabbitmq.client.*;
-
-
import java.io.IOException;
-
-
/**
-
* @Author wt
-
* @Date 2022/9/19 21:22
-
* @PackageName:com.wt.simple
-
* @ClassName: Customer
-
* @Description: 消费者01---队列public01
-
* @Version 1.0
-
*/
-
public class CustomerPublish01 {
-
public static void main(String[] args) throws Exception{
-
ConnectionFactory factory = new ConnectionFactory();
-
//设置rabbilemMq服务器的地址 默认为localhost
-
factory.setHost("192.168.135.156");
-
//设置rabbitMQ的端口号 默认5672
-
factory.setPort(5672);
-
//设置账号和密码 默认guest
-
factory.setUsername("guest");
-
factory.setPassword("guest");
-
//设置虚拟主机名 默认 /
-
factory.setVirtualHost("/");
-
-
//获取连接通道
-
Connection connection = factory.newConnection();
-
//获取channel信道
-
Channel channel = connection.createChannel();
-
-
//监听队列
-
/**
-
* String queue 监听的队列名称
-
* autoAck :是否自动确认消息
-
* Consumer callback: 监听到消息后触发的回调函数
-
*/
-
-
DefaultConsumer callback = new DefaultConsumer(channel){
-
//一旦有消息就会触发该方法
-
//body:表示消息的内容
-
-
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
-
System.out.println("接收的消息内容:" new String(body));
-
}
-
};
-
-
channel.basicConsume("publish_queue01",true,callback);
-
-
}
-
}
消费者02:
-
package com.wt.publish;
-
-
import com.rabbitmq.client.*;
-
-
import java.io.IOException;
-
-
/**
-
* @Author wt
-
* @Date 2022/9/19 21:22
-
* @PackageName:com.wt.simple
-
* @ClassName: Customer
-
* @Description: 消费者02---队列public02
-
* @Version 1.0
-
*/
-
public class CustomerPublish02 {
-
public static void main(String[] args) throws Exception{
-
ConnectionFactory factory = new ConnectionFactory();
-
//设置rabbilemMq服务器的地址 默认为localhost
-
factory.setHost("192.168.135.156");
-
//设置rabbitMQ的端口号 默认5672
-
factory.setPort(5672);
-
//设置账号和密码 默认guest
-
factory.setUsername("guest");
-
factory.setPassword("guest");
-
//设置虚拟主机名 默认 /
-
factory.setVirtualHost("/");
-
-
//获取连接通道
-
Connection connection = factory.newConnection();
-
//获取channel信道
-
Channel channel = connection.createChannel();
-
-
//监听队列
-
/**
-
* String queue 监听的队列名称
-
* autoAck :是否自动确认消息
-
* Consumer callback: 监听到消息后触发的回调函数
-
*/
-
-
DefaultConsumer callback = new DefaultConsumer(channel){
-
//一旦有消息就会触发该方法
-
//body:表示消息的内容
-
-
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
-
System.out.println("接收的消息内容:" new String(body));
-
}
-
};
-
-
channel.basicConsume("publish_queue02",true,callback);
-
-
}
-
}
2.5 router路由模式
router和public的区别是在原先的基础上增加了路由,交换机通过路由来判断将消息发送给哪个队列,而不是像public一样两个队列都会接收到一模一样的消息,只有满足路由的队列才会收到消息
p:生产者
x: 交换机---Direct (路由模式)
c1和c2表示消费者:
Q:队列
生产者--代码:
-
package com.wt.router;
-
-
import com.rabbitmq.client.BuiltinExchangeType;
-
import com.rabbitmq.client.Channel;
-
import com.rabbitmq.client.Connection;
-
import com.rabbitmq.client.ConnectionFactory;
-
-
import java.io.IOException;
-
import java.util.concurrent.TimeoutException;
-
-
/**
-
* @Author wt
-
* @Date 2022/9/20 14:33
-
* @PackageName:com.wt.router
-
* @ClassName: RouterTest
-
* @Description: router路由模式
-
* @Version 1.0
-
*/
-
public class RouterTest {
-
public static void main(String[] args) throws IOException, TimeoutException {
-
ConnectionFactory factory = new ConnectionFactory();
-
//设置rabbilemMq服务器的地址 默认为localhost
-
factory.setHost("192.168.135.156");
-
//设置rabbitMQ的端口号 默认5672
-
factory.setPort(5672);
-
//设置账号和密码 默认guest
-
factory.setUsername("guest");
-
factory.setPassword("guest");
-
//设置虚拟主机名 默认 /
-
factory.setVirtualHost("/");
-
-
//获取连接通道
-
Connection connection = factory.newConnection();
-
//获取channel信道
-
Channel channel = connection.createChannel();
-
//创建队列
-
/**
-
* 如果该队列名不存在则自动创建,存在则不创建
-
* String queue ,队列名
-
* boolean durable ,是否持久化
-
* boolean exclusive (独占)声明队列同一时间只能包含一个连接,且该队列只有这一个被连接使用。
-
* boolean autoDelte ,是否自动删除
-
* Map<String,Object>arguments
-
*/
-
channel.queueDeclare("router_queue01",true,false,false,null);
-
channel.queueDeclare("router_queue02",true,false,false,null);
-
-
//创建交换机
-
/**
-
* String exchange,交换机的名称
-
* BuiltinExchangeType type,交换机的种类
-
* boolean durable:是否持久化
-
*/
-
channel.exchangeDeclare("router_queue", BuiltinExchangeType.DIRECT,true);
-
-
//交换机和队列绑定
-
/**
-
* s:String queue,队列名 s1:String exchange,交换机名 s2:String routerkey 路由key
-
* 如果为发布订阅模式(public)则无需有路由key
-
*/
-
channel.queueBind("router_queue01","router_queue","error");
-
-
channel.queueBind("router_queue02","router_queue","error");
-
channel.queueBind("router_queue02","router_queue","info");
-
channel.queueBind("router_queue02","router_queue","warning");
-
-
-
-
/**
-
* String exchange ,把消息发给哪个交换机--简单模式没有交换机“”
-
* String routingKey ,消息绑定的路由key 如果为简单模式 默认写为队列名称
-
* BasicProperties props, 消息的属性
-
* byte[] body: 消息的内容
-
*/
-
String msg = "{code:2000,name:张三,age:18}";
-
channel.basicPublish("router_queue","info",null,msg.getBytes());
-
//channel.basicPublish("router_queue","lazy.orange.ss",null,msg.getBytes());
-
-
-
connection.close();
-
}
-
}
消费者01-代码:
-
package com.wt.router;
-
-
import com.rabbitmq.client.*;
-
-
import java.io.IOException;
-
-
/**
-
* @Author wt
-
* @Date 2022/9/19 21:22
-
* @PackageName:com.wt.simple
-
* @ClassName: Customer
-
* @Description: router路由模式 ---消费者01
-
* @Version 1.0
-
*/
-
public class CustomerRouter01 {
-
public static void main(String[] args) throws Exception{
-
ConnectionFactory factory = new ConnectionFactory();
-
//设置rabbilemMq服务器的地址 默认为localhost
-
factory.setHost("192.168.135.156");
-
//设置rabbitMQ的端口号 默认5672
-
factory.setPort(5672);
-
//设置账号和密码 默认guest
-
factory.setUsername("guest");
-
factory.setPassword("guest");
-
//设置虚拟主机名 默认 /
-
factory.setVirtualHost("/");
-
-
//获取连接通道
-
Connection connection = factory.newConnection();
-
//获取channel信道
-
Channel channel = connection.createChannel();
-
-
//监听队列
-
/**
-
* String queue 监听的队列名称
-
* autoAck :是否自动确认消息
-
* Consumer callback: 监听到消息后触发的回调函数
-
*/
-
-
DefaultConsumer callback = new DefaultConsumer(channel){
-
//一旦有消息就会触发该方法
-
//body:表示消息的内容
-
-
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
-
System.out.println("接收的消息内容:" new String(body));
-
}
-
};
-
-
channel.basicConsume("router_queue01",true,callback);
-
-
}
-
}
消费者02:
-
package com.wt.router;
-
-
import com.rabbitmq.client.*;
-
-
import java.io.IOException;
-
-
/**
-
* @Author wt
-
* @Date 2022/9/19 21:22
-
* @PackageName:com.wt.simple
-
* @ClassName: Customer
-
* @Description: router路由模式 ---消费者01
-
* @Version 1.0
-
*/
-
public class CustomerRouter02 {
-
public static void main(String[] args) throws Exception{
-
ConnectionFactory factory = new ConnectionFactory();
-
//设置rabbilemMq服务器的地址 默认为localhost
-
factory.setHost("192.168.135.156");
-
//设置rabbitMQ的端口号 默认5672
-
factory.setPort(5672);
-
//设置账号和密码 默认guest
-
factory.setUsername("guest");
-
factory.setPassword("guest");
-
//设置虚拟主机名 默认 /
-
factory.setVirtualHost("/");
-
-
//获取连接通道
-
Connection connection = factory.newConnection();
-
//获取channel信道
-
Channel channel = connection.createChannel();
-
-
//监听队列
-
/**
-
* String queue 监听的队列名称
-
* autoAck :是否自动确认消息
-
* Consumer callback: 监听到消息后触发的回调函数
-
*/
-
-
DefaultConsumer callback = new DefaultConsumer(channel){
-
//一旦有消息就会触发该方法
-
//body:表示消息的内容
-
-
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
-
System.out.println("接收的消息内容:" new String(body));
-
}
-
};
-
-
channel.basicConsume("router_queue02",true,callback);
-
-
}
-
}
2.6 主题模式--topic
topic模式和router模式的区别是将路由有指定内容变成了通配符
*: 统配一个单词
: 统配零个或者n个单词
生产者代码
-
package com.wt.topic;
-
-
import com.rabbitmq.client.BuiltinExchangeType;
-
import com.rabbitmq.client.Channel;
-
import com.rabbitmq.client.Connection;
-
import com.rabbitmq.client.ConnectionFactory;
-
-
import java.io.IOException;
-
import java.util.concurrent.TimeoutException;
-
-
/**
-
* @Author wt
-
* @Date 2022/9/20 14:33
-
* @PackageName:com.wt.router
-
* @ClassName: RouterTest
-
* @Description: topic模式
-
* @Version 1.0
-
*/
-
public class TopicTest {
-
public static void main(String[] args) throws IOException, TimeoutException {
-
ConnectionFactory factory = new ConnectionFactory();
-
//设置rabbilemMq服务器的地址 默认为localhost
-
factory.setHost("192.168.135.156");
-
//设置rabbitMQ的端口号 默认5672
-
factory.setPort(5672);
-
//设置账号和密码 默认guest
-
factory.setUsername("guest");
-
factory.setPassword("guest");
-
//设置虚拟主机名 默认 /
-
factory.setVirtualHost("/");
-
-
//获取连接通道
-
Connection connection = factory.newConnection();
-
//获取channel信道
-
Channel channel = connection.createChannel();
-
//创建队列
-
/**
-
* 如果该队列名不存在则自动创建,存在则不创建
-
* String queue ,队列名
-
* boolean durable ,是否持久化
-
* boolean exclusive (独占)声明队列同一时间只能包含一个连接,且该队列只有这一个被连接使用。
-
* boolean autoDelte ,是否自动删除
-
* Map<String,Object>arguments: 其它参数
-
*/
-
channel.queueDeclare("topic_queue01",true,false,false,null);
-
channel.queueDeclare("topic_queue02",true,false,false,null);
-
-
-
//创建交换机
-
/**
-
* String exchange,交换机的名称
-
* BuiltinExchangeType type,交换机的种类
-
* boolean durable:是否持久化
-
*/
-
channel.exchangeDeclare("topic_queue", BuiltinExchangeType.TOPIC,true);
-
-
//交换机和队列绑定
-
/**
-
* String queue,队列名 String exchange,交换机名 String routerkey 路由key 如果为发布订阅模式则
-
* 无需有路由key
-
*/
-
channel.queueBind("topic_queue01","topic_queue","*.orange.*");
-
-
channel.queueBind("topic_queue02","topic_queue","*.*.rabbit");
-
channel.queueBind("topic_queue02","topic_queue","lazy.#");
-
-
-
-
-
//发送消息到队列
-
/**
-
* String exchange ,把消息发给哪个交换机--简单模式没有交换机“”
-
* String routingKey ,消息绑定的路由key 如果为简单模式 默认写为队列名称
-
* BasicProperties props, 消息的属性
-
* byte[] body: 消息的内容
-
*/
-
String msg = "{code:2000,name:张三,age:18}";
-
channel.basicPublish("topic_queue","lazy.orange.rabbit",null,msg.getBytes());
-
//channel.basicPublish("router_queue","lazy.orange.ss",null,msg.getBytes());
-
-
-
connection.close();
-
}
-
}
消费者01:
-
package com.wt.topic;
-
-
import com.rabbitmq.client.*;
-
-
import java.io.IOException;
-
-
/**
-
* @Author wt
-
* @Date 2022/9/19 21:22
-
* @PackageName:com.wt.simple
-
* @ClassName: Customer
-
* @Description: topic模式-------消费者01
-
* @Version 1.0
-
*/
-
public class CustomerTopic01 {
-
public static void main(String[] args) throws Exception{
-
ConnectionFactory factory = new ConnectionFactory();
-
//设置rabbilemMq服务器的地址 默认为localhost
-
factory.setHost("192.168.135.156");
-
//设置rabbitMQ的端口号 默认5672
-
factory.setPort(5672);
-
//设置账号和密码 默认guest
-
factory.setUsername("guest");
-
factory.setPassword("guest");
-
//设置虚拟主机名 默认 /
-
factory.setVirtualHost("/");
-
-
//获取连接通道
-
Connection connection = factory.newConnection();
-
//获取channel信道
-
Channel channel = connection.createChannel();
-
-
//监听队列
-
/**
-
* String queue 监听的队列名称
-
* autoAck :是否自动确认消息
-
* Consumer callback: 监听到消息后触发的回调函数
-
*/
-
-
DefaultConsumer callback = new DefaultConsumer(channel){
-
//一旦有消息就会触发该方法
-
//body:表示消息的内容
-
-
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
-
System.out.println("接收的消息内容:" new String(body));
-
}
-
};
-
-
channel.basicConsume("topic_queue01",true,callback);
-
-
}
-
}
消费者02:
-
package com.wt.topic;
-
-
import com.rabbitmq.client.*;
-
-
import java.io.IOException;
-
-
/**
-
* @Author wt
-
* @Date 2022/9/19 21:22
-
* @PackageName:com.wt.simple
-
* @ClassName: Customer
-
* @Description: TODO
-
* @Version 1.0
-
*/
-
public class CustomerTopic02 {
-
public static void main(String[] args) throws Exception{
-
ConnectionFactory factory = new ConnectionFactory();
-
//设置rabbilemMq服务器的地址 默认为localhost
-
factory.setHost("192.168.135.156");
-
//设置rabbitMQ的端口号 默认5672
-
factory.setPort(5672);
-
//设置账号和密码 默认guest
-
factory.setUsername("guest");
-
factory.setPassword("guest");
-
//设置虚拟主机名 默认 /
-
factory.setVirtualHost("/");
-
-
//获取连接通道
-
Connection connection = factory.newConnection();
-
//获取channel信道
-
Channel channel = connection.createChannel();
-
-
//监听队列
-
/**
-
* String queue 监听的队列名称
-
* autoAck :是否自动确认消息
-
* Consumer callback: 监听到消息后触发的回调函数
-
*/
-
-
DefaultConsumer callback = new DefaultConsumer(channel){
-
//一旦有消息就会触发该方法
-
//body:表示消息的内容
-
-
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
-
System.out.println("接收的消息内容:" new String(body));
-
}
-
};
-
-
channel.basicConsume("topic_queue02",true,callback);
-
-
}
-
}
3. springboot整合rabbitMQ
3.1 准备工作
3.1.1 创建如下的springboot项目
3.1.2 添加依赖
-
<dependency>
-
<groupId>org.springframework.boot</groupId>
-
<artifactId>spring-boot-starter-amqp</artifactId>
-
</dependency>
3.1.3 添加配置文件application.properties
生产者微服务和消费者微服务都要添加
-
server.port=9999
-
-
#rabbitMQ的配置
-
spring.rabbitmq.host=192.168.135.156
-
spring.rabbitmq.port=5672
-
#账号密码和host默认为以下配置,如果没有修改可以不写
-
#spring.rabbitmq.username=guest
-
#spring.rabbitmq.password=guest
-
#spring.rabbitmq.virtual-host=/
3.2 springboot的测试
队列我们这次直接在rabbitmq 的图形化界面创建 ,并且给队列创建exchange交换机
创建普通队列test01和test02,创建方式入下图
创建交换机testX
点击testX进入创建好的交换机testX并配置相关内容
3.2.1 测试开始
使用product工具类发送消息到队列
-
-
public class ProductTest {
-
//springboot集成了rabbitMQ 提供了一个工具类 ,该类封装了消息的发送
-
-
private RabbitTemplate rabbitTemplate;
-
-
/**
-
* 给交换机为testX,路由为a的队列发送消息hello springboot
-
*/
-
-
public void test01(){
-
rabbitTemplate.convertAndSend("testX","a","hello springboot");
-
}
-
}
消费者
-
package com.wt.rabbitmq;
-
-
import com.rabbitmq.client.Channel;
-
import org.springframework.amqp.core.Message;
-
import org.springframework.amqp.rabbit.annotation.RabbitListener;
-
import org.springframework.stereotype.Component;
-
-
-
public class MyListener {
-
-
-
/**
-
* @RabbitListener:队列监听,queues:队列名
-
*/
-
-
public void test(Map<String,Object> msg){
-
System.out.println(msg);
-
//运行相关的业务处理
-
}
-
}
3.3 如何确保消息的可靠性
首先确定消息可能在哪些位置丢失---不同的位置可以有不同的解决方案。
3.3.1 保证消息从生产者到交换机
1. comfirm确认机制
该模式必须在生产者的application.properties配置文件中开启手动确认机制
#开启确认机制 spring.rabbitmq.publisher-confirm-type=correlated
-
//保证消息从生产者到交换机
-
//测试确认机制
-
-
/**
-
* 1,手动开启确认机制spring.rabbitmq.publisher-confirm-type=correlated
-
* 2.为rabbitTemplate设置确认回调函数
-
*/
-
-
public void testConfirm(){
-
//为rabbitTemplate设置确认回调函数
-
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){
-
//不管是否到大交换机,都会触发该方法
-
-
public void confirm(CorrelationData correlationData, boolean b, String s) {
-
System.out.println("b~~~~~~~~~~~" b);
-
if (b==false){
-
System.out.println("消息发送失败,开启回滚");
-
}
-
}
-
});
-
//故意设置一个不存在的交换机
-
rabbitTemplate.convertAndSend("testX2","a","Heollo Simple2");
-
}
3.3.2 保证消息可以从交换机到队列
returning机制: 如果消息无法到达队列,则会触发returning机制。如果能从交换机到队列则不会触发returning机制。
默认rabbitMQ不开启该机制。
该模式必须在生产者的application.properties配置文件中开启手动returning机制
#开启returning机制 spring.rabbitmq.publisher-returns=true
-
**
-
* 1.开启returning机制
-
* 2.为rabbitTemplate设置returning回调函数
-
*/
-
-
public void testReturning(){
-
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
-
//该方法只有从交换机到队列失败时才会触发
-
-
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
-
System.out.println("~~~~~~~~~~~~~~~~~~~~");
-
}
-
});
-
//故意写一个不存在的队列测试是否能执行
-
rabbitTemplate.convertAndSend("testX","a","Hello Springboot3");
-
}
3.3.3 如何保证消息在队列
队列持久化--->
搭建rabbitmq集群--保证高可用
3.3.4 消费者可靠的消费消息
在消费者的配置文件中修改为手动确认模式
#开启消息确认auto :自动确认 manual:手动确认 none:不确认 spring.rabbitmq.listener.simple.acknowledge-mode=manual
当业务处理完毕后在确认消息给队列让其删除该消息
-
-
public void test02(Message message, Channel channel){
-
byte[] body = message.getBody();
-
long deliveryTag = message.getMessageProperties().getDeliveryTag();
-
System.out.println("接收到的消息" new String(body));
-
-
try{
-
System.out.println("处理业务代码");
-
//可以故意抛出一个异常来测试是否会继续发送
-
// int i = 10/0;
-
System.out.println("业务处理完毕");
-
//创建手动确认---队列会把消息溢出
-
/**
-
* long deliveryTag 消息的标记
-
* boolean multiple: 是否把之前没有确认的消息也确认掉
-
*/
-
//手动确认,需要在配置文件中进行开启
-
channel.basicAck(deliveryTag,true);
-
}catch (Exception e){
-
//出现异常让队列再发一次
-
//boolean requeue: true继续发给我 false还是直接丢掉
-
try {
-
channel.basicNack(deliveryTag,true,true);
-
-
}catch (Exception exception){
-
e.printStackTrace();
-
}
-
}
-
-
}
如何保证消息的可靠性。
设置confirm和returning机制
设置队列和交互机的持久化
搭建rabbitMQ服务集群
消费者改为手动确认机制。
3.4 如何限制消费者消费消息的条数
设置消费者消费消息的条数
消费者端必须为手动确认模式。
在消费者的配置文件中修改每次拉取消息的条数
#设置消息限流---消费者一次最多消费的消息个数 spring.rabbitmq.listener.simple.prefetch=3
按照上述创建队列的方式,创建test03、test04队列,并将队列加入到testX交换机中,并设置test03 的路由为d ,test04的路由为e
生产者:
-
//测试限制消费者消费的个数
-
-
public void testMessage(){
-
for (int i = 0; i <10; i ) {
-
rabbitTemplate.convertAndSend("testX","d","Hello Word00" i);
-
}
-
}
消费者:
-
/**
-
* 限制消费者消费的个数
-
*
-
* 设置限制个数,需要在配置文件中进行配置
-
* 这里测试时没有进行消息确认,否则会因为消息确认太快而无法看出限流
-
* 这里测试的是没次通过3个,在3个消息确认之后才会接着发送其他请求
-
*/
-
-
public void test03 (Message message, Channel channel) throws IOException {
-
byte[] body = message.getBody();
-
System.out.println("消息的内容:" new String(body));
-
//消息确认
-
// long deliveryTag = message.getMessageProperties().getDeliveryTag();
-
// channel.basicAck(deliveryTag,true);
-
}
3.5 设置过期时间
TTL:time to live
可以为整个队列设置也可以单独为某条信息设置
在rabbitmq图形化界面创建设置过期时间的队列test04
队列创建完成后将该队列加入到交换机exchange内,并设置路由为e
3.5.1 创建队列时为队列设置过期时间
生产者:
-
//测试设置过期时间
-
-
/**
-
* 为队列设置过期时间
-
* 设置休眠时间,判断数据过期是所有数据都被删除,还是哪个数据进入队列给这个数据进行计时,到期自动删除这个数据
-
*/
-
-
public void test(){
-
for (int i = 0; i <10; i ) {
-
if (i<5){
-
rabbitTemplate.convertAndSend("testX","e","Hello Word00" i);
-
}else {
-
try{
-
Thread.sleep(6000);
-
}catch (Exception e){
-
e.printStackTrace();
-
}
-
rabbitTemplate.convertAndSend("testX","e","Hello Word00" i);
-
}
-
-
}
-
}
消费者:
-
-
public void test04 (Message message, Channel channel) throws IOException {
-
byte[] body = message.getBody();
-
System.out.println("消息的内容:" new String(body));
-
//消息确认
-
// long deliveryTag = message.getMessageProperties().getDeliveryTag();
-
// channel.basicAck(deliveryTag,true);
-
}
3.5.2 单独为消息设置过期时间
-
/*
-
单独为一条信息设置过期时间
-
*/
-
-
public void test00(){
-
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
-
-
public Message postProcessMessage(Message message) throws AmqpException {
-
message.getMessageProperties().setExpiration("10000");
-
return message;
-
}
-
};
-
rabbitTemplate.convertAndSend("testX","d","Hello Word0",messagePostProcessor);
-
-
}
如何保证消息的可靠性
TTL过期
限定消费消息的条数
3.6 创建死信队列
3.6.1 准备工作
在rabbitmq中创建创建普通队列和死信队列
死信队列连接连接的是普通交换机、普通队列连接的是死信交换机
创建死信队列
创建普通队列
创建死信队列连接的普通交换机
创建普通队列连接的死信交换机
生产者
我们的队列设置了消息上限和超时时间,并且没有设置消息确认,这次一共发送十条,有五条会等待普通队列确认,剩下的五条会进入死信交换机,若普通队列的五条超过20秒,这5条消息也会进入死信队列
-
/**
-
* 死信队列
-
*/
-
-
public void testSx(){
-
for (int i = 0; i < 10; i ) {
-
rabbitTemplate.convertAndSend("pt_exchange","dead","Hello Word~~~~~~~" i);
-
}
-
}
3.7 延迟队列
这里的判断订单状态 是因为 如果支付系统第29分分钟去支付,支付的比较慢,最后在第31分钟支付成功了。消息30分钟加入死信队列执行库存回滚,就会出错。
3.8 如何防止消息被重复消费
3.9 rabbitMQ的常见面试题
1. 如何防止消息被重复消费
2.如何保证消息的可靠性
3.rabbitMQ消息积压过多
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhggjjgf
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
怎样阻止微信小程序自动打开
PHP中文网 06-13