• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Netty线程模型和核心功能

武飞扬头像
xujingyiss
帮助3

为什么要使用Netty?

NIO 的类库和 API 繁杂,使用麻烦:需要熟练掌握 Selector、 ServerSocketChannel、 SocketChannel、 ByteBuffer 等,开发工作量和难度都非常大;

Netty 对 JDK 自带的 NIO 的 API 进行了良好的封装,解决了上述问题。且 Netty 拥有高性能、 吞吐量更高,延迟更低,减少资源消耗,最小化不必要的内存复制等优点;

Netty在互联网行业的应用

在分布式系统中,各个节点之间需要远程服务调用,高性能的 RPC 框架必不可少。

Netty 作为异步高性能的通信框架,往往作为基础通信组件被这些 RPC 框架使用。

典型的应用有:阿里分布式服务框架 Dubbo 的 RPC 框架使用 Dubbo 协议进行节点间通信。Dubbo 协议默认使用 Netty 作为基础通信组件,用于实现各进程节点之间的内部通信。Rocketmq 底层也是用的 Netty 作为基础通信组件。

主从Reactor多线程模型

单Reactor多线程模型:只有一个Reactor,Reactor负责处理accpet,read,write事件。

主从Reactor多线程模型:使用两个Reactor,已主一从。mainReactor专门维护accept事件,当接收到accept事件,将该连接交给subReactor,subReactor维护该连接的read和write事件。这里的 Reactor可以理解为 NIO 中的 Selector。

学新通

主从Reactor的方式,将连接和数据处理完全分开维护,将大大提高并发量。

Netty线程模型

Netty使用的线程模型就是“主从Reactor多线程模型”,并做了些扩展。

学新通

模型说明:

  1. Boss Group 对应 mainReactor,用于处理连接事件
  2. Worker Group 对应 subReactor,用于处理读写事件
  3. NioEventGroup 是一个线程池,但是里面只有一个线程,对应一个 Selector,用于监听注册在其上的 Channel 的网络通讯
  4. Worker Group 包含多个 NioEventGroup,说明有多个工作线程池(每个线程池中仍然只有一个线程)
  5. Boss Group 一般来说只建一个 NioEventGroup 就行了,也就是只用一个 Selector 处理连接事件。与多个工作线程合起来组成一主多从架构
  6. 但是 Netty 也支持多主多从架构,也就是 Boss Group 中包含多个 NioEventGroup。但是这意味着要提供多个端口,而且使用时要做好负载均衡(如借助zookeeper),很麻烦。一般使用一主多从就够了。而通常说的 Netty 支持单机百万量访问,是需要使用多主多从架构的
  7. Boss NioEventLoop 线程内部循环处理 accept 事件,与 client 建立连接 , 生成 NioSocketChannel,然后将 NioSocketChannel 注册到某个 Worker NIOEventLoop 上的 Selector,最后处理任务队列的任务(runAllTasks)
  8. 每个 Worker NioEventLoop 线程内部轮询注册到自己 Selector 上的所有 NioSocketChannel 的 read/write 事件并交给 Pipeline 中的 ChannelHandler 处理,最后处理任务队列的任务,耗时的业务处理可以放到队列中,在这个阶段处理。
  9. Worker Group 中 NioEventGroup 数量默认为机器 CPU 核心数的两倍

Netty模块组件

1、Bootstrap、ServerBootstrap

Bootstrap 意思是“引导”,一个 Netty 应用通常由一个 Bootstrap 开始,主要作用是配置整个 Netty 程序,串联各个组件

客户端程序的启动引导类是 Bootstrap,服务端启动引导类是 ServerBootstrap。

2、Future、ChannelFuture

在 Netty 中所有的 IO 操作都是“异步”的,不能立刻得知消息是否被正确处理。

但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过 Future 和 ChannelFuture。他们可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件。

3、Channel

Channel 是 Netty 网络通信的组件,能够用于执行网络 I/O 操作。对应 NIO 模型中的 Channel。

不同协议、不同的阻塞类型的连接都有不同的 Channel 类型与之对应。

TCP Socket 连接中,服务端使用 NioServerSocketChannel,客户端使用 NioSocketChannel。

4、Handler

涉及 ChannelHandler、ChannelHandlerContext、ChannelPipline。Handler 非常重要,在下面单独写。

5、Selector

这个 Selector 就是 NIO 的 Selector。

Netty 基于 Selector 对象实现 I/O 多路复用,通过 Selector 一个线程可以监听多个连接的 Channel 事件。

当向一个 Selector 中注册 Channel 后,Selector 内部的机制就可以自动不断地查询(Select)这些注册的 Channel 是否有已就绪的 I/O 事件(例如可读,可写,网络连接完成等),这样程序就可以很简单地使用一个线程高效地管理多个 Channel 。

6、NioEventLoop、NioEventLoopGroup

