• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

SpringBoot 使用 RabbitTemplate

武飞扬头像
lazyhhh
帮助1

SpringBoot 中 使用RabbtiMq 

如图使用redisTemplate 一样的简单方便

模拟发送邮件的情况

pom.xml  

  1.  
    <dependency>
  2.  
    <groupId>org.springframework.boot</groupId>
  3.  
    <artifactId>spring-boot-starter-amqp</artifactId>
  4.  
    </dependency>
  5.  
    <dependency>
  6.  
    <groupId>org.springframework.amqp</groupId>
  7.  
    <artifactId>spring-rabbit-test</artifactId>
  8.  
    <scope>test</scope>
  9.  
    </dependency>

application.properties

  1.  
    spring.rabbitmq.username=guest
  2.  
    spring.rabbitmq.password=guest
  3.  
    spring.rabbitmq.host=192.168.91.128
  4.  
    spring.rabbitmq.port=5672
  5.  
     
  6.  
    ## 根据自己情况而定,可以不用
  7.  
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
  8.  
    spring.rabbitmq.listener.simple.prefetch=100

写在配置文件中,由 RabbitProperties 这个类进行读取,封装到ConnectionFactory 中。

MailConstants (常量)

  1.  
    public class MailConstants {
  2.  
    public static final Integer DELIVERING = 0;//消息投递中
  3.  
    public static final Integer SUCCESS = 1;//消息投递成功
  4.  
    public static final Integer FAILURE = 2;//消息投递失败
  5.  
    public static final Integer MAX_TRY_COUNT = 3;//最大重试次数
  6.  
    public static final Integer MSG_TIMEOUT = 1;//消息超时时间
  7.  
    public static final String MAIL_QUEUE_NAME = "javaboy.mail.queue";
  8.  
    public static final String MAIL_EXCHANGE_NAME = "javaboy.mail.exchange";
  9.  
    public static final String MAIL_ROUTING_KEY_NAME = "javaboy.mail.routing.key";
  10.  
    }


RabbitConfig (rabbitMq的配置类)

  1.  
    import org.javaboy.vhr.model.MailConstants;
  2.  
    import org.javaboy.vhr.service.MailSendLogService;
  3.  
    import org.slf4j.Logger;
  4.  
    import org.slf4j.LoggerFactory;
  5.  
    import org.springframework.amqp.core.Binding;
  6.  
    import org.springframework.amqp.core.BindingBuilder;
  7.  
    import org.springframework.amqp.core.DirectExchange;
  8.  
    import org.springframework.amqp.core.Queue;
  9.  
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
  10.  
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
  11.  
    import org.springframework.beans.factory.annotation.Autowired;
  12.  
    import org.springframework.context.annotation.Bean;
  13.  
    import org.springframework.context.annotation.Configuration;
  14.  
     
  15.  
    @Configuration
  16.  
    public class RabbitConfig {
  17.  
    public final static Logger logger = LoggerFactory.getLogger(RabbitConfig.class);
  18.  
    @Autowired
  19.  
    CachingConnectionFactory cachingConnectionFactory;
  20.  
     
  21.  
    //发送邮件的
  22.  
    @Autowired
  23.  
    MailSendLogService mailSendLogService;
  24.  
     
  25.  
    @Bean
  26.  
    RabbitTemplate rabbitTemplate() {
  27.  
    RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
  28.  
     
  29.  
    //手动应答返回的标志
  30.  
    rabbitTemplate.setConfirmCallback((data, ack, cause) -> {
  31.  
    String msgId = data.getId();
  32.  
    if (ack) {
  33.  
    logger.info(msgId ":消息发送成功");
  34.  
    mailSendLogService.updateMailSendLogStatus(msgId, 1);//修改数据库中的记录,消息投递成功
  35.  
    } else {
  36.  
    logger.info(msgId ":消息发送失败");
  37.  
    }
  38.  
    });
  39.  
    rabbitTemplate.setReturnCallback((msg, repCode, repText, exchange, routingkey) -> {
  40.  
    logger.info("消息发送失败");
  41.  
    });
  42.  
    return rabbitTemplate;
  43.  
    }
  44.  
     
  45.  
    @Bean
  46.  
    Queue mailQueue() {
  47.  
    return new Queue(MailConstants.MAIL_QUEUE_NAME, true);
  48.  
    }
  49.  
     
  50.  
    @Bean
  51.  
    DirectExchange mailExchange() {
  52.  
    return new DirectExchange(MailConstants.MAIL_EXCHANGE_NAME, true, false);
  53.  
    }
  54.  
     
  55.  
    @Bean
  56.  
    Binding mailBinding() {
  57.  
    return BindingBuilder.bind(mailQueue()).to(mailExchange()).with(MailConstants.MAIL_ROUTING_KEY_NAME);
  58.  
    }
  59.  
     
  60.  
    }
学新通

