RabbitMQ:TTL机制
在京东下单,订单创建成功,等待支付,一般会给30分钟的时间,开始倒计时。如果在这段时间内用户没有支付,则默认订单取消。
该如何实现?
- 定期轮询(数据库等)
- 用户下单成功,将订单信息放入数据库,同时将支付状态放入数据库,用户付款更改数据库状态。定期轮询数据库支付状态,如果超过30分钟就将该订单取消。
- 优点:设计实现简单
- 缺点:需要对数据库进行大量的IO操作,效率低下。
- Timer
-
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH:mm:ss");
-
Timer timer = new Timer();
-
TimerTask timerTask = new TimerTask() {
-
-
public void run() {
-
System.out.println("用户没有付款,交易取消:" simpleDateFormat.format(new Date(System.currentTimeMillis())));
-
timer.cancel();
-
}
-
};
-
System.out.println("等待用户付款:" simpleDateFormat.format(new Date(System.currentTimeMillis())));
-
// 10秒后执行timerTask
-
timer.schedule(timerTask, 10 * 1000);
- 缺点
- Timers没有持久化机制
- Timers不灵活 (只可以设置开始时间和重复间隔,对等待支付貌似够用)
- Timers 不能利用线程池,一个timer一个线程
- Timers没有真正的管理计划
-
- ScheduledExecutorService
-
SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss");
-
// 线程工厂
-
ThreadFactory factory = Executors.defaultThreadFactory();
-
// 使用线程池
-
ScheduledExecutorService service = new ScheduledThreadPoolExecutor(10, factory);
-
System.out.println("开始等待用户付款10秒:" format.format(new Date()));
-
service.schedule(new Runnable() {
-
-
public void run() {
-
System.out.println("用户未付款,交易取消:" format.format(new Date()));
-
}// 等待10s 单位秒
-
}, 10, TimeUnit.SECONDS);
- 优点:可以多线程执行,一定程度上避免任务间互相影响,单个任务异常不影响其它任务。
- 在高并发的情况下,不建议使用定时任务去做,因为太浪费服务器性能,不建议。
-
- RabbitMQ:使用TTL
- Quartz
- Redis Zset
- JCronTab
- SchedulerX
- 。。。
TTL,Time to Live 的简称,即过期时间。
RabbitMQ 可以对消息和队列两个维度来设置TTL。
任何消息中间件的容量和堆积能力都是有限的,如果有一些消息总是不被消费掉,那么需要有一种过期的机制来做兜底。
目前有两种方法可以设置消息的TTL。
- 通过Queue属性设置,队列中所有消息都有相同的过期时间。
- 对消息自身进行单独设置,每条消息的TTL 可以不同。
如果两种方法一起使用,则消息的TTL 以两者之间较小数值为准。通常来讲,消息在队列中的生存时间一旦超过设置的TTL 值时,就会变成“死信”(Dead Message),消费者默认就无法再收到该消息。当然,“死信”也是可以被取出来消费的
1、原生API案例
-
package com.lagou.rabbitmq.demo;
-
-
import com.rabbitmq.client.*;
-
-
import java.nio.charset.StandardCharsets;
-
import java.util.HashMap;
-
import java.util.Map;
-
-
public class Producer {
-
public static void main(String[] args) throws Exception {
-
ConnectionFactory factory = new ConnectionFactory();
-
factory.setUri("amqp://root:123456@192.168.80.121:5672//");
-
Connection connection = factory.newConnection();
-
Channel channel = connection.createChannel();
-
-
Map<String, Object> arguments = new HashMap<>();
-
// 消息队列中消息的过期时间
-
arguments.put("x-message-ttl", 10 * 1000);
-
// 如果消息队列中没有消费者,则10s后消息过期,队列将会被自动删除
-
arguments.put("x-expires", 60 * 1000);
-
-
channel.queueDeclare("queue.ttl.waiting",
-
true,
-
false,
-
false,
-
arguments);
-
-
channel.exchangeDeclare("ex.ttl.waiting",
-
BuiltinExchangeType.DIRECT,
-
true,
-
false,
-
false,
-
null);
-
-
channel.queueBind("queue.ttl.waiting", "ex.ttl.waiting", "key.ttl.waiting");
-
-
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
-
.contentEncoding("utf-8")
-
.deliveryMode(2) // 设置消息持久化,2代表设置消息持久化
-
.build();
-
-
channel.basicPublish("ex.ttl.waiting", "key.ttl.waiting", properties, "等待的订单号".getBytes(StandardCharsets.UTF_8));
-
-
channel.close();
-
connection.close();
-
}
-
}
此外,还可以通过命令行方式设置全局TTL,执行如下命令:
rabbitmqctl set_policy TTL ".*" '{"message-ttl":30000}' --apply-to queues
默认规则:
- 如果不设置TTL,则表示此消息不会过期;
- 如果TTL设置为0,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃;
注意理解message-ttl 、x-expires 这两个参数的区别,有不同的含义。但是这两个参数属性都遵循上面的默认规则。一般TTL相关的参数单位都是毫秒(ms)
2、springboot案例
(1)pom.xml添加依赖
-
<dependencies>
-
<dependency>
-
<groupId>org.springframework.boot</groupId>
-
<artifactId>spring-boot-starter-amqp</artifactId>
-
</dependency>
-
<dependency>
-
<groupId>org.springframework.boot</groupId>
-
<artifactId>spring-boot-starter-web</artifactId>
-
</dependency>
-
<dependency>
-
<groupId>org.springframework.boot</groupId>
-
<artifactId>spring-boot-starter-test</artifactId>
-
<scope>test</scope>
-
<exclusions>
-
<exclusion>
-
<groupId>org.junit.vintage</groupId>
-
<artifactId>junit-vintage-engine</artifactId>
-
</exclusion>
-
</exclusions>
-
</dependency>
-
<dependency>
-
<groupId>org.springframework.amqp</groupId>
-
<artifactId>spring-rabbit-test</artifactId>
-
<scope>test</scope>
-
</dependency>
-
</dependencies>
(2)application.properties添加rabbitmq连接信息
-
spring.application.name=ttl
-
spring.rabbitmq.host=node1
-
spring.rabbitmq.virtual-host=/
-
spring.rabbitmq.username=root
-
spring.rabbitmq.password=123456
-
spring.rabbitmq.port=5672
(3)主入口类
-
package com.lagou.rabbitmq.demo;
-
import org.springframework.boot.SpringApplication;
-
import org.springframework.boot.autoconfigure.SpringBootApplication;
-
-
public class RabbitmqDemo {
-
public static void main(String[] args) {
-
SpringApplication.run(RabbitmqDemo07.class, args);
-
}
-
}
(4)RabbitConfig类
-
package com.lagou.rabbitmq.demo.config;
-
-
import org.springframework.amqp.core.*;
-
import org.springframework.context.annotation.Bean;
-
import org.springframework.context.annotation.Configuration;
-
-
import java.util.HashMap;
-
import java.util.Map;
-
-
-
public class RabbitConfig {
-
-
public Queue queueTTLWaiting() {
-
Map<String, Object> props = new HashMap<>();
-
// 对于该队列中的消息,设置都等待10s
-
props.put("x-message-ttl", 10000);
-
Queue queue = new Queue("q.pay.ttl-waiting", false, false, false, props);
-
return queue;
-
}
-
-
-
public Queue queueWaiting() {
-
Queue queue = new Queue("q.pay.waiting", false, false, false);
-
return queue;
-
}
-
-
-
public Exchange exchangeTTLWaiting() {
-
DirectExchange exchange = new DirectExchange("ex.pay.ttlwaiting", false, false);
-
return exchange;
-
}
-
-
/**
-
* 该交换器使用的时候,需要给每个消息设置有效期
-
*
-
* @return
-
*/
-
-
public Exchange exchangeWaiting() {
-
DirectExchange exchange = new DirectExchange("ex.pay.waiting", false, false);
-
return exchange;
-
}
-
-
-
public Binding bindingTTLWaiting() {
-
return BindingBuilder.bind(queueTTLWaiting()).to(exchangeTTLWaiting()).with("pay.ttl - waiting").noargs();
-
}
-
-
-
public Binding bindingWaiting() {
-
return BindingBuilder.bind(queueWaiting()).to(exchangeWaiting()).with("pay.waiting").noargs();
-
}
-
}
(5)PayController类
-
package com.lagou.rabbitmq.demo.controller;
-
-
import org.springframework.amqp.core.AmqpTemplate;
-
import org.springframework.amqp.core.Message;
-
import org.springframework.amqp.core.MessageProperties;
-
import org.springframework.beans.factory.annotation.Autowired;
-
import org.springframework.web.bind.annotation.RequestMapping;
-
import org.springframework.web.bind.annotation.RestController;
-
-
import java.io.UnsupportedEncodingException;
-
import java.util.HashMap;
-
import java.util.Map;
-
-
-
public class PayController {
-
-
private AmqpTemplate rabbitTemplate;
-
-
-
public String sendMessage() {
-
rabbitTemplate.convertAndSend("ex.pay.ttl-waiting", "pay.ttlwaiting", "发送了TTL-WAITING-MESSAGE");
-
return "queue-ttl-ok";
-
}
-
-
-
public String sendTTLMessage() throws UnsupportedEncodingException {
-
MessageProperties properties = new MessageProperties();
-
properties.setExpiration("5000");
-
Message message = new Message("发送了WAITINGMESSAGE".getBytes("utf-8"), properties);
-
rabbitTemplate.convertAndSend("ex.pay.waiting", "pay.waiting", message);
-
return "msg-ttl-ok";
-
}
-
}
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhgagaka
系列文章
更多
同类精品
更多
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
怎样阻止微信小程序自动打开
PHP中文网 06-13 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
photoshop蒙版画笔没反应怎么办
PHP中文网 06-24