springboot整合rabbitmq,动态创建queue和监听queue
一、pom.xml添加如下依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- mq的依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
二、整合rabbitmq
(1)在application.properties中添加mq信息
#mq的连接信息,可直接多host连接和单host连接
mq.rabbit.address=192.168.1.1:5672,192.168.1.2:5672
mq.rabbit.virtualHost=/
mq.rabbit.username=guest
mq.rabbit.password=guest
mq.rabbit.exchange.name=mq.direct
#创建queue的数量
mq.rabbit.size=2
#消费者数量
mq.concurrent.consumers=4
#每个消费者获取的最大的消息投递数量
mq.prefetch.count=100
(2)rabbitmqConfig工具类
@Configuration
public class RabbitConfig {
@Value("${mq.rabbit.address}")
String address;
@Value("${mq.rabbit.username}")
String username;
@Value("${mq.rabbit.password}")
String password;
@Value("${mq.rabbit.virtualHost}")
String mqRabbitVirtualHost;
@Value("${mq.rabbit.exchange.name}")
String exchangeName;
@Value("${mq.rabbit.size}")
int queueSize;
@Value("${mq.concurrent.consumers}")
int concurrentConsumers;
@Value("${mq.prefetch.count}")
int prefetchCount;
//创建mq连接
@Bean(name = "connectionFactory")
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(mqRabbitVirtualHost);
connectionFactory.setPublisherConfirms(true);
//该方法配置多个host,在当前连接host down掉的时候会自动去重连后面的host
connectionFactory.setAddresses(address);
return connectionFactory;
}
//监听处理类
@Bean
@Scope("prototype")
public HandleService handleService() {
return new HandleService();
}
//动态创建queue,命名为:hostName.queue1【192.168.1.1.queue1】,并返回数组queue名称
@Bean
public String[] mqMsgQueues() throws AmqpException, IOException {
String[] queueNames = new String[queueSize];
String hostName = OsUtil.getHostNameForLiunx();//获取hostName
for (int i = 1; i <= queueSize; i ) {
String queueName = String.format("%s.queue%d", hostName, i);
connectionFactory().createConnection().createChannel(false).queueDeclare(queueName, true, false, false, null);
connectionFactory().createConnection().createChannel(false).queueBind(queueName, exchangeName, queueName);
queueNames[i - 1] = queueName;
}
return queueNames;
}
//创建监听器,监听队列
@Bean
public SimpleMessageListenerContainer mqMessageContainer(HandleService handleService) throws AmqpException, IOException {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
container.setQueueNames(mqMsgQueues());
container.setExposeListenerChannel(true);
container.setPrefetchCount(prefetchCount);//设置每个消费者获取的最大的消息数量
container.setConcurrentConsumers(concurrentConsumers);//消费者个数
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//设置确认模式为手工确认
container.setMessageListener(handleService);//监听处理类
return container;
}
}
(3)消费者
@Service
public class HandleService implements ChannelAwareMessageListener {
private static final Logger logger = LoggerFactory.getLogger(HandleService.class);
/**
* @param
* 1、处理成功,这种时候用basicAck确认消息;
* 2、可重试的处理失败,这时候用basicNack将消息重新入列;
* 3、不可重试的处理失败,这时候使用basicNack将消息丢弃。
*
* basicNack(long deliveryTag, boolean multiple, boolean requeue)
* deliveryTag:该消息的index
* multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。
* requeue:被拒绝的是否重新入队列
*/
@Override
public void onMessage(Message message, Channel channel) throws Exception {
byte[] body = message.getBody();
logger.info("接收到消息:" new String(body));
JSONObject jsonObject = null;
try {
jsonObject = JSONObject.parseObject(new String(body));
if (消费成功) {
logger.info("消息消费成功");
channel.basicAck(message.getMessagePropertites().getDeliveryTag(),false);//确认消息消费成功
}else if(可重试的失败处理){
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
} else { //消费失败
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
} catch (JSONException e) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);//消息丢弃
logger.error("This message:" jsonObject " conversion JSON error ");
}
}
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhgagkef
系列文章
更多
同类精品
更多
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
怎样阻止微信小程序自动打开
PHP中文网 06-13