• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

RabbitMQ:TTL机制

武飞扬头像
悠然予夏
帮助1

学新通

        在京东下单,订单创建成功,等待支付,一般会给30分钟的时间,开始倒计时。如果在这段时间内用户没有支付,则默认订单取消。 

该如何实现?

  • 定期轮询(数据库等)
    • 用户下单成功,将订单信息放入数据库,同时将支付状态放入数据库,用户付款更改数据库状态。定期轮询数据库支付状态,如果超过30分钟就将该订单取消。
    • 优点:设计实现简单
    • 缺点:需要对数据库进行大量的IO操作,效率低下。
  • Timer
    1.  
      SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH:mm:ss");
    2.  
      Timer timer = new Timer();
    3.  
      TimerTask timerTask = new TimerTask() {
    4.  
      @Override
    5.  
      public void run() {
    6.  
      System.out.println("用户没有付款,交易取消:" simpleDateFormat.format(new Date(System.currentTimeMillis())));
    7.  
      timer.cancel();
    8.  
      }
    9.  
      };
    10.  
      System.out.println("等待用户付款:" simpleDateFormat.format(new Date(System.currentTimeMillis())));
    11.  
      // 10秒后执行timerTask
    12.  
      timer.schedule(timerTask, 10 * 1000);
    • 缺点
      • Timers没有持久化机制
      • Timers不灵活 (只可以设置开始时间和重复间隔,对等待支付貌似够用)
      • Timers 不能利用线程池,一个timer一个线程
      • Timers没有真正的管理计划
  • ScheduledExecutorService
    1.  
      SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss");
    2.  
      // 线程工厂
    3.  
      ThreadFactory factory = Executors.defaultThreadFactory();
    4.  
      // 使用线程池
    5.  
      ScheduledExecutorService service = new ScheduledThreadPoolExecutor(10, factory);
    6.  
      System.out.println("开始等待用户付款10秒:" format.format(new Date()));
    7.  
      service.schedule(new Runnable() {
    8.  
      @Override
    9.  
      public void run() {
    10.  
      System.out.println("用户未付款,交易取消:" format.format(new Date()));
    11.  
      }// 等待10s 单位秒
    12.  
      }, 10, TimeUnit.SECONDS);
    • 优点:可以多线程执行,一定程度上避免任务间互相影响,单个任务异常不影响其它任务。
    • 在高并发的情况下,不建议使用定时任务去做,因为太浪费服务器性能,不建议。
  • RabbitMQ:使用TTL
  • Quartz
  • Redis Zset
  • JCronTab
  • SchedulerX
  • 。。。

TTL,Time to Live 的简称,即过期时间。

RabbitMQ 可以对消息和队列两个维度来设置TTL。

   任何消息中间件的容量和堆积能力都是有限的,如果有一些消息总是不被消费掉,那么需要有一种过期的机制来做兜底。

目前有两种方法可以设置消息的TTL。

  1. 通过Queue属性设置,队列中所有消息都有相同的过期时间。
  2. 对消息自身进行单独设置,每条消息的TTL 可以不同。

        如果两种方法一起使用,则消息的TTL 以两者之间较小数值为准。通常来讲,消息在队列中的生存时间一旦超过设置的TTL 值时,就会变成“死信”(Dead Message),消费者默认就无法再收到该消息。当然,“死信”也是可以被取出来消费的

