• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

golang对接rabbitMQ

武飞扬头像
leo_jk
帮助1

此文参考如下链接:

Golang 使用 RabbitMQ - 知乎

rabbitmq的个组件、消息传递原理就不做解释了,百度一大堆。

一、rabbitMQ架构

首先要明确的是,tabbitmq是c/s架构的软件,对接rabbitMQ也就是通过client(客户端)来操作服务端,发送消息、接收消息。

二、golang对接rabbitMQ如何来组织

1.首先针对各语言,rabbitMQ官方都有提供对应语言的包,golang的包通过 go get 下载,地址:

github.com/streadway/amqp

2.首先已经明确过了rabbitMQ是c/s架构,所以我们不论是生产者还是消费者,都是通过client端来操作。

3.生产者和消费者都需要完成的步骤:

        3.1 创建与server端的连接 connection

        3.2 创建channel(发送或接收消息通道) 

4.生产者要完成的步骤:

        4.1 通过channnel声明Queue(如果Queue已经存在,服务端则会忽略,不存在,服务端则会新建一个Queue)

        4.2 通过channel声明Exchange,需要指定Exchange的Type类型(同Queue声明一样,有则忽略,没有则创建)

        4.3 创建binding,指定binding key 将Queue绑定到Exchange上,可有多个绑定关系。

        4.4 发送消息,指定消息的 bingding key 、Exchange(消息携带了bingding key 和 Exchange,到达Exchange后就知道根据什么type,以及绑定关系分发给哪个队列)

        

5. 消费者要完成的步骤:

        5.1 通过channel声明Queue(同生产者)

        5.2 从Queue中取消息

        5.3 要注意的是,消费和没必要去关注 Exchange和 绑定关系,因为这是生产者关注的点,尽管消费端也可以 创建交换机以及绑定关系。

三、实际代码实现

注意:生产者和消费者可以根据消息模式进行封装,直接调用。本代码实例即 topic模式,了解此种模式其他模式也就很简单了。

1.rabbitMQ的client端初始化代码(即 生产者和消费者 都需要用到的 client)

  1.  
    package rabbitMq
  2.  
     
  3.  
    import (
  4.  
    "log"
  5.  
     
  6.  
    "github.com/streadway/amqp"
  7.  
    ) //导入mq包
  8.  
     
  9.  
    // MQURL 格式 amqp://账号:密码@rabbitmq服务器地址:端口号/vhost (默认是5672端口)
  10.  
    // 端口可在 /etc/rabbitmq/rabbitmq-env.conf 配置文件设置,也可以启动后通过netstat -tlnp查看
  11.  
    const MQURL = "amqp://admin:mima@192.168.1.11:5672/"
  12.  
     
  13.  
    // RabbitMQ 结构体
  14.  
    type RabbitMQ struct {
  15.  
    Conn *amqp.Connection
  16.  
    Channel *amqp.Channel
  17.  
    // 队列名称
  18.  
    QueueName string
  19.  
    // 交换机
  20.  
    Exchange string
  21.  
    // routing Key
  22.  
    RoutingKey string
  23.  
    //MQ链接字符串
  24.  
    Mqurl string
  25.  
    }
  26.  
     
  27.  
    // 创建结构体实例
  28.  
    func NewRabbitMQ(queueName, exchange, routingKey string) *RabbitMQ {
  29.  
    rabbitMQ := RabbitMQ{
  30.  
    QueueName: queueName,
  31.  
    Exchange: exchange,
  32.  
    RoutingKey: routingKey,
  33.  
    Mqurl: MQURL,
  34.  
    }
  35.  
    var err error
  36.  
    //创建rabbitmq连接
  37.  
    rabbitMQ.Conn, err = amqp.Dial(rabbitMQ.Mqurl)
  38.  
    checkErr(err, "创建连接失败")
  39.  
     
  40.  
    //创建Channel
  41.  
    rabbitMQ.Channel, err = rabbitMQ.Conn.Channel()
  42.  
    checkErr(err, "创建channel失败")
  43.  
     
  44.  
    return &rabbitMQ
  45.  
     
  46.  
    }
  47.  
     
  48.  
    // 释放资源,建议NewRabbitMQ获取实例后 配合defer使用
  49.  
    func (mq *RabbitMQ) ReleaseRes() {
  50.  
    mq.Conn.Close()
  51.  
    mq.Channel.Close()
  52.  
    }
  53.  
     
  54.  
    // 错误处理
  55.  
    func checkErr(err error, meg string) {
  56.  
    if err != nil {
  57.  
    log.Fatalf("%s:%s\n", meg, err)
  58.  
    }
  59.  
    }
