学新通技术网

rabbitmq 简单及工作 模式 simple|work

juejin 18 1
rabbitmq 简单及工作 模式 simple|work

1.模式封装

package simple

import (
   "fmt"
   "github.com/streadway/amqp"
   "log"
)

const MQURL = "amqp://用户名:密码@127.0.0.1:5672/vhost设置名"

type RabbitMq struct{

   conn *amqp.Connection //mq连接
   channel *amqp.Channel //mq通道
   QueueName string     //mq队列名
   ExChange string         //交换机名
   Key string         //绑定key
   MqUrl string        //mq连接地址
}

//初始化mq信息

func NewRabbitMq(queueName,exChange,key string) *RabbitMq {

   rab:=&RabbitMq{QueueName: queueName,ExChange:exChange,Key:key,MqUrl: MQURL}

   var err error
   rab.conn,err=amqp.Dial(rab.MqUrl)
   rab.failOnErr(err,"连接rabbitmq失败")

   rab.channel,err = rab.conn.Channel()
   rab.failOnErr(err,"获取channel失败")

   return rab
}

//错误信息公共返回处理
func(r *RabbitMq)failOnErr(err error, message string){
   if err!=nil{
      log.Fatalf("%s:%s",err,message)
      panic(fmt.Sprintf("%s:%s",err,message))
   }
}

//简单模式创建实例
func (r *RabbitMq)NewRabbitMqSimple(queueName string)*RabbitMq {

   return NewRabbitMq(queueName,"","")
}

//简单模式生产消息
func (r *RabbitMq)PublishSimple(message string){


   //1 申请队列,如果队列不存在则自动创建,如果队列存在则跳过,如此保证了队列存在,消息才能发送到队列中
   _,err:=r.channel.QueueDeclare(
      //队列名字
      r.QueueName,
      false, //是否持久化
      false,//是否为自动删除
      false, //是否具有排他性 如果true 只有自己使用
      false, //是否阻塞,    设置false 等待服务器响应
      nil,      //额外属性
      )
   r.failOnErr(err,"申请队列失败")

   //2发送消息到队列
   err = r.channel.Publish(
      r.ExChange,
      r.QueueName,
      false,//默认false 如果为true 根据exchange 类型和 routkey 规则,如果无法找到符合条件的队列则把消息返回给发送者
      false,//默认false 如果为true,exchange 发送消息到队列后发现队列上没有绑定消费者则返回给发送者
      amqp.Publishing{

         ContentType: "text/plain",
         Body:        []byte(message),
      },
      )
   r.failOnErr(err,"发送消息到队列失败")
}
//消费信息
func (r *RabbitMq)ConsumeSimple(){
   //1 申请队列,如果队列不存在则自动创建,如果队列存在则跳过,如此保证了队列存在,消息才能发送到队列中
   q,err:=r.channel.QueueDeclare(
      //队列名字
      r.QueueName,
      false, //是否持久化
      false,//是否为自动删除
      false, //是否具有排他性 如果true 只有自己使用
      false, //是否阻塞,    设置false 等待服务器响应
      nil,      //额外属性
   )
   r.failOnErr(err,"申请队列失败")

   //接受消息
   msgs,err:= r.channel.Consume(
      q.Name,
      "", //用来区分多个消费之,简单模式默认空
      true,//是否自动应答
      false,//是否具有排他性
      false,//如果设置为true,表示不能将同一个connection中发送的消息给这个connection中的消费
      false,//队列消息是否阻塞
      nil,)
   r.failOnErr(err,"接受消息失败")
   forever := make(chan bool)

   go func() {
      for d:=range msgs {
         fmt.Println(string(d.Body))
      }
   }()
   <-forever
}

2 简单模式,新建pub.go 服务端 新建 consume.go消费端

package main

import (
   "fmt"
   "nn/rabbitmq/simple"
)

func main() {
   rab:= &simple.RabbitMq{}
   r:=rab.NewRabbitMqSimple( "ic" )
   r.PublishSimple("你好,世界")
   fmt.Println("发送成功")
}
package main

import (
   "nn/rabbitmq/simple"
)

func main() {
   rab:= &simple.RabbitMq{}
   r:=rab.NewRabbitMqSimple( "ic" )
   r.ConsumeSimple()
}

3 工作模式 新建pub.go 服务端 新建 consume1.go消费端 consume2.go消费端

package main

import (
   "fmt"
   "nn/rabbitmq/simple"
   "strconv"
   "time"
)

func main() {
   rab:= simple.RabbitMq{}
   r:=rab.NewRabbitMqSimple("icc")

   for i:=0;i<100;i++{

      r.PublishSimple("你好,世界"+strconv.Itoa(i))
      time.Sleep(time.Second*1)
   }
   fmt.Println("发布消息成功")
   
}
package main

import "nn/rabbitmq/simple"

func main() {
   rab:=simple.RabbitMq{}
   r:=rab.NewRabbitMqSimple("icc")
   r.ConsumeSimple()
}
package main

import "nn/rabbitmq/simple"

func main() {
   rab:=simple.RabbitMq{}
   r:=rab.NewRabbitMqSimple("icc")
   r.ConsumeSimple()
}

4.工作模式 下 go run consume1.go go run consume2.go ,再启用服务端发送队列消息 go run pub.go

服务端: image.png

消费端consume1.go: image.png

本文出至:学新通技术网

标签: