• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

读懂RabbitMQ消息队列

武飞扬头像
zhoupenghui168
帮助1

一.什么是消息队列

1.简介

在介绍消息队列之前,应该先了解什么是 AMQP(Advanced Message Queuing Protocol, 高级消息队列协议,点击查看)
消息(Message)是指在应用间 传送的数据,消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象;而 消息队列(Message Queue)是一种 应用间通信方式,消息发送后可以 立即返回,由 消息系统来确保消息的 可靠传递消息发布者只管把消息发布到 MQ 中而不用管谁来取, 消息使用者只管从 MQ 中取消息而不管是谁发布的,这样发布者和使用者都不用知道对方的存在,它是典型的 生产者-消费者模型,生产者不断向消息队列生产消息,消费者不断的从队列获取消息。因为消息的生产和消费都是 异步的,并且只关心消息的发送和接收,没有业务逻辑的浸入,这样就实现了生产者和消费者的 解耦

2.总结

(1).消息队列是队列结构的 中间件
(2).消息发送后,不需要立即处理,而是由消息系统来处理
(3).消息处理是 消息使用者(消费者) 按顺序处理的

3.结构图

学新通

二.为什么要使用消息队列

消息队列是一种应用间的异步协作机制,是分布式系统中的重要的组件,主要目的是为了解决应用 藕合异步消息流城削锋, 冗余,扩展性,排序保证等问题,实现 高性能高并发可伸缩最终一致性架构,下面举例说明
  1. 业务解耦

以常见的订单系统为例,用户点击【下单】按钮之后的业务逻辑可能包括:扣减库存、生成相应单据、发红包、发短信通知,商品配送等业务;在业务发展初期这些逻辑可能放在一起同步执行,随着业务的发展订单量增长,需要提升系统服务的性能,这时可以将一些不需要立即生效的操作拆分出来异步执行,或者单独拆分出来作为一个独立的系统,比如生成相应单据为订单系统,扣减库存为库存系统,发放红包独立为红包系统、发短信通知为短信系统,商品配送为配送系统等。这种场景下就可以用 MQ ,在下单的主流程(比如扣减库存、生成相应单据)完成之后发送一条消息到 MQ 让主流程快速完结,而由另外的单独线程拉取MQ的消息(或者由 MQ 推送消息),当发现 MQ 中有发红包或发短信或商品配送之类的消息时,执行相应的业务系统逻辑,这样各个业务系统相互独立,就很方便进行分离部署,防止某一系统故障引起的连锁故障

学新通
  1. 流量削峰

流量削峰一般在秒杀或者团抢活动中广泛使用

(1).由来

主要是还是来自于互联网的业务场景,例如:春节火车票抢购,大量的用户需要同一时间去抢购;以及双11秒杀, 短时间上亿的用户涌入,瞬间流量巨大(高并发),比如:200万人准备在凌晨12:00准备抢购一件商品,但是商品的数量缺是有限的100-500件左右。这样真实能购买到该件商品的用户也只有几百人左右, 但是从业务上来说,秒杀活动是希望更多的人来参与,也就是抢购之前希望有越来越多的人来看购买商品。但是,在抢购时间达到后,用户开始真正下单时,秒杀的服务器后端却不希望同时有几百万人同时发起抢购请求。因为服务器的处理资源是有限的,所以出现峰值的时候,很容易导致服务器宕机,用户无法访问的情况出现。这就好比出行的时候存在早高峰和晚高峰的问题,为了解决这个问题,出行就有了错峰限行的解决方案。同理,在线上的秒杀等业务场景,也需要类似的解决方案,需要平安度过同时抢购带来的流量峰值的问题,这就是流量削峰的由来。

学新通

(2).怎样来实现流量削峰方案

削峰从本质上来说就是更多地延缓用户请求,以及层层过滤用户的访问需求,遵从“最后落地到数据库的请求数要尽量少”的原则。

1).消息队列解决削峰

要对流量进行削峰,最容易想到的解决方案就是用消息队列来缓冲瞬时流量,把同步的直接调用转换成异步的间接推送,中间通过一个队列在一端承接瞬时的流量洪峰,在另一端平滑地将消息推送出去。

学新通

消息队列中间件主要解决应用耦合,异步消息, 流量削锋等问题;常用消息队列系统:目前在生产环境,使用较多的消息队列有 ActiveMQ、RabbitMQ、 ZeroMQ、Kafka、MetaMQ、RocketMQ 等。

在这里,消息队列就像“水库”一样,拦蓄上游的洪水,削减进入下游河道的洪峰流量,从而达到减免洪水灾害的目的。

2).流量削峰漏斗:层层削峰

针对秒杀场景还有一种方法,就是对请求进行分层过滤,从而过滤掉一些无效的请求。分层过滤其实就是采用“漏斗”式设计来处理请求的,如下图所示:

学新通
这样就像漏斗一样,尽量把数据量和请求量一层一层地过滤和减少了
I.分层过滤的核心思想
  • 通过在不同的层次尽可能地过滤掉无效请求

  • 通过CDN过滤掉大量的图片,静态资源的请求

  • 再通过类似Redis这样的分布式缓存,过滤请求等就是典型的在上游拦截读请求

II.分层过滤的基本原则
  • 对写数据进行基于时间合理分片,过滤掉过期的失效请求

  • 对写请求做限流保护,将超出系统承载能力的请求过滤掉

  • 涉及到的读数据不做强一致性校验,减少因为一致性校验产生瓶颈的问题

  • 对写数据进行强一致性校验,只保留最后有效的数据

最终,让“漏斗”最末端(数据库)的才是有效请求,例如:当用户真实达到订单和支付的流程,这个是需要数据强一致性的。

(3).总结

1).对于秒杀这样的高并发场景业务,最基本的原则就是将请求拦截在系统上游,降低下游压力。如果不在前端拦截很可能造成数据库(mysql、oracle等)读写锁冲突,甚至导致死锁,最终还有可能出现雪崩等场景。

2).划分好动静资源,静态资源使用CDN进行服务分发

3).充分利用缓存(redis等),增加QPS,从而加大整个集群的吞吐量

4).高峰值流量是压垮系统很重要的原因,所以需要RabbitMQ等消息队列在一端承接瞬时的流量洪峰,在另一端平滑地将消息推送出去

  1. 异步处理

用户注册后,需要发送注册邮件和注册短信
学新通

三.RabbitMQ介绍

RabbitMQ是一个由 erlang语言开发的,实现了AMQP协议的标准的开源消息代理和队列服务器( 消息队列中间件)

1.常见的消息队列中间件

学新通

2.RabbitMQ特性

  • 可靠性(Reliability)
    RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认

  • 灵活的路由(Flexible Routing)
    在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange

  • 消息集群(Clustering)
    多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker

  • 高可用(Highly Available Queues)
    队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用

  • 多种协议(Multi-protocol)
    RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等

  • 多语言客户端(Many Clients)
    RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby ,PHP等

  • 管理界面(Management UI)
    RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面

  • 跟踪机制(Tracing)
    如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么

  • 插件机制(Plugin System)
    RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件

3.RabbitMQ工作原理

内部实际上也是 AMQP 中的基本概念
学新通

上图各个模块的说明:

  • Message: 消息,消息是不具名的,它由消息头消息体组成,消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)priority(相对于其他消息的优先权)delivery-mode(指出该消息可能需要持久性存储)

  • Publisher: 消息的生产者,也是一个向交换器发布消息客户端应用程序

  • Consumer:消息的消费者,表示一个从消息队列中取得消息客户端应用程序

  • Broker: 接收和分发消息的应用,表示消息队列服务器实体,RabbitMQ Server就是Message Broker

  • Virtual host: 虚拟主机(共享相同的身份认证和加密环境的独立服务器域),表示一批交换器、消息队列和相关对象,类似于mysql的数据库,当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange、queue等.每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制,vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 /

  • Connection: publisher、consumer和broker之间的网络连接,比如:TCP连接,断开连接的操作只会在client端进行,Broker不会断开连接,除非出现网络故障broker服务出现问题

  • Channel: 管道,多路复用连接中的一条独立的双向数据流通道,信道是建立在真实的TCP连接内虚拟连接(逻辑连接),如果应用程序支持多线程,通常每个多线程创建单独的channel进行通讯, 因为AMQP 方法中包含了channel id帮助客户端和broker识别channel,所以channel之间是完全隔离的,AMQP 命令都是通过管道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过管道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低,所以引入了管道的概念,目的是为了减少操作系统建立TCP 连接的开销,以复用一条 TCP 连接

  • Exchange: 交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列,message到达broker的第一站,根据分发规则,匹配查询表中的路由键(routing key),分发消息到queue中去,常用的类型有:直连交换机-direct (point-to-point), 主题交换机-topic (publish-subscribe),扇型交换机-fanout (multicast), 头交换机-headers(amq.match (and amq.headers in RabbitMQ))

  • Queue: 消息队列,用来保存消息直到发送给消费者,它是消息的容器,也是消息的终点,一个消息可投入一个或多个队列,消息最终被送到这里等待消费者连接到这个队列将其取走

  • Binding: 绑定,消息队列(queue)和交换器(exchange)之间的虚拟连接, binding中可以包含routing key, Binding信息被保存到exchange中的查询表中,用于message的分发依据,一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表

四.RabbitMQ安装与启动

linux下,docker-compose 安装nignx,php,mysql,redis,rabbitmq,mongo

端口说明:安装完rabbitmq后,有几个常见端口:
4369: epmd(Erlang Port Mapper Daemon),erlang服务端口
5672: client端通信端口
15672:http Api客户端,管理UI(仅在启用了管理插件的情况下使用)
25672:用于节点间通信(Erlang分发服务器端口)

五.RabbitMQ几个重要特性概念讲解

  • 队列模式-简单队列模式,工作队列模式

  • ACK&NACK消费确认机制&重回队列机制

  • 消息持久化

  • 公平调度(限流机制)

  • 幂等性

  • return机制

  • 消息的可靠性投递

下面是RabbitMQ和消息所涉及到的一些 术语
  • 生产(Producing)的意思就是发送:发送消息的程序就是一个生产者(producer),一般用"P"来表示:

学新通
  • 队列(queue)就是存在于RabbitMQ中邮箱的名称:虽然消息的传输经过了RabbitMQ和你的应用程序,但是它只能被存储于队列当中。实质上队列就是个巨大的消息缓冲区,它的大小只受主机内存和硬盘限制。多个生产者(producers)可以把消息发送给同一个队列,同样,多个消费者(consumers)也能够从同一个队列(queue)中获取数据。队列可以绘制成这样(图上是队列的名称):

学新通
  • 消费(Consuming)和接收(receiving)是同一个意思:一个消费者(consumer)就是一个等待获取消息的程序。把它绘制为"C":

学新通
以php框架yii2为参照
  1. 简单队列模式(simple queue)

发送 单个消息的生产者,以及 接收消息并将其 打印出来的消费者。将忽略RabbitMQ API中的一些细节。 在下图中,“P”是生产者,“C”是消费者。中间的框是一个队列(保存消息的地方)
学新通

