golang对接rabbitMQ
此文参考如下链接:
rabbitmq的个组件、消息传递原理就不做解释了,百度一大堆。
一、rabbitMQ架构
首先要明确的是,tabbitmq是c/s架构的软件,对接rabbitMQ也就是通过client(客户端)来操作服务端,发送消息、接收消息。
二、golang对接rabbitMQ如何来组织
1.首先针对各语言,rabbitMQ官方都有提供对应语言的包,golang的包通过 go get 下载,地址:
github.com/streadway/amqp
2.首先已经明确过了rabbitMQ是c/s架构,所以我们不论是生产者还是消费者,都是通过client端来操作。
3.生产者和消费者都需要完成的步骤:
3.1 创建与server端的连接 connection
3.2 创建channel(发送或接收消息通道)
4.生产者要完成的步骤:
4.1 通过channnel声明Queue(如果Queue已经存在,服务端则会忽略,不存在,服务端则会新建一个Queue)
4.2 通过channel声明Exchange,需要指定Exchange的Type类型(同Queue声明一样,有则忽略,没有则创建)
4.3 创建binding,指定binding key 将Queue绑定到Exchange上,可有多个绑定关系。
4.4 发送消息,指定消息的 bingding key 、Exchange(消息携带了bingding key 和 Exchange,到达Exchange后就知道根据什么type,以及绑定关系分发给哪个队列)
5. 消费者要完成的步骤:
5.1 通过channel声明Queue(同生产者)
5.2 从Queue中取消息
5.3 要注意的是,消费和没必要去关注 Exchange和 绑定关系,因为这是生产者关注的点,尽管消费端也可以 创建交换机以及绑定关系。
三、实际代码实现
注意:生产者和消费者可以根据消息模式进行封装,直接调用。本代码实例即 topic模式,了解此种模式其他模式也就很简单了。
1.rabbitMQ的client端初始化代码(即 生产者和消费者 都需要用到的 client)
-
package rabbitMq
-
-
import (
-
"log"
-
-
"github.com/streadway/amqp"
-
) //导入mq包
-
-
// MQURL 格式 amqp://账号:密码@rabbitmq服务器地址:端口号/vhost (默认是5672端口)
-
// 端口可在 /etc/rabbitmq/rabbitmq-env.conf 配置文件设置,也可以启动后通过netstat -tlnp查看
-
const MQURL = "amqp://admin:mima@192.168.1.11:5672/"
-
-
// RabbitMQ 结构体
-
type RabbitMQ struct {
-
Conn *amqp.Connection
-
Channel *amqp.Channel
-
// 队列名称
-
QueueName string
-
// 交换机
-
Exchange string
-
// routing Key
-
RoutingKey string
-
//MQ链接字符串
-
Mqurl string
-
}
-
-
// 创建结构体实例
-
func NewRabbitMQ(queueName, exchange, routingKey string) *RabbitMQ {
-
rabbitMQ := RabbitMQ{
-
QueueName: queueName,
-
Exchange: exchange,
-
RoutingKey: routingKey,
-
Mqurl: MQURL,
-
}
-
var err error
-
//创建rabbitmq连接
-
rabbitMQ.Conn, err = amqp.Dial(rabbitMQ.Mqurl)
-
checkErr(err, "创建连接失败")
-
-
//创建Channel
-
rabbitMQ.Channel, err = rabbitMQ.Conn.Channel()
-
checkErr(err, "创建channel失败")
-
-
return &rabbitMQ
-
-
}
-
-
// 释放资源,建议NewRabbitMQ获取实例后 配合defer使用
-
func (mq *RabbitMQ) ReleaseRes() {
-
mq.Conn.Close()
-
mq.Channel.Close()
-
}
-
-
// 错误处理
-
func checkErr(err error, meg string) {
-
if err != nil {
-
log.Fatalf("%s:%s\n", meg, err)
-
}
-
}
2.生产者代码实现
-
package main
-
-
import (
-
"fmt"
-
"mq/rabbitMq"
-
-
"github.com/streadway/amqp"
-
)
-
-
//生产者发布流程
-
func main() {
-
// 初始化mq
-
mq := rabbitMq.NewRabbitMQ("queue_publisher", "exchange_publisher", "key1")
-
defer mq.ReleaseRes() // 完成任务释放资源
-
-
// 1.声明队列
-
/*
-
如果只有一方声明队列,可能会导致下面的情况:
-
a)消费者是无法订阅或者获取不存在的MessageQueue中信息
-
b)消息被Exchange接受以后,如果没有匹配的Queue,则会被丢弃
-
-
为了避免上面的问题,所以最好选择两方一起声明
-
ps:如果客户端尝试建立一个已经存在的消息队列,Rabbit MQ不会做任何事情,并返回客户端建立成功的
-
*/
-
_, err := mq.Channel.QueueDeclare( // 返回的队列对象内部记录了队列的一些信息,这里没什么用
-
mq.QueueName, // 队列名
-
true, // 是否持久化
-
false, // 是否自动删除(前提是至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。注意:生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列)
-
false, // 是否为排他队列(排他的队列仅对“首次”声明的conn可见[一个conn中的其他channel也能访问该队列],conn结束后队列删除)
-
false, // 是否阻塞
-
nil, //额外属性(我还不会用)
-
)
-
if err != nil {
-
fmt.Println("声明队列失败", err)
-
return
-
}
-
-
// 2.声明交换器
-
err = mq.Channel.ExchangeDeclare(
-
mq.Exchange, //交换器名
-
"topic", //exchange type:一般用fanout、direct、topic
-
true, // 是否持久化
-
false, //是否自动删除(自动删除的前提是至少有一个队列或者交换器与这和交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑)
-
false, //设置是否内置的。true表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式
-
false, // 是否阻塞
-
nil, // 额外属性
-
)
-
if err != nil {
-
fmt.Println("声明交换器失败", err)
-
return
-
}
-
-
// 3.建立Binding(可随心所欲建立多个绑定关系)
-
err = mq.Channel.QueueBind(
-
mq.QueueName, // 绑定的队列名称
-
mq.RoutingKey, // bindkey 用于消息路由分发的key
-
mq.Exchange, // 绑定的exchange名
-
false, // 是否阻塞
-
nil, // 额外属性
-
)
-
// err = mq.Channel.QueueBind(
-
// mq.QueueName, // 绑定的队列名称
-
// "routingkey2", // bindkey 用于消息路由分发的key
-
// mq.Exchange, // 绑定的exchange名
-
// false, // 是否阻塞
-
// nil, // 额外属性
-
// )
-
if err != nil {
-
fmt.Println("绑定队列和交换器失败", err)
-
return
-
}
-
-
// 4.发送消息
-
mq.Channel.Publish(
-
mq.Exchange, // 交换器名
-
mq.RoutingKey, // routing key
-
false, // 是否返回消息(匹配队列),如果为true, 会根据binding规则匹配queue,如未匹配queue,则把发送的消息返回给发送者
-
false, // 是否返回消息(匹配消费者),如果为true, 消息发送到queue后发现没有绑定消费者,则把发送的消息返回给发送者
-
amqp.Publishing{ // 发送的消息,固定有消息体和一些额外的消息头,包中提供了封装对象
-
ContentType: "text/plain", // 消息内容的类型
-
Body: []byte("hello jochen"), // 消息内容
-
},
-
)
-
}
3.消费者代码
-
package main
-
-
import (
-
"fmt"
-
"mq/rabbitMq"
-
)
-
-
// 消费者订阅
-
func main() {
-
// 初始化mq
-
mq := rabbitMq.NewRabbitMQ("queue_publisher", "exchange_publisher", "key1")
-
defer mq.ReleaseRes() // 完成任务释放资源
-
-
// 1.声明队列(两端都要声明,原因在生产者处已经说明)
-
_, err := mq.Channel.QueueDeclare( // 返回的队列对象内部记录了队列的一些信息,这里没什么用
-
mq.QueueName, // 队列名
-
true, // 是否持久化
-
false, // 是否自动删除(前提是至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。注意:生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列)
-
false, // 是否为排他队列(排他的队列仅对“首次”声明的conn可见[一个conn中的其他channel也能访问该队列],conn结束后队列删除)
-
false, // 是否阻塞
-
nil, // 额外属性(我还不会用)
-
)
-
if err != nil {
-
fmt.Println("声明队列失败", err)
-
return
-
}
-
-
// 2.从队列获取消息(消费者只关注队列)consume方式会不断的从队列中获取消息
-
msgChanl, err := mq.Channel.Consume(
-
mq.QueueName, // 队列名
-
"", // 消费者名,用来区分多个消费者,以实现公平分发或均等分发策略
-
true, // 是否自动应答
-
false, // 是否排他
-
false, // 是否接收只同一个连接中的消息,若为true,则只能接收别的conn中发送的消息
-
true, // 队列消费是否阻塞
-
nil, // 额外属性
-
)
-
if err != nil {
-
fmt.Println("获取消息失败", err)
-
return
-
}
-
-
for msg := range msgChanl {
-
// 这里写你的处理逻辑
-
// 获取到的消息是amqp.Delivery对象,从中可以获取消息信息
-
fmt.Println(string(msg.Body))
-
// msg.Ack(true) // 主动应答
-
-
}
-
-
}
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhggiecg
系列文章
更多
同类精品
更多
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
怎样阻止微信小程序自动打开
PHP中文网 06-13