学新通

2.生产者代码实现

  1.  
    package main
  2.  
     
  3.  
    import (
  4.  
    "fmt"
  5.  
    "mq/rabbitMq"
  6.  
     
  7.  
    "github.com/streadway/amqp"
  8.  
    )
  9.  
     
  10.  
    //生产者发布流程
  11.  
    func main() {
  12.  
    // 初始化mq
  13.  
    mq := rabbitMq.NewRabbitMQ("queue_publisher", "exchange_publisher", "key1")
  14.  
    defer mq.ReleaseRes() // 完成任务释放资源
  15.  
     
  16.  
    // 1.声明队列
  17.  
    /*
  18.  
    如果只有一方声明队列,可能会导致下面的情况:
  19.  
    a)消费者是无法订阅或者获取不存在的MessageQueue中信息
  20.  
    b)消息被Exchange接受以后,如果没有匹配的Queue,则会被丢弃
  21.  
     
  22.  
    为了避免上面的问题,所以最好选择两方一起声明
  23.  
    ps:如果客户端尝试建立一个已经存在的消息队列,Rabbit MQ不会做任何事情,并返回客户端建立成功的
  24.  
    */
  25.  
    _, err := mq.Channel.QueueDeclare( // 返回的队列对象内部记录了队列的一些信息,这里没什么用
  26.  
    mq.QueueName, // 队列名
  27.  
    true, // 是否持久化
  28.  
    false, // 是否自动删除(前提是至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。注意:生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列)
  29.  
    false, // 是否为排他队列(排他的队列仅对“首次”声明的conn可见[一个conn中的其他channel也能访问该队列],conn结束后队列删除)
  30.  
    false, // 是否阻塞
  31.  
    nil, //额外属性(我还不会用)
  32.  
    )
  33.  
    if err != nil {
  34.  
    fmt.Println("声明队列失败", err)
  35.  
    return
  36.  
    }
  37.  
     
  38.  
    // 2.声明交换器
  39.  
    err = mq.Channel.ExchangeDeclare(
  40.  
    mq.Exchange, //交换器名
  41.  
    "topic", //exchange type:一般用fanout、direct、topic
  42.  
    true, // 是否持久化
  43.  
    false, //是否自动删除(自动删除的前提是至少有一个队列或者交换器与这和交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑)
  44.  
    false, //设置是否内置的。true表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式
  45.  
    false, // 是否阻塞
  46.  
    nil, // 额外属性
  47.  
    )
  48.  
    if err != nil {
  49.  
    fmt.Println("声明交换器失败", err)
  50.  
    return
  51.  
    }
  52.  
     
  53.  
    // 3.建立Binding(可随心所欲建立多个绑定关系)
  54.  
    err = mq.Channel.QueueBind(
  55.  
    mq.QueueName, // 绑定的队列名称
  56.  
    mq.RoutingKey, // bindkey 用于消息路由分发的key
  57.  
    mq.Exchange, // 绑定的exchange名
  58.  
    false, // 是否阻塞
  59.  
    nil, // 额外属性
  60.  
    )
  61.  
    // err = mq.Channel.QueueBind(
  62.  
    // mq.QueueName, // 绑定的队列名称
  63.  
    // "routingkey2", // bindkey 用于消息路由分发的key
  64.  
    // mq.Exchange, // 绑定的exchange名
  65.  
    // false, // 是否阻塞
  66.  
    // nil, // 额外属性
  67.  
    // )
  68.  
    if err != nil {
  69.  
    fmt.Println("绑定队列和交换器失败", err)
  70.  
    return
  71.  
    }
  72.  
     
  73.  
    // 4.发送消息
  74.  
    mq.Channel.Publish(
  75.  
    mq.Exchange, // 交换器名
  76.  
    mq.RoutingKey, // routing key
  77.  
    false, // 是否返回消息(匹配队列),如果为true, 会根据binding规则匹配queue,如未匹配queue,则把发送的消息返回给发送者
  78.  
    false, // 是否返回消息(匹配消费者),如果为true, 消息发送到queue后发现没有绑定消费者,则把发送的消息返回给发送者
  79.  
    amqp.Publishing{ // 发送的消息,固定有消息体和一些额外的消息头,包中提供了封装对象
  80.  
    ContentType: "text/plain", // 消息内容的类型
  81.  
    Body: []byte("hello jochen"), // 消息内容
  82.  
    },
  83.  
    )
  84.  
    }