NioEventLoop 可以看做是一个线程池,而且里面只有一个线程,对应一个 Selector!线程启动时会调用 NioEventLoop 的 run 方法,执行 I/O 任务和非 I/O 任务:

  1. I/O 任务,即 selectionKey 中 ready 的事件,如 accept、connect、read、write 等,由 processSelectedKeys 方法触发。
  2. 非 IO 任务,添加到 taskQueue 中的任务,如 register0、bind0 等任务,由 runAllTasks 方法触发。

NioEventLoopGroup 主要用来管理 NioEventLoop 的生命周期。可以理解为一个线程池,内部维护了一组线程池,每个 NioEventLoop(对应一个Selector) 负责处理多个 Channel 上的事件。

Handler

什么是 Handler?

Handler 从字面意思上来看,就是处理器。一个 Handler 负责处理特定的任务。

例如 Netty 自带的 StringEncoder,它就是一个 Handler,实现了 ChannelHandler 接口。它负责字符串的编码工作。我们输入的是 String 类型的字符串,但是计算机处理的时候不认字符串,只认 字节,StringEncoder 就用来把字符串转换为字节。

在 Netty 中,Handler 非常重要,平时编码基本上只是跟 Handler 打交道。不管是 server 端,还是 client 端,代码基本上是固定的,不同的只是 Handler 部分。

Handler 的分类

基本接口是 ChannelHandler。

平时开发时可以继承它的子类:ChannelInboundHandler 和 ChannelOutboundHandler(这俩也是接口),分别用于处理“入站/出站” I/O 事件。

或者使用适配器类:ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter。

Handler 的组成部分

Handler 相关的组件有:ChannelHandler、ChannelHandlerContext、ChannelPipline

在 Netty 中每个 Channel 都有且仅有一个 ChannelPipeline 与之对应,它们的组成关系如下:

学新通

ChannelHandler 就是处理器。平时编码主要写的就是这个。
ChannelHandlerContext 保存 Channel 相关的所有上下文信息,同时关联一个 ChannelHandler 对象。
ChannelPipline 由 ChannelHandlerContext 组成的双向链表。

对应的 java 代码:

学新通

当有消息时产生时,比如客户端向服务端发送消息:

首先在客户端,会按一定的顺序执行 SocketChannel 对应 ChannelPipline 中所有的 ChannelHandler,将消息给到服务端;然后在服务端,会按照一定的顺序,执行服务端 ServerSocketChannel 对应的 ChannelPipline 中所有 ChannelHandler,来处理消息。

Handler 的执行顺序

处理消息时,不是 ChannelPipline 中所有的 ChannelHandler 都会执行的!而且执行顺序也有讲究。这个执行顺序非常重要!

基本规则:

对于入站事件,只执行 InboundHandler,且顺序是 head -> tail 方向

对于出站事件,只执行 OutboundHandler,且顺序是 tail -> head 方向

学新通

举个例子:客户端向服务端发送消息

  1. 对于客户端来说,是出站事件;先执行 OutboundHandler2,再执行 OutboundHandler1
  2. 对于服务端来说,是入站事件;先执行 InboundHandler1,在执行 InboundHandler2

编解码

编解码是通过 Handler 来实现的。

当你通过 Netty 发送或者接受一个消息的时候,就将会发生一次“数据转换

  • 出站:消息会被编码成字节
  • 入站:消息会被解码,从字节转换为另一种格式(比如string、java对象等)

以发送 string 消息为例,流程图如下: 

学新通

  1. 客户端发送消息之前,先将string编码为byte
  2. 客户端将byte发送到服务端
  3. 服务端收到byte后,先解码为string,再进行业务处理

反过来,服务端向客户端发送消息,也是一样的流程。

编解码相关Handler在Pipeline中的顺序

编解码相关Handler,在Pipeline中的顺序是很重要的。

StringEncoder 和 StringDecoder 为例:

  1. 假如 Pipeline 中只有 StringEncoder 这一个编码器,它必须是 Pipeline 中第一个 OutboundHandler。因为肯定是先进行业务操作,最后才进行编码(OutboundHandler倒序执行)
  2. 假如 Pipeline 中只有 StringDecoder 这一个解码器,它必须是 Pipeline 中第一个 InboundHandler。因为必须先解码,才能进行业务操作,否则没法识别消息内容(InboundHandler正序执行)

粘包拆包

TCP粘包拆包是指发送方发送的若干包数据到接收方接收时粘成一包或某个数据包被拆开接收。

如下图所示,client发了两个数据包D1和 D2,但是server端可能会收到如下几种情况的数据。

学新通

为什么出现粘包现象?