(1).生产者发布消息步骤

  1.  
    use PhpAmqpLib\Connection\AMQPStreamConnection;
  2.  
    use PhpAmqpLib\Message\AMQPMessage;
  3.  
     
  4.  
    // 创建连接
  5.  
    $connection = new AMQPStreamConnection($host, $port, $user, $pass, $v_host="/");
  6.  
     
  7.  
    // 创建channel
  8.  
    $channel = $connection->channel();
  9.  
     
  10.  
    // 初始化队列,并持久化(声明队列)
  11.  
    $channel->queue_declare($queue_name, false, true, false, false);
  12.  
     
  13.  
    //消息内容
  14.  
    $data = "this is message2";
  15.  
    // 声明消息,并持久化(创建消息)
  16.  
    $mes = new AMQPMessage($data, ["delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
  17.  
     
  18.  
    // 把消息推到队列里(发布消息)
  19.  
    $channel->basic_publish($mes, '', $queue_name);
  20.  
     
  21.  
    //关闭通道和连接
  22.  
    $channel->close();
  23.  
    $connection->close();
学新通
上面声明队列方法queue_declare()参数详解
学新通

(2).消费者消费消息步骤

  1.  
    //核心代码
  2.  
    basic_consume($queue = '', $consumer_tag = '', $no_local = false,$no_ack = false,$exclusive = false,$nowait = false,$callback = null,$ticket = null,$arguments = array())
上面消费消息方法basic_consume()参数详解
学新通

(3).具体代码展示

rabbitmq配置
  1.  
    "rabbitMq" => [
  2.  
    "base" => [
  3.  
    'host' => '192.168.0.5', // host地址
  4.  
    'port' => 5672, // 端口
  5.  
    "user" => "user", // 账户
  6.  
    'pass' => 123456, // 密码
  7.  
    "v_host" => "order", // 对应Virtual Hosts
  8.  
    ],
  9.  
    "queue_name" => [
  10.  
    "name1" => "goods", // 队列名称
  11.  
    ],
  12.  
    ]
生产者代码
  1.  
    <?php
  2.  
    /**
  3.  
    * 生产者生产消息
  4.  
    */
  5.  
     
  6.  
    namespace console\controllers\simple;
  7.  
     
  8.  
    use Yii;
  9.  
    use yii\web\Controller;
  10.  
    use PhpAmqpLib\Connection\AMQPStreamConnection;
  11.  
    use PhpAmqpLib\Message\AMQPMessage;
  12.  
     
  13.  
    class PublisherController extends Controller
  14.  
    {
  15.  
    public $enableCsrfValidation=false;
  16.  
     
  17.  
    public function actionIndex()
  18.  
    {
  19.  
    //rabbitmq相关配置
  20.  
    $rabbitMqConfig = Yii::$app->params["rabbitMq"];
  21.  
    $config = $rabbitMqConfig["base"];
  22.  
    $queue_name = $rabbitMqConfig["queue_name"]["name1"]; // 队列名称
  23.  
    // 创建连接
  24.  
    $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
  25.  
    // 创建channel
  26.  
    $channel = $connection->channel();
  27.  
    // 初始化队列,并持久化
  28.  
    $channel->queue_declare($queue_name, false, true, false, false);
  29.  
    //消息
  30.  
    $data = "this is message2";
  31.  
    // 声明消息,并持久化
  32.  
    $mes = new AMQPMessage($data, ["delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
  33.  
    // 把消息推到队列里
  34.  
    $channel->basic_publish($mes, '', $queue_name);
  35.  
    //关闭通道和连接
  36.  
    $channel->close();
  37.  
    $connection->close();
  38.  
    }
  39.  
    }
学新通
消费者代码
  1.  
    <?php
  2.  
    /**
  3.  
    * 消费者消费消息
  4.  
    */
  5.  
     
  6.  
    namespace console\controllers\simple;
  7.  
     
  8.  
    use Yii;
  9.  
    use yii\web\Controller;
  10.  
    use PhpAmqpLib\Connection\AMQPStreamConnection;
  11.  
     
  12.  
    class ConsumerController extends Controller
  13.  
    {
  14.  
    public $enableCsrfValidation=false;
  15.  
    public function actionIndex()
  16.  
    {
  17.  
    //rabbitmq相关配置
  18.  
    $rabbitMqConfig = Yii::$app->params["rabbitMq"];
  19.  
    $config = $rabbitMqConfig["base"];
  20.  
    $queue_name = $rabbitMqConfig["queue_name"]["name1"]; // 队列名称
  21.  
    // 创建连接
  22.  
    $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
  23.  
    // 创建channel
  24.  
    $channel = $connection->channel();
  25.  
    // 初始化队列,并持久化
  26.  
    $channel->queue_declare($queue_name, false, true, false, false);
  27.  
    // 消费消息
  28.  
    $callback = function ($msg) {
  29.  
    echo "reviced: " . $msg->body . "\n";
  30.  
    };
  31.  
    $channel->basic_consume($queue_name, "", false, true, false, false,$callback);
  32.  
    // 监控
  33.  
    while ($channel->is_open()){
  34.  
    $channel->wait();
  35.  
    }
  36.  
    //关闭通道和连接
  37.  
    $channel->close();
  38.  
    $connection->close();
  39.  
    }
  40.  
    }
学新通
  1. 工作队列模式(worker queue)

创建一个 工作队列(Work Queue),它会发送一些 耗时的任务多个工作者(Worker),工作队列(又称: 任务队列——Task Queues)是为了 避免等待一些占用大量资源、时间的操作,当把任务(Task)当作消息发送到队列中,一个运行在后台的工作者(worker)进程就会取出任务然后处理,当运行多个工作者(workers),任务就会在它们之间 共享。这个概念在网络应用中是非常有用的,它可以在短暂的HTTP请求中处理一些复杂的任务,使用工作队列的一个 好处就是它能够 并行的处理队列。如果堆积了很多任务,只需要添加 更多的工作者(workers)就可以了,这就是所谓的 循环调度,扩展很简单
学新通

(1).具体代码展示

rabbitmq配置
  1.  
    "rabbitMq" => [
  2.  
    "base" => [
  3.  
    'host' => '192.168.0.5', // host地址
  4.  
    'port' => 5672, // 端口
  5.  
    "user" => "user", // 账户
  6.  
    'pass' => 123456, // 密码
  7.  
    "v_host" => "order", // 对应Virtual Hosts
  8.  
    ],
  9.  
    "queue_name" => [
  10.  
    "name1" => "goods", // 队列名称
  11.  
    "name2" => "task_queue", // 队列名称
  12.  
    ],
  13.  
    ]
生产者代码
  1.  
    <?php
  2.  
    /**
  3.  
    * 生产者生产消息
  4.  
    */
  5.  
     
  6.  
    namespace console\controllers\worker;
  7.  
     
  8.  
    use Yii;
  9.  
    use yii\web\Controller;
  10.  
    use PhpAmqpLib\Connection\AMQPStreamConnection;
  11.  
    use PhpAmqpLib\Message\AMQPMessage;
  12.  
     
  13.  
    class PublisherController extends Controller
  14.  
    {
  15.  
    public $enableCsrfValidation=false;
  16.  
     
  17.  
    public function actionIndex()
  18.  
    {
  19.  
    //rabbitmq相关配置
  20.  
    $rabbitMqConfig = Yii::$app->params["rabbitMq"];
  21.  
    $config = $rabbitMqConfig["base"];
  22.  
    $queue_name = $rabbitMqConfig["queue_name"]["name2"]; // 队列名称
  23.  
    // 创建连接
  24.  
    $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
  25.  
    // 创建channel
  26.  
    $channel = $connection->channel();
  27.  
    // 初始化队列,并持久化
  28.  
    $channel->queue_declare($queue_name, false, true, false, false);
  29.  
     
  30.  
    // 生产多条消息
  31.  
    for ($i = 0; $i <= 10; $i) {
  32.  
    //消息
  33.  
    $data = "this is " . $i. " message";
  34.  
    // 声明消息,并持久化
  35.  
    $mes = new AMQPMessage($data, ["delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
  36.  
    // 把消息推到队列里
  37.  
    $channel->basic_publish($mes, '', $queue_name);
  38.  
    }
  39.  
    //关闭通道和连接
  40.  
    $channel->close();
  41.  
    $connection->close();
  42.  
    }
  43.  
    }
学新通
消费者代码
当生产者生产了多条费时的消息时,一个消费者不能满足需要,可以添加多个消费者处理生产者的消息,多个消费者之间采用 轮询的方式获取队列的消息,并把该消息发送给对应的用户
比如:可以对一个队列的消息开多个消费者,这里我们开了两个消费者,里面的代码都是一致的
  1.  
    <?php
  2.  
    /**
  3.  
    * 消费者消费消息
  4.  
    */
  5.  
     
  6.  
    namespace console\controllers\worker;
  7.  
     
  8.  
    use Yii;
  9.  
    use yii\web\Controller;
  10.  
    use PhpAmqpLib\Connection\AMQPStreamConnection;
  11.  
     
  12.  
    class ConsumerController extends Controller
  13.  
    {
  14.  
    public $enableCsrfValidation=false;
  15.  
    public function actionIndex()
  16.  
    {
  17.  
    //rabbitmq相关配置
  18.  
    $rabbitMqConfig = Yii::$app->params["rabbitMq"];
  19.  
    $config = $rabbitMqConfig["base"];
  20.  
    $queue_name = $rabbitMqConfig["queue_name"]["name2"]; // 队列名称
  21.  
    // 创建连接
  22.  
    $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
  23.  
    // 创建channel
  24.  
    $channel = $connection->channel();
  25.  
    // 初始化队列,并持久化
  26.  
    $channel->queue_declare($queue_name, false, true, false, false);
  27.  
    // 消费消息
  28.  
    $callback = function ($msg) {
  29.  
    echo "reviced: " . $msg->body . "\n";
  30.  
    };
  31.  
    $channel->basic_consume($queue_name, "", false, true, false, false,$callback);
  32.  
    // 监控
  33.  
    while ($channel->is_open()){
  34.  
    $channel->wait();
  35.  
    }
  36.  
    //关闭通道和连接
  37.  
    $channel->close();
  38.  
    $connection->close();
  39.  
    }
  40.  
    }
学新通
  1.  
    <?php
  2.  
    /**
  3.  
    * 消费者2消费消息
  4.  
    */
  5.  
     
  6.  
    namespace console\controllers\worker;
  7.  
     
  8.  
    use Yii;
  9.  
    use yii\web\Controller;
  10.  
    use PhpAmqpLib\Connection\AMQPStreamConnection;
  11.  
     
  12.  
    class Consumer2Controller extends Controller
  13.  
    {
  14.  
    public $enableCsrfValidation=false;
  15.  
    public function actionIndex()
  16.  
    {
  17.  
    //rabbitmq相关配置
  18.  
    $rabbitMqConfig = Yii::$app->params["rabbitMq"];
  19.  
    $config = $rabbitMqConfig["base"];
  20.  
    $queue_name = $rabbitMqConfig["queue_name"]["name2"]; // 队列名称
  21.  
    // 创建连接
  22.  
    $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
  23.  
    // 创建channel
  24.  
    $channel = $connection->channel();
  25.  
    // 初始化队列,并持久化
  26.  
    $channel->queue_declare($queue_name, false, true, false, false);
  27.  
    // 消费消息
  28.  
    $callback = function ($msg) {
  29.  
    echo "reviced: " . $msg->body . "\n";
  30.  
    };
  31.  
    $channel->basic_consume($queue_name, "", false, true, false, false,$callback);
  32.  
    // 监控
  33.  
    while ($channel->is_open()){
  34.  
    $channel->wait();
  35.  
    }
  36.  
    //关闭通道和连接
  37.  
    $channel->close();
  38.  
    $connection->close();
  39.  
    }
  40.  
    }
学新通
  1. ACK消费确认机制&NACK&重回队列机制

ACK消费确认机制

当处理一个 比较耗时得任务的时候,想知道消费者(consumers) 运行到一半就挂掉时, 正在处理的消息/发送给当前工作者的消息会怎样,当消息在队列中 没有进行持久化操作时,消息被RabbitMQ发送给消费者(consumers)之后,马上就会在内存中 移除。这种情况,只要把一个 工作者(worker)停止,正在处理的消息就会 丢失。同时,所有发送到这个工作者的还没有处理的消息 都会丢失。所以,如果不想消息丢失,当一个工作者(worker)挂掉了,希望任务会重新发送给其他的工作者(worker),RabbitMQ就提供了 消息响应acknowledgments)。消费者会通过一个 ack(响应),告诉RabbitMQ已经收到并处理了某条消息,然后RabbitMQ就会 释放并删除这条消息。如果消费者(consumer) 挂掉了, 没有发送响应,RabbitMQ就会认为消息 没有被完全处理,然后 重新发送其他消费者(consumer)。这样,即使工作者(workers)偶尔的挂掉,也不会丢失消息。消息是没有超时这个概念的,当工作者与它断开连的时候,RabbitMQ会重新发送消息,这样在处理一个耗时非常长的消息任务的时候就不会出问题了。在该讲解中,将使用手动消息确认,通过为 no_ack参数传递 false,一旦有任务完成,使用d.ack()(false)向RabbitMQ服务器发送消费完成的确认(这个
确认消息是单次传递的)
学新通
//核心代码
basic_consume($queue = '', $consumer_tag = '', $no_local = false, $no_ack = false,$exclusive = false,$nowait = false,$callback = null,$ticket = null,$arguments = array())
// 消费消息
$callback = function ($msg) {
echo "reviced: " . $msg->body . "\n";
// 消费ack
$msg->ack();
};
// 第四个参数: 需要ack确认,这里我们在callback手动确认
$channel->basic_consume($queue_name, "", false, false, false, false,$callback);
学新通
rabbitmq配置
  1.  
    "rabbitMq" => [
  2.  
    "base" => [
  3.  
    'host' => '192.168.0.5', // host地址
  4.  
    'port' => 5672, // 端口
  5.  
    "user" => "user", // 账户
  6.  
    'pass' => 123456, // 密码
  7.  
    "v_host" => "order", // 对应Virtual Hosts
  8.  
    ],
  9.  
    "queue_name" => [
  10.  
    "name1" => "goods", // 队列名称
  11.  
    "name2" => "task_queue", // 队列名称
  12.  
    "name3" => "task_ack", // 队列名称
  13.  
    ],
  14.  
    ]
生产者代码
  1.  
    <?php
  2.  
    /**
  3.  
    * 生产者生产消息
  4.  
    */
  5.  
     
  6.  
    namespace console\controllers\ack;
  7.  
     
  8.  
    use Yii;
  9.  
    use yii\web\Controller;
  10.  
    use PhpAmqpLib\Connection\AMQPStreamConnection;
  11.  
    use PhpAmqpLib\Message\AMQPMessage;
  12.  
     
  13.  
    class PublisherController extends Controller
  14.  
    {
  15.  
    public $enableCsrfValidation=false;
  16.  
     
  17.  
    public function actionIndex()
  18.  
    {
  19.  
    //rabbitmq相关配置
  20.  
    $rabbitMqConfig = Yii::$app->params["rabbitMq"];
  21.  
    $config = $rabbitMqConfig["base"];
  22.  
    $queue_name = $rabbitMqConfig["queue_name"]["name3"]; // 队列名称
  23.  
    // 创建连接
  24.  
    $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
  25.  
    // 创建channel
  26.  
    $channel = $connection->channel();
  27.  
    // 初始化队列,并持久化
  28.  
    $channel->queue_declare($queue_name, false, true, false, false);
  29.  
     
  30.  
    // 生产多条消息
  31.  
    for ($i = 0; $i <= 10; $i) {
  32.  
    //消息
  33.  
    $data = "this is " . $i. " message";
  34.  
    // 声明消息,并持久化
  35.  
    $mes = new AMQPMessage($data, ["delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
  36.  
    // 把消息推到队列里
  37.  
    $channel->basic_publish($mes, '', $queue_name);
  38.  
    }
  39.  
    //关闭通道和连接
  40.  
    $channel->close();
  41.  
    $connection->close();
  42.  
    }
  43.  
    }
学新通
消费者代码
  1.  
    <?php
  2.  
    /**
  3.  
    * 消费者消费消息
  4.  
    */
  5.  
     
  6.  
    namespace console\controllers\ack;
  7.  
     
  8.  
    use Yii;
  9.  
    use yii\web\Controller;
  10.  
    use PhpAmqpLib\Connection\AMQPStreamConnection;
  11.  
     
  12.  
    class ConsumerController extends Controller
  13.  
    {
  14.  
    public $enableCsrfValidation=false;
  15.  
    public function actionIndex()
  16.  
    {
  17.  
    //rabbitmq相关配置
  18.  
    $rabbitMqConfig = Yii::$app->params["rabbitMq"];
  19.  
    $config = $rabbitMqConfig["base"];
  20.  
    $queue_name = $rabbitMqConfig["queue_name"]["name3"]; // 队列名称
  21.  
    // 创建连接
  22.  
    $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
  23.  
    // 创建channel
  24.  
    $channel = $connection->channel();
  25.  
    // 初始化队列,并持久化
  26.  
    $channel->queue_declare($queue_name, false, true, false, false);
  27.  
    // 消费消息
  28.  
    $callback = function ($msg) {
  29.  
    echo "reviced: " . $msg->body . "\n";
  30.  
    // 消费ack
  31.  
    $msg->ack();
  32.  
    };
  33.  
    // 第二个参数:同一时刻服务器只会发送1条消息给消费者
  34.  
    $channel->basic_qos(null, 1, null);
  35.  
    // 第四个参数: 需要ack确认,这里我们在callback手动确认
  36.  
    $channel->basic_consume($queue_name, "", false, false, false, false,$callback);
  37.  
    // 监控
  38.  
    while ($channel->is_open()){
  39.  
    $channel->wait();
  40.  
    }
  41.  
    //关闭通道和连接
  42.  
    $channel->close();
  43.  
    $connection->close();
  44.  
    }
  45.  
    }
学新通

NACK&重回队列机制

当设置了方法 basic_consume$no_ack = false 时,使用手工 ACK 方式,除了ACK外,其实还有 NACK 方式,当手工 AcK 时,会发送给Broker( 服务器)一个应答,代表消息处理成功,Broker就可回送响应给生产端 .
NACK 则表示消息处理失败,如果设设置了重回队列, Broker 端就会将没有成功处理的消息重新发送
通俗来讲:
手工ACK:消费成功了,向发起者确认
NACK:消费失败,让生产者重新发
一般在实际应用中,都会关闭重回队列,也就是设置为false
使用方式
  • 消费端消费时.如果由于业务异常,可以手工 NACK 记录日志,然后进行补偿

  • API :basic_nack($delivery_tag, $multiple = false, $requeue = false)

  • 如果由于服务器宕机等严里问题,就需要手工 ACK 保障消费端消费成功

  • API :basic_ack($delivery_tag, $multiple = false)

4.消息持久化

如果没有特意告诉RabbitMQ,那么在它退出或者崩溃的时候,将会丢失所有队列和消息,为了确保消息不会丢失,有两个事情是需要注意的:必须把“队列”“消息”设为持久化首先,为了不让队列消失,需要把队列声明为持久化(durable),但这里面会有一定的问题,它会返回一个错误,可以使用一个快捷的解决方法——用不同名字的队列,例如task_queue

代码如上面生产者/消费者所示:

// 初始化队列,并持久化
$channel->queue_declare($queue_name, false, true, false, false);
消息持久化配置: " delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT

5.公平调度(限流机制)

为什么要限流

  • 当工作者处理消息时,会出现这么一个问题:比如有两个工作者(workers),处理奇数消息的比较繁忙,处理偶数消息的比较轻松,然而RabbitMQ并不知道这些,它仍然一如既往的派发消息,这时因为RabbitMQ只管分发进入队列的消息不会关心有多少消费者(consumer)没有作出响应,它盲目的把第n-th条消息发给第n-th个消费者

  • 假设还有这样的场景:RabbitMQ服务器有上万条未处理的消息,随便打开一个消费者Client ,会造成巨量的消息瞬间全部推送过来,然而单个客户端无法同时处理这么多数据,此时很有可能导致服务器崩溃,严重的可能导致线上的故障

  • 还有一些其他的场景:比如说单个 生产端一分钟产生了几百条数据,但是单个消费端 一分钟可能只能处理 60 条,这个时候生产端-消费端肯定是不平衡的,通常生产端是没办法做限制的,所以消费端肯定需要做一些限流措施,否则如果超出最大负载,可能导致 消费端 性能下降,服务器卡顿甚至崩溃等一系列严重后果

RabbitMQ 提供了一种 qos (服务质质量保证)功能,即在 非自动确认消息的前提下,如果一定数目的消息 ( 通过基于 生产者或者 channel设置 Qos 的值) 未被确认前,不消费新的消息
不能设置自动签收功能( auto_ack = false ),如果消息未被确认,就不会到达 消费端 ,目的就是给 生产端 减压
学新通

这是可以设置预取计数值为1,告诉RabbitMQ一次只向一个worker发送一条消息,换句话说,在处理并确认前一个消息之前,不要向工作人员发送新消息

如上面ACK消费确认机制中消费者代码:

  1.  
    // 第二个参数:同一时刻服务器只会发送1条消息给消费者
  2.  
    //basic_qos($prefetch_size, $prefetch_count, $a_global)
  3.  
    $channel->basic_qos(null, 1, null);
参数说明:
$prefetch_size:单条消息的大小限制, 通常设置为 0 ,表示不做限制
$prefetch_count:一次最多能处理多少条消息
$a_global:是否将上面设置: true 应用于 channel 级别, false 代表消费者级别
$prefetch_size,$a_global这两项, RabbitMQ 没有实现,暂且不研究.$prefetch_count 在auto_ack = f alse 的情况下生效,即在自动应答的情况下该值无效
学新通

6.幂等性概念

一句话概括: 用户对于同一操作发起的一次请求或者多次请求的结果是一致的
比如:对一个SQL执行100次1000次,我们可以借鉴数据库的乐观锁机制:比如我们执行一条更新库存的SQL语句:update T_reps set count = count -1, version = version 1 where version = 1,数据库的乐观锁在执行更新操作前一先去数据库查询此version ,然后执行更新语句,以此version作为条件,如果执行更新时有其他人先更新了这张表的数据,那么这个条件就不生效了,也就不会执行操作了,通过这种 乐观锁的机制来保障幕等性

消费端 - 幂等性保障

在海量订单产生的业务高峰期,如何避免 消息的重复消费问题?
在业务高峰期:容易产生 消息重复消费问题,当消费端消费完消息时,在给生产者端返回ack时由于 网络中断,导致生产端 未收到确认信息,该条消息就会 重新发送被消费端消费,但实际上该消费端已成功消费了该条消息,这就造成了重复消费.而 消费端实现 幂等性,就意味着:消息不会被多次消费,即使收到了很多一样的消息

业界主流的幂等性操作解决方案:

(1)唯一Id 指纹码 机制,核心:利用数据库主键去重
  • 唯一Id: 业务表主键

  • 指纹码: 为了区别每次正常操作的码,每次操作时生成指纹码;可以用时间截 业务编号或者标志位(具体视业务场景而定 )

select count(1) from t_order where id = 唯一Id 指纹码

  • 优势:实现简单

  • 弊端:高并发下有数据库写入的性能瓶颈

  • 解决方案:根据ID进行分库分表进行算法路由

(2)利用Redis的原子性去实现
  • 第一:是否要进行数据落库,如果落库的话,关键解决的问题是数据库和缓存如何做到原子性?

  • 第二:如果不进行落库,那么都存储到缓存中,如何设置定时同步的策略?

7.return机制

  • Return Listener用于处理一些不可路由的消息,也是生产阶段添加的一个监听

  • 消息生产者通过指定一个Exchange和Routing Key,把消息送达到某一个队列中去,然后消费者监听队列,进行消费处理操作

  • 但是在某些情况下,如果在发送消息的时候,当前的Exchange不存在或者指定的路由key路由不到,这个时候如果需要监听这种不可达的消息,就要使用Return Listener

  • 在API中有一个关键的配置项 Mandatory:如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么Broker(服务器)会自动删除该消息

学新通

8.消息的可靠性投递

(1).什么是生产端的可靠性投递?

  • 保障消息的成功发出

  • 保障MQ节点的成功接收

  • 发送端收到MQ节点(Broker)确认应答

  • 完善的消息进行补偿机制(在大厂一般都不会加事务,都是进行补偿操作)

在实际生产中,很难保障前三点的完全可靠,比如在 极端的环境中,生产者发送消息失败了,发送端在接受确认应答时突然发生网络闪断等等情况,很难保障可靠性投递,所以就需要有第四点完善的 消息补偿机制

(2).解决方案

方案一 消息信息落库,对消息状态进行打标(常见方案
将消息持久化到 DB中并设置状态值,收到 Consumer 的应答就改变当前记录的状态
再轮询重新发送没接收到应答的消息,注意这里要设置重试次数
学新通
方案实现流程
比如下单成功

步骤1

对订单数据入ORDER_DB 订单库,并对因此生成的业务消息入 MSG_DB 消息库,此处由于采用了两个数据库,需要两次持久化操作,为了保证数据的一致性,有人可能就想采用分布式事务,但在大厂实践中,基本都是采用补偿机制

这里一定要保证步骤1中消息都存储成功了,没有出现任问异常情况,然后生产端再进行消息发送.如果失败了就进行快速失败机制

对业务数据和消息入库完毕就进入步骤2

步骤2

发送消息到MQ服务上,如果一切正常无误消费者监听到该消息,进入步骤3

步骤3

生产端有一个 confi rm Listener ,异步监听 Broker(服务端) 回送的响应,从而判断消息是否投递成功

步骤4

如果成功,去数据库查询该消息.并将消息状态更新为 1

步骤5

如果出现意外情况,消费者未接收到或者Listener 接收确认时发生网络闪断,导致生产端的Listener 就永远收不到这条消息的 confi rm应答了,也就是说这条消息的状态就一直为0 了,这时候就需要用到分布式定时任务来从 MSG_DB 数据库抓取那些超时了还未被消费的消息,重新发送一遍。此时需要设置一个规则,比如说消息在入库时候设置一个临界值 timeout , 5 分钟之后如果还是0的状态那就需要把消息抽取出来。这里使用的是分布式定时任务,去定时抓取 MSG_DB中距离消息创建时间超过 5 分钟的且状态为0 的消息

步骤6

把抓取出来的消息进行重新投递( Retry Send ) ,也就是从第二步开始继续往下走

步骤7

当然有些消息可能就是由于一些实际的问题无法路由到 Broker ,比如Routing Key设置不对,对应的队列被误删除了,那么这种消息即使重试多次也仍然无法投递成功,所以需要对重复次数做限制,比如限制 3 次,如果投递次数大于3次,那么就将消息状态更新为 2 ,表示这个消息最终投递失败,然后通过补偿机制,人工去处理,实际生产中.这种情况还是比较少的,但是不能没有这个补偿机制,要不然就做不到可靠性了

缺点
在第一步需要更新或者插入操作数据库2次
优化
不需要消息进行持久化 只需要业务持久化
方案二 消息的延迟投递,做二次确认,回调检查(不常用,大厂在用的高并发方案)
学新通
方案实现流程

步骤1

(上游服务: Upstream service )业务入库,然后send 消息到broker,这两步是有先后顺序的

步骤2

进行消息延迟发送到新的队列(延迟时间为 5 分钟:业务决定)

步骤3

(下游服务: Downstream service )监听到消息然后处理消息

步骤4

下游服务 send confirm生成新的消息到 broker (这里是一个新的队列 )

步骤5

callback service 去监听这个消息,并且入库,如果监听到,表示这个消息已经消费成功

步骤6

callback service 去检查 步骤2投递的延迟消息是否 在msgDB里面是否消费成功,不存在或者消费失败就会 Resend command

如果在第 1 , 2 , 4 步失败,如果成功broker 会给一个 confirm ,失败当然没有,这是消息可靠性投递的里要保障

9.注意

关于队列大小: 如果所有的工作者都处理繁忙状态,队列就会被填满,需要留意这个问题,要么添加更多的工作者(workers),要么使用其他策略

六.RabbitMQ几种常见的交换器模式

1.消息模型基本介绍

前面的教程中,讲的是 发送消息到队列并从中取出消息,现在介绍RabbitMQ中 完整的消息模型

简单的概括一下之前讲的:

  • 发布者(producer)是发布消息的应用程序

  • 队列(queue)用于消息存储的缓冲

  • 消费者(consumer)是接收消息的应用程序

RabbitMQ消息模型的 核心理念是:发布者(producer) 不会直接发送任何消息给 队列,事实上,发布者(producer)甚至不知道消息是否已经被投递到队列。发布者(producer)只需要 把消息发送给一个交换机(exchange),交换机非常简单,它一边从发布者方 接收消息,一边把消息 推送到队列,交换机 必须知道如何处理它接收到的消息,是应该 路由指定的队列还是是 多个队列,或者是直接 忽略消息,这些规则是通过交换机类型(exchange type)来定义的
学新通

有几个可供选择的交换器类型:direct, topic, headers和fanout

direct(直连/定向交换器)

消息与一个特定的路由键完全匹配

topic(主题交换器)

使用通配符*,#,让路由键和某种模式进行匹配

headers(头交换器)

不处理路由键,而是根据发送的消息内容中的 headers 属性进行匹配

fanout(扇型交换器)

发布/ 订阅模式可以理解为广播模式:即exchange会将消息转发到所有绑定到这个exchange的队列上,这种类型在发送消息,queue bind时,都将忽略route key,也就是说不需要设置 route key

Routing Key(路由键): 生产者将消息发送给交换器,一般都会指定一个Routing Key,用来指定这个消息的路由规则,而这个Routing Key需要与交换器类型和绑定键(Binding Key)联合使用才能生效

Binding(绑定):它是Exchange与Queue之间的虚拟连接,通俗的讲就是交换器和队列之间的联系(这个队列(Queue)对这个交换机(Exchange)的消息感兴趣),实现了根据不同的Routing Key(路由规则),交换机将消息路由(发送)到对应的Queue上

2.交换器核心方法

  1.  
    //试探性申请一个交换器,若该交换器存在,则跳过,不存在则创建
  2.  
    exchange_declare($exchange,$type,$passive = false,$durable = false,$auto_delete = true,$internal = false,$nowait = false,$arguments = array(),$ticket = null)

参数名

默认值

解释

$exchange

 

交换器名称

$type

 

交换器类型:

’’ 默认交换机 匿名交换器 未显示声明类型都是该类型

fanout 扇形交换器 会发送消息到它所知道的所有队列,每个消费者获取的消息都是一致的

headers 头部交换器

direct 直连交换器,该交换机将会对绑定键(binding key)和路由键(routing key)进行精确匹配

topic 主题交换器 该交换机会对路由键正则匹配,必须是*(一个单词)、#(多个单词,以.分割) ,eg:user.key .abc.* 类型的key

$passive

false

只判断不创建(一般用于判断交换器是否存在)

true:

1.如果exchange已存在则直接连接并且不检查配置比如已存在的exchange是fanout,新需要建立的是direct,也不会报错;

2.如果exchange不存在则直接报错

false:

1.如果exchange不存在则创建新的exchange

2.如果exchange已存在则判断配置是否相同,如果配置不相同则直接报错,比如已存在的exchange是fanout,新需要建立的是direct,会报错。

$durable

false

设置是否持久化,设置为true,表示持久化,反之非持久化,持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息

$auto_delete

true

设置是否自动删除,设置为true时,表示自动删除。自动删除的前提:至少有一个队列或者交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑

$internal

false

设置是否为内置的,设置为true表示是内置的交换器,客户端程序无法直接发送消息到这个交换器,只能通过交换器路由到这个交换器

$nowait

false

如果为true,表示不等待服务器回执信息,函数将返回null,可以提高访问速度

$arguments

array()

其他结构化参数

$ticket

null

 

3.fanout模式(广播模式)

广播模式可以理解为:发布/订阅模式,即exchange会将消息转发到所有绑定到这个exchange的队列上。针对这种广播模式,在发送消息,queue bind时,都将忽略route key,也就是说不需要设置 route key

案例一

一个生产者生产消息并发布消息到交换器上,多个消费者订阅该交换器,并与之队列绑定,消费消息
rabbitmq配置
  1.  
    "rabbitMq" => [
  2.  
    "base" => [
  3.  
    'host' => '192.168.0.5', // host地址
  4.  
    'port' => 5672, // 端口
  5.  
    "user" => "user", // 账户
  6.  
    'pass' => 123456, // 密码
  7.  
    "v_host" => "order", // 对应Virtual Hosts
  8.  
    ],
  9.  
    "exchange_name" => [
  10.  
    "name1" => "exch", // 交换器名称
  11.  
    ],
  12.  
    ]
生产者
  1.  
    <?php
  2.  
    /**
  3.  
    * 交换器fanout(广播)模式: 生产者生产消息
  4.  
    */
  5.  
     
  6.  
    namespace console\controllers\exchange\fanout;
  7.  
     
  8.  
    use PhpAmqpLib\Exchange\AMQPExchangeType;
  9.  
    use Yii;
  10.  
    use yii\web\Controller;
  11.  
    use PhpAmqpLib\Connection\AMQPStreamConnection;
  12.  
    use PhpAmqpLib\Message\AMQPMessage;
  13.  
     
  14.  
    class PublisherController extends Controller
  15.  
    {
  16.  
    public $enableCsrfValidation=false;
  17.  
     
  18.  
    public function actionIndex()
  19.  
    {
  20.  
    //rabbitmq相关配置
  21.  
    $rabbitMqConfig = Yii::$app->params["rabbitMq"];
  22.  
    $config = $rabbitMqConfig["base"];
  23.  
    $exchangeName = $rabbitMqConfig["exchange_name"]["name1"];
  24.  
    // 创建连接
  25.  
    $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
  26.  
    // 创建channel
  27.  
    $channel = $connection->channel();
  28.  
    // 声明并初始化交换器
  29.  
    $channel->exchange_declare($exchangeName, AMQPExchangeType::FANOUT, false, false, false);
  30.  
    // 声明一个数据
  31.  
    $data = "this is a exchange message";
  32.  
    // 初始化消息并持久化
  33.  
    $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
  34.  
    // 发布消息到交换器
  35.  
    $channel->basic_publish($msg, $exchangeName);
  36.  
     
  37.  
    //关闭通道和连接
  38.  
    $channel->close();
  39.  
    $connection->close();
  40.  
    }
  41.  
    }
学新通
消费者(多个)

消费者1

  1.  
    <?php
  2.  
    /**
  3.  
    * 交换器fanout(广播)模式: 消费者消费消息
  4.  
    */
  5.  
     
  6.  
    namespace console\controllers\exchange\fanout;
  7.  
     
  8.  
    use PhpAmqpLib\Exchange\AMQPExchangeType;
  9.  
    use Yii;
  10.  
    use yii\web\Controller;
  11.  
    use PhpAmqpLib\Connection\AMQPStreamConnection;
  12.  
     
  13.  
    class ConsumerController extends Controller
  14.  
    {
  15.  
    public $enableCsrfValidation = false;
  16.  
     
  17.  
    public function actionIndex()
  18.  
    {
  19.  
    //rabbitmq相关配置
  20.  
    $rabbitMqConfig = Yii::$app->params["rabbitMq"];
  21.  
    $config = $rabbitMqConfig["base"];
  22.  
    // $queueName = $rabbitMqConfig["queue_name"]["name4"];
  23.  
    $exchangeName = $rabbitMqConfig["exchange_name"]["name1"];
  24.  
    // 创建连接
  25.  
    $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
  26.  
    // 创建channel
  27.  
    $channel = $connection->channel();
  28.  
    // 声明对应的交换器
  29.  
    $channel->exchange_declare($exchangeName, AMQPExchangeType::FANOUT, false, false, false);
  30.  
    // 声明队列
  31.  
    list($queueName) = $channel->queue_declare('', false, false, true, false);
  32.  
    // 交换机与队列绑定
  33.  
    $channel->queue_bind($queueName, $exchangeName);
  34.  
    // 消息回调处理
  35.  
    $callback = function ($meg) {
  36.  
    echo "revince: " . $meg->body. "\n";
  37.  
    $meg->ack();
  38.  
    };
  39.  
    // 设置消费者处理消息限制,第二个参数:同一时刻服务器只会发送1条消息给消费者消费
  40.  
    $channel->basic_qos(null, 1, null);
  41.  
    // 消费者消费消息: 第四个参数: 需要ack确认
  42.  
    $channel->basic_consume($queueName, '', false, false, false, false, $callback);
  43.  
    // 监控
  44.  
    while ($channel->is_open()) {
  45.  
    $channel->wait();
  46.  
    }
  47.  
    //关闭通道和连接
  48.  
    $channel->close();
  49.  
    $connection->close();
  50.  
    }
  51.  
    }
学新通

消费者2

  1.  
    <?php
  2.  
    /**
  3.  
    * 交换器fanout(广播)模式: 消费者消费消息
  4.  
    */
  5.  
     
  6.  
    namespace console\controllers\exchange\fanout;
  7.  
     
  8.  
    use PhpAmqpLib\Exchange\AMQPExchangeType;
  9.  
    use Yii;
  10.  
    use yii\web\Controller;
  11.  
    use PhpAmqpLib\Connection\AMQPStreamConnection;
  12.  
     
  13.  
    class Consumer2Controller extends Controller
  14.  
    {
  15.  
    public $enableCsrfValidation = false;
  16.  
     
  17.  
    public function actionIndex()
  18.  
    {
  19.  
    //rabbitmq相关配置
  20.  
    $rabbitMqConfig = Yii::$app->params["rabbitMq"];
  21.  
    $config = $rabbitMqConfig["base"];
  22.  
    // $queueName = $rabbitMqConfig["queue_name"]["name4"];
  23.  
    $exchangeName = $rabbitMqConfig["exchange_name"]["name1"];
  24.  
    // 创建连接
  25.  
    $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
  26.  
    // 创建channel
  27.  
    $channel = $connection->channel();
  28.  
    // 声明对应的交换器
  29.  
    $channel->exchange_declare($exchangeName, AMQPExchangeType::FANOUT, false, false, false);
  30.  
    // 声明队列
  31.  
    list($queueName) = $channel->queue_declare('', false, false, true, false);
  32.  
    // 交换机与队列绑定
  33.  
    $channel->queue_bind($queueName, $exchangeName);
  34.  
    // 消息回调处理
  35.  
    $callback = function ($meg) {
  36.  
    echo "revince: " . $meg->body. "\n";
  37.  
    $meg->ack();
  38.  
    };
  39.  
    // 设置消费者处理消息限制,第二个参数:同一时刻服务器只会发送1条消息给消费者消费
  40.  
    $channel->basic_qos(null, 1, null);
  41.  
    // 消费者消费消息: 第四个参数: 需要ack确认
  42.  
    $channel->basic_consume($queueName, '', false, false, false, false, $callback);
  43.  
    // 监控
  44.  
    while ($channel->is_open()) {
  45.  
    $channel->wait();
  46.  
    }
  47.  
    //关闭通道和连接
  48.  
    $channel->close();
  49.  
    $connection->close();
  50.  
    }
  51.  
    }
学新通

案例二

举个实际应用的场景:比方说用户注册(注销,更改姓名等)新浪,同时需要开通微博、博客、邮箱等,如果不采用队列,按照常规的线性处理,可能注册用户会特别的慢,因为在注册的时候,需要调各种其他服务器接口,如果服务很多的话,可能客户端就超时了。如果采用普通的队列,可能在处理上也会特别的慢(不是最佳方案),如果采用订阅模式,则是最优的选择
处理过程如下:
  1. 用户提交username、pwd…等之类的基本信息,将数据提交register.php中

  1. register.php对数据进行校验,符合注册要求,生成uid,并将和基本信息json后,发布一条消息到对应的交换器中,同时直接显示用户注册成功

  1. exchange中的多个队列,如(queue.process、queue.boke、queue.weibo、queue.email)都订阅了这个消息,根据各业务自身的逻辑来处理

总结:

1.不申明队列,因为发布/订阅模式下,是可以随时添加新的订阅队列

2.exchange的Type指定为fanout(广播模式)

3.队列不需要指定route key,绑定exchange

代码如下:
生产者
  1.  
    <?php
  2.  
    /**
  3.  
    * 交换器fanout(广播)模式: 生产者生产消息
  4.  
    */
  5.  
     
  6.  
    namespace console\controllers\exchange\fanout;
  7.  
     
  8.  
    use PhpAmqpLib\Exchange\AMQPExchangeType;
  9.  
    use Yii;
  10.  
    use yii\web\Controller;
  11.  
    use PhpAmqpLib\Connection\AMQPStreamConnection;
  12.  
    use PhpAmqpLib\Message\AMQPMessage;
  13.  
     
  14.  
    class PublisherController extends Controller
  15.  
    {
  16.  
    public function actionIndex()
  17.  
    {
  18.  
            /*
  19.  
                用户注册逻辑
  20.  
            */
  21.  
           
  22.  
            //发送消息逻辑
  23.  
    //rabbitmq相关配置
  24.  
    $rabbitMqConfig = Yii::$app->params["rabbitMq"];
  25.  
    $config = $rabbitMqConfig["base"];
  26.  
    $exchangeName = "register";
  27.  
    // 创建连接
  28.  
    $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
  29.  
    // 创建channel
  30.  
    $channel = $connection->channel();
  31.  
    // 声明并初始化交换器
  32.  
    $channel->exchange_declare($exchangeName, AMQPExchangeType::FANOUT, false, false, false);
  33.  
    // 声明一个数据
  34.  
    $data = "{uid:xxx,reg_time:xxx}";
  35.  
    // 初始化消息并持久化
  36.  
    $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
  37.  
    // 发布消息到交换器
  38.  
    $channel->basic_publish($msg, $exchangeName);
  39.  
     
  40.  
    //关闭通道和连接
  41.  
    $channel->close();
  42.  
    $connection->close();
  43.  
    }
  44.  
    }
学新通
消费者(多个)
可以创建多个不同类型的消费者(开通微博、博客、邮箱)等逻辑功能的消费者
  1.  
    <?php
  2.  
    /**
  3.  
    * 交换器fanout(广播)模式: 消费者消费消息
  4.  
    */
  5.  
     
  6.  
    namespace console\controllers\exchange\fanout;
  7.  
     
  8.  
    use PhpAmqpLib\Exchange\AMQPExchangeType;
  9.  
    use Yii;
  10.  
    use yii\web\Controller;
  11.  
    use PhpAmqpLib\Connection\AMQPStreamConnection;
  12.  
     
  13.  
    class ConsumerController extends Controller
  14.  
    {
  15.  
    public $enableCsrfValidation = false;
  16.  
     
  17.  
    public function actionIndex()
  18.  
    {
  19.  
    //rabbitmq相关配置
  20.  
    $rabbitMqConfig = Yii::$app->params["rabbitMq"];
  21.  
    $config = $rabbitMqConfig["base"];
  22.  
    $exchangeName = "register";
  23.  
    // 创建连接
  24.  
    $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
  25.  
    // 创建channel
  26.  
    $channel = $connection->channel();
  27.  
    // 声明对应的交换器
  28.  
    $channel->exchange_declare($exchangeName, AMQPExchangeType::FANOUT, false, false, false);
  29.  
    // 声明一个匿名队列
  30.  
    list($queueName) = $channel->queue_declare('', false, false, true, false);
  31.  
    // 交换机与队列绑定
  32.  
    $channel->queue_bind($queueName, $exchangeName);
  33.  
    // 消息回调处理
  34.  
    $callback = function ($meg) {
  35.  
                //处理逻辑
  36.  
    echo "revince: " . $meg->body. "\n";
  37.  
    $meg->ack();
  38.  
    };
  39.  
    // 设置消费者处理消息限制,第二个参数:同一时刻服务器只会发送1条消息给消费者消费
  40.  
    $channel->basic_qos(null, 1, null);
  41.  
    // 消费者消费消息: 第四个参数: 需要ack确认
  42.  
    $channel->basic_consume($queueName, '', false, false, false, false, $callback);
  43.  
    // 监控
  44.  
    while ($channel->is_open()) {
  45.  
    $channel->wait();
  46.  
    }
  47.  
    //关闭通道和连接
  48.  
    $channel->close();
  49.  
    $connection->close();
  50.  
    }
  51.  
    }
学新通

4.Direct模式

Direct交换器将消息投递到路由参数 完全匹配的队列中
学新通

直接上代码

rabbitmq配置

  1.  
    "rabbitMq" => [
  2.  
    "base" => [
  3.  
    'host' => '192.168.0.5', // host地址
  4.  
    'port' => 5672, // 端口
  5.  
    "user" => "user", // 账户
  6.  
    'pass' => 123456, // 密码
  7.  
    "v_host" => "order", // 对应Virtual Hosts
  8.  
    ],
  9.  
    "queue_name" => [
  10.  
    "name1" => "goods", // 队列名称
  11.  
    "name2" => "task_queue", // 队列名称
  12.  
    "name3" => "task_ack", // 队列名称
  13.  
    "name4" => "exchange_fanout_1", // 队列名称
  14.  
    ],
  15.  
    "exchange_name" => [
  16.  
    "name1" => "exch", // 交换器名称
  17.  
    "name2" => "exch_direct_log", // 交换器名称
  18.  
    ],
  19.  
    "routing_key" => [
  20.  
    "info_key" => "info", // 路由键
  21.  
    "error_key" => "error", // 路由键
  22.  
    "warn_key" => "warn", // 路由键
  23.  
    ],
  24.  
学新通

生产者

  1.  
    <?php
  2.  
    /**
  3.  
    * 交换器direct(routing_key-更详细的bind模式)模式: 生产者生产消息
  4.  
    */
  5.  
     
  6.  
    namespace console\controllers\exchange\direct;
  7.  
     
  8.  
    use PhpAmqpLib\Exchange\AMQPExchangeType;
  9.  
    use Yii;
  10.  
    use yii\web\Controller;
  11.  
    use PhpAmqpLib\Connection\AMQPStreamConnection;
  12.  
    use PhpAmqpLib\Message\AMQPMessage;
  13.  
     
  14.  
    class PublisherController extends Controller
  15.  
    {
  16.  
    public $enableCsrfValidation=false;
  17.  
     
  18.  
    public function actionIndex()
  19.  
    {
  20.  
    //rabbitmq相关配置
  21.  
    $rabbitMqConfig = Yii::$app->params["rabbitMq"];
  22.  
    $config = $rabbitMqConfig["base"];
  23.  
    $exchangeName = $rabbitMqConfig["exchange_name"]["name2"];
  24.  
    $routingKey = $rabbitMqConfig["routing_key"]["error_key"];
  25.  
    // 创建连接
  26.  
    $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
  27.  
    // 创建channel
  28.  
    $channel = $connection->channel();
  29.  
    // 声明并初始化交换器
  30.  
    $channel->exchange_declare($exchangeName, AMQPExchangeType::DIRECT, false, false, false);
  31.  
    // 声明一个数据
  32.  
    $data = "this is a ". $routingKey . " message";
  33.  
    // 初始化消息并持久化
  34.  
    $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
  35.  
    // 发布消息到交换器, 并和路由键匹配
  36.  
    $channel->basic_publish($msg, $exchangeName, $routingKey);
  37.  
     
  38.  
    //关闭通道和连接
  39.  
    $channel->close();
  40.  
    $connection->close();
  41.  
    }
  42.  
    }
学新通

消费者

  1.  
    <?php
  2.  
    /**
  3.  
    * 交换器direct(routing_key-更详细的bind模式)模式: 消费者消费消息
  4.  
    */
  5.  
     
  6.  
    namespace console\controllers\exchange\direct;
  7.  
     
  8.  
    use PhpAmqpLib\Exchange\AMQPExchangeType;
  9.  
    use Yii;
  10.  
    use yii\web\Controller;
  11.  
    use PhpAmqpLib\Connection\AMQPStreamConnection;
  12.  
     
  13.  
    class ConsumerWarnController extends Controller
  14.  
    {
  15.  
     
  16.  
     
  17.  
    public function actionIndex()
  18.  
    {
  19.  
    //rabbitmq相关配置
  20.  
    $rabbitMqConfig = Yii::$app->params["rabbitMq"];
  21.  
    $config = $rabbitMqConfig["base"];
  22.  
    $exchangeName = $rabbitMqConfig["exchange_name"]["name2"];
  23.  
    $routingKey = $rabbitMqConfig["routing_key"]["warn_key"]; //路由键可以修改为其他key,与生产者bind的关联
  24.  
    // 创建连接
  25.  
    $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
  26.  
    // 创建channel
  27.  
    $channel = $connection->channel();
  28.  
    // 声明对应的交换器
  29.  
    $channel->exchange_declare($exchangeName, AMQPExchangeType::DIRECT, false, false, false);
  30.  
    // 声明一个匿名队列
  31.  
    list($queueName) = $channel->queue_declare('', false, false, true, false);
  32.  
    // 交换机与队列绑定,并指定routing_key
  33.  
    $channel->queue_bind($queueName, $exchangeName, $routingKey);
  34.  
    // 消息回调处理
  35.  
    $callback = function ($meg) {
  36.  
    echo "revince: " . $meg->body. "\n";
  37.  
    $meg->ack();
  38.  
    };
  39.  
    // 设置消费者处理消息限制,第二个参数:同一时刻服务器只会发送1条消息给消费者消费
  40.  
    $channel->basic_qos(null, 1, null);
  41.  
    // 消费者消费消息: 第四个参数: 需要ack确认
  42.  
    $channel->basic_consume($queueName, '', false, false, false, false, $callback);
  43.  
    // 监控
  44.  
    while ($channel->is_open()) {
  45.  
    $channel->wait();
  46.  
    }
  47.  
    //关闭通道和连接
  48.  
    $channel->close();
  49.  
    $connection->close();
  50.  
    }
  51.  
    }
学新通

5.topic模式

发送到topic交换器的消息不可以携带随意routing_key,它的routing_key必须是一个由 .分隔开的 词语列表,这些单词随便是什么都可以,但是最好是跟携带它们的消息有关系的词汇,以下是几个推荐的例子:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit",词语的个数可以随意,但是 不要超过255字节。binding key也必须拥有同样的格式,topic交换器背后的逻辑跟direct交换机很相似 : 一个携带着特定routing_key的消息会被topic交换机投递给绑定键与之想匹配的队列,但是它的binding key和routing_key有两个特殊应用方式:
  • * (星号) 用来表示一个单词

  • # (井号) 用来表示任意数量(零个或多个)单词

下边用图说明:

这个例子里,发送的所有消息都是用来描述小动物的,发送的消息所携带的路由键是由三个单词所组成的,这三个单词被两个.分割开,路由键里的第一个单词

学新通

描述的是动物的手脚的利索程度,第二个单词是动物的颜色,第三个是动物的种类,所以它看起来是这样的: <celerity>.<colour>.<species>。

创建了三个绑定:Q1的绑定键为 *.orange.*,Q2的绑定键为 *.*.rabbit 和 lazy.# 。

  • Q1-->绑定的是

  • 中间带 orange 带 3 个单词的字符串 (*.orange.*)

  • Q2-->绑定的是

  • 最后一个单词是 rabbit 的 3 个单词 (*.*.rabbit)

  • 第一个单词是 lazy 的多个单词 (lazy.#)

这三个绑定键被可以总结为:

  • Q1 对所有的桔黄色动物都感兴趣

  • Q2 则是对所有的兔子所有懒惰的动物感兴趣

一个携带有 quick.orange.rabbit 的消息将会被分别投递给这两个队列,携带着 lazy.orange.elephant 的消息同样也会给两个队列都投递过去。另一方面携带有 quick.orange.fox 的消息会投递给第一个队列,携带有 lazy.brown.fox 的消息会投递给第二个队列。携带有 lazy.pink.rabbit 的消息只会被投递给第二个队列一次,即使它同时匹配第二个队列的两个绑定。携带着 quick.brown.fox 的消息不会投递给任何一个队列。

注意:

如果违反约定,发送了一个携带有一个单词或者四个单词("orange" or "quick.orange.male.rabbit")的消息时,发送的消息不会投递给任何一个队列,而且会丢 失掉

但是另一方面,即使 "lazy.orange.male.rabbit" 有四个单词,他还是会匹配最后一个绑定,并且被投递到第二个队列中。

topic交换机是很强大的,它可以表现出跟其他交换机类似的行为 当一个队列的binding key为 "#"(井号) 的时候,这个队列将会无视消息的routing key,接收所有的消息。 当 * (星号) 和 # (井号) 这两个特殊字符都未在binding key中出现的时候,此时Topic交换机就拥有的direct交换机的行为

代码如下:

rabbitmq配置

  1.  
    "rabbitMq" => [
  2.  
    "base" => [
  3.  
    'host' => '192.168.0.5', // host地址
  4.  
    'port' => 5672, // 端口
  5.  
    "user" => "user", // 账户
  6.  
    'pass' => 123456, // 密码
  7.  
    "v_host" => "order", // 对应Virtual Hosts
  8.  
    ],
  9.  
    "queue_name" => [
  10.  
    "name1" => "goods", // 队列名称
  11.  
    "name2" => "task_queue", // 队列名称
  12.  
    "name3" => "task_ack", // 队列名称
  13.  
    "name4" => "exchange_fanout_1", // 队列名称
  14.  
    ],
  15.  
    "exchange_name" => [
  16.  
    "name1" => "exch", // 交换器名称
  17.  
    "name2" => "exch_direct_log", // 交换器名称
  18.  
    "name3" => "exch_topic_log", // 交换器名称
  19.  
    ],
  20.  
    "routing_key" => [
  21.  
    "info_key" => "info", // 路由键
  22.  
    "error_key" => "error", // 路由键
  23.  
    "warn_key" => "warn", // 路由键
  24.  
    "all_key" => "#", // 所有的路由键
  25.  
    "user_info" => "user.info", // 路由键
  26.  
    "user_warn" => "user.warn", // 路由键
  27.  
    "user_all" => "user.*", // 匹配以user.开头的路由键
  28.  
    ],
  29.  
    ]
  30.  
学新通

生产者

  1.  
    <?php
  2.  
    /**
  3.  
    * 交换器topic(通配符)模式: 生产者生产消息
  4.  
    */
  5.  
     
  6.  
    namespace console\controllers\exchange\topic;
  7.  
     
  8.  
    use PhpAmqpLib\Exchange\AMQPExchangeType;
  9.  
    use Yii;
  10.  
    use yii\web\Controller;
  11.  
    use PhpAmqpLib\Connection\AMQPStreamConnection;
  12.  
    use PhpAmqpLib\Message\AMQPMessage;
  13.  
     
  14.  
    class PublisherController extends Controller
  15.  
    {
  16.  
     
  17.  
    public function actionIndex()
  18.  
    {
  19.  
    //rabbitmq相关配置
  20.  
    $rabbitMqConfig = Yii::$app->params["rabbitMq"];
  21.  
    $config = $rabbitMqConfig["base"];
  22.  
    $exchangeName = $rabbitMqConfig["exchange_name"]["name3"];
  23.  
    $routingKey = $rabbitMqConfig["routing_key"]["user_info"];
  24.  
    // 创建连接
  25.  
    $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
  26.  
    // 创建channel
  27.  
    $channel = $connection->channel();
  28.  
    // 声明并初始化交换器
  29.  
    $channel->exchange_declare($exchangeName, AMQPExchangeType::TOPIC, false, false, false);
  30.  
    // 声明一个数据
  31.  
    $data = "this is a ". $routingKey . " message";
  32.  
    // 初始化消息并持久化
  33.  
    $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
  34.  
    // 发布消息到交换器, 并和路由键匹配
  35.  
    $channel->basic_publish($msg, $exchangeName, $routingKey);
  36.  
     
  37.  
    //关闭通道和连接
  38.  
    $channel->close();
  39.  
    $connection->close();
  40.  
    }
  41.  
    }
学新通

消费者

  1.  
    <?php
  2.  
    /**
  3.  
    * 交换器topic(通配符)模式: 消费者消费消息
  4.  
    *
  5.  
    */
  6.  
     
  7.  
    namespace console\controllers\exchange\topic;
  8.  
     
  9.  
    use PhpAmqpLib\Exchange\AMQPExchangeType;
  10.  
    use Yii;
  11.  
    use yii\web\Controller;
  12.  
    use PhpAmqpLib\Connection\AMQPStreamConnection;
  13.  
     
  14.  
    class ConsumerController extends Controller
  15.  
    {
  16.  
     
  17.  
    public function actionIndex()
  18.  
    {
  19.  
    //rabbitmq相关配置
  20.  
    $rabbitMqConfig = Yii::$app->params["rabbitMq"];
  21.  
    $config = $rabbitMqConfig["base"];
  22.  
    $exchangeName = $rabbitMqConfig["exchange_name"]["name3"];
  23.  
    $routingKey = $rabbitMqConfig["routing_key"]["user_all"];
  24.  
    // 创建连接
  25.  
    $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
  26.  
    // 创建channel
  27.  
    $channel = $connection->channel();
  28.  
    // 声明对应的交换器
  29.  
    $channel->exchange_declare($exchangeName, AMQPExchangeType::TOPIC, false, false, false);
  30.  
    // 声明队列
  31.  
    list($queueName) = $channel->queue_declare('', false, false, true, false);
  32.  
    // 交换机与队列绑定,并指定routing_key
  33.  
    $channel->queue_bind($queueName, $exchangeName, $routingKey);
  34.  
    // 消息回调处理
  35.  
    $callback = function ($meg) {
  36.  
    echo "revince: " . $meg->body. "\n";
  37.  
    $meg->ack();
  38.  
    };
  39.  
    // 设置消费者处理消息限制,第二个参数:同一时刻服务器只会发送1条消息给消费者消费
  40.  
    $channel->basic_qos(null, 1, null);
  41.  
    // 消费者消费消息: 第四个参数: 需要ack确认
  42.  
    $channel->basic_consume($queueName, '', false, false, false, false, $callback);
  43.  
    // 监控
  44.  
    while ($channel->is_open()) {
  45.  
    $channel->wait();
  46.  
    }
  47.  
    //关闭通道和连接
  48.  
    $channel->close();
  49.  
    $connection->close();
  50.  
    }
  51.  
    }
学新通

七.死信队列,延时队列

1.死信队列

死信( Dead Letter )是RabbitMQ 中的一种 消息机制,当在消费消息时,如果队列里的消息出现以下情况:
  • 消息被拒绝

  • 消息在队列的存活时间超过设置的 TTL 时间

  • TTL (Time To Live),即生存时间

  • RabbitMQ支持消息的过期时间,在消息发送时可以进行指定

  • RabbitMQ 支持为每个队列设号消息的超时时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会被自动清除

  • 消息队列的消息数量已经超过最大队列长度

那么该消息将成为“死信”,“死信”消息会被 RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃

RabbitMQ 中有一种交换器叫 DLX,全称为 Dead 一 Letter 一 Exchange ,可以称之为 死信交换器,当消息在一个队列中变成死信( dead message )消息之后,它会被重新发送到另外一个交换器中,这个交换器就是DLX , 绑定在 DLX上的队列就称之为死信队列, 程序就可以监听这个队列中的消息,并做相应的处理,该特性 可以弥补RabbitMQ3.0以前支持的immediate参数的功能

消息变成死信有以下几种情况:

  • 消息被拒绝消息

  • TTL 过期(延迟队列)

  • 队列达到最大长度

学新通
学新通

2.延时队列

延时队列就是用来存放需要在 指定时间被处理的元素的队列.
一般可以利用 死信队列的特性实现延迟队列:只要给消息设置一个过期时间,消息过期就会自动进入死信队列,消费者只要监听死信队列就可以实现延迟队列了

应用场景

  • 订单在十分钟之内未支付则自动取消

  • 账单在一周内未支付,则自动结算

  • 某个时间下发一条通知

  • 用户注册成功后,如果三天内没有登陆则进行短信提醒

  • 用户发起退款,如果三天内没有得到处理则通知相关运营人员

下面通过一个案例来更进一步了解死信队列,延时队列

案例1

订单在一段时间内未支付则自动取消,步骤:
(1).创建订单操作
(2).订单创建成功后,订单相关数据json处理
(3).构建rabbitmq消息队列,并设置消息过期时间为60秒,把订单相关json数据发布到交换器, 并和路由键匹配,生产者生产消息60秒之后,消息会进入到死信队列,消费者监听死信队列,处理订单

rabbitmq配置

  1.  
    "rabbitMq" => [
  2.  
    "base" => [
  3.  
    'host' => '192.168.0.5', // host地址
  4.  
    'port' => 5672, // 端口
  5.  
    "user" => "user", // 账户
  6.  
    'pass' => 123456, // 密码
  7.  
    "v_host" => "order", // 对应Virtual Hosts
  8.  
    ],
  9.  
    "queue_name" => [
  10.  
    "name1" => "goods", // 队列名称
  11.  
    "name2" => "task_queue", // 队列名称
  12.  
    "name3" => "task_ack", // 队列名称
  13.  
    "name4" => "exchange_fanout_1", // 队列名称
  14.  
    "name5" => "queue_pay", // 订单支付队列
  15.  
    ],
  16.  
    "exchange_name" => [
  17.  
    "name1" => "exch", // 交换器名称
  18.  
    "name2" => "exch_direct_log", // 交换器名称
  19.  
    "name3" => "exch_topic_log", // 交换器名称
  20.  
    "name4" => "exch_pay", // 订单支付, 交换器名称
  21.  
    ],
  22.  
    "routing_key" => [
  23.  
    "info_key" => "info", // 路由键
  24.  
    "error_key" => "error", // 路由键
  25.  
    "warn_key" => "warn", // 路由键
  26.  
    "order_key" => "order_pay", // 订单支付
  27.  
    "all_key" => "#", // 所有的路由键
  28.  
    "user_info" => "user.info", // 路由键
  29.  
    "user_warn" => "user.warn", // 路由键
  30.  
    "user_all" => "user.*", // 匹配以user.开头的路由键
  31.  
    ],
  32.  
    "dead_letter" => [ // 死信队列
  33.  
    "exchange_name" => [ // 死信队列交换机名称
  34.  
    "pay" => "dead_exch_pay"
  35.  
    ],
  36.  
    "queue_name" => [ // 死信队列名称
  37.  
    "pay" => "dead_queue_pay"
  38.  
    ],
  39.  
    "routing_key" => [ // 死信队列routing名称
  40.  
    "pay" => "dead_routing_key_pay"
  41.  
    ]
  42.  
    ]
  43.  
    ]
学新通

生产者

  1.  
    <?php
  2.  
    /**
  3.  
    * 死信队列,延时队列: 生产者推送消息到队列,模拟订单支付
  4.  
    */
  5.  
     
  6.  
    namespace console\controllers\exchange\dead;
  7.  
     
  8.  
    use PhpAmqpLib\Exchange\AMQPExchangeType;
  9.  
    use PhpAmqpLib\Wire\AMQPTable;
  10.  
    use Yii;
  11.  
    use yii\web\Controller;
  12.  
    use PhpAmqpLib\Connection\AMQPStreamConnection;
  13.  
    use PhpAmqpLib\Message\AMQPMessage;
  14.  
     
  15.  
    class PublisherController extends Controller
  16.  
    {
  17.  
    public $enableCsrfValidation = false;
  18.  
     
  19.  
    public function actionIndex()
  20.  
    {
  21.  
    //rabbitmq相关配置
  22.  
    $rabbitMqConfig = Yii::$app->params["rabbitMq"];
  23.  
    $deadConfig = Yii::$app->params["rabbitMq"]["dead_letter"];
  24.  
    $config = $rabbitMqConfig["base"];
  25.  
    $exchangeName = $rabbitMqConfig["exchange_name"]["name4"];
  26.  
    $queueName = $rabbitMqConfig["queue_name"]["name5"];
  27.  
    $routingKey = $rabbitMqConfig["routing_key"]["order_key"];
  28.  
    // 创建连接
  29.  
    $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
  30.  
    // 创建channel
  31.  
    $channel = $connection->channel();
  32.  
    // 声明并初始化交换器, 交换机类型: routing_key-更详细的bind模式
  33.  
    $channel->exchange_declare($exchangeName, AMQPExchangeType::DIRECT, false, false, false);
  34.  
    // 消息队列的额外参数
  35.  
    $args = new AMQPTable([
  36.  
    'x-message-ttl' => 2000, // 消息的过期时间
  37.  
    "x-dead-letter-exchange" => $deadConfig["exchange_name"]["pay"], // 死信队列交换机名称
  38.  
    "x-dead-letter-routing-key" => $deadConfig["routing_key"]["pay"] // 死信队列routing名称
  39.  
    ]);
  40.  
    // 声明队列
  41.  
    $channel->queue_declare($queueName, false, true, false, false, false, $args);
  42.  
    // 交换机和队列绑定
  43.  
    $channel->queue_bind($queueName, $exchangeName, $routingKey);
  44.  
     
  45.  
    // 声明死信交换机
  46.  
    $channel->exchange_declare($deadConfig["exchange_name"]["pay"], AMQPExchangeType::DIRECT, false, false, false);
  47.  
    // 声明死信队列
  48.  
    $channel->queue_declare($deadConfig["queue_name"]["pay"], false, true, false, false, false);
  49.  
    // 死信交换机和队列绑定
  50.  
    $channel->queue_bind($deadConfig["queue_name"]["pay"], $deadConfig["exchange_name"]["pay"], $deadConfig["routing_key"]["pay"]);
  51.  
     
  52.  
    // 声明一个数据:里面可以是用户订单相关json数据
  53.  
    $data = "this is a dead message";
  54.  
    // 初始化消息并持久化
  55.  
    $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
  56.  
    // 发布消息到交换器, 并和路由键匹配
  57.  
    $channel->basic_publish($msg, $exchangeName, $routingKey);
  58.  
     
  59.  
    //关闭通道和连接
  60.  
    $channel->close();
  61.  
    $connection->close();
  62.  
    }
  63.  
    }
学新通

消费者

  1.  
    <?php
  2.  
    /**
  3.  
    * 死信队列,延时队列: 模拟订单支付,消费者消费消息
  4.  
    *
  5.  
    */
  6.  
     
  7.  
    namespace console\controllers\exchange\dead;
  8.  
     
  9.  
    use PhpAmqpLib\Exchange\AMQPExchangeType;
  10.  
    use Yii;
  11.  
    use yii\web\Controller;
  12.  
    use PhpAmqpLib\Connection\AMQPStreamConnection;
  13.  
     
  14.  
    class ConsumerController extends Controller
  15.  
    {
  16.  
     
  17.  
    public function actionIndex()
  18.  
    {
  19.  
    //rabbitmq相关配置
  20.  
    $rabbitMqConfig = Yii::$app->params["rabbitMq"];
  21.  
    $config = $rabbitMqConfig["base"];
  22.  
    $exchangeName = $rabbitMqConfig["dead_letter"]["exchange_name"]["pay"];
  23.  
    $queueName = $rabbitMqConfig["dead_letter"]["queue_name"]["pay"];
  24.  
    $routingKey = $rabbitMqConfig["dead_letter"]["routing_key"]["pay"];
  25.  
    // 创建连接
  26.  
    $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
  27.  
    // 创建channel
  28.  
    $channel = $connection->channel();
  29.  
    // 声明对应的交换器
  30.  
    $channel->exchange_declare($exchangeName, AMQPExchangeType::DIRECT, false, false, false);
  31.  
    // 交换机与队列绑定,并指定routing_key
  32.  
    $channel->queue_bind($queueName, $exchangeName, $routingKey);
  33.  
    // 消息回调处理
  34.  
    $callback = function ($meg) {
  35.  
                //处理订单相关数据
  36.  
    echo "revince: " . $meg->body. "\n";
  37.  
    $meg->ack();
  38.  
    };
  39.  
    // 设置消费者处理消息限制,第二个参数:同一时刻服务器只会发送1条消息给消费者消费
  40.  
    $channel->basic_qos(null, 1, null);
  41.  
    // 消费者消费消息: 第四个参数: 需要ack确认
  42.  
    $channel->basic_consume($queueName, '', false, false, false, false, $callback);
  43.  
    // 监控
  44.  
    while ($channel->is_open()) {
  45.  
    $channel->wait();
  46.  
    }
  47.  
    //关闭通道和连接
  48.  
    $channel->close();
  49.  
    $connection->close();
  50.  
    }
  51.  
    }
学新通

问题

通过上面的案例就可以实现死信队列,延时队列操作,上面看上去似乎没什么问题,实测一下就会发现 消息不会“如期死亡 。当先生产一个TTL为60s的消息,再生产一个TTL为5s的消息,第二个消息并不会再5s后过期进入死信队列,而是需要等到第一个消息TTL到期后,与第一个消息一同进入死信队列, 这是因为RabbitMQ 只会判断队列中的第一个消息是否过期

那么怎么来解决这个问题呢?

通过 RabbitMQ 的 rabbitmq_delayed_message_exchange 插件来解决
此插件的原理是将消息在交换机处暂存储在mnesia(一个分布式数据系统)表中,延迟投递到队列中,等到消息到期再投递到队列当中
rabbitmq_delayed_message_exchange插件安装
(1).下载地址:https://www.rabbitmq.com/community-plugins.html
注意: 要下载与rabbitmq相对应的版本
学新通
(2).把下载的插件放到指定位置
下载的文件为zip格式,将zip格式解压,插件格式为ez,将文件复制到插件目录:
  • Linux

  1.  
    /usr/lib/rabbitmq/lib/rabbitmq_server-xxx/plugins
  2.  
    rabbitmq-plugins list
  • Windows

C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.17\plugins
(3).启动插件
  • Linux

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  • Windows

学新通
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

启动信息:

学新通
(4).查看
进入:http://localhost:15672/#/exchanges
学新通

重构代码

生产者
生产者实现的关键点:
1.在声明交换机时不在是direct类型,而是x-delayed-message类型,这是由插件提供的类型
2.交换机要增加"x-delayed-type": "direct"参数设置
3.发布消息时,要在 Headers 中设置x-delay参数,来控制消息从交换机过期时间
  1.  
    <?php
  2.  
    /**
  3.  
    * 死信队列,延时队列插件使用: 模拟订单支付
  4.  
    */
  5.  
     
  6.  
    namespace console\controllers\exchange\delay;
  7.  
     
  8.  
    use PhpAmqpLib\Exchange\AMQPExchangeType;
  9.  
    use PhpAmqpLib\Wire\AMQPTable;
  10.  
    use Yii;
  11.  
    use yii\web\Controller;
  12.  
    use PhpAmqpLib\Connection\AMQPStreamConnection;
  13.  
    use PhpAmqpLib\Message\AMQPMessage;
  14.  
     
  15.  
    class PublisherController extends Controller
  16.  
    {
  17.  
    public $enableCsrfValidation = false;
  18.  
     
  19.  
    public function actionIndex()
  20.  
    {
  21.  
    //rabbitmq相关配置
  22.  
    $rabbitMqConfig = Yii::$app->params["rabbitMq"];
  23.  
    $delayConfig = Yii::$app->params["rabbitMq"]["delay"];
  24.  
    $config = $rabbitMqConfig["base"];
  25.  
    // 创建连接
  26.  
    $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
  27.  
    // 创建channel
  28.  
    $channel = $connection->channel();
  29.  
    // 声明并初始化交换器, 交换机类型: 延时插件名称(x-delayed-message)
  30.  
    $channel->exchange_declare($delayConfig["exchange_name"]["pay"], "x-delayed-message", false, true, false);
  31.  
    // 消息队列的额外参数
  32.  
    $args = new AMQPTable(["x-delayed-type" => "direct"]);
  33.  
    // 声明队列
  34.  
    $channel->queue_declare($delayConfig["queue_name"]["pay"], false, true, false, false, false, $args);
  35.  
    // 交换机和队列绑定
  36.  
    $channel->queue_bind($delayConfig["queue_name"]["pay"], $delayConfig["exchange_name"]["pay"], $delayConfig["routing_key"]["pay"]);
  37.  
     
  38.  
    // 声明一个数据
  39.  
    $data = "this is a dead message";
  40.  
    // 初始化消息并持久化
  41.  
    $arr = [
  42.  
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT,
  43.  
    "application_headers" => new AMQPTable([
  44.  
    "x-delayed" => 2000 // 过期时间
  45.  
    ])
  46.  
    ];
  47.  
    $msg = new AMQPMessage($data, $arr);
  48.  
    // 发布消息到交换器, 并和路由键匹配
  49.  
    $channel->basic_publish($msg, $delayConfig["exchange_name"]["pay"], $delayConfig["routing_key"]["pay"]);
  50.  
     
  51.  
    //关闭通道和连接
  52.  
    $channel->close();
  53.  
    $connection->close();
  54.  
    }
  55.  
    }
学新通
消费者
没有啥特别的修改
  1.  
    <?php
  2.  
    /**
  3.  
    * 死信队列,延时队列插件使用: 模拟订单支付
  4.  
    *
  5.  
    */
  6.  
     
  7.  
    namespace console\controllers\exchange\delay;
  8.  
     
  9.  
    use PhpAmqpLib\Exchange\AMQPExchangeType;
  10.  
    use Yii;
  11.  
    use yii\web\Controller;
  12.  
    use PhpAmqpLib\Connection\AMQPStreamConnection;
  13.  
     
  14.  
    class ConsumerController extends Controller
  15.  
    {
  16.  
    public $enableCsrfValidation = false;
  17.  
     
  18.  
    public function actionIndex()
  19.  
    {
  20.  
    //rabbitmq相关配置
  21.  
    $rabbitMqConfig = Yii::$app->params["rabbitMq"];
  22.  
    $delayConfig = Yii::$app->params["rabbitMq"]["delay"];
  23.  
    $config = $rabbitMqConfig["base"];// 创建连接
  24.  
    $connection = new AMQPStreamConnection($config["host"], $config["port"], $config["user"], $config["pass"], $config["v_host"]);
  25.  
    // 创建channel
  26.  
    $channel = $connection->channel();
  27.  
    // 声明并初始化交换器, 交换机类型: 延时插件名称(x-delayed-message)
  28.  
    $channel->exchange_declare($delayConfig["exchange_name"]["pay"], "x-delayed-message", false, true, false);
  29.  
    // 交换机和队列绑定
  30.  
    $channel->queue_bind($delayConfig["queue_name"]["pay"], $delayConfig["exchange_name"]["pay"], $delayConfig["routing_key"]["pay"]);
  31.  
    // 消息回调处理
  32.  
    $callback = function ($meg) {
  33.  
    echo "revince: " . $meg->body. "\n";
  34.  
    $meg->ack();
  35.  
    };
  36.  
    // 设置消费者处理消息限制,第二个参数:同一时刻服务器只会发送1条消息给消费者消费
  37.  
    $channel->basic_qos(null, 1, null);
  38.  
    // 消费者消费消息: 第四个参数: 需要ack确认
  39.  
    $channel->basic_consume($delayConfig["queue_name"]["pay"], '', false, false, false, false, $callback);
  40.  
    // 监控
  41.  
    while ($channel->is_open()) {
  42.  
    $channel->wait();
  43.  
    }
  44.  
    //关闭通道和连接
  45.  
    $channel->close();
  46.  
    $connection->close();
  47.  
    }
  48.  
    }
学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhggkbig
系列文章
更多 icon
同类精品
更多 icon
继续加载