• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

NIO:Channel

武飞扬头像
普通人zzz~
帮助1

一、通道(channel)介绍

Channel是一个对象,作用是用于源节点和目标节点的连接,在java NIO中负责缓冲区数据的传递。Channel本身不存储数据,因此需要配合缓冲区进行传输。
  学新通

二、主要实现类

主要的实现类有如下四个: FileChannel, SocketChannel, ServerSocketChannel, DatagramChannel,
学新通

三、获取通道

  1. Java针对支持通道的类提供了getChannel()方法
本地IO 网络IO
FileInputStream/FileOutputStream Socket
RandomAccessFile ServerSocket
DatagramSocket
  1. 在JDK1.7中的NIO.2针对各个通道提供了静态方法open()
  2. 在JDK1.7中的NIO.2的Files工具类的newByteChannel()
public static void main(String[] args) throws Exception {
	// 1本地IO获取通道
	FileInputStream inputStream = new FileInputStream(resourceFilePath);
    FileOutputStream outputStream = new FileOutputStream(targetFilePath);
    FileChannel inChannel = inputStream.getChannel();
    FileChannel outChannel = outputStream.getChannel();
	
	// 2.通过open方法获取
	FileChannel.open(Paths.get(resourceFilePath), StandardOpenOption.READ);
	FileChannel.open(Paths.get(targetFilePath),
                        StandardOpenOption.READ,
                        StandardOpenOption.WRITE,
                        StandardOpenOption.CREATE);
}

四、案例

4.1 FileChannel-文件复制

public class FileNioTest {
    public static void main(String[] args) {
        String resourceFilePath = "D:\\test\\copyFileTest.txt";
        String targetFilePath = "D:\\test\\copyFileTest-1.txt";
        // 复制文件
        // copyFile1(resourceFilePath, targetFilePath);
        // copyFile2(resourceFilePath, targetFilePath);
        copyFile3(resourceFilePath, targetFilePath);
    }

