.NetCore 使用 RabbitMQ (交换机/队列/消息持久化+mq高级特性+死信队列+延迟队列)
目录
一、安装mq
1、我的环境是使用VMware安装的Centos7系统。MQ部署在docker上面
2、创建公共项目Commons用于提供者和消费者引用,nuget安装 RabbitMQ.Client,添加一个帮助类:
public class RabbitMQHelper
{//连接mq
public static IConnection GetMQConnection()
{
var factory = new ConnectionFactory
{
HostName = "127.0.0.1", //mq的ip(我自己虚拟机上的)
Port = 5672, //端口
UserName = "guoyingjian", //账户
Password = "guoyingjian", //密码
VirtualHost = "/" //虚拟机
};
return factory.CreateConnection(); //返回连接
}
}
二、实操
rabbitmq消息队列有几种模式:
1、简单模式
一个提供者,一个消费者,是有序的,消费者只有一个,吞吐量低,工作基本不用,用来学习了解一下还是可以的
2、工作模式
根据队列名发消息,但有多个消费者,无序的,吞吐量高,1和2工作中基本不用,因为他们没有使用自定义交换机,练练手明白就行了。
生产者代码:
using RabbitMQ.Client;
/// <summary>
/// MQ 工作队列模式发消息
/// </summary>
/// <returns></returns>
public void SendWorkerMQ()
{
//最基础的是点对点的队列模式,他的优势是有序的,//下面这个工作队列是无序的
#region 工作队列模式
string queueName = "WorkQueue";
using (var connection = RabbitMQHelper.GetMQConnection())
{
//创建通信管道
using (var channel = connection.CreateModel())
{
//创建队列
channel.QueueDeclare(queueName, false, false, false, null);
for (int i = 1; i <= 30; i )
{
string message = "hello mq" i;
var body = Encoding.UTF8.GetBytes(message);
//发送消息到mq,如没绑定交换机,将使用默认交换机路由
channel.BasicPublish(exchange: "", routingKey: queueName, null, body);
Console.WriteLine("send normal message" i);
}
}
}
#endregion
}
消费者代码:
//工作队列接收消息(多个消费者,默认轮循)
public static void ReceiveMessage()
{
string queueName = "WorkQueue";//队列名称与提供者一致
var connection = RabbitMQHelper.GetMQConnection();//创建管道
var channel = connection.CreateModel();
channel.QueueDeclare(queueName, false, false, false, null);
var consumer = new EventingBasicConsumer(channel);//prefetchCount:1意思是当前worker在当前消息未消费确认时,不会再往这个worker中再次发送(可以根据不通服务器负载能力来分配)
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);//消息处理的事件
consumer.Received = (model, ea) =>
{
//业务逻辑处理
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine($"normal received => {message}");
};//消费消息
channel.BasicConsume(queueName, true, consumer);}
下面是工作中使用交换机的4种模式:
3、fanout扇形模式(发布订阅)
该类型通常叫作广播类型。fanout类型的Exchange不处理Routing key,而是会将发送给Exchange的消息,路由到所有与它绑定的Queue上。比如现在有一个fanout类型的Exchange,它下面绑定了三个Queue,Routing key分别是ORDER/GOODS/STOCK:
然后向该Exchange中发送一条消息,消息的Routing key随便填一个值abc(不填也行),如果这个Exchange的路由与这三个Queue绑定,则三个Queue都应该会收到消息
生产者代码:
/// <summary>
/// MQ 扇形交换机模式发消息
/// </summary>
/// <returns></returns>
[HttpGet("SendFanoutWorkerMQ")]
public void SendFanoutWorkerMQ()
{
#region 使用扇形交换机模式
using (var connection = RabbitMQHelper.GetMQConnection())
{
//创建通信管道
using (var channel = connection.CreateModel())
{
string exchangeName = "fanout_exchange";//fanout只提供交换机名称即可var properties = channel.CreateBasicProperties();
properties.Persistent = true; //设置数据的持久化,保证mq服务挂掉之后消息的安全性for (int i = 1; i <= 10; i )
{
var body = Encoding.UTF8.GetBytes("hello mq" i);
//这里绑定了交换机,那么就会发送到这个交换机所有绑定过的队列中
channel.BasicPublish(exchange: exchangeName, routingKey: "", properties, body);
}
}
}
#endregion
}
消费者代码:
/// <summary>
/// 扇形模式队列消费消息
/// 注意先启动一次消费端会创建交换机、队列、绑定。如果不启动则消息丢失。也可以在生产端做这些创建和绑定
/// </summary>
public static void FanoutReceiveMessage()
{var connection = RabbitMQHelper.GetMQConnection();
//创建管道
var channel = connection.CreateModel();//创建交换机
channel.ExchangeDeclare(exchange: "fanout_exchange", type: "fanout");
//创建队列
string queueName1 = "fanoutWorkQueue1";//队列名称
string queueName2 = "fanoutWorkQueue2";
string queueName3 = "fanoutWorkQueue3";
channel.QueueDeclare(queue: queueName1,//队列名
durable: false,//是否持久化
exclusive: false,//排它性
autoDelete: false,//一旦客户端连接断开则自动删除queue
arguments: null);//如果安装了队列优先级插件则可以设置优先级
channel.QueueDeclare(queueName2, false, false, false, null);
channel.QueueDeclare(queueName3, false, false, false, null);//多个队列绑定到fanout_exchange交换机(似发布订阅)
channel.QueueBind(queue: queueName1, exchange: "fanout_exchange", routingKey: "");
channel.QueueBind(queue: queueName2, exchange: "fanout_exchange", routingKey: "");
channel.QueueBind(queue: queueName3, exchange: "fanout_exchange", routingKey: "");//声明消费者
var consumer = new EventingBasicConsumer(channel);//对消费端进行限流:
//首先第一步,我们既然要使用消费端限流,我们需要关闭自动 ack,将 autoAck 设置为 channel.basicConsume(queueName, false, consumer);
//第二步我们来设置具体的限流大小以及数量。channel.basicQos(0, 15, false);
//第三步在消费者的 handleDelivery 消费方法中手动 ack,并且设置批量处理 ack 回应为 channel.basicAck(envelope.getDeliveryTag(), true);
//prefetchCount:1意思是当前worker在当前消息未消费确认时,不会再往这个worker中再次发送(可以根据不通服务器负载能力来分配)
//channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);//消费者处理的事件
consumer.Received = (model, ea) =>
{
//业务逻辑处理
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine($"normal received => {message}");
};//消费消息
channel.BasicConsume(queueName2, //队列名
autoAck: true, //确认消费(删除)
consumer: consumer);}
4、direct路由模式也叫定向模式
direct类型的Exchange会将消息转发到指定Routing key的Queue上,Routing key的解析规则为精确匹配。也就是只有当producer发送的消息的Routing key与消费端的某个Routing key相等时,消息才会被分发到对应的Queue上。比如现在有一个direct类型的Exchange,它下面绑定了三个Queue,Routing key分别是ORDER/GOODS/STOCK:
然后向该Exchange中发送一条消息,消息的Routing key是ORDER,那只有Routing key是ORDER的队列有一条消息。(与fanout区别:fanout根据已绑定的交换机的队列发送消息。direct当然也得绑定交换机,只不过再精确匹配到routingkey相等的队列发送消息)
生产者代码:
/// <summary>
/// MQ 直接交换机模式发消息(指定routingKey发送)
/// </summary>
/// <returns></returns>
[HttpGet("SendDirectWorkerMQ")]public void SendDirectWorkerMQ()
{
#region 使用直接交换机模式
using (var connection = RabbitMQHelper.GetMQConnection())
{
//创建通信管道
using (var channel = connection.CreateModel())
{
//direct只提供交换机名称和routingkey即可,消费端只消费routingkey相匹配的
string exchangeName = "direct_exchange";var properties = channel.CreateBasicProperties();
properties.Persistent = true; //设置数据的持久化,保证mq服务挂掉之后消息的安全性for (int i = 1; i <= 10; i )
{
var body = Encoding.UTF8.GetBytes("hello mq" i "yellow");
//这里绑定了交换机,同时绑定了routekey,就会发送到routekey是yellow的队列中
channel.BasicPublish(exchange: exchangeName, routingKey: "yellow", properties, body);}
}
}
#endregion
}
消费者代码:
/// <summary>
/// 直接模式队列消费消息
/// 注意先启动一次消费端会创建交换机、队列、绑定。如果不启动则消息丢失。也可以在生产端做这些创建和绑定
/// </summary>
public static void DirectReceiveMessage()
{
var connection = RabbitMQHelper.GetMQConnection();
//创建管道
var channel = connection.CreateModel();//创建交换机
channel.ExchangeDeclare(exchange: "direct_exchange", type: "direct");
//创建队列
string queueName1 = "directWorkQueue1";//队列名称
string queueName2 = "directWorkQueue2";
string queueName3 = "directWorkQueue3";
channel.QueueDeclare(queue: queueName1,//队列名
durable: false,//是否持久化
exclusive: false,//排它性
autoDelete: false,//一旦客户端连接断开则自动删除queue
arguments: null);//如果安装了队列优先级插件则可以设置优先级
channel.QueueDeclare(queueName2, false, false, false, null);
channel.QueueDeclare(queueName3, false, false, false, null);//多个队列绑定到fanout_exchange交换机(似发布订阅)
channel.QueueBind(queue: queueName1, exchange: "direct_exchange", routingKey: "red");
channel.QueueBind(queue: queueName2, exchange: "direct_exchange", routingKey: "yellow");
channel.QueueBind(queue: queueName3, exchange: "direct_exchange", routingKey: "green");//声明消费者
var consumer = new EventingBasicConsumer(channel);//对消费端进行限流:
//首先第一步,我们既然要使用消费端限流,我们需要关闭自动 ack,将 autoAck 设置为 falsechannel.basicConsume(queueName, false, consumer);
//第二步我们来设置具体的限流大小以及数量。channel.basicQos(0, 15, false);
//第三步在消费者的 handleDelivery 消费方法中手动 ack,并且设置批量处理 ack 回应为 truechannel.basicAck(envelope.getDeliveryTag(), true);
//prefetchCount:1意思是当前worker在当前消息未消费确认时,不会再往这个worker中再次发送(可以根据不通服务器负载能力来分配)
//channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);//消费者处理的事件
consumer.Received = (model, ea) =>
{
//业务逻辑处理
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine($"normal received => {message},routingkey:{ea.RoutingKey}");//消费完成后需要手动手动签收消息,如果不写该代码就容易导致重复消费问题
//可以降低每次签收性能损耗。参数multiple:false就是单个手动签收,true就是批量签收,比如消费30条消息后再确认签收
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};//消息的签收模式
//手动签收:保证正确消费,不会丢消息(基于客户端而已)
//自动签收:容易丟失消息
channel.BasicConsume(queueName1, //消费队列2的消息
autoAck: false, //代表要手动签收,因可能会出现确认签收了然后宕机了导致没有执行事件,造成消息丢失。解决方案:手动签收操作写在了队列事件完成后。
consumer: consumer);}
5、topic主题模式也叫通配符模式(路由模式的一种)
根据通配符模糊匹配,将消息交给符合routing pattern(路由模式)的队列。
它与direct相比,都是可以根据routingkey把消息路由到不同的队列。只不过topic类型exchange可以让队列在绑定routingkey的时候使用通配符。
routingkey一般都是有一个或多个单词组成,多个单词以“.”分割,例如:“item.insert”。通配符匹配规则“#”可以匹配一个或多个单词,“*”只能匹配1个单词,例如:“item.#”可以匹配“item.insert.asd”或者“item.insert”,“item.*”只能匹配到“item.insert”。
生产者代码:
public void SendTopicWorkerMQ()
{
#region 使用topic交换机模式
using (var connection = RabbitMQHelper.GetMQConnection())
{
//创建通信管道
using (var channel = connection.CreateModel())
{
//topic只提供交换机名称和routingkey即可,消费端只消费与routingkey通配符匹配的
string exchangeName = "topic_exchange";
string routingKey1 = "user.america";
string routingKey2 = "user.china";
string routingKey3 = "user.china.beijing";
string routingKey4 = "user.china.beijing.changping";var properties = channel.CreateBasicProperties();
properties.Persistent = true; //设置数据的持久化,保证mq服务挂掉之后消息的安全性
for (int i = 1; i <= 10; i )
{
var body = Encoding.UTF8.GetBytes("hello mq" i "topic");
//传4个不同的routingkey过去,消费者会根据通配符匹配并消费(好像不能在生产者写通配符)
channel.BasicPublish(exchange: exchangeName, routingKey: routingKey1, properties , body);
channel.BasicPublish(exchange: exchangeName, routingKey: routingKey2, properties , body);
channel.BasicPublish(exchange: exchangeName, routingKey: routingKey3, properties , body);
channel.BasicPublish(exchange: exchangeName, routingKey: routingKey4, properties , body);
}
}
}
#endregion
}
消费者代码:
/// <summary>
/// 主题模式队列消费消息
/// 注意先启动一次消费端会创建交换机、队列、绑定。如果不启动则消息丢失。也可以在生产端做这些创建和绑定
/// </summary>
public static void TopicReceiveMessage()
{
var connection = RabbitMQHelper.GetMQConnection();
//创建管道
var channel = connection.CreateModel();//创建交换机
channel.ExchangeDeclare(exchange: "topic_exchange", type: "topic");
string exchangeName = "topic_exchange";
//创建队列
string queueName1 = "topicWorkQueue1";//队列名称
string queueName2 = "topicWorkQueue2";
string queueName3 = "topicWorkQueue3";
channel.QueueDeclare(queue: queueName1,//队列名
durable: false,//是否持久化
exclusive: false,//排它性
autoDelete: false,//一旦客户端连接断开则自动删除queue
arguments: null);//如果安装了队列优先级插件则可以设置优先级
channel.QueueDeclare(queueName2, false, false, false, null);
channel.QueueDeclare(queueName3, false, false, false, null);//多个队列绑定到fanout_exchange交换机
channel.QueueBind(queue: queueName1, exchange: exchangeName, routingKey: "user.*.*");//匹配例如:user.a.b
channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: "user.*"); //匹配例如:user.a
channel.QueueBind(queue: queueName3, exchange: exchangeName, routingKey: "user.#"); //匹配例如:user...... (user. 后面是啥都行)//声明消费者
var consumer = new EventingBasicConsumer(channel);//对消费端进行限流:
//首先第一步,我们既然要使用消费端限流,我们需要关闭自动 ack,将 autoAck 设置为 channel.basicConsume(queueName, false, consumer);
//第二步我们来设置具体的限流大小以及数量。channel.basicQos(0, 15, false);
//第三步在消费者的 handleDelivery 消费方法中手动 ack,并且设置批量处理 ack 回应为 channel.basicAck(envelope.getDeliveryTag(), true);
//prefetchCount:1意思是当前worker在当前消息未消费确认时,不会再往这个worker中再次发送(可以根据不通服务器负载能力来分配)
//channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);//消费者处理的事件
consumer.Received = (model, ea) =>
{
//业务逻辑处理
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine($"normal received => {message},routingkey:{ea.RoutingKey}");//消费完成后需要手动手动签收消息,如果不写该代码就容易导致重复消费问题
//可以降低每次签收性能损耗。参数multiple:false就是单个手动签收,true就是批量签收,比如消费30条消息后再确认签收
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};//消息的签收模式
//手动签收:保证正确消费,不会丢消息(基于客户端而已)
//自动签收:容易丟失消息
channel.BasicConsume(queueName2, //消费队列2的消息(可以手动替换其他队列消费)
autoAck: false, //代表要手动签收,因可能会出现确认签收了然后宕机了导致没有执行事件,造成消息丢失。解决方案:手动签收操作写在了队列事件完成后。
consumer: consumer);}
6、header 参数匹配模式
header模式与routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(键值对)匹配队列,Headers 类型的交换器性能会很差,所以这种类型不常用。
以上注意:Exchange(交换机):只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息将会丢失!!
7、延时队列(插件方式实现)
实现延迟队列有两种方法
1、TTL DLX需要创建死信交换机绑定队列,需创建多个交换机多个队列,复杂麻烦所以不推荐。
2、推荐使用rabbitmq_delayed_message_exchange 插件实现,下面来实现一下:
①插件下载网址:Releases · rabbitmq/rabbitmq-delayed-message-exchange · GitHub
注意:一定要下载与自己mq版本同一个版本号,不然下面开启插件会报错
比如我的mq版本是 3.9.11的,那我就下载rabbitmq_delayed_message_exchange-3.9.0.ez 即可
下载之后想办法上传到linux上。
或者直接在linux上面下载:
#linux下载插件的命令:(注:选择属于自己的版本号,f12指针查看按钮链接获取下载地址)
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.9.0/rabbitmq_delayed_message_exchange-3.9.0.ez
②下载完毕后,使用命令复制到docker内rabbitmq容器的plugins文件夹下
查看mq的位置: whereis rabbitmq
查看运行中的容器命令:
docker ps
docker的复制命令:
docker cp rabbitmq_delayed_message_exchange-3.9.0.ez 0e6e229cc6f2:/plugins(0e6e229cc6f2是容器id,plugins是mq容器内的文件夹)
③然后进入容器
docker exec -it 0e6e229cc6f2 bash
退出容器:
exit
或者按Ctrl P Q进行退出容器
④进入之后直接进入plugins文件夹下看看复制进去没有?
cd /plugins
ls
记住只能有一个 rabbitmq_delayed_message_exchange-3.9.0.ez文件,不能放多个版本否则报错
⑤启用rabbitmq_delayed_message_exchange插件
开启插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
启用之后可以看到mq可视化页面,交换机类型多了一个 x-delayed-message
⑥然后就是代码实现:
生产者代码:
public void SendDelayedWorkerMQ()
{
#region 使用延时交换机模式
using (var connection = RabbitMQHelper.GetMQConnection())
{
//创建通信管道
using (var channel = connection.CreateModel())
{
string exchangeName = "delayed_exchange";//delayed需提供交换机名称
string queueName = "delay_WorkQueue";#region 消费端做交换机和队列的创建和绑定
#endregion
var properties = channel.CreateBasicProperties();
properties.Persistent = true; //设置数据的持久化,保证mq服务挂掉之后消息的安全性
//properties.Priority = 9;//消息的优先级 值越大 优先级越高 0~9
//延时时间从header赋值
Dictionary<string, object> headers = new Dictionary<string, object>();
headers.Add("x-delay", 10000);
properties.Headers = headers;var body = Encoding.UTF8.GetBytes("生产者发送时间:" DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"));
//发送延时消息
channel.BasicPublish(exchange: exchangeName, routingKey: queueName, properties, body);
}
}
#endregion
}
消费者代码:
/// <summary>
/// 延迟交换机模式队列消费消息
/// 注意先启动一次消费端会创建交换机、队列、绑定。如果不启动则消息丢失。也可以在生产端做这些创建和绑定
/// </summary>
public static void DelayedReceiveMessage()
{var connection = RabbitMQHelper.GetMQConnection();
//创建管道
var channel = connection.CreateModel();
string queueName = "delay_WorkQueue";//队列名称
string exchangeName = "delayed_exchange";//队列名称Dictionary<string, object> args = new Dictionary<string, object>();
args.Add("x-delayed-type", "direct"); //x-delayed-type必须加(这个创建的是交换机类型)//创建交换机
channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", durable: true, autoDelete: false, arguments: args);
//创建队列channel.QueueDeclare(queue: queueName,//队列名
durable: true,//队列是否持久化
exclusive: false,//是否为单消费者队列,为True时,只能由单一消费者消费
autoDelete: false,//是否自动删除队列,当消费者全部断开时,队列自动删除
arguments: null);//高级特性//多个队列绑定到delayed_exchange交换机(似发布订阅)
channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: queueName);//声明消费者
var consumer = new EventingBasicConsumer(channel);//对消费端进行限流:
//首先第一步,我们既然要使用消费端限流,我们需要关闭自动 ack,将 autoAck 设置为 channel.basicConsume(queueName, false, consumer);
//第二步我们来设置具体的限流大小以及数量。channel.basicQos(0, 15, false);
//第三步在消费者的 handleDelivery 消费方法中手动 ack,并且设置批量处理 ack 回应为 channel.basicAck(envelope.getDeliveryTag(), true);
//prefetchCount:1意思是当前worker在当前消息未消费确认时,不会再往这个worker中再次发送(可以根据不通服务器负载能力来分配)
//channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);//消费者处理的事件
consumer.Received = (model, ea) =>
{
//业务逻辑处理
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine(message);
Console.WriteLine($"消费者消费时间:" DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"));//消费完成后需要手动手动签收消息,如果不写该代码就容易导致重复消费问题
//可以降低每次签收性能损耗。参数multiple:false就是单个手动签收,true就是批量签收,比如消费30条消息后再确认签收
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};//消费消息
channel.BasicConsume(queueName, //队列名
autoAck: false, //确认消费
consumer: consumer);
}
最终启动程序可以看到消费者端输出的结果计算精准的相差10秒!
参考资料:
持久化、优先级、高级特性:
NetCore RabbitMQ高级特性 持久化 及 消息优先级 - 天才卧龙 - 博客园
延迟和死信队列:https://www.jb51.net/article/221796.htm
NetCore RabbitMQ 高级特性 消息存活周期TTL、死信交换机/死信对列DLX,延迟队列,及幂等性的保障 - 天才卧龙 - 博客园
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhfeiiba
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
怎样阻止微信小程序自动打开
PHP中文网 06-13 -
photoshop蒙版画笔没反应怎么办
PHP中文网 06-24