• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

go通过rabbitmq实现延迟队列

武飞扬头像
我是等闲之辈
帮助1

延时队列的应用场景有很多,比如:订单在十分钟之内未支付则自动取消。
使用的go包是:github.com/streadway/amqp

下面展示主要的代码

生成队列的代码如下:
两个重点:
1、设置队列过期时间(包括队列和单个消息的过期时间,两者都设置的话取短的那个)
2、指定队列里面的消息过期后发送到哪个死信交换机,以及绑定规则。

【注意】
修改了队列参数后(比如修改队列的过期时间),需要把旧的队列删掉,才能重新生成新的队列。不然生产消息会提示:Exception (504) Reason: “channel/connection is not open”

//simple 模式下 队列生产(修改队列参数,需要删掉队列,重新生成队列才能生效)
func (r *RabbitMQ) PublishSimple(message string) {
	//1.申请队列,如果队列不存在会自动创建,存在则跳过创建
	//额外属性
	var args = make(map[string]interface{})
	args["x-message-ttl"] = 20000                            //设置队列的过期时间(单位是毫秒)
	args["x-dead-letter-exchange"] = "dead_message_exchange" // 指定死信交换机
	args["x-dead-letter-routing-key"] = "dead_message_key"   // 指定死信routing-key
	_, err := r.channel.QueueDeclare(
		r.QueueName,
		//是否持久化
		true,
		//是否自动删除
		false,
		//是否具有排他性
		false,
		//是否阻塞处理
		false,
		//额外的属性
		args,
	)
	if err != nil {
		fmt.Println(err)
	}
	//调用channel 发送消息到队列中
	r.channel.Publish(
		r.Exchange,
		r.QueueName,
		//如果为true,根据自身exchange类型和routekey规则无法找到符合条件的队列会把消息返还给发送者
		false,
		//如果为true,当exchange发送消息到队列后发现队列上没有消费者,则会把消息返还给发送者
		false,
		amqp.Publishing{
			//Expiration:  "10000", //针对单个消息,单位是毫秒(如果队列也设置了过期时间,就以两者比较短的时间为准)
			ContentType: "text/plain",
			Body:        []byte(message),
		})
}
学新通

生产者调用上面的方法:

package main

import (
	"go_rabbitmq/rabbitmq"
)
func main() {
	//只需要设置队列名称即可
	rabbitmq := rabbitmq.NewRabbitMQSimple("cowboyQueue_swl")

	//简单模式下,直接用这个测试是否能正常消费
	rabbitmq.PublishSimple("cowbouy very busy!")
	fmt.Println("发送成功")
}

然后登陆rabbit后台查看
学新通
学新通

接下来就是要生成死信交换机和死信队列

通过消费者进程创建交换机和绑定关系(生产者进程最多只创建交换机,因为生者者只负责把消息投递给交换机)
重点:注意durable和exclusive的用法和关系。
1、durable为true的时候连接关闭(消费者进程退出)不删除队列。前提是exclusive为false。
2、exclusive为true 代表只被一个连接(connection)使用,而且当连接关闭后队列即被删除(消费者进程退出即删除队列)。exclusive设置了true的话,即使durable为true队列也不会持久存在。

死信队列消费者进程代码

//死信队列消费者(消费者进程负责创建交换机、队列以及绑定关系。无法在生产者进程去创建队列和绑定关系,生产者只负责把消息投递给交换机。)
func (r *RabbitMQ) ConsumeDeadQueue() {

	//1.不存在该交换机则创建交换机
	err := r.channel.ExchangeDeclare(
		r.Exchange,
		"direct",
		true,
		false,
		false,
		false,
		nil,
	)
	r.failOnErr(err, "Failed to declare an exchange")
	//2.不存在则创建队列
	q, err := r.channel.QueueDeclare(
		r.QueueName,
		true, //durable为true的时候连接关闭(消费者进程退出)不删除队列。前提是exclusive为false
		false,
		false, //⭐重点:exclusive为true 代表只被一个连接(connection)使用,而且当连接关闭后队列即被删除(消费者进程退出即删除队列)。exclusive设置了true的话,即使durable为true队列也不会持久存在。
		false,
		nil,
	)
	fmt.Printf("q: %v\n", q)
	fmt.Printf("err: %v\n", err)
	r.failOnErr(err, "Failed to declare a queue")

	//3.绑定队列和交换机
	err = r.channel.QueueBind(
		q.Name,
		r.Key,
		r.Exchange,
		false,
		nil,
	)
	r.failOnErr(err, "Failed to bind")

	//消费消息
	messges, err := r.channel.Consume(
		q.Name,
		"",
		true,
		false,
		false,
		false,
		nil,
	)
	r.failOnErr(err, "Failed to consume")
	forever := make(chan bool)

	go func() {
		for d := range messges {
			log.Printf("Received a message: %s", d.Body)
		}
	}()

	fmt.Println("退出请按 CTRL C")
	<-forever
}
学新通
package main

import "go_rabbitmq/rabbitmq"

func main() {
	//执行 go build -o xxx.exe 生成消费进程
	rabbitmq := rabbitmq.NewRabbitMQDead("cowboy_dead_queue", "dead_message_exchange", "dead_message_key")
	rabbitmq.ConsumeDeadQueue()
}

验证

接下来就可以重新执行生产者的代码,看消息到期后,会不会发送到死信队列。
最后开启死信队列的消费者进程,不断消费死信队列中的消息,就能达到延迟队列的效果。

学新通
学新通

技巧

查看消费者进程是否存在?以及如何真正杀死消费者进程?
学新通

学新通

总结

总共需要写两个进程:
一个正常队列的生产者进程(设置好超时过期时间,指定消息过期后投递的死信交换机,和绑定路由),
一个死信队列的消费者进程(创建死信交换机、死信队列、绑定关系)。

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhghabeh
系列文章
更多 icon
同类精品
更多 icon
继续加载