• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Spring RabbitMQ 实现消息队列延迟

武飞扬头像
BUG弄潮儿
帮助1

1.概述

要实现RabbitMQ的消息队列延迟功能,一般采用官方提供的 rabbitmq_delayed_message_exchange插件。但RabbitMQ版本必须是3.5.8以上才支持该插件,否则得用其死信队列功能。

2.安装RabbitMQ延迟插件

  • 检查插件 使用rabbitmq-plugins list命令用于查看RabbitMQ安装的插件。

rabbitmq-plugins list

检查RabbitMQ插件安装情况

学新通
  • 下载插件

如果没有安装插件,则直接访问官网进行下载

https://www.rabbitmq.com/community-plugins.html
学新通 学新通
  • 安装插件

下载后,将其拷贝到RabbitMQ安装目录的plugins目录;并进行解压,如:

E:\software\RabbitMQ Server\rabbitmq_server-3.11.13\plugins
学新通

打开cmd命令行窗口,如果系统已经配置RabbitMQ环境变量,则直接执行以下的命令进行安装;否则需要进入到RabbitMQ安装目录的sbin目录。

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
学新通

3.实现RabbitMQ消息队列延迟功能

  • pom.xml配置信息文件中,添加相关依赖文件

  1.  
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  2.  
     <modelVersion>4.0.0</modelVersion>
  3.  
     <groupId>com.olive</groupId>
  4.  
     <artifactId>rabbitmq-spring-demo</artifactId>
  5.  
     <version>0.0.1-SNAPSHOT</version>
  6.  
     <parent>
  7.  
      <groupId>org.springframework.boot</groupId>
  8.  
      <artifactId>spring-boot-starter-parent</artifactId>
  9.  
      <version>2.7.7</version>
  10.  
      <relativePath />
  11.  
     </parent>
  12.  
     <dependencies>
  13.  
      <!--rabbitmq-->
  14.  
      <dependency>
  15.  
       <groupId>org.springframework.boot</groupId>
  16.  
       <artifactId>spring-boot-starter-amqp</artifactId>
  17.  
      </dependency>
  18.  
      <dependency>
  19.  
       <groupId>org.springframework.boot</groupId>
  20.  
       <artifactId>spring-boot-starter-web</artifactId>
  21.  
      </dependency>
  22.  
      <dependency>
  23.  
       <groupId>org.springframework.boot</groupId>
  24.  
       <artifactId>spring-boot-starter-test</artifactId>
  25.  
       <scope>test</scope>
  26.  
      </dependency>
  27.  
      
  28.  
     <dependency>
  29.  
          <groupId>org.eclipse.paho</groupId>
  30.  
          <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
  31.  
          <version>1.2.5</version>
  32.  
      </dependency>
  33.  
     
  34.  
     </dependencies>
  35.  
     <build>
  36.  
      <plugins>
  37.  
       <plugin>
  38.  
        <groupId>org.apache.maven.plugins</groupId>
  39.  
        <artifactId>maven-compiler-plugin</artifactId>
  40.  
        <configuration>
  41.  
         <source>1.8</source>
  42.  
         <target>1.8</target>
  43.  
        </configuration>
  44.  
       </plugin>
  45.  
      </plugins>
  46.  
     </build>
  47.  
    </project>
学新通
  • application.yml配置文件中配置RabbitMQ信息

  1.  
    server:
  2.  
      port: 8080
  3.  
    spring:
  4.  
      #给项目来个名字
  5.  
      application:
  6.  
        name: rabbitmq-spring-demo
  7.  
      #配置rabbitMq 服务器
  8.  
      rabbitmq:
  9.  
        host: 127.0.0.1
  10.  
        port: 5672
  11.  
        username: admin
  12.  
        password: admin123
  13.  
        #虚拟host。可以不设置,使用server默认host;不同虚拟路径下的队列是隔离的
  14.  
        virtual-host: /
  • RabbitMQ配置类

  1.  
    package com.olive.config;
  2.  
     
  3.  
    import org.springframework.amqp.core.Binding;
  4.  
    import org.springframework.amqp.core.BindingBuilder;
  5.  
    import org.springframework.amqp.core.CustomExchange;
  6.  
    import org.springframework.amqp.core.Queue;
  7.  
    import org.springframework.context.annotation.Bean;
  8.  
    import org.springframework.context.annotation.Configuration;
  9.  
     
  10.  
    import java.util.HashMap;
  11.  
    import java.util.Map;
  12.  
     
  13.  
    /**
  14.  
     * RabbitMQ配置类
  15.  
     **/
  16.  
    @Configuration
  17.  
    public class RabbitMqConfig {
  18.  
     
  19.  
     public static final String DELAY_EXCHANGE_NAME = "delayed_exchange";
  20.  
     
  21.  
     public static final String DELAY_QUEUE_NAME = "delay_queue_name";
  22.  
     
  23.  
     public static final String DELAY_ROUTING_KEY = "delay_routing_key";
  24.  
     
  25.  
     @Bean
  26.  
     public CustomExchange delayExchange() {
  27.  
      Map<String, Object> args = new HashMap<>();
  28.  
      args.put("x-delayed-type""direct");
  29.  
      return new CustomExchange(DELAY_EXCHANGE_NAME, "x-delayed-message"truefalse, args);
  30.  
     }
  31.  
     
  32.  
     @Bean
  33.  
     public Queue queue() {
  34.  
      Queue queue = new Queue(DELAY_QUEUE_NAME, true);
  35.  
      return queue;
  36.  
     }
  37.  
     
  38.  
     @Bean
  39.  
     public Binding binding(Queue queue, CustomExchange delayExchange) {
  40.  
      return BindingBuilder.bind(queue).to(delayExchange).with(DELAY_ROUTING_KEY).noargs();
  41.  
     }
  42.  
    }
