RabbitMQ报错 Already closed: The AMQP operation was interrupted
C#使用rabbitmq在接收消息事件处理中报错:
Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=505, text='UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead', classId=60, methodId=40
解决办法是将接收事件代码里面末尾加个线程休眠“System.Threading.Thread.Sleep(1);”
-
/// <summary>
-
/// 监听消息队里的消息
-
/// </summary>
-
/// <param name="func">外围业务方法</param>
-
public static void Receive(Func<string, bool> func)
-
{
-
try
-
{
-
//如果未连接则重连连接队列
-
AgainInitMessageQueue();
-
-
//创建消费者对象
-
var consumer = new EventingBasicConsumer(channel);
-
//监听消费事件,如果执行shutdown了,此事件会执行失败
-
consumer.Received = (model, ea) =>
-
{
-
//接收到的消息
-
var message = Encoding.UTF8.GetString(ea.Body.Span);
-
//委托外围方法处理业务
-
var result = func(message);
-
if (result)
-
{
-
//业务处理成功后单条通知生产者
-
channel.BasicAck(ea.DeliveryTag, true);//手动应答MQ已经成功接收,批量应答
-
LogHelper.Info(typeof(MessageQueueFactory), $"MessageQueueFactory.Receive()-消费成功,message={message}");
-
}
-
else
-
{
-
//业务处理失败需要重回队列等待消费
-
//channel.BasicReject(ea.DeliveryTag, true);//拒绝接收单条消息,是否重回队列
-
LogHelper.Error(typeof(MessageQueueFactory), $"MessageQueueFactory.Receive()-业务处理失败,进入缓存等待重新入列,message={message}", true);
-
-
//###为了防止队里一直处于等到出列,导致后面数据无法消费,此处自动消费成功,进入缓存等待重新入列消费####
-
//添加处理失败的数据进缓存中
-
//添加消息重试次数缓存 超过上限不再进行重试
-
var mqBaseModel = SerializerHelper.DeserializerJson<MQBaseModel>(message);
-
if (mqBaseModel != null)
-
{
-
int retryTimes = ConvertBasic.ToInt32(RedisClient.GetValue<int>(CacheKey.SendMsgFailRetryKey mqBaseModel.MQCode));
-
RedisClient.SetValue(CacheKey.SendMsgFailRetryKey mqBaseModel.MQCode, retryTimes 1);
-
}
-
var ret = RedisClient.SAdd(CacheKey.SendMsgFailListKey, message);
-
if (!ret)
-
{
-
LogHelper.Error(typeof(MessageQueueFactory), $"MessageQueueFactory.Receive()-添加处理失败的数据进缓存中失败,message={message}", true);
-
}
-
channel.BasicAck(ea.DeliveryTag, true);//手动应答MQ已经成功接收,批量应答
-
}
-
Thread.Sleep(1);
-
//此行代码必须,不然写入队列消息会报错
-
//错误消息:Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=505, text='UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead', classId=60, methodId=40
-
};
-
//监听shutdown事件
-
consumer.Shutdown = (model, e) =>
-
{
-
//记录日志
-
LogHelper.Error(typeof(MessageQueueFactory), $"MessageQueueFactory.Receive()-shutdown被执行,队列监听失败e={SerializerHelper.SerializerJson(e)}", true);
-
};
-
//消费者开启监听,手动应答
-
channel.BasicQos(0, 1, false);//逐条消费
-
channel.BasicConsume(queue: MQNameConfig.CommentQueue, autoAck: false, consumer: consumer);
-
}
-
catch (Exception ex)
-
{
-
LogHelper.Error(typeof(MessageQueueFactory), $"MessageQueueFactory.Receive()-监听消息队里的消息出错,{ex.Message}", ex, true);
-
}
-
}
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhgffgci
系列文章
更多
同类精品
更多
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
怎样阻止微信小程序自动打开
PHP中文网 06-13 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01