springboot整合rabbitMQ使用
一,整合rabbitMQ
- 导入maven依赖
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
- 在application.yml中添加上rabbitmq的配置
spring:
rabbitmq:
virtual-host: /
host: localhost
username: guest
password: guest
port: 5672
# 消息确认配置项
# 确认消息发送到队列
publisher-returns: true
# 确认消息发送到交换机
publisher-confirm-type: correlated
#调整监听使用手动
listener:
direct:
acknowledge-mode: manual
二,配置消息队列和交换机
根据场景选择下面配置模式
1.配置直连(一对一消费)
@Configuration
public class DirectRabbitConfig {
//返回一个队列
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
@Bean
public Queue TestDirectRabbit(){
return new Queue("operationLog",true,false,false,null);
}
//交换机起名
@Bean
DirectExchange TestDirectExchange(){
return new DirectExchange("TestDirectExchange",true,false);
}
//交换机和队列绑定,提供路由
@Bean
Binding BindingDirect(){
return BindingBuilder.bind(TestDirectRabbit()).to(TestDirectExchange()).with("TestDirectRouting");
}
}
这种方式是,发送一个消息到消息队列,只会一个交换机消费一次
- TOPIC型
Topic型,当多个队列绑定到同一个交换机上时,根据routingkey来确定,与Direct不同的是,rountingkey有统配规则,例如topic#,表示所有前面是topic后面无论是什么,都会被捕获到,然后消费,达到了一个可以选择性直连或者广播的效果
@Configuration
public class TopicRabbitConfig {
final static String man="topic.man";
final static String woman="topic.woman";
//队列1
@Bean
public Queue getQueue1(){
return new Queue("TopicQueue1",true,false,false,null);
}
//队列2
@Bean
public Queue getQueue2(){
return new Queue("TopicQueue2",true,false,false,null);
}
//交换机
@Bean
public TopicExchange getTopicExchange(){
return new TopicExchange("TopicExchange");
}
//绑定
@Bean
public Binding getBinding1(){
return BindingBuilder.bind(getQueue1()).to(getTopicExchange()).with(man);
}
@Bean
public Binding getBinding2(){
return BindingBuilder.bind(getQueue2()).to(getTopicExchange()).with("topic.#");
}
}
- 广播模式
一个消息,可以被大家都消费一遍
@Configuration
public class FanoutRabbitConfig {
//队列一
@Bean
public Queue getFanoutQueue1(){
return new Queue("FanoutQueue1",true,false,false);
}
//队列二
@Bean
public Queue getFanoutQueue2(){
return new Queue("FanoutQueue2",true,false,false);
}
//队列三
@Bean
public Queue getFanoutQueue3(){
return new Queue("FanoutQueue3",true,false,false);
}
//交换机
@Bean
public FanoutExchange getFanoutExchange(){
return new FanoutExchange("FanoutExchange");
}
//绑定交换机与队列
@Bean
public Binding getFanoutBinding1(){
return BindingBuilder.bind(getFanoutQueue1()).to(getFanoutExchange());
}
//绑定交换机与队列
@Bean
public Binding getFanoutBinding2(){
return BindingBuilder.bind(getFanoutQueue2()).to(getFanoutExchange());
}
//绑定交换机与队列
@Bean
public Binding getFanoutBinding3(){
return BindingBuilder.bind(getFanoutQueue3()).to(getFanoutExchange());
}
}
三,监听消费
- 编写一个手动监听的功能类
@Component
public class MyListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.println("deliveryTag=" deliveryTag);
//一次接收一条
channel.basicAck(1, false);
byte[] body = message.getBody();
String s = new String(body);
System.out.println("消费信息:" s);
}
}
- 创建一个监听容器,把上面对象存入
@Configuration
public class SimpleMessageListenerConfig {
@Autowired
CachingConnectionFactory connectionFactory;
@Autowired
MyListener myListener;
@Bean
public SimpleMessageListenerContainer createSimpleMessageListenerContainer(){
SimpleMessageListenerContainer container=new SimpleMessageListenerContainer(connectionFactory);
//监听为手动处理消费
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//设置需要监听的队列名
container.setQueueNames("operationLog");
//将监听处理类加入容器
container.setMessageListener(myListener);
return container;
}
}
四,编写controller,发送消息给rabbitmq
@RestController
@RequestMapping("/rabbit")
public class RabbitController {
//注入rabbitTemplate来发送消息
@Autowired
RabbitTemplate rabbitTemplate;
@RequestMapping("/log2")
public void TopicRabbit1(){
Map<String,String> map = new HashMap<>();
map.put("lmh", "测试666");
//将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
rabbitTemplate.convertAndSend("TopicExchange", "topic.man",map);
System.out.println("上传日志===");
}
测试一下就好。
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhggkjih
系列文章
更多
同类精品
更多
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
怎样阻止微信小程序自动打开
PHP中文网 06-13