RabbitMQ - 案例
目录
0.引用
https://note.oddfar.com/rabbitmq/
1.Hello world
1.1 依赖引用
-
<dependencies>
-
<!--rabbitmq 依赖客户端-->
-
<dependency>
-
<groupId>com.rabbitmq</groupId>
-
<artifactId>amqp-client</artifactId>
-
<version>5.8.0</version>
-
</dependency>
-
<!--操作文件流的一个依赖-->
-
<dependency>
-
<groupId>commons-io</groupId>
-
<artifactId>commons-io</artifactId>
-
<version>2.6</version>
-
</dependency>
-
</dependencies>
1.2 消息生产者
-
package com.example.one;
-
-
import com.rabbitmq.client.Channel;
-
import com.rabbitmq.client.Connection;
-
import com.rabbitmq.client.ConnectionFactory;
-
-
-
public class Producer {
-
private final static String QUEUE_NAME = "quque";
-
-
public static void main(String[] args) throws Exception {
-
-
//创建一个连接工厂
-
ConnectionFactory factory = new ConnectionFactory();
-
factory.setHost("192.168.2.17");
-
factory.setUsername("admin");
-
factory.setPassword("admin");
-
-
//channel 实现了自动 close 接口 自动关闭 不需要显示关闭
-
//创建连接
-
Connection connection = factory.newConnection();
-
-
//获取信道
-
Channel channel = connection.createChannel();
-
-
/**
-
* 生成一个队列
-
* 1.QUEUE_NAME 队列名称
-
* 2.durable 队列里面的消息是否持久化 也就是是否用完就删除
-
* 3.exclusive 该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
-
* 4.autoDelete是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
-
* 5.其他参数
-
*/
-
Boolean durable = true;
-
Boolean exclusive = false;
-
Boolean autoDelete = false;
-
Map<String, Object> arguments = null;
-
channel.queueDeclare(QUEUE_NAME,durable,exclusive,autoDelete, null);
-
String message = "hello world";
-
-
/**
-
* 发送一个消息
-
* 1.发送到那个交换机
-
* 2.路由的 key 是哪个
-
* 3.其他的参数信息
-
* 4.发送消息的消息体
-
*/
-
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
-
System.out.println("消息发送完毕");
-
-
}
-
-
}
1.3 消息消费者
-
package com.example.one;
-
import com.rabbitmq.client.*;
-
-
public class Consumer {
-
private final static String QUEUE_NAME = "quque";
-
-
public static void main(String[] args) throws Exception {
-
-
ConnectionFactory factory = new ConnectionFactory();
-
factory.setHost("192.168.2.17");
-
factory.setUsername("admin");
-
factory.setPassword("admin");
-
Connection connection = factory.newConnection();
-
Channel channel = connection.createChannel();
-
-
System.out.println("等待接收消息.........");
-
-
//推送的消息如何进行消费的接口回调
-
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
-
String message = new String(delivery.getBody());
-
System.out.println(message);
-
};
-
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
-
CancelCallback cancelCallback = (consumerTag) -> {
-
System.out.println("消息消费被中断");
-
};
-
/**
-
* 消费者消费消息 - 接受消息
-
* 1.消费哪个队列
-
* 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
-
* 3.消费者未成功消费的回调
-
* 4.消息被取消时的回调
-
*/
-
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
-
}
-
-
}
2.轮训分发消息
2.1 抽取工具类
-
package com.example.utils;
-
-
import com.rabbitmq.client.Channel;
-
import com.rabbitmq.client.Connection;
-
import com.rabbitmq.client.ConnectionFactory;
-
-
public class RabbitMqUtils {
-
//得到一个连接的 channel
-
-
public static Channel getChannel() throws Exception {
-
//创建一个连接工厂
-
ConnectionFactory factory = new ConnectionFactory();
-
factory.setHost("192.168.2.17");
-
factory.setUsername("admin");
-
factory.setPassword("admin");
-
Connection connection = factory.newConnection();
-
Channel channel = connection.createChannel();
-
return channel;
-
}
-
}
2.2 启动两个工作线程接受消息
-
package com.example.two;
-
-
import com.oddfar.utils.RabbitMqUtils;
-
import com.rabbitmq.client.CancelCallback;
-
import com.rabbitmq.client.Channel;
-
import com.rabbitmq.client.DeliverCallback;
-
-
public class Worker01 {
-
-
private static final String QUEUE_NAME = "quque";
-
-
public static void main(String[] args) throws Exception {
-
-
Channel channel = RabbitMqUtils.getChannel();
-
-
//消息接受
-
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
-
String receivedMessage = new String(delivery.getBody());
-
System.out.println("接收到消息:" receivedMessage);
-
};
-
//消息被取消
-
CancelCallback cancelCallback = (consumerTag) -> {
-
System.out.println(consumerTag "消费者取消消费接口回调逻辑");
-
-
};
-
-
System.out.println("C1 消费者启动等待消费.................. ");
-
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
-
-
}
-
}
选中 Allow multiple instances
启动后
2.3 启动一个发送消息线程
-
public class Task01 {
-
public static final String QUEUE_NAME = "quque";
-
-
public static void main(String[] args) throws Exception {
-
-
Channel channel = RabbitMqUtils.getChannel();
-
-
Scanner scanner = new Scanner(System.in);
-
while (scanner.hasNext()) {
-
String message = scanner.next();
-
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
-
System.out.println("消息发送完成:" message);
-
}
-
-
}
-
}
2.4 结果展示
通过程序执行发现生产者总共发送 4 个消息,消费者 1 和消费者 2 分别分得两个消息,并且是按照有序的一个接收一次消息
3.消息应答
3.1 自动应答
消息发送后立即被认为已经传送成功
3.2 手动消息应答的方法
- Channel.basicAck(用于肯定确认)
- Channel.basicNack(用于否定确认)
- Channel.basicReject(用于否定确认)
Multiple 的解释:
手动应答的好处是可以批量应答并且减少网络拥堵
- true 代表批量应答 channel 上未应答的消息
- false 同上面相比只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答
3.3 消息自动重新入队
3.4 消息手动应答代码
消费者在上面代码的基础上增加了以下内容
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
4.RabbitMQ 持久化
4.1 队列如何实现持久化
-
//让队列持久化
-
boolean durable = true;
-
//声明队列
-
channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);
4.2 消息实现持久化
需要在消息生产者修改代码,MessageProperties.PERSISTENT_TEXT_PLAIN
添加这个属性
5.不公平分发
为了避免这种情况,在消费者中消费之前,我们可以设置参数 channel.basicQos(1);
-
//不公平分发
-
int prefetchCount = 1;
-
channel.basicQos(prefetchCount);
-
-
//采用手动应答
-
boolean autoAck = false;
-
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
6.预取值分发
本身消息的发送就是异步发送的,所以在任何时候,channel 上肯定不止只有一个消息另外来自消费 者的手动确认本质上也是异步的。因此这里就存在一个未确认的消息缓冲区,因此希望开发人员能限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。这个时候就可以通过使用 basic.qos 方法设 置“预取计数”值来完成的
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhgbgkbf
系列文章
更多
同类精品
更多
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
怎样阻止微信小程序自动打开
PHP中文网 06-13