• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Netty案例-群聊天室

武飞扬头像
悠然予夏
帮助1

案例要求:

  1. 编写一个 Netty 群聊系统,实现服务器端和客户端之间的数据简单通讯
  2. 实现多人群聊
  3. 服务器端:可以监测用户上线,离线,并实现消息转发功能
  4. 客户端:可以发送消息给其它所有用户,同时可以接受其它用户发送的消息

1、聊天室服务端编写

NettyChatServer

  1.  
    package com.lagou.chat;
  2.  
     
  3.  
    import io.netty.bootstrap.ServerBootstrap;
  4.  
    import io.netty.channel.*;
  5.  
    import io.netty.channel.nio.NioEventLoopGroup;
  6.  
    import io.netty.channel.socket.SocketChannel;
  7.  
    import io.netty.channel.socket.nio.NioServerSocketChannel;
  8.  
    import io.netty.handler.codec.string.StringDecoder;
  9.  
    import io.netty.handler.codec.string.StringEncoder;
  10.  
     
  11.  
    /**
  12.  
    * 聊天室服务端
  13.  
    */
  14.  
    public class NettyChatServer {
  15.  
    // 端口号
  16.  
    private int port;
  17.  
     
  18.  
    public NettyChatServer(int port) {
  19.  
    this.port = port;
  20.  
    }
  21.  
     
  22.  
    public void run() throws InterruptedException {
  23.  
    // 1. 创建bossGroup线程组: 处理网络事件--连接事件
  24.  
    EventLoopGroup bossGroup = null;
  25.  
    // 2. 创建workerGroup线程组: 处理网络事件--读写事件 2*处理器线程数
  26.  
    EventLoopGroup workerGroup = null;
  27.  
    try {
  28.  
    // 1. 创建bossGroup线程组: 处理网络事件--连接事件
  29.  
    bossGroup = new NioEventLoopGroup(1);
  30.  
    // 2. 创建workerGroup线程组: 处理网络事件--读写事件 2*处理器线程数
  31.  
    workerGroup = new NioEventLoopGroup(2);
  32.  
    // 3. 创建服务端启动助手
  33.  
    ServerBootstrap serverBootstrap = new ServerBootstrap();
  34.  
    // 4. 设置bossGroup线程组和workerGroup线程组
  35.  
    serverBootstrap.group(bossGroup, workerGroup)
  36.  
    .channel(NioServerSocketChannel.class) // 5. 设置服务端通道实现为NIO
  37.  
    .option(ChannelOption.SO_BACKLOG, 128) // 6. 参数设置
  38.  
    .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE) // 6. 参数设置
  39.  
    .childHandler(new ChannelInitializer<SocketChannel>() {// 7. 创建一个通道初始化对象
  40.  
    @Override
  41.  
    protected void initChannel(SocketChannel ch) throws Exception {
  42.  
    // 8. 向pipeline中添加自定义业务处理handler
  43.  
    // 添加编解码器
  44.  
    ch.pipeline().addLast(new StringDecoder());
  45.  
    ch.pipeline().addLast(new StringEncoder());
  46.  
     
  47.  
    ch.pipeline().addLast(new NettyServerHandler());
  48.  
    }
  49.  
    });
  50.  
     
  51.  
    // 9. 启动服务端并绑定端口,同时将异步改为同步
  52.  
    // ChannelFuture future = serverBootstrap.bind(9999).sync();
  53.  
    ChannelFuture future = serverBootstrap.bind(port);
  54.  
    future.addListener(new ChannelFutureListener() {
  55.  
    @Override
  56.  
    public void operationComplete(ChannelFuture future) throws Exception {
  57.  
    if (future.isSuccess()) {
  58.  
    System.out.println("端口绑定成功");
  59.  
    } else {
  60.  
    System.out.println("端口绑定失败");
  61.  
    }
  62.  
    }
  63.  
    });
  64.  
    System.out.println("聊天室服务端启动成功");
  65.  
    // 10. 关闭通道(并不是真正意义上的关闭,而是监听通道关闭的状态)和关闭连接池
  66.  
    future.channel().closeFuture().sync();
  67.  
    } finally {
  68.  
    bossGroup.shutdownGracefully();
  69.  
    workerGroup.shutdownGracefully();
  70.  
    }
  71.  
    }
  72.  
     
  73.  
    public static void main (String[]args) throws InterruptedException {
  74.  
    new NettyChatServer(9998).run();
  75.  
    }
  76.  
    }
  77.  
     