学新通
  • 发送消息

实现消息发送,设置消息延迟5s。

  1.  
    package com.olive.service;
  2.  
     
  3.  
    import java.text.SimpleDateFormat;
  4.  
    import java.util.Date;
  5.  
     
  6.  
    import org.springframework.amqp.AmqpException;
  7.  
    import org.springframework.amqp.core.Message;
  8.  
    import org.springframework.amqp.core.MessagePostProcessor;
  9.  
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
  10.  
    import org.springframework.beans.factory.annotation.Autowired;
  11.  
    import org.springframework.stereotype.Service;
  12.  
     
  13.  
    import com.olive.config.RabbitMqConfig;
  14.  
     
  15.  
    /**
  16.  
     * 消息发送者
  17.  
     **/
  18.  
    @Service
  19.  
    public class CustomMessageSender {
  20.  
     
  21.  
     @Autowired
  22.  
     private RabbitTemplate rabbitTemplate;
  23.  
     
  24.  
     public void sendMsg(String msg) {
  25.  
      SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  26.  
      System.out.println("消息发送时间:"   sdf.format(new Date()));
  27.  
      rabbitTemplate.convertAndSend(RabbitMqConfig.DELAY_EXCHANGE_NAME, 
  28.  
        RabbitMqConfig.DELAY_ROUTING_KEY, 
  29.  
        msg, 
  30.  
        new MessagePostProcessor() {
  31.  
         @Override
  32.  
         public Message postProcessMessage(Message message) throws AmqpException {
  33.  
          // 消息延迟5秒
  34.  
          message.getMessageProperties().setHeader("x-delay"5000);
  35.  
          return message;
  36.  
         }
  37.  
        });
  38.  
     }
  39.  
    }
学新通
  • 接收消息

  1.  
    package com.olive.service;
  2.  
     
  3.  
    import java.text.SimpleDateFormat;
  4.  
    import java.util.Date;
  5.  
     
  6.  
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
  7.  
    import org.springframework.stereotype.Component;
  8.  
     
  9.  
    import com.olive.config.RabbitMqConfig;
  10.  
     
  11.  
    /**
  12.  
     * 消息接收者
  13.  
     **/
  14.  
    @Component
  15.  
    public class CustomMessageReceiver {
  16.  
     
  17.  
     @RabbitListener(queues = RabbitMqConfig.DELAY_QUEUE_NAME)
  18.  
     public void receive(String msg) {
  19.  
      SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  20.  
      System.out.println(sdf.format(new Date())   msg);
  21.  
      System.out.println("Receiver:执行取消订单");
  22.  
     }
  23.  
    }
学新通
  • 测试验证

  1.  
    package com.olive.controller;
  2.  
     
  3.  
    import org.springframework.beans.factory.annotation.Autowired;
  4.  
    import org.springframework.web.bind.annotation.GetMapping;
  5.  
    import org.springframework.web.bind.annotation.RestController;
  6.  
     
  7.  
    import com.olive.service.CustomMessageSender;
  8.  
     
  9.  
    @RestController
  10.  
    public class DelayMessageController {
  11.  
     
  12.  
     @Autowired
  13.  
     private CustomMessageSender customMessageSender;
  14.  
     
  15.  
     @GetMapping("/sendMessage")
  16.  
     public String sendMessage() {
  17.  
      // 发送消息
  18.  
      customMessageSender.sendMsg("你已经支付超时,取消订单通知!");
  19.  
      return "success";
  20.  
     }
  21.  
     
  22.  
    }
学新通

发送消息,访问

http://127.0.0.1:8080/sendMessage

查看控制台打印的信息

学新通

学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhficaig
系列文章
更多 icon
同类精品
更多 icon
继续加载