学新通

3.消费者代码

  1.  
    package main
  2.  
     
  3.  
    import (
  4.  
    "fmt"
  5.  
    "mq/rabbitMq"
  6.  
    )
  7.  
     
  8.  
    // 消费者订阅
  9.  
    func main() {
  10.  
    // 初始化mq
  11.  
    mq := rabbitMq.NewRabbitMQ("queue_publisher", "exchange_publisher", "key1")
  12.  
    defer mq.ReleaseRes() // 完成任务释放资源
  13.  
     
  14.  
    // 1.声明队列(两端都要声明,原因在生产者处已经说明)
  15.  
    _, err := mq.Channel.QueueDeclare( // 返回的队列对象内部记录了队列的一些信息,这里没什么用
  16.  
    mq.QueueName, // 队列名
  17.  
    true, // 是否持久化
  18.  
    false, // 是否自动删除(前提是至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。注意:生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列)
  19.  
    false, // 是否为排他队列(排他的队列仅对“首次”声明的conn可见[一个conn中的其他channel也能访问该队列],conn结束后队列删除)
  20.  
    false, // 是否阻塞
  21.  
    nil, // 额外属性(我还不会用)
  22.  
    )
  23.  
    if err != nil {
  24.  
    fmt.Println("声明队列失败", err)
  25.  
    return
  26.  
    }
  27.  
     
  28.  
    // 2.从队列获取消息(消费者只关注队列)consume方式会不断的从队列中获取消息
  29.  
    msgChanl, err := mq.Channel.Consume(
  30.  
    mq.QueueName, // 队列名
  31.  
    "", // 消费者名,用来区分多个消费者,以实现公平分发或均等分发策略
  32.  
    true, // 是否自动应答
  33.  
    false, // 是否排他
  34.  
    false, // 是否接收只同一个连接中的消息,若为true,则只能接收别的conn中发送的消息
  35.  
    true, // 队列消费是否阻塞
  36.  
    nil, // 额外属性
  37.  
    )
  38.  
    if err != nil {
  39.  
    fmt.Println("获取消息失败", err)
  40.  
    return
  41.  
    }
  42.  
     
  43.  
    for msg := range msgChanl {
  44.  
    // 这里写你的处理逻辑
  45.  
    // 获取到的消息是amqp.Delivery对象,从中可以获取消息信息
  46.  
    fmt.Println(string(msg.Body))
  47.  
    // msg.Ack(true) // 主动应答
  48.  
     
  49.  
    }
  50.  
     
  51.  
    }
学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhggiecg
系列文章
更多 icon
同类精品
更多 icon
继续加载