学新通

NettyServerHandler

  1.  
    package com.lagou.chat;
  2.  
     
  3.  
    import io.netty.channel.Channel;
  4.  
    import io.netty.channel.ChannelHandlerContext;
  5.  
    import io.netty.channel.SimpleChannelInboundHandler;
  6.  
     
  7.  
    import java.util.ArrayList;
  8.  
    import java.util.List;
  9.  
     
  10.  
    /**
  11.  
    * 聊天室业务处理类
  12.  
    */
  13.  
    public class NettyServerHandler extends SimpleChannelInboundHandler<String> {
  14.  
    public static List<Channel> channelList = new ArrayList<>();
  15.  
     
  16.  
    /**
  17.  
    * 通道就绪事件
  18.  
    *
  19.  
    * @param ctx
  20.  
    * @throws Exception
  21.  
    */
  22.  
    @Override
  23.  
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
  24.  
    Channel channel = ctx.channel();
  25.  
    // 当有新的客户端连接的时候,将通道放入集合
  26.  
    channelList.add(channel);
  27.  
    System.out.println("[Server]:" channel.remoteAddress().toString().substring(1) "在线");
  28.  
    }
  29.  
     
  30.  
    /**
  31.  
    * 通道未就绪--channel 下线
  32.  
    *
  33.  
    * @param ctx
  34.  
    * @throws Exception
  35.  
    */
  36.  
    @Override
  37.  
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  38.  
    Channel channel = ctx.channel();
  39.  
    // 当已有客户端断开连接的时候,就一处对应的通道
  40.  
    channelList.remove(channel);
  41.  
    System.out.println("[Server]:" channel.remoteAddress().toString().substring(1) "下线");
  42.  
    }
  43.  
     
  44.  
    /**
  45.  
    * 通道读取事件
  46.  
    *
  47.  
    * @param ctx
  48.  
    * @param msg
  49.  
    * @throws Exception
  50.  
    */
  51.  
    @Override
  52.  
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
  53.  
    // 当前发送消息的通道,当前发送客户端的连接
  54.  
    Channel channel = ctx.channel();
  55.  
    for (Channel channel1 : channelList) {
  56.  
    // 排除自身通道
  57.  
    if (channel != channel1) {
  58.  
    channel1.writeAndFlush("[" channel.remoteAddress().toString().substring(1) "]说:" msg);
  59.  
    }
  60.  
    }
  61.  
    }
  62.  
     
  63.  
    /**
  64.  
    * 异常处理事件
  65.  
    * @param ctx
  66.  
    * @param cause
  67.  
    * @throws Exception
  68.  
    */
  69.  
     
  70.  
    @Override
  71.  
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  72.  
    cause.printStackTrace();
  73.  
    Channel channel = ctx.channel();
  74.  
    // 移除集合
  75.  
    channelList.remove(channel);
  76.  
    System.out.println("[Server]:" channel.remoteAddress().toString().substring(1) "异常");
  77.  
    }
  78.  
    }
学新通

2、聊天室客户端编写

