• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

springboot整合rabbitMQ使用

武飞扬头像
Magic林
帮助1

一,整合rabbitMQ

  1. 导入maven依赖
   	<dependency>
   		<groupId>org.springframework.amqp</groupId>
   		<artifactId>spring-rabbit</artifactId>
   	</dependency>
  1. 在application.yml中添加上rabbitmq的配置
spring:
  rabbitmq:
    virtual-host: /
    host: localhost
    username: guest
    password: guest
    port: 5672
      #    消息确认配置项
      #    确认消息发送到队列
    publisher-returns: true
      #    确认消息发送到交换机
    publisher-confirm-type: correlated
    #调整监听使用手动
    listener:
      direct:
        acknowledge-mode: manual
学新通

二,配置消息队列和交换机

根据场景选择下面配置模式
1.配置直连(一对一消费)

@Configuration
public class DirectRabbitConfig {
    //返回一个队列
    // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
    // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
    // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
    @Bean
    public Queue TestDirectRabbit(){
        return new Queue("operationLog",true,false,false,null);
    }
    //交换机起名
    @Bean
    DirectExchange TestDirectExchange(){
        return new DirectExchange("TestDirectExchange",true,false);
    }
    //交换机和队列绑定,提供路由
    @Bean
    Binding BindingDirect(){
        return BindingBuilder.bind(TestDirectRabbit()).to(TestDirectExchange()).with("TestDirectRouting");
    }
   }
学新通

这种方式是,发送一个消息到消息队列,只会一个交换机消费一次

  1. TOPIC型
    Topic型,当多个队列绑定到同一个交换机上时,根据routingkey来确定,与Direct不同的是,rountingkey有统配规则,例如topic#,表示所有前面是topic后面无论是什么,都会被捕获到,然后消费,达到了一个可以选择性直连或者广播的效果

@Configuration
public class TopicRabbitConfig {
   final static String man="topic.man";
   final static String woman="topic.woman";

   //队列1
   @Bean
   public Queue getQueue1(){
       return new Queue("TopicQueue1",true,false,false,null);
   }
   //队列2
   @Bean
   public Queue getQueue2(){
       return new Queue("TopicQueue2",true,false,false,null);
   }
   //交换机
   @Bean
   public TopicExchange getTopicExchange(){
       return new TopicExchange("TopicExchange");
   }
   //绑定
   @Bean
   public Binding getBinding1(){
       return BindingBuilder.bind(getQueue1()).to(getTopicExchange()).with(man);
   }
   @Bean
   public Binding getBinding2(){
       return BindingBuilder.bind(getQueue2()).to(getTopicExchange()).with("topic.#");
   }
}
学新通
  1. 广播模式
    一个消息,可以被大家都消费一遍
@Configuration
public class FanoutRabbitConfig {
    //队列一
    @Bean
    public Queue getFanoutQueue1(){
        return new Queue("FanoutQueue1",true,false,false);
    }
    //队列二
    @Bean
    public Queue getFanoutQueue2(){
        return new Queue("FanoutQueue2",true,false,false);
    }
    //队列三
    @Bean
    public Queue getFanoutQueue3(){
        return new Queue("FanoutQueue3",true,false,false);
    }
    //交换机
    @Bean
    public FanoutExchange getFanoutExchange(){
        return new FanoutExchange("FanoutExchange");
    }
    //绑定交换机与队列
    @Bean
    public Binding getFanoutBinding1(){
        return BindingBuilder.bind(getFanoutQueue1()).to(getFanoutExchange());
    }
    //绑定交换机与队列
    @Bean
    public Binding getFanoutBinding2(){
        return BindingBuilder.bind(getFanoutQueue2()).to(getFanoutExchange());
    }
    //绑定交换机与队列
    @Bean
    public Binding getFanoutBinding3(){
        return BindingBuilder.bind(getFanoutQueue3()).to(getFanoutExchange());
    }
}
学新通

三,监听消费

  1. 编写一个手动监听的功能类
@Component
public class MyListener implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("deliveryTag="   deliveryTag);
        //一次接收一条
        channel.basicAck(1, false);
        byte[] body = message.getBody();

        String s = new String(body);
      System.out.println("消费信息:"   s);

    }
}
  1. 创建一个监听容器,把上面对象存入

@Configuration
public class SimpleMessageListenerConfig {
   @Autowired
   CachingConnectionFactory connectionFactory;
   @Autowired
   MyListener myListener;
   @Bean
   public SimpleMessageListenerContainer createSimpleMessageListenerContainer(){
       SimpleMessageListenerContainer container=new SimpleMessageListenerContainer(connectionFactory);
       //监听为手动处理消费
       container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
       //设置需要监听的队列名
       container.setQueueNames("operationLog");
       //将监听处理类加入容器
       container.setMessageListener(myListener);
       return container;
   }


}
学新通

四,编写controller,发送消息给rabbitmq

@RestController
@RequestMapping("/rabbit")
public class RabbitController {
    //注入rabbitTemplate来发送消息
    @Autowired
    RabbitTemplate rabbitTemplate;

    @RequestMapping("/log2")
    public void TopicRabbit1(){
        Map<String,String> map = new HashMap<>();
        map.put("lmh", "测试666");
        //将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
        rabbitTemplate.convertAndSend("TopicExchange", "topic.man",map);
        System.out.println("上传日志===");
    }

测试一下就好。

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhggkjih
系列文章
更多 icon
同类精品
更多 icon
继续加载