一张图进阶 RocketMQ - 通信机制
前 言
「三此君看了好几本书,看了很多遍源码整理的 一张图进阶 RocketMQ 图片,关于 RocketMQ 你只需要记住这张图!觉得不错的话,记得点赞关注哦。」
【重要】视频在 B 站同步更新,欢迎围观,轻轻松松涨姿势。一张图进阶 RocketMQ-通信机制(视频版) 点击查看【bilibili】
本文是“一张图进阶 RocketMQ”第 4 篇,对 RocketMQ 不了解的同学可以先看看前面三期:
[一张图进阶 RocketMQ-整体架构](https://www.bilibili.com/video/BV1534y157RF)
[一张图进阶 RocketMQ - NameServer](https://www.bilibili.com/video/BV1tY4y1g795)
[一张图进阶 RocketMQ - 消息发送](https://www.bilibili.com/video/BV1bf4y1Z7ui)
上一期分享了 RocketMQ 生产者启动流程及同步消息发送流程,我们知道了在通信层是基于 Netty 将消息传递给 Broker 进行存储的。如果对 Netty 完全不了解我们就很难真正理解 RocketMQ,所以今天我们简单的聊一聊 Netty 基本流程,然后分析 RocketMQ 的通信机制,最后通过异步消息发送来串联 RocketMQ 通信机制。
Netty 介绍
Netty 有很多概念,等介绍完概念大家都困了,我们就不过多介绍了,直接结合示例来看看 Netty 的基础流程,能够帮助我们更好的理解 RocketMQ 即可。
-
Netty 服务端启动初始化两个线程组 「BossGroup & WorkerGroup」,分别用于处理「客户端连接及网络读写」。
-
Netty 客户端启动初始化一个线程组, 用于处理请求及返回结果。
-
客户端 「connect」 到 Netty 服务端,创建用于 「传输数据的 Channel」。
-
Netty 服务端的 BossGroup 处理客户端的连接请求,然后把剩下的工作交给 WorkerGroup。
-
连接建立好了,客户端就可以利用这个连接发送数据给 Netty 服务端。
-
Netty WorkerGroup 中的线程使用 「Pipeline(包含多个处理器 Handler)」 对数据进行处理。
-
Netty 服务端的处理完请求后,返回结果也经过 Pipeline 处理。
-
Netty 服务端通过 Channel 将数据返回给客户端。
-
客户端通过 Channel 接收到数据,也经过 Pipeline 进行处理。
Netty 示例
我们先用 Netty 实现一个简单的 服务端/客户端 通信示例,我们是这样使用的,那 RocketMQ 基于 Netty 的通信也应该是这样使用的,不过是在这个基础上封装了一层。主要关注以下几个点:服务端什么时候初始化的,服务端实现的 Handler 做了什么事?客户端什么时候初始化的,客户端实现的 Handler 做了什么事? 「Netty 服务端初始化」:初始化的代码很关键,我们要从源码上理解 RocketMQ 的通信机制,那肯定会看到类似的代码。根据上面的流程来看,首先是实例化 bossGroup 和 workerGroup,然后初始化 Channel,从代码可以看出我们是在 Pipeline 中添加了自己实现的 Handler,这个 Handler 就是业务自己的逻辑了,那 RocketMQ 要处理数据应该也需要实现相应的 Handler。
-
public class MyServer {
-
public static void main(String[] args) throws Exception {
-
//创建两个线程组 boosGroup、workerGroup
-
EventLoopGroup bossGroup = new NioEventLoopGroup();
-
EventLoopGroup workerGroup = new NioEventLoopGroup();
-
try {
-
//创建服务端的启动对象,设置参数
-
ServerBootstrap bootstrap = new ServerBootstrap();
-
//设置两个线程组boosGroup和workerGroup
-
bootstrap.group(bossGroup, workerGroup)
-
//设置服务端通道实现类型
-
.channel(NioServerSocketChannel.class)
-
//使用匿名内部类的形式初始化Channel对象
-
.childHandler(new ChannelInitializer<SocketChannel>() {
-
@Override
-
protected void initChannel(SocketChannel socketChannel) throws Exception {
-
//给pipeline管道添加处理器
-
socketChannel.pipeline().addLast(new MyServerHandler());
-
}
-
});//给workerGroup的EventLoop对应的管道设置处理器
-
//绑定端口号,启动服务端
-
ChannelFuture channelFuture = bootstrap.bind(6666).sync();
-
//对关闭通道进行监听
-
channelFuture.channel().closeFuture().sync();
-
} finally {
-
bossGroup.shutdownGracefully();
-
workerGroup.shutdownGracefully();
-
}
-
}
-
}
「实现自定义的服务端处理器 Handler」:自定义的 Handler 需要实现 Netty 定义的 HandlerAdapter,当有可读事件时就会调用这里的 channelRead() 方法。等下我们看 RocketMQ 通信机制的时候留意RocketMQ 自定义了哪些 Handler,这些 Handler 有做了什么事。
-
/**
-
* 自定义的Handler需要继承Netty规定好的 HandlerAdapter 才能被Netty框架所关联,有点类似SpringMVC的适配器模式
-
**/
-
public class MyServerHandler extends ChannelInboundHandlerAdapter {
-
@Override
-
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-
//获取客户端发送过来的消息
-
ByteBuf byteBuf = (ByteBuf) msg;
-
System.out.println("收到" ctx.channel().remoteAddress() "发送的消息:" byteBuf.toString(CharsetUtil.UTF_8));
-
//发送消息给客户端
-
ctx.writeAndFlush(Unpooled.copiedBuffer("服务端已收到消息,记得关注三此君,记得三连", CharsetUtil.UTF_8));
-
}
-
@Override
-
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-
//发生异常,关闭通道
-
ctx.close();
-
}
-
}
「Netty 客户端初始化」:Netty 客户端,在 RocketMQ 中对应了 Producer/Consumer。在 Producer 启动中有一步是启动通信模块服务,其实就是初始化 Netty 客户端。客户端也需要先实例化一个 NioEventLoopGroup,然后将自定义的 handler 添加到 Pipeline,还有很重要的一步是我们需要 connect 连接到 Netty 服务端。
-
public class MyClient {
-
public static void main(String[] args) throws Exception {
-
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
-
try {
-
//创建bootstrap启动引导对象,配置参数
-
Bootstrap bootstrap = new Bootstrap();
-
//设置线程组
-
bootstrap.group(eventExecutors)
-
//设置客户端的Channel实现类型
-
.channel(NioSocketChannel.class)
-
//使用匿名内部类初始化 Pipeline
-
.handler(new ChannelInitializer<SocketChannel>() {
-
@Override
-
protected void initChannel(SocketChannel ch) throws Exception {
-
//添加客户端Channel的处理器
-
ch.pipeline().addLast(new MyClientHandler());
-
}
-
})
-
//connect连接服务端
-
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();
-
//对Channel关闭进行监听
-
channelFuture.channel().closeFuture().sync();
-
} finally {
-
//关闭线程组
-
eventExecutors.shutdownGracefully();
-
}
-
}
-
}
「实现自定义的客户端处理器 Handler」:客户端处理器也继承自 Netty 定义的 HandlerAdapter,当 Channel 变得可读的时候(服务端数据返回)会调用我们自己实现的 channelRead()。
-
public class MyClientHandler extends ChannelInboundHandlerAdapter {
-
-
public void channelActive(ChannelHandlerContext ctx) throws Exception {
-
//发送消息到服务端
-
ctx.writeAndFlush(Unpooled.copiedBuffer("三此君,我正在看 RocketMQ 生产者发送消息~", CharsetUtil.UTF_8));
-
}
-
-
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-
//接收服务端发送过来的消息
-
ByteBuf byteBuf = (ByteBuf) msg;
-
System.out.println("收到三此君的消息,我一定会三连的" ctx.channel().remoteAddress() byteBuf.toString(CharsetUtil.UTF_8));
-
}
-
}
RocketMQ 通信流程
RocketMQ 通信模块基于 Netty 实现,总体代码量不多。主要是 NettyRemotingServer和NettyRemotingClient,分别对应通信的服务端和客户端。根据前面的 Netty 示例,我们要理解 RocketMQ 如何基于 Netty 通信,只需要知道 4 个地方:NettyRemotingServer 如何初始化,NettyRemotingClient 初始化,如何基于 NettyRemotingClient 发送消息,无论是客户端还是服务端收到数据后都需要 Handler 来处理。
-
Broker/NameServer 需要启动 Netty 服务端。Broker 我们后面会进一步分析,只需要知道 Broker 启动的时候会调用 NettyRemotingServer.start() 方法初始化 Netty 服务器。主要做了 4 件事:配置 BossGroup/WorkerGroup NioEventLoopGroup 线程组,配置 Channel,添加 NettyServerHandler,调用 serverBootstrap.bind() 监听端口等待客户端连接。
-
Producer/Consumer 需要启动 Netty 客户端,在生产者启动流程中 MQClientInstantce 启动通信服务模块,其实就是调用NettyRemotingClient.start() 初始化 Netty 客户端。主要做了 3 件事:配置客户端 NioEventLoopGroup 线程组,配置 Channel,添加 NettyClientHandler。
-
客户端配置了 Channel,但是 Channel 还没有创建,因为 Channel 肯定要和具体的 Server IP Addr 关联。在同步消息发送流程中,调用 NettyRemoteClient.invokeSync(),从 channelTables 缓存中获取或者创建一个新的 Channel,其实就是调用 bootstrap.connect() 连接到 NettyServer,创建用于通信的 Channel。
-
有了 Channel 后,Producer 调用 Channel.writeAndFlush() 将数据发送给服务器。NettyRemotingServer WorkerGroup 处理可读事件,调用 NettyServerHandler 处理数据。
-
NettyServerHandler 调用 processMessageReceived方法。processMessageReceived 方法做了什么呢?通过传入的请求码 RequestCode 区别不同的请求,不同的请求定义了不同的 Processor。例如,是生产者存入消息使用 SendMessageProcessor,查询消息使用 QueryMessageProcessor,拉取消息使用 PullMessageProcessor。这些 Processor 在服务端初始化的时候,以 RequestCode 为 Key 添加到 Processor 缓存中。processMessageReceived 就是根据 RequeseCode 获取不同的 Processor,处理完后把结果返回给 NettyRemotingClient。
-
NettyRemotingClient 收到可读事件,调用 NettyClientHandler 处理返回结果。NettyClientHandler也调用processMessageReceived 处理返回结果。processMessageReceived 从以 opaque 为 key ResponseTables 缓存冲取出 ResponseFuture,将返回结果设置到 ResponseFuture。同步消息则执行 responseFuture.putResponse(),异步调用执行回调。
异步发送
除了同步消息发送,RocketMQ 还支持异步发送。我们只需要在前面是示例中稍作修改就会得到一个异步发送示例,最大的不同在于发送的时候传入 SendCallback 接收异步返回结果回调。
-
public class AsyncProducer {
-
public static void main(String[] args) throws Exception {
-
// 实例化消息生产者Producer
-
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
-
// 设置NameServer的地址
-
producer.setNamesrvAddr("localhost:9876");
-
// 启动Producer实例
-
producer.start();
-
// 创建消息,并指定Topic,Tag和消息体
-
Message msg = new Message("Topic1","Tag", "Key", "Hello world".getBytes("UTF-8"));
-
// SendCallback 接收异步返回结果的回调
-
producer.send(msg, new SendCallback() {
-
-
public void onSuccess(SendResult sendResult) {
-
System.out.printf("关注呀!!!%-10d OK %s %n", index,sendResult.getMsgId());
-
}
-
-
public void onException(Throwable e) {
-
System.out.printf("三连呀!!!%-10d Exception %s %n", index, e);
-
e.printStackTrace();
-
}
-
});
-
// 如果不再发送消息,关闭Producer实例。
-
producer.shutdown();
-
}
-
}
同步发送个异步发送主要的过程都是一样的,不同点在于同步消息调用 Netty Channel.writeAndFlush 之后是 waitResponse 等待 Broker 返回,而异步消息是调用预先定义好的回调函数。
异步消息和同步消息整体差不多,可以说在基于 Netty 实现异步消息比同步消息还要简单一下,我们这里主要来看一些不同点:
-
调用 DefaultMQProducer 异步发送接口需要我们定义 SendCallback 回调函数,在执行成功或者执行失败后回调。
-
DefaultMQProducerImp 中的 send 方法会将异步发送请求封装成 Runable 提交到线程池,然后业务线程就直接返回了。
-
sendDefaultImpl 计算重试同步和异步消息有区别,异步消息在这里不会重试,而是在后面结果返回的时候通过递归重试。
-
跟着调用链到 sendMessageAsync 方法,需要注意的是这里构建了 InvokeCallback 实例,ResponseFuture 会持有该实例,Netty 结果返回后调用该实例的方法。
-
下面就是正常的 Netty 数据发送流程,直到 Broker 处理完请求,返回结果。NettyRemotingClient 处理可读事件,NettyClientHandler 处理返回结果,调用 ResponseFuture.executeInokeCallback,进而调用 InvokeCallback.operationComplete.
-
如果 Broker 返回结果是成功的,则封装返回结果 SendResult,并回调业务实现的 SendCallback.onSucess 方法,更新容错项。
-
如果 Broker 返回失败,或出现任何异常则执行重试,重试超过 retryTimesWhenSendFailed 次则回调业务定义的 SendCallback.onException方法。
总结
-
Netty:BossGroup 处理客户端连接请求,生成 ServerSocketChannel 注册到 WorkerGroup,WorkerGroup 处理网络读写请求,调用 Channel 对应的 Pipeline 处理请求,Pipeline 中有很多 ChannelHandler 对请求进行处理。
-
通信机制:基于 Netty 实现,只需要留意 NettyRemotingServer/NettyRemotingClient 的初始化,并且在通道变得可读/可写时,会调用 NettyServerHandler/NettyClienthandler 进行处理。
-
同步异步:同步和异步消息大同小异,只是同步消息通过 Netty 发送请求后会执行 ResponseFuture.waitResponse() 阻塞等待,而异步消息发送请求后不会等待,请求返回回调用 SendCallback 相应的方法。
消息已经发送给了 Broker,下一期我们将来看看Broker 是如何存储消息的,RocketMQ 如何支持百万级的吞吐量?感谢观看,我们下期再见
参考文献
-
丁威, 周继锋. RocketMQ技术内幕:RocketMQ架构设计与实现原理. 机械工业出版社, 2019-01.
-
李伟. RocketMQ分布式消息中间件:核心原理与最佳实践. 电子工业出版社, 2020-08.
-
杨开元. RocketMQ实战与原理解析. 机械工业出版社, 2018-06.
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhfhhifb
-
第二十二篇Java NIO重点
-
Elasticsearch8 解决启动报错fatal exception while booting Elasticsearchjava.nio.file.InvalidPathException
-
Android 编译jar异常AGPBI: {“kind“:“error“,“text“:“java.nio.file.NoSuchFileException: C:\\Program Files
-
网络通信编程基础,BIO,NIO
-
Java基础:《netty3—NIO:Buffer》
-
java网络和IOBIO/NIO面试题,从Linux底层告诉你为什么Java这么设计----持续补充
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
怎样阻止微信小程序自动打开
PHP中文网 06-13