TCP 是面向连接的,面向流的,提供高可靠性服务。收发两端要有成对的 socket。因此,发送端为了将多个发给接收端的包,更有效的发给对方,使用了优化方法(Nagle 算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包。 

这样做虽然提高了效率,但是接收端就难于分辨出完整的数据包了,因为面向流的通信是无消息保护边界的。

解决方案:

格式化数据:每条数据有固定的格式(开始符、结束符),这种方法简单易行,但选择开始符和结束符的时候一定要注意每条数据的内部一定不能出现开始符或结束符。所以这个方式不怎么好用。

发送长度:发送每条数据的时候,将数据的长度一并发送,比如可以选择每条数据的前4位是数据的长度,应用层处理时可以根据长度来判断每条数据的开始和结束。这种方法比第一种方法稳妥。

代码示例

服务端

  1.  
    public class NettyServer {
  2.  
    public static void main(String[] args) {
  3.  
    // 1、定义server启动类
  4.  
    ServerBootstrap serverBootstrap = new ServerBootstrap();
  5.  
     
  6.  
    // 2、定义工作组: boss负责处理客户端连接请求,worker负责处理读写请求
  7.  
    EventLoopGroup boss = new NioEventLoopGroup();
  8.  
    EventLoopGroup worker = new NioEventLoopGroup();
  9.  
    serverBootstrap.group(boss, worker);
  10.  
     
  11.  
    // 设置通道channel
  12.  
    serverBootstrap.channel(NioServerSocketChannel.class);
  13.  
     
  14.  
    // 添加handler,管道中的处理器,通过ChannelInitializer来构造
  15.  
    serverBootstrap.childHandler(new ChannelInitializer<Channel>() {
  16.  
    @Override
  17.  
    protected void initChannel(Channel channel) {
  18.  
    // 获得通道channel中的管道链(执行链、handler链)
  19.  
    ChannelPipeline pipeline = channel.pipeline();
  20.  
    pipeline.addLast(new StringDecoder());
  21.  
    pipeline.addLast(new StringEncoder());
  22.  
    pipeline.addLast(new ServerHandler());
  23.  
    }
  24.  
    });
  25.  
     
  26.  
    // 设置参数
  27.  
    serverBootstrap.option(ChannelOption.SO_BACKLOG, 2048);
  28.  
    serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
  29.  
    serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true);
  30.  
     
  31.  
    try {
  32.  
    // server绑定ip和port
  33.  
    ChannelFuture channelFuture = serverBootstrap
  34.  
    .bind("127.0.0.1", 9000).sync();
  35.  
    // 监听关闭,关闭后应该释放资源
  36.  
    channelFuture.channel().closeFuture().sync();
  37.  
    } catch (InterruptedException e) {
  38.  
    System.out.println("server start got exception!");
  39.  
    e.printStackTrace();
  40.  
    } finally {
  41.  
    // 关闭资源
  42.  
    boss.shutdownGracefully();
  43.  
    worker.shutdownGracefully();
  44.  
    }
  45.  
    }
  46.  
    }
学新通
  1.  
    public class ServerHandler extends SimpleChannelInboundHandler<String> {
  2.  
    @Override
  3.  
    protected void channelRead0(ChannelHandlerContext ctx, String msg) {
  4.  
    System.out.println("服务端收到消息:" msg);
  5.  
    // 返回给客户端信息
  6.  
    ctx.channel().writeAndFlush("服务端已收到消息[" msg "]");
  7.  
    }
  8.  
    }

客户端

  1.  
    public class NettyClient {
  2.  
    public static void main(String[] args) {
  3.  
    // 定义服务类
  4.  
    Bootstrap bootstrap = new Bootstrap();
  5.  
     
  6.  
    // 定义工作线程池
  7.  
    EventLoopGroup worker = new NioEventLoopGroup();
  8.  
    bootstrap.group(worker);
  9.  
     
  10.  
    // 设置通道
  11.  
    bootstrap.channel(NioSocketChannel.class);
  12.  
     
  13.  
    // 添加Handler
  14.  
    bootstrap.handler(new ChannelInitializer<Channel>() {
  15.  
    @Override
  16.  
    protected void initChannel(Channel channel) {
  17.  
    ChannelPipeline pipeline = channel.pipeline();
  18.  
    pipeline.addLast("StringDecoder", new StringDecoder());
  19.  
    pipeline.addLast("StringEncoder", new StringEncoder());
  20.  
    pipeline.addLast("ClientHandler", new ClientHandler());
  21.  
    }
  22.  
    });
  23.  
     
  24.  
    // 建立连接
  25.  
    ChannelFuture channelFuture = bootstrap
  26.  
    .connect("127.0.0.1",9000);
  27.  
    try {
  28.  
    // 监听客户端输入
  29.  
    BufferedReader bufferedReader =
  30.  
    new BufferedReader(new InputStreamReader(System.in));
  31.  
    while(true) {
  32.  
    System.out.println("请输入:");
  33.  
    String msg = bufferedReader.readLine();
  34.  
    channelFuture.channel().writeAndFlush(msg);
  35.  
    }
  36.  
    } catch (Exception e) {
  37.  
    e.printStackTrace();
  38.  
    }finally {
  39.  
    // 关闭连接
  40.  
    worker.shutdownGracefully();
  41.  
    }
  42.  
    }
  43.  
    }
学新通
  1.  
    public class ClientHandler extends SimpleChannelInboundHandler<String> {
  2.  
    @Override
  3.  
    protected void channelRead0(ChannelHandlerContext ctx, String msg) {
  4.  
    System.out.println("客户端收到消息:" msg);
  5.  
    }
  6.  
    }

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhfhgcjb
系列文章
更多 icon
同类精品
更多 icon
继续加载