• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

ABP: 重新理解RabbitMQ,ABP使用第三方事件总线

武飞扬头像
董厂长
帮助1

2022/10/21 更新

前提概念:消息队列(英语:Message queue)是一种进程间通信或同一进程的不同线程间通信方式,软件的贮列用来处理一系列的输入,通常是来自用户。

想一想,自己写的接口是不是async的?多线程的?简单来说,发个eventbus就要用到消息队列。

首先得了解两个概念:

1.领域事件

为服务内发生的事情,比如打个日志给自己看看

2.集成事件

微服务之间发生的事情,比如A服务打了个日志给B服务看看 📖

(有一说一还是要理解了本质再去看待事物,之前对这些东西理解太浅显了)

当我们面临集成事件的时候,就需要一个第三方的消息服务来给我们当个跑腿的。

so:

称之为事件总线,是不是很熟悉,和ABP 的EventBus一个作用,代理 分发。或者更简单的理解,就是RXJS的subject 😊

1、集成事件是服务器间的通信,所以必须借助于第三方服务器作为事件总线。常用的消息中间件有Redis、RabbitMQ、Kafka、ActiveMQ等。

2、RabbitMQ的基本概念:

1)信道(Channel):信道是消息的生产者、消费者和服务器进行通信的虚拟连接。TCP连接的建立是非常消耗资源的,所以RabbitMQ在TCP连接的基础上构建了虚拟的信道。我们尽量重复使用TCP连接,而信道则是可以用完了就关闭。

2)队列(Queue):用来进行消息收发的地方,生产者把消息放到队列中,消费者从队列中获取数据。

3)交换机(exchange):把消息路由到一个或者多个队列中。

RabbitMQ的routing模式 

学新通

生产者把消息发布到交换机中,消息携带一个routingKey属性,交换机会根据routingKey的值把消息发送到一个或者多个队列;消费者会从队列中获取消息;交换机和队列都位于RabbitMQ服务器内部。优点:即使消费者不在线,消费者相关的消息也会被保存到队列中,当消费者上线之后,消费者就可以获取到离线期间错过的消息。

红色部分很容易理解,实际公司项目中经常遇见其他微服务掉链子的情况,等重新上线了,消息不能丢失,等事件总线继续推送过来就好了。

落地到ABP

1、安装RabbitMQ服务器。

2、分别创建发送消息的项目和接收消息的控制台项目,这两个项目都安装NuGet包RabbitMQ.Client。

首先 发送者

  1.  
    var factory = new ConnectionFactory();
  2.  
    factory.HostName = "127.0.0.1";//RabbitMQ服务器地址
  3.  
    factory.DispatchConsumersAsync = true;// 设置异步发消息
  4.  
    string exchangeName = "exchange1";//交换机的名字
  5.  
    string eventName = "myEvent";// routingKey的值
  6.  
    using var conn = factory.CreateConnection();
  7.  
    while(true)
  8.  
    {
  9.  
    string msg = DateTime.Now.TimeOfDay.ToString();//待发送消息
  10.  
    using (var channel = conn.CreateModel())//创建信道
  11.  
    {
  12.  
    var properties = channel.CreateBasicProperties();
  13.  
    properties.DeliveryMode = 2;
  14.  
    channel.ExchangeDeclare(exchange: exchangeName, type: "direct");//声明交换机
  15.  
    byte[] body = Encoding.UTF8.GetBytes(msg);
  16.  
    channel.BasicPublish(exchange: exchangeName,routingKey: eventName,
  17.  
    mandatory: true,basicProperties: properties,body: body);//发布消息
  18.  
    }
  19.  
    Console.WriteLine("发布了消息:" msg);
  20.  
    Thread.Sleep(1000);
学新通

然后 消费者 

  1.  
    var factory = new ConnectionFactory();
  2.  
    factory.HostName = "127.0.0.1";
  3.  
    factory.DispatchConsumersAsync = true;
  4.  
    string exchangeName = "exchange1";
  5.  
    string eventName = "myEvent";
  6.  
    using var conn = factory.CreateConnection();
  7.  
    using var channel = conn.CreateModel();
  8.  
    string queueName = "queue1";
  9.  
    channel.ExchangeDeclare(exchange: exchangeName,type: "direct");
  10.  
    channel.QueueDeclare(queue: queueName,durable: true,
  11.  
    exclusive: false,autoDelete: false,arguments: null);
  12.  
    channel.QueueBind(queue: queueName,
  13.  
    exchange: exchangeName,routingKey: eventName);
  1.  
    var consumer = new AsyncEventingBasicConsumer(channel);
  2.  
    consumer.Received = Consumer_Received;
  3.  
    channel.BasicConsume(queue: queueName, autoAck: false,consumer: consumer);
  4.  
    Console.ReadLine();
  5.  
    async Task Consumer_Received(object sender, BasicDeliverEventArgs args)
  6.  
    {
  7.  
    try
  8.  
    {
  9.  
    var bytes = args.Body.ToArray();
  10.  
    string msg = Encoding.UTF8.GetString(bytes);
  11.  
    Console.WriteLine(DateTime.Now "收到了消息" msg);
  12.  
    channel.BasicAck(args.DeliveryTag, multiple: false);
  13.  
    await Task.Delay(800);
  14.  
    }
  15.  
    catch (Exception ex)
  16.  
    {
  17.  
    channel.BasicReject(args.DeliveryTag, true);//失败重发
  18.  
    Console.WriteLine("处理收到的消息出错" ex);
  19.  
    }
  20.  
    }
学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhggjjfk
系列文章
更多 icon
同类精品
更多 icon
继续加载