• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Python处理MQ消息

武飞扬头像
breeze的一切
帮助1

普通MQ处理

发送端

import pika

# 远程rabbitmq服务的配置信息
username = 'guest'  # 指定远程rabbitmq的用户名密码
pwd = 'guest'
ip_addr = '127.0.0.1'
port_num = 5672

# 消息队列服务的连接和队列的创建
credentials = pika.PlainCredentials(username, pwd)
connection = pika.BlockingConnection(pika.ConnectionParameters(ip_addr, port_num, '/', credentials))
channel = connection.channel()
# 创建一个名为balance的队列,对queue进行durable持久化设为True(持久化第一步)
channel.queue_declare(queue='single_user_overflow_queue', durable=True)

message_str = 'Hello World!'
for i in range(1000):
    # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
    channel.basic_publish(
        exchange='single_user_overflow_exchange',
        routing_key='single_user_overflow_queue',  # 写明将消息发送给队列balance
        body=message_str,  # 要发送的消息
        properties=pika.BasicProperties(delivery_mode=2, )  # 设置消息持久化(持久化第二步),将要发送的消息的属性标记为2,表示该消息要持久化
    )  # 向消息队列发送一条消息
    print(" [%s] Sent 'Hello World!'" % i)
    # time.sleep(0.2)
connection.close()  # 关闭消息队列服务的连接

学新通

接收端

import pika
import sys
import time

# 远程rabbitmq服务的配置信息
username = 'guest'  # 指定远程rabbitmq的用户名密码
pwd = 'guest'
ip_addr = '127.0.0.1'
port_num = 5672

credentials = pika.PlainCredentials(username, pwd)
connection = pika.BlockingConnection(pika.ConnectionParameters(ip_addr, port_num, '/', credentials))
channel = connection.channel()


# 消费成功的回调函数
def callback(ch, method, properties, body):
    print(" [%s] Received %r" % (time.time(), body))
    # time.sleep(0.2)


# 开始依次消费balance队列中的消息
channel.basic_consume(queue='balance', on_message_callback=callback, auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL C')
channel.start_consuming()  # 启动消费

学新通

这里可以手动绑定

import pika

# 远程rabbitmq服务的配置信息
username = 'guest'  # 指定远程rabbitmq的用户名密码
pwd = 'guest'
ip_addr = '127.0.0.1'
port_num = 5672

# 消息队列服务的连接和队列的创建
credentials = pika.PlainCredentials(username, pwd)
connection = pika.BlockingConnection(pika.ConnectionParameters(ip_addr, port_num, '/', credentials))
channel = connection.channel()
# 创建一个名为balance的队列,对queue进行durable持久化设为True(持久化第一步)
channel.queue_declare(queue='balance', durable=True)
channel.queue_bind(exchange="mqtest_exchange", queue='balance', routing_key='mqtest_queue')

学新通

mq支持ssl

mqCfg = cfg.getConfig()["mq"]
#ssl安全链接的带入
# context = ssl.create_default_context(cafile=mqCfg["ca_certificate"])
# 忽略证书验证
context = ssl._create_unverified_context()

# context.load_cert_chain(mqCfg["client_certificate"], mqCfg["client_key"])
ssl_options = pika.SSLOptions(context, mqCfg["ip_addr"])
parameters = pika.ConnectionParameters(host=mqCfg["ip_addr"],
                                       port=mqCfg["port_num"],
                                       credentials=pika.PlainCredentials(mqCfg["username"], mqCfg["pwd"]),
                                       ssl_options=ssl_options)
# 消息队列服务的连接和队列的创建
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
return channel

学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhfjikif
系列文章
更多 icon
同类精品
更多 icon
继续加载