ABP: 重新理解RabbitMQ,ABP使用第三方事件总线
2022/10/21 更新
前提概念:消息队列(英语:Message queue)是一种进程间通信或同一进程的不同线程间的通信方式,软件的贮列用来处理一系列的输入,通常是来自用户。
想一想,自己写的接口是不是async的?多线程的?简单来说,发个eventbus就要用到消息队列。
首先得了解两个概念:
1.领域事件
为服务内发生的事情,比如打个日志给自己看看
2.集成事件
微服务之间发生的事情,比如A服务打了个日志给B服务看看 📖
(有一说一还是要理解了本质再去看待事物,之前对这些东西理解太浅显了)
当我们面临集成事件的时候,就需要一个第三方的消息服务来给我们当个跑腿的。
so:
称之为事件总线,是不是很熟悉,和ABP 的EventBus一个作用,代理 分发。或者更简单的理解,就是RXJS的subject 😊
1、集成事件是服务器间的通信,所以必须借助于第三方服务器作为事件总线。常用的消息中间件有Redis、RabbitMQ、Kafka、ActiveMQ等。
2、RabbitMQ的基本概念:
1)信道(Channel):信道是消息的生产者、消费者和服务器进行通信的虚拟连接。TCP连接的建立是非常消耗资源的,所以RabbitMQ在TCP连接的基础上构建了虚拟的信道。我们尽量重复使用TCP连接,而信道则是可以用完了就关闭。
2)队列(Queue):用来进行消息收发的地方,生产者把消息放到队列中,消费者从队列中获取数据。
3)交换机(exchange):把消息路由到一个或者多个队列中。
RabbitMQ的routing模式
生产者把消息发布到交换机中,消息携带一个routingKey属性,交换机会根据routingKey的值把消息发送到一个或者多个队列;消费者会从队列中获取消息;交换机和队列都位于RabbitMQ服务器内部。优点:即使消费者不在线,消费者相关的消息也会被保存到队列中,当消费者上线之后,消费者就可以获取到离线期间错过的消息。
红色部分很容易理解,实际公司项目中经常遇见其他微服务掉链子的情况,等重新上线了,消息不能丢失,等事件总线继续推送过来就好了。
落地到ABP
1、安装RabbitMQ服务器。
2、分别创建发送消息的项目和接收消息的控制台项目,这两个项目都安装NuGet包RabbitMQ.Client。
首先 发送者
-
var factory = new ConnectionFactory();
-
factory.HostName = "127.0.0.1";//RabbitMQ服务器地址
-
factory.DispatchConsumersAsync = true;// 设置异步发消息
-
string exchangeName = "exchange1";//交换机的名字
-
string eventName = "myEvent";// routingKey的值
-
using var conn = factory.CreateConnection();
-
while(true)
-
{
-
string msg = DateTime.Now.TimeOfDay.ToString();//待发送消息
-
using (var channel = conn.CreateModel())//创建信道
-
{
-
var properties = channel.CreateBasicProperties();
-
properties.DeliveryMode = 2;
-
channel.ExchangeDeclare(exchange: exchangeName, type: "direct");//声明交换机
-
byte[] body = Encoding.UTF8.GetBytes(msg);
-
channel.BasicPublish(exchange: exchangeName,routingKey: eventName,
-
mandatory: true,basicProperties: properties,body: body);//发布消息
-
}
-
Console.WriteLine("发布了消息:" msg);
-
Thread.Sleep(1000);
然后 消费者
-
var factory = new ConnectionFactory();
-
factory.HostName = "127.0.0.1";
-
factory.DispatchConsumersAsync = true;
-
string exchangeName = "exchange1";
-
string eventName = "myEvent";
-
using var conn = factory.CreateConnection();
-
using var channel = conn.CreateModel();
-
string queueName = "queue1";
-
channel.ExchangeDeclare(exchange: exchangeName,type: "direct");
-
channel.QueueDeclare(queue: queueName,durable: true,
-
exclusive: false,autoDelete: false,arguments: null);
-
channel.QueueBind(queue: queueName,
-
exchange: exchangeName,routingKey: eventName);
-
var consumer = new AsyncEventingBasicConsumer(channel);
-
consumer.Received = Consumer_Received;
-
channel.BasicConsume(queue: queueName, autoAck: false,consumer: consumer);
-
Console.ReadLine();
-
async Task Consumer_Received(object sender, BasicDeliverEventArgs args)
-
{
-
try
-
{
-
var bytes = args.Body.ToArray();
-
string msg = Encoding.UTF8.GetString(bytes);
-
Console.WriteLine(DateTime.Now "收到了消息" msg);
-
channel.BasicAck(args.DeliveryTag, multiple: false);
-
await Task.Delay(800);
-
}
-
catch (Exception ex)
-
{
-
channel.BasicReject(args.DeliveryTag, true);//失败重发
-
Console.WriteLine("处理收到的消息出错" ex);
-
}
-
}
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhggjjfk
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
怎样阻止微信小程序自动打开
PHP中文网 06-13