1、原生API案例

  1.  
    package com.lagou.rabbitmq.demo;
  2.  
     
  3.  
    import com.rabbitmq.client.*;
  4.  
     
  5.  
    import java.nio.charset.StandardCharsets;
  6.  
    import java.util.HashMap;
  7.  
    import java.util.Map;
  8.  
     
  9.  
    public class Producer {
  10.  
    public static void main(String[] args) throws Exception {
  11.  
    ConnectionFactory factory = new ConnectionFactory();
  12.  
    factory.setUri("amqp://root:123456@192.168.80.121:5672//");
  13.  
    Connection connection = factory.newConnection();
  14.  
    Channel channel = connection.createChannel();
  15.  
     
  16.  
    Map<String, Object> arguments = new HashMap<>();
  17.  
    // 消息队列中消息的过期时间
  18.  
    arguments.put("x-message-ttl", 10 * 1000);
  19.  
    // 如果消息队列中没有消费者,则10s后消息过期,队列将会被自动删除
  20.  
    arguments.put("x-expires", 60 * 1000);
  21.  
     
  22.  
    channel.queueDeclare("queue.ttl.waiting",
  23.  
    true,
  24.  
    false,
  25.  
    false,
  26.  
    arguments);
  27.  
     
  28.  
    channel.exchangeDeclare("ex.ttl.waiting",
  29.  
    BuiltinExchangeType.DIRECT,
  30.  
    true,
  31.  
    false,
  32.  
    false,
  33.  
    null);
  34.  
     
  35.  
    channel.queueBind("queue.ttl.waiting", "ex.ttl.waiting", "key.ttl.waiting");
  36.  
     
  37.  
    AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
  38.  
    .contentEncoding("utf-8")
  39.  
    .deliveryMode(2) // 设置消息持久化,2代表设置消息持久化
  40.  
    .build();
  41.  
     
  42.  
    channel.basicPublish("ex.ttl.waiting", "key.ttl.waiting", properties, "等待的订单号".getBytes(StandardCharsets.UTF_8));
  43.  
     
  44.  
    channel.close();
  45.  
    connection.close();
  46.  
    }
  47.  
    }
学新通

此外,还可以通过命令行方式设置全局TTL,执行如下命令:

rabbitmqctl set_policy TTL ".*" '{"message-ttl":30000}' --apply-to queues

默认规则:

  • 如果不设置TTL,则表示此消息不会过期;
  • 如果TTL设置为0,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃;

        注意理解message-ttlx-expires 这两个参数的区别,有不同的含义。但是这两个参数属性都遵循上面的默认规则。一般TTL相关的参数单位都是毫秒(ms)

2、springboot案例

(1)pom.xml添加依赖

  1.  
    <dependencies>
  2.  
    <dependency>
  3.  
    <groupId>org.springframework.boot</groupId>
  4.  
    <artifactId>spring-boot-starter-amqp</artifactId>
  5.  
    </dependency>
  6.  
    <dependency>
  7.  
    <groupId>org.springframework.boot</groupId>
  8.  
    <artifactId>spring-boot-starter-web</artifactId>
  9.  
    </dependency>
  10.  
    <dependency>
  11.  
    <groupId>org.springframework.boot</groupId>
  12.  
    <artifactId>spring-boot-starter-test</artifactId>
  13.  
    <scope>test</scope>
  14.  
    <exclusions>
  15.  
    <exclusion>
  16.  
    <groupId>org.junit.vintage</groupId>
  17.  
    <artifactId>junit-vintage-engine</artifactId>
  18.  
    </exclusion>
  19.  
    </exclusions>
  20.  
    </dependency>
  21.  
    <dependency>
  22.  
    <groupId>org.springframework.amqp</groupId>
  23.  
    <artifactId>spring-rabbit-test</artifactId>
  24.  
    <scope>test</scope>
  25.  
    </dependency>
  26.  
    </dependencies>
学新通

(2)application.properties添加rabbitmq连接信息

  1.  
    spring.application.name=ttl
  2.  
    spring.rabbitmq.host=node1
  3.  
    spring.rabbitmq.virtual-host=/
  4.  
    spring.rabbitmq.username=root
  5.  
    spring.rabbitmq.password=123456
  6.  
    spring.rabbitmq.port=5672

(3)主入口类

  1.  
    package com.lagou.rabbitmq.demo;
  2.  
    import org.springframework.boot.SpringApplication;
  3.  
    import org.springframework.boot.autoconfigure.SpringBootApplication;
  4.  
    @SpringBootApplication
  5.  
    public class RabbitmqDemo {
  6.  
    public static void main(String[] args) {
  7.  
    SpringApplication.run(RabbitmqDemo07.class, args);
  8.  
    }
  9.  
    }