MailSendTask(定时任务,发送)

  1.  
    @Component
  2.  
    public class MailSendTask {
  3.  
     
  4.  
    @Autowired
  5.  
    MailSendLogService mailSendLogService;
  6.  
     
  7.  
    @Autowired
  8.  
    RabbitTemplate rabbitTemplate;
  9.  
     
  10.  
    @Autowired
  11.  
    EmployeeService employeeService;
  12.  
     
  13.  
    @Scheduled(cron = "0/10 * * * * ?")
  14.  
    public void mailResendTask() {
  15.  
    List<MailSendLog> logs = mailSendLogService.getMailSendLogsByStatus();
  16.  
    if (logs == null || logs.size() == 0) {
  17.  
    return;
  18.  
    }
  19.  
    logs.forEach(mailSendLog->{
  20.  
    if (mailSendLog.getCount() >= 3) {
  21.  
    mailSendLogService.updateMailSendLogStatus(mailSendLog.getMsgId(), 2);//直接设置该条消息发送失败
  22.  
    }else{
  23.  
    mailSendLogService.updateCount(mailSendLog.getMsgId(), new Date());
  24.  
    Employee emp = employeeService.getEmployeeById(mailSendLog.getEmpId());
  25.  
    /**
  26.  
    * 参数1:交换机名称
  27.  
    * 参数2 :路由key
  28.  
    * 参数三:数据
  29.  
    * 参数4:作为唯一标识
  30.  
    *
  31.  
    */
  32.  
    rabbitTemplate.convertAndSend(MailConstants.MAIL_EXCHANGE_NAME, MailConstants.MAIL_ROUTING_KEY_NAME, emp, new CorrelationData(mailSendLog.getMsgId()));
  33.  
    }
  34.  
    });
  35.  
    }
  36.  
    }
学新通

MailReceiver(接收端)

  1.  
    @Component
  2.  
    public class MailReceiver {
  3.  
     
  4.  
    public static final Logger logger = LoggerFactory.getLogger(MailReceiver.class);
  5.  
     
  6.  
    @Autowired
  7.  
    JavaMailSender javaMailSender;
  8.  
    @Autowired
  9.  
    MailProperties mailProperties;
  10.  
    @Autowired
  11.  
    TemplateEngine templateEngine;
  12.  
    @Autowired
  13.  
    StringRedisTemplate redisTemplate;
  14.  
     
  15.  
    @RabbitListener(queues = MailConstants.MAIL_QUEUE_NAME)
  16.  
    public void handler(Message message, Channel channel) throws IOException {
  17.  
    Employee employee = (Employee) message.getPayload();
  18.  
    MessageHeaders headers = message.getHeaders();
  19.  
    Long tag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
  20.  
    String msgId = (String) headers.get("spring_returned_message_correlation");
  21.  
    if (redisTemplate.opsForHash().entries("mail_log").containsKey(msgId)) {
  22.  
    //redis 中包含该 key,说明该消息已经被消费过
  23.  
    logger.info(msgId ":消息已经被消费");
  24.  
    channel.basicAck(tag, false);//确认消息已消费
  25.  
    return;
  26.  
    }
  27.  
    //收到消息,发送邮件
  28.  
    MimeMessage msg = javaMailSender.createMimeMessage();
  29.  
    MimeMessageHelper helper = new MimeMessageHelper(msg);
  30.  
    try {
  31.  
    helper.setTo(employee.getEmail());
  32.  
    helper.setFrom(mailProperties.getUsername());
  33.  
    helper.setSubject("入职欢迎");
  34.  
    helper.setSentDate(new Date());
  35.  
    Context context = new Context();
  36.  
    context.setVariable("name", employee.getName());
  37.  
    context.setVariable("posName", employee.getPosition().getName());
  38.  
    context.setVariable("joblevelName", employee.getJobLevel().getName());
  39.  
    context.setVariable("departmentName", employee.getDepartment().getName());
  40.  
    //根据模板发送
  41.  
    String mail = templateEngine.process("mail", context);
  42.  
    helper.setText(mail, true);
  43.  
    javaMailSender.send(msg);
  44.  
    redisTemplate.opsForHash().put("mail_log", msgId, "javaboy");
  45.  
    channel.basicAck(tag, false);
  46.  
    logger.info(msgId ":邮件发送成功");
  47.  
    } catch (MessagingException e) {
  48.  
    //手动应答, tag 消息id ,、
  49.  
    channel.basicNack(tag, false, true);
  50.  
    e.printStackTrace();
  51.  
    logger.error("邮件发送失败:" e.getMessage());
  52.  
    }
  53.  
    }
  54.  
    }
学新通

使用总结

0. rabbtMq的本地服务,得开启。(跟redis差不多)

1. 写 application.properties中的rabbitMq的连接配置等

2. rabbitConfig配置文件。(包括:交换机选择与队列的配置,绑定),选择的模式在这里配置

3. 直接使用,导入rabbitTemplate类,使用rabbitTemplate.convertAndSend()方法

4. 接收类

@RabbitListener(queues = MailConstants.MAIL_QUEUE_NAME)
 public void handler(Message message, Channel channel) throws IOException {

        业务逻辑了

        手动接收等等

}

相关文章:

1. rabbitMq基础结构图

2. channel接口常用方法

3. rabbitTemplate模板

4. rabbitMq的笔记1

5. rabbitMq笔记2

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhfjjebf
系列文章
更多 icon
同类精品
更多 icon
继续加载