RabbitMQ高级 -- 分布式事务
一、引言
前端时间因为忙着学吉他~ 就耽误正事了~ 现在就赶紧补回来~~
二、目录
目录
1.1 什么是分布式事务 ?事务的作用 ? 什么情况下会用到分布式事务 ?
1.1 什么是分布式事务 ?事务的作用 ? 什么情况下会用到分布式事务 ?
概述:
分布式事务是指事务的操作位于不同的节点上,需要保证事务的ACID特性 ~
(所谓分布式事务作用:在不同的系统之间,如何保证数据的一致性。)
什么情况下回用到分布式事务呢 ?
1.主要是应用在我们 springCloud(微服务)当中 ~ 具体是怎么应用的呢 ,看下图 !
解释:
如上图所示,现在有订单服务和配送中心两个服务,每个服务都有自己独立的数据库,假如有一个用户在订单服务下了下单,需要往订单数据库插入数据, 然后给配送中心配送订单,插入配送数据库。
问题
但是假设,给订单数据库保存数据成功了,在调用配送中心的时候发生了异常,这个时候,是不会回滚订单服务的。 就会造成数据的不一致性 !!这个时候就会用的我们的分布式事务.
注意:Spring 提供的也有事务的方法,比如 @Transactional 注解,但是在微服务当中,每个服务都是一个独立的jvm,两个jvm 之间没办法相互去控制。
1.2 可靠生产和确认推送
概念:
可靠生产就是生产者发送一个消息,确认消息成功投递到了队列当中,那怎么能保证消息成功投递到队列当中呢 ? , RabbitMQ 提供的有确认机制,就是当生产者消息成功投递到队列当中,会给你回值,告诉你这个消息已成功投递,简称为 "可靠生产" 。
图 19-7
上图大致流程:
第一步:生产者发送消息到交换机,交换机分发给队列。
第二步:消息的确认机制,判断消息是否成功投递到队列。
第三步:
如果消息投递成功, MQ队列会给你个状态,告诉你消息投递成功。
如果消息投递失败,有可能是服务器原因造成,就将投递失败的消息进行两次重发,如果两次投递消息都失败,证明这条消息有问题。
可靠生产的问题:
图 19-8
上图说明:
当有一个用户下了订单,发送消息可能会出现不可靠,可能就是MQ宕机了,为了防止这种情况,在我们的业务当中会加张冗余表,利用交换机的确认机制判断消息是否发送成功。如果发送成功,就把冗余表的状态改成成功(0和1代表成功都可)。如果失败,就通过定时器重发消息。
代码实现:
注意:MQ配置的时候把确认机制类型:correlated 才能生效。
-
spring:
-
# RabbitMQ 配置
-
rabbitmq:
-
username: admin
-
password: 123456
-
virtual-host: /
-
addresses: 81.70.97.167:5672
-
# 确认机制生效
-
publisher-confirm-type: correlated
a.先保存消息到订单表和消息冗余表
-
/**
-
* @author WangYan
-
* @date 2022/3/2 21:04
-
*/
-
-
public class DownOrderServiceImpl implements DownOrderService {
-
-
-
DownOrderMapper downOrderMapper;
-
-
DownOrderMessageMapper orderMessageMapper;
-
-
RabbitTemplate rabbitTemplate;
-
-
-
private RestTemplate restTemplate;
-
-
-
public String downOrder(DownOrderPO po) {
-
// 1.保存到订单表
-
downOrderMapper.insert(po);
-
// 2.保存到订单冗余表
-
this.saveRedundancy(po);
-
// 3.发送消息到MQ
-
this.sendMessage(po.getUuid());
-
return "成功!";
-
}
-
-
/**
-
* 保存到消息冗余表
-
* @param po
-
*/
-
public void saveRedundancy(DownOrderPO po){
-
DownOrderMessagePO orderMessage = new DownOrderMessagePO();
-
orderMessage.setUuid(po.getUuid());
-
orderMessage.setCommodity(po.getCommodity());
-
orderMessage.setAmount(po.getAmount());
-
orderMessage.setOrderPerson(po.getOrderPerson());
-
// 0 未发送
-
orderMessage.setStatus(0);
-
-
orderMessageMapper.insert(orderMessage);
-
}
-
-
/**
-
* 发送消息到MQ
-
*/
-
public void sendMessage(String orderId){
-
rabbitTemplate.convertAndSend("amq.fanout","",orderId,new CorrelationData(orderId));
-
}
-
}
b.MQ的确认机制
注意:Springboot是个Web项目,一直在运行,下完订单之后,当前线程最好睡眠两毫秒,这样确认机制有一个好的回值。
-
/**
-
* @author WangYan
-
* @date 2022/4/10 18:12
-
*/
-
-
public class MQService {
-
-
-
RabbitTemplate rabbitTemplate;
-
-
DownOrderMessageMapper orderMessageMapper;
-
-
/**
-
* @PostConstruct
-
* 不是Spring提供的注解,而是Java自己的注解。
-
* Java中该注解说明: @PostConstruc 该注解被用来修饰一个非静态的 void() 方法。
-
* 被 @PostConstruc 修饰的方法会在服务器加载Servlet的时候运行。并且只会被服务器执行一次, PostConstruct 在构造函数之后执行, init() 方法之前。
-
*/
-
-
public void regCallback(){
-
// 消息发送成功以后,给与生产者的消息回复,以此来确保生产者的可靠性
-
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
-
-
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
-
// 如果ack为true 代表消息已经收到
-
String orderId = correlationData.getId();
-
-
if (!ack){
-
// 消息投递失败
-
System.out.println("MQ队列应答失败,orderId是: " orderId);
-
}
-
-
QueryWrapper<DownOrderMessagePO> queryWrapper = new QueryWrapper<>();
-
queryWrapper.lambda().eq(DownOrderMessagePO::getUuid,orderId);
-
DownOrderMessagePO orderMessagePO = orderMessageMapper.selectOne(queryWrapper);
-
orderMessagePO.setStatus(1);
-
int insert = orderMessageMapper.updateById(orderMessagePO);
-
System.out.println("消息投递成功!!");
-
}
-
});
-
}
-
}
1.2 可靠消费
a.怎么才能保证消息一定被可靠消费呢 ?
解决方案:
第一种:
1.队列绑定一个死信队列。
2.当消息消费的时候遇到异常,利用MQ的nack机制把消息丢弃到死信队列。
3.监听死信队列中的消息进行业务处理或者人工干预。
第二种:
1.利用MQ的消息重发。
注意:暂时推荐第一种,比较好的解决方案。
注意:消费服务如果出现报错,会出现死循环,可能会把服务器磁盘消耗殆尽,导致服务宕机。
图列20-1
上图流程说明:
1.利用MQ的ack机制,由消费者自身控制消息的重发、清除和丢弃。
2.问题:
a.幂等性问题:定时重发会造成消息的重复发送。可以使用唯一主键,或者redis 的
分布式锁。
b.处理流程:
图例 20-2
上图简单说明:
1.监听队列,获取消息,开始处理消息
2.当消息消费的时候遇到异常,利用MQ的nack机制把消息丢弃到死信队列。
3.监听死信队列中的消息进行业务处理或者人工干预。
c.代码实现:
yml配置:
-
spring:
-
# RabbitMQ 配置
-
rabbitmq:
-
username: admin
-
password: 123456
-
virtual-host: /
-
addresses: 81.70.97.167:5672
-
listener:
-
simple:
-
acknowledge-mode: manual # 开启手动ack,让程序去控制MQ的消息重发和删除和转移
-
retry:
-
enabled: true #开启重试
-
max-attempts: 2 #重试最大次数 (默认是3次)
-
initial-interval: 2000ms #重试间隔时间
代码:
1.绑定死信交换机
-
/**
-
* @author WangYan
-
* @date 2022/2/13 10:36
-
* 绑定死信交换机
-
*/
-
-
public class TTLExChangebinding {
-
-
/**
-
* 声明注册交换机
-
* @return
-
*/
-
-
public FanoutExchange deadExChange(){
-
return new FanoutExchange("dead_fanout_exChange",true,false);
-
}
-
-
/**
-
* 声明队列
-
* @return
-
*/
-
-
public Queue deadQueue(){
-
return new Queue("deadQueue",true);
-
}
-
-
/**
-
* 绑定关系
-
* @return
-
*/
-
-
public Binding ttlBinding(){
-
return BindingBuilder.bind(deadQueue()).to(deadExChange());
-
}
-
-
-
public FanoutExchange orderExChange(){
-
return new FanoutExchange("amq.fanout",true,false);
-
}
-
-
-
public Queue orderQueue(){
-
Map<String, Object> args = new HashMap<>();
-
args.put("x-dead-letter-exchange","dead_fanout_exChange");
-
return new Queue("吕洞宾",true,false,false,args);
-
}
-
-
-
public Binding orderBinding(){
-
return BindingBuilder.bind(orderQueue()).to(orderExChange());
-
}
-
}
2.监听队列,消息处理异常丢到死信队列中
重点:
a.如果配置文件中设置了重发次数,但是代码中加了try catch、channel.basicNack(tag,false,true),这个时候配置文件当中的重发次数是不会生效的,会造成死循环。
b.如果需要配置文件当中的重发次数生效,去掉try catch,加上channel.basicNack(tag,false,false);
-
/**
-
* @author WangYan
-
* @date 2022/3/5 16:13
-
*/
-
-
public class OrderMqConsumer {
-
-
-
TransportOrderMapper orderMapper;
-
-
-
public void getMessage(String message, Channel channel,
-
CorrelationData correlationData,
-
long tag) throws IOException {
-
-
try {
-
-
// 1.获取队列消息
-
System.out.println("收到的MQ消息:" message);
-
// 2.保存运单消息
-
TransportOrderPO orderPO = new TransportOrderPO();
-
orderPO.setOrderId(Integer.parseInt(message));
-
orderPO.setTransportDatetime(LocalDateTime.now());
-
orderMapper.insert(orderPO);
-
-
// 2.手动ack告诉mq消息已经正常消费
-
System.out.println(1 / 0);
-
-
channel.basicAck(tag,false);
-
}catch (Exception ex){
-
-
/**
-
* @param : requeue
-
* false 不会重发,会把消息打入到死信队列
-
* true 会死循环重发.建议如果使用true的话,不加try/catch 否则的话就会死循环。
-
*
-
* tag 是消息的标签,类似唯一主键
-
*/
-
channel.basicNack(tag,false,false);
-
}
-
}
-
}
3.监听死信队列
-
/**
-
* @author WangYan
-
* @date 2022/3/5 16:13
-
* 死信队列
-
*/
-
-
public class deadMqConsumer {
-
-
-
TransportOrderMapper orderMapper;
-
-
-
public void getMessage(String message, Channel channel,
-
CorrelationData correlationData,
-
long tag) throws IOException {
-
-
try {
-
-
// 1.获取队列消息
-
System.out.println("收到死信队列的MQ消息:" message);
-
// 2.业务处理
-
TransportOrderPO orderPO = new TransportOrderPO();
-
orderPO.setOrderId(Integer.parseInt(message));
-
orderPO.setTransportDatetime(LocalDateTime.now());
-
orderMapper.insert(orderPO);
-
-
-
// 3.手动ack告诉mq消息已经正常消费
-
channel.basicAck(tag,false);
-
}catch (Exception ex){
-
-
// 4.处理异常死信队列中的消息
-
System.out.println("人工干预,或者保存到数据库,根据你的业务进行处理死信队列的消息!!");
-
// 5.将消息从死信队列中移除
-
channel.basicNack(tag,false,false);
-
}
-
}
-
}
拜拜~
有任何问题欢迎大家指出~
Thank You !
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhghabcj
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
怎样阻止微信小程序自动打开
PHP中文网 06-13