(4)RabbitConfig类

  1.  
    package com.lagou.rabbitmq.demo.config;
  2.  
     
  3.  
    import org.springframework.amqp.core.*;
  4.  
    import org.springframework.context.annotation.Bean;
  5.  
    import org.springframework.context.annotation.Configuration;
  6.  
     
  7.  
    import java.util.HashMap;
  8.  
    import java.util.Map;
  9.  
     
  10.  
    @Configuration
  11.  
    public class RabbitConfig {
  12.  
    @Bean
  13.  
    public Queue queueTTLWaiting() {
  14.  
    Map<String, Object> props = new HashMap<>();
  15.  
    // 对于该队列中的消息,设置都等待10s
  16.  
    props.put("x-message-ttl", 10000);
  17.  
    Queue queue = new Queue("q.pay.ttl-waiting", false, false, false, props);
  18.  
    return queue;
  19.  
    }
  20.  
     
  21.  
    @Bean
  22.  
    public Queue queueWaiting() {
  23.  
    Queue queue = new Queue("q.pay.waiting", false, false, false);
  24.  
    return queue;
  25.  
    }
  26.  
     
  27.  
    @Bean
  28.  
    public Exchange exchangeTTLWaiting() {
  29.  
    DirectExchange exchange = new DirectExchange("ex.pay.ttlwaiting", false, false);
  30.  
    return exchange;
  31.  
    }
  32.  
     
  33.  
    /**
  34.  
    * 该交换器使用的时候,需要给每个消息设置有效期
  35.  
    *
  36.  
    * @return
  37.  
    */
  38.  
    @Bean
  39.  
    public Exchange exchangeWaiting() {
  40.  
    DirectExchange exchange = new DirectExchange("ex.pay.waiting", false, false);
  41.  
    return exchange;
  42.  
    }
  43.  
     
  44.  
    @Bean
  45.  
    public Binding bindingTTLWaiting() {
  46.  
    return BindingBuilder.bind(queueTTLWaiting()).to(exchangeTTLWaiting()).with("pay.ttl - waiting").noargs();
  47.  
    }
  48.  
     
  49.  
    @Bean
  50.  
    public Binding bindingWaiting() {
  51.  
    return BindingBuilder.bind(queueWaiting()).to(exchangeWaiting()).with("pay.waiting").noargs();
  52.  
    }
  53.  
    }
学新通

(5)PayController类

  1.  
    package com.lagou.rabbitmq.demo.controller;
  2.  
     
  3.  
    import org.springframework.amqp.core.AmqpTemplate;
  4.  
    import org.springframework.amqp.core.Message;
  5.  
    import org.springframework.amqp.core.MessageProperties;
  6.  
    import org.springframework.beans.factory.annotation.Autowired;
  7.  
    import org.springframework.web.bind.annotation.RequestMapping;
  8.  
    import org.springframework.web.bind.annotation.RestController;
  9.  
     
  10.  
    import java.io.UnsupportedEncodingException;
  11.  
    import java.util.HashMap;
  12.  
    import java.util.Map;
  13.  
     
  14.  
    @RestController
  15.  
    public class PayController {
  16.  
    @Autowired
  17.  
    private AmqpTemplate rabbitTemplate;
  18.  
     
  19.  
    @RequestMapping("/pay/queuettl")
  20.  
    public String sendMessage() {
  21.  
    rabbitTemplate.convertAndSend("ex.pay.ttl-waiting", "pay.ttlwaiting", "发送了TTL-WAITING-MESSAGE");
  22.  
    return "queue-ttl-ok";
  23.  
    }
  24.  
     
  25.  
    @RequestMapping("/pay/msgttl")
  26.  
    public String sendTTLMessage() throws UnsupportedEncodingException {
  27.  
    MessageProperties properties = new MessageProperties();
  28.  
    properties.setExpiration("5000");
  29.  
    Message message = new Message("发送了WAITINGMESSAGE".getBytes("utf-8"), properties);
  30.  
    rabbitTemplate.convertAndSend("ex.pay.waiting", "pay.waiting", message);
  31.  
    return "msg-ttl-ok";
  32.  
    }
  33.  
    }
学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgagaka
系列文章
更多 icon
同类精品
更多 icon
继续加载