    private static void copyFile1(String resourceFilePath, String targetFilePath) {
        // 使用FileChannel配合缓冲区实现文件复制的功能
        try (
                FileInputStream inputStream = new FileInputStream(resourceFilePath);
                FileOutputStream outputStream = new FileOutputStream(targetFilePath);
                FileChannel inChannel = inputStream.getChannel();
                FileChannel outChannel = outputStream.getChannel();
        ) {
            ByteBuffer buffer = ByteBuffer.allocate(1024);

            while (inChannel.read(buffer) != -1) {
                // 切换读模式
                buffer.flip();
                outChannel.write(buffer);
                // 清空
                buffer.clear();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static void copyFile2(String resourceFilePath, String targetFilePath) {
        // 内存映射文件的方式实现文件复制
        try (
                FileChannel inChannel = FileChannel.open(Paths.get(resourceFilePath), StandardOpenOption.READ);
                FileChannel outChannel = FileChannel.open(Paths.get(targetFilePath),
                        StandardOpenOption.READ,
                        StandardOpenOption.WRITE,
                        StandardOpenOption.CREATE);
        ) {
            MappedByteBuffer inMap = inChannel.map(FileChannel.MapMode.READ_ONLY, 0, inChannel.size());
            MappedByteBuffer outMap = outChannel.map(FileChannel.MapMode.READ_WRITE, 0, inChannel.size());
            byte[] b = new byte[inMap.limit()];
            // 从磁盘文件中获取数据写入到b字节数组中
            inMap.get(b);
            // 将b字节数组中的数据写入到磁盘文件中
            outMap.put(b);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static void copyFile3(String resourceFilePath, String targetFilePath) {
        // Channel-to-channel方式实现复制:零拷贝方式
        try (
                FileChannel inChannel = FileChannel.open(Paths.get(resourceFilePath), StandardOpenOption.READ);
                FileChannel outChannel = FileChannel.open(Paths.get(targetFilePath),
                        StandardOpenOption.READ,
                        StandardOpenOption.WRITE,
                        StandardOpenOption.CREATE);
        ) {
            // inChannel.transferTo(0, inChannel.size(), outChannel);
            outChannel.transferFrom(inChannel, 0, inChannel.size());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
学新通

4.2 SocketChannel、ServerSocketChannel

案例:通过NIO实现多人聊天室

4.1 服务端Server

public class Server {
    public void start() throws IOException {
        // 创建 selector
        Selector selector = Selector.open();
        // 创建ServerSocketChannel,并绑定监听端口
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(9999));
        // 将Channel设置为非阻塞模式
        serverSocketChannel.configureBlocking(false);
        // 将Channel注册到Selector上,监听连接事件
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println("-------------服务端启动成功---------------");
        while (true) {
            // 循环调用Selector的select方法,检测就绪情况
            int i = selector.select();
            if (i == 0) {
                continue;
            }
            // 调用selectedKeys方法获取就绪channel集合
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                iterator.remove();
                // 判断就绪事件种类,调用业务处理方法
                if (selectionKey.isAcceptable()) {
                    acceptable(serverSocketChannel, selector);
                } else if (selectionKey.isWritable()) {
                    System.out.println("----------Writable");
                } else if (selectionKey.isReadable()) {
                    readable(selectionKey, selector);
                } else if (selectionKey.isConnectable()) {
                    System.out.println("----------Connectable");
                }
            }
        }
    }

    /**
     * 可读事件处理方法
     *
     * @param selectionKey
     * @param selector
     * @throws IOException
     */
    private void readable(SelectionKey selectionKey, Selector selector) throws IOException {
        // 从 selectionKey中获取已经就绪的channel
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        // 创建 byteBuffer
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        StringBuffer sb = new StringBuffer();
        // 获取客户端的消息
        while (socketChannel.read(byteBuffer) > 0) {
            byteBuffer.flip();
            sb.append(Charset.forName("utf-8").decode(byteBuffer));
            byteBuffer.clear();
        }
        System.out.println(sb.toString());
        // 将channel再次注册到selector上,监听它的可读事件
        socketChannel.register(selector, SelectionKey.OP_READ);

        if (sb.toString().length() != 0) {
            // 广播给其他客户端
            broadCast(selector, socketChannel, sb.toString());
        }
    }

    /**
     * 处理连接事件
     *
     * @param serverSocketChannel
     * @param selector
     * @throws IOException
     */
    private void acceptable(ServerSocketChannel serverSocketChannel, Selector selector) throws IOException {
        // 获取要连接
        SocketChannel socketChannel = serverSocketChannel.accept();
        // 将Channel设置为非阻塞模式
        socketChannel.configureBlocking(false);
        // 将Channel注册到Selector上,监听连接事件
        socketChannel.register(selector, SelectionKey.OP_READ);
        // 返回消息给客户端
        socketChannel.write(Charset.forName("utf-8").encode("建立连接成功!"));
    }

    /**
     * 广播给其他客户端
     *
     * @param selector
     * @param socketChannel 当前客户端对象
     * @param sb            消息
     */
    private void broadCast(Selector selector, SocketChannel socketChannel, String sb) {
        // 获取所有连接的 SelectionKey
        Set<SelectionKey> keys = selector.keys();
        keys.forEach(key -> {
            Channel channel = key.channel();
            // 剔除当前客户端
            if (channel instanceof SocketChannel && channel != socketChannel) {
                try {
                    // 发送消息
                    ((SocketChannel) channel).write(Charset.forName("utf-8").encode(sb));
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    public static void main(String[] args) throws IOException {
        new Server().start();
    }
}
学新通

4.2 客户端Client

public class ClientA {
    private String name;

    public ClientA(String name) {
        this.name = name;
    }

    public void start() throws IOException {
        // 连接服务器
        SocketChannel socketChannel = SocketChannel.open(
                new InetSocketAddress("127.0.0.1", 9999)
        );
        // 接收服务端响应消息
        Selector selector = Selector.open();
        socketChannel.configureBlocking(false);
        socketChannel.register(selector, SelectionKey.OP_READ);
        ClientThread thread = new ClientThread();
        thread.setSelector(selector);
        new Thread(thread).start();

        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNextLine()) {
            String sb = scanner.nextLine();
            socketChannel.write(Charset.forName("utf-8").encode(this.name   ":"   sb.trim()));
        }
    }

    public static void main(String[] args) throws IOException {
        new ClientA("普通人").start();
    }
}
学新通

ClientThread

public class ClientThread implements Runnable {
    private Selector selector;

    public void setSelector(Selector selector) {
        this.selector = selector;
    }

    @Override
    public void run() {
        try {
            while (true){
                int i = selector.select();
                if (i == 0){
                    continue;
                }
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()){
                    SelectionKey selectionKey = iterator.next();
                    iterator.remove();
                    if (selectionKey.isReadable()){
                        readable(selectionKey);
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 可读事件处理方法
     * @param selectionKey
     * @throws IOException
     */
    private void readable(SelectionKey selectionKey) throws IOException {
        // 从 selectionKey中获取已经就绪的channel
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        // 创建 byteBuffer
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        StringBuffer sb = new StringBuffer();
        // 获取客户端的消息
        while (socketChannel.read(byteBuffer) > 0){
            byteBuffer.flip();
            sb.append(Charset.forName("utf-8").decode(byteBuffer));
        }
        // 将channel再次注册到selector上,监听它的可读事件
        socketChannel.register(selector, SelectionKey.OP_READ);

        System.out.println(sb.toString());
    }
}
学新通

4.3 DatagramChannel

SocketChannel 创建的是 TCP 连接,DatagramChannel 则创建的是 UDP 连接。

创建 DatagramChannel 的模式和创建其他 socket 通道是一样的:调用静态的 open( ) 方法来创建一个新实例。新 DatagramChannel 会有一个可以通过调用 socket( ) 方法获取的对等 DatagramSocket 对象。DatagramChannel对象既可以充当服务器(监听者)也可以充当客户端(发送者)。

DatagramChannel channel = DatagramChannel.open( );
DatagramSocket socket = channel.socket( );
socket.bind (new InetSocketAddress (9999));

DatagramChannel 是无连接的。每个数据报(datagram)都是一个自包含的实体,拥有它自己的目的地址及不依赖其他数据报的数据净荷。与面向流的的 socket 不同,DatagramChannel 可以发送单独的数据报给不同的目的地址。同样,DatagramChannel 对象也可以接收来自任意地址的数据包。每个到达的数据报都含有关于它来自何处的信息(源地址SocketAddress)。

// 打开 DatagramChannel
DatagramChannel channel = DatagramChannel.open();
InetSocketAddress sendAddress = new InetSocketAddress(9999);
// 绑定
channel.bind(sendAddress);
ByteBuffer buffer = ByteBuffer.allocate(1024);

// 接收
while (true) {
    buffer.clear();
    SocketAddress socketAddress = channel.receive(buffer);
    buffer.flip();
    System.out.println(Charset.forName("UTF-8").decode(buffer));
}

1. 打开DatagramChannel

通过 9999 端口接收 UDP 报文数据

DatagramChannel server = DatagramChannel.open();
server.socket().bind(new InetSocketAddress(9999));

2. 接收数据

通过 DatagramChannel.receive() 接收 UDP 报文数据

public static void testReceive() throws IOException {
    // 打开 DatagramChannel
    DatagramChannel receiveChannel = DatagramChannel.open();
    InetSocketAddress sendAddress = new InetSocketAddress(9999);
    // 绑定
    receiveChannel.bind(sendAddress);

    ByteBuffer buffer = ByteBuffer.allocate(1024);

    // 接收
    while (true) {
        buffer.clear();
        SocketAddress socketAddress = receiveChannel.receive(buffer);
        buffer.flip();
        System.out.println(Charset.forName("UTF-8").decode(buffer));
    }
}
学新通

3. 发送数据

通过 DatagramChannel.send() 发送 UDP 报文数据

public static void testSend() throws IOException {
    // 打开 DatagramChannel
    DatagramChannel sendChannel = DatagramChannel.open();
    InetSocketAddress sendAddress = new InetSocketAddress("127.0.0.1", 9999);

    // 发送
    ByteBuffer buffer = ByteBuffer.wrap(String.format("client send:%s", System.currentTimeMillis()).getBytes(StandardCharsets.UTF_8));
    sendChannel.send(buffer, sendAddress);
    System.out.println("send success");
}

4. 连接
UDP 不存在真正意义上的连接,这里的连接是向特定服务地址用 read() / write() 接收/发送 数据包

public static void testConnect() throws IOException {
    // 打开DatagramChannel
    DatagramChannel connChannel = DatagramChannel.open();
    // 绑定
    connChannel.bind(new InetSocketAddress(9999));
    // 连接
    connChannel.connect(new InetSocketAddress("127.0.0.1", 9999));
    // write方法
    connChannel.write(ByteBuffer.wrap(String.format("test connect:%s", System.currentTimeMillis()).getBytes(StandardCharsets.UTF_8)));
    // buffer
    ByteBuffer buffer = ByteBuffer.allocate(1024);

    while (true) {
        buffer.clear();
        connChannel.read(buffer);
        buffer.flip();
        System.out.println(Charset.forName("UTF-8").decode(buffer));
    }
}
学新通

注意:read()write() 只有在 connect() 后才能使用,不然会抛 NotYetConnectedException 异常,用 read() 接收时,如果没有接收包,会抛 PortUnreachableException 异常。

参考:https://blog.csdn.net/qq_38526573/article/details/89207100

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhfhgagi
系列文章
更多 icon
同类精品
更多 icon
继续加载