NettyChatClient

  1.  
    package com.lagou.chat;
  2.  
     
  3.  
    import com.lagou.code.MessageCodec;
  4.  
    import com.lagou.code.NettyClientHandler;
  5.  
    import io.netty.bootstrap.Bootstrap;
  6.  
    import io.netty.channel.Channel;
  7.  
    import io.netty.channel.ChannelFuture;
  8.  
    import io.netty.channel.ChannelInitializer;
  9.  
    import io.netty.channel.EventLoopGroup;
  10.  
    import io.netty.channel.nio.NioEventLoopGroup;
  11.  
    import io.netty.channel.socket.SocketChannel;
  12.  
    import io.netty.channel.socket.nio.NioSocketChannel;
  13.  
    import io.netty.handler.codec.string.StringDecoder;
  14.  
    import io.netty.handler.codec.string.StringEncoder;
  15.  
     
  16.  
    import java.util.Scanner;
  17.  
     
  18.  
    /**
  19.  
    * 聊天室客户端
  20.  
    */
  21.  
    public class NettyChatClient {
  22.  
    private String ip;
  23.  
    private int port;
  24.  
     
  25.  
    public NettyChatClient(String ip, int port) {
  26.  
    this.ip = ip;
  27.  
    this.port = port;
  28.  
    }
  29.  
     
  30.  
    public void run() throws InterruptedException {
  31.  
    // 1. 创建线程组
  32.  
    EventLoopGroup group = null;
  33.  
    try {
  34.  
    group = new NioEventLoopGroup();
  35.  
    // 2. 创建客户端启动助手
  36.  
    Bootstrap bootstrap = new Bootstrap();
  37.  
    // 3. 设置线程组
  38.  
    bootstrap.group(group)
  39.  
    .channel(NioSocketChannel.class) // 4. 设置客户端通道实现为NIO
  40.  
    .handler(new ChannelInitializer<SocketChannel>() { // 5. 创建一个通道初始化对象
  41.  
    @Override
  42.  
    protected void initChannel(SocketChannel ch) throws Exception {
  43.  
    // 6. 向pipeline中添加自定义业务处理handler
  44.  
    // 添加编解码器
  45.  
    ch.pipeline().addLast(new StringDecoder());
  46.  
    ch.pipeline().addLast(new StringEncoder());
  47.  
    // 添加客户端的处理类
  48.  
    ch.pipeline().addLast(new NettyChatClientHandler());
  49.  
    }
  50.  
    });
  51.  
    // 7. 启动客户端,等待连接服务端,同时将异步改为同步
  52.  
    ChannelFuture channelFuture = bootstrap.connect(ip, port).sync();
  53.  
    Channel channel = channelFuture.channel();
  54.  
    System.out.println("----------" channel.localAddress().toString().substring(1) "-------------");
  55.  
    Scanner scanner = new Scanner(System.in);
  56.  
    while(scanner.hasNextLine()) {
  57.  
    String msg = scanner.nextLine();
  58.  
    // 向服务端发送消息
  59.  
    channel.writeAndFlush(msg);
  60.  
    }
  61.  
    // 8. 关闭通道和关闭连接池
  62.  
    channelFuture.channel().closeFuture().sync();
  63.  
    }finally {
  64.  
    group.shutdownGracefully();
  65.  
    }
  66.  
    }
  67.  
     
  68.  
    public static void main(String[] args) throws InterruptedException {
  69.  
    new NettyChatClient("127.0.0.1", 9998).run();
  70.  
    }
  71.  
    }
学新通

NettyChatClientHandler

  1.  
    package com.lagou.chat;
  2.  
     
  3.  
    import io.netty.channel.ChannelHandlerContext;
  4.  
    import io.netty.channel.SimpleChannelInboundHandler;
  5.  
     
  6.  
    /**
  7.  
    * 聊天室处理handler
  8.  
    */
  9.  
    public class NettyChatClientHandler extends SimpleChannelInboundHandler<String> {
  10.  
    /**
  11.  
    * 通道就绪事件
  12.  
    * @param ctx
  13.  
    * @param msg
  14.  
    * @throws Exception
  15.  
    */
  16.  
    @Override
  17.  
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
  18.  
    System.out.println(msg);
  19.  
    }
  20.  
    }
学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgakije
系列文章
更多 icon
同类精品
更多 icon
继续加载