• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Java的NIO

武飞扬头像
log.error
帮助3

一、NIO概述

Java NIO由以下几个核心部分组成:

  • Channels
  • Buffers
  • Selectors

1.1 Channel

Channle的作用和IO中的stream差不多一个等级。stream为单项,而channel是双向的,既可以进行读操作,也可以进行写操作。

1.2 Buffer

NIO中的关键buffer实现有:ByteBuffer,CharBuffer,DoubleBuffer,FloatBuffer,IntBuffer,ShortBuffer,分别对应基本数据类型:byte,char,double,float,int,long,short

1.3 Selector

Selector在单线程中处理多个Channel。例如在一个聊天服务器内,要使用Selector,得像Selector注册channel,然后调用select()方法,这个方法会一直阻塞到某个注册的通道有事件就绪。一旦这个方法返回,线程就可以处理这些事件,事件的例子有如新的连接进来,数据接收等。

1.4 三者的联系

  • 一个channel就像一个流,但channel是双向的,channel读数据到Buffer,Buffer写数据到Channel

    学新通

  • 一个Selector允许一个线程处理多个channel

学新通

二、Channel的概述

2.1 Channel的实现

  • FileChannel
  • DatagramChannel
  • SocketChannel
  • ServerSocketChannel
  1. FileChannel从文件中读写数据。
  2. DatagramChannel能通过UDP读写网络中的数据。
  3. SocketChannel能通过TCP读写网络中的数据。
  4. SeverSocketChannel可以监听新进来的TCP连接,像Web服务器那样。对每一个新进来的连接都会创建一个SocketChannel。

2.2 FileChannel的实例

2.2.1 从FileChannel内读取数据

package com.zjh.channel;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

/**
 * @author zjh
 * @date 2022/12/30 17:40
 * @desc 使用FileChannel读取文件
 */
public class FileChannelDemo1 {
    public static void main(String[] args) throws IOException {
        // 创建fileChannel
        RandomAccessFile file = new RandomAccessFile("src/main/resources/1.txt", "rw");
        FileChannel fileChannel = file.getChannel();
        // 创建buffer
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        // 读取文件并输出文件内容
        int len;
        while ((len = fileChannel.read(buffer)) != -1) {
            System.out.println(new String(buffer.array(),0,len));
        }
    }
}
学新通

2.2.2 向FileChannel写入数据

package com.zjh.channel;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

/**
 * @author zjh
 * @date 2022/12/30 18:03
 * @desc 使用FileChannnel向文件内写入文件
 */
public class FileChannelDemo2 {
    public static void main(String[] args) throws IOException {
        // 创建文件对象获取FileChannel
        RandomAccessFile file = new RandomAccessFile("src/main/resources/2.txt", "rw");
        FileChannel channel = file.getChannel();
        // 需要写入的内容
        String s = "hello world";
        // buffer
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        buffer.clear();
        buffer.put(s.getBytes());
        // 转换读写模式
        buffer.flip();
        // 开始写出内容 hasRemaining(当缓冲区内还有剩余字节时继续写出)
        while (buffer.hasRemaining()){
            channel.write(buffer);
        }
        // 关闭fileChannel
        channel.close();
    }
}
学新通

学新通

2.3 SocketChannel的实例

SeverSocketChannel是一个基于通道的socket监听器。执行着与ServerSocket类似的功能。能够在非阻塞的模式下运行。以下为ServerSocketChannel使用的一个实例:

package com.zjh.socket;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

/**
 * @author zjh
 * @date 2022/12/31 11:16
 * @desc ServerSocketChannel的使用
 */
public class ServerSocketChannelDemo {
    // 设置绑定端口
    public static final int port = 8888;

    public static void main(String[] args) throws IOException, InterruptedException {
        // 创建ServerSocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        // 绑定端口
        serverSocketChannel.bind(new InetSocketAddress(port));
        // 设置为非阻塞模式
        serverSocketChannel.configureBlocking(false);
        // 开始监听
        while (true) {
            SocketChannel socketChannel = serverSocketChannel.accept();
            // 进行判断,socket为空sleep 2s,非空则执行对应逻辑
            if (socketChannel == null) {
                System.out.println("没有新的socket进入!");
                Thread.sleep(2000);
            } else {
                SocketAddress address = socketChannel.getRemoteAddress();
                System.out.println("来自"   address   "加入连接");
                // 关闭socketChannel连接
                socketChannel.close();
            }
        }
    }
}
学新通

启动程序,通过浏览器来模拟连接:

学新通
学新通

2.3.2 SocketChannel

(1)SocketChannel介绍

Java NIO中的SocketChannel是一个连接到TCP网络套接字的通道。SocketChannel是一种面向流连接sockets套接字的可选择通道,具有以下特点:

  • SocketChannel是用来连接Socket套接字
  • SocketChannel主要用途用来处理网络中的I/O通道
  • SocketChannel是基于TCP连接传输
  • SocketChannel实现了可选择通道,可以被多路复用
(2)SocketChannel特征
  1. 对于已经存在的socket不能创建SocketChannel
  2. SocketChannel中提供的open接口创建的Channel并没有进行网络级联,需要用connect接口连接到指定地址
  3. 未进行连接的SocketChannel执行I/O操作时,或抛出NotYetConnectionException
  4. SocketChannel支持两种I/O模式:阻塞式和非阻塞式
  5. SocketChannel支持异步关闭。如果SocketChannel在一个线程上read阻塞,另一个线程对该SocketChannel调用shutdownInput,则读阻塞的线程将返回-1表示没有读取任何数据;如果SocketChannel在一个线程上write阻塞,另一个线程对该SocketChannel调用shutdownWrite,则写阻塞的线程将抛出AsynchronousCloseException
  6. SocketChannel支持设定参数
SO_SNDBUF 套接字发送缓冲区大小
SO_RCVBUF 套接字接收缓冲区大小
SO_KEEPALIVE 保持连接存活
O_REUSEADDR 复用地址
SO_LINGER 有数据传输时延缓关闭Channel(只在非阻塞模式下有用)
TCP_NODELAY 禁用Naggle算法
(3)SocketChannel的使用

以下是使用SocketChannel连接百度,并读取数据的例子

package com.zjh.socket;

import javax.sound.midi.SoundbankResource;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

/**
 * @author zjh
 * @date 2022/12/31 12:53
 * @desc SocketChannel的简单使用
 */
public class SocketChannelDemo {
    public static void main(String[] args) throws IOException {
        // 打开socketChannel
        SocketChannel socketChannel = SocketChannel.open();
        // 连接到百度
        socketChannel.connect(new InetSocketAddress("www.百度.com", 80));
        // 创建buffer
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        // 采用非阻塞读模式
        socketChannel.configureBlocking(false);
        // 读取数据
        int read = socketChannel.read(buffer);
        System.out.println("read over!");
        socketChannel.close();
    }
}
学新通

学新通

三、Buffer缓冲区

3.1 缓冲区Buffer

一个用于特定基本数据类型的容器。由java.nio包所定义,所有缓冲区都是Buffer抽象类的子类。Java NIO中的Buffer主要用于与NIO通道进行交互,数据是从通道读入缓冲区,从缓冲区写入通道中的

channel
channel
channel
channel
套接字
Buffer
NIO程序

3.2 Buffer类及其子类

Buffer的作用类似一个数组,可以保存多个相同类型的数据。根据数据类型不同,有以下Buffer常用子类:

  • ByteBuffer
  • CharBuffer
  • ShortBuffer
  • IntBuffer
  • LongBuffer
  • FloatBuffer
  • DoubleBuffer

以上Buffer类都采用相似的方法进行数据管理,只是管理的数据类型不同,都可以通过下面的方法获取一个Buffer对象:

static XXXBuffer allocate(int capacity)

调用该方法会生成一个容量为capacity的XXXBuffer对象。

3.3 缓冲区的基本属性

Buffer中具有以下重要属性

  • 容量(capacity):作为一个内存块,Buffer具有一定的固定大小,也成为容量,缓冲区熔炼不能为负,并且创建后不能修改。

  • 限制(limit):表示缓冲区内可以操作数据的大小(limit后的数据不能进行读写)。缓冲区限制不能为负,并且不能大于其容量。写入模式,限制等于buffer的容量。读取模式下,limit等于写入的数据量。

  • 位置(position):下一个要读取或者写入数据的索引。缓冲区的位置不能为负,并且不能大于其限制

  • 标记(mark)与重置(reset):标记是一个索引,通过Buffer中的mark()方法指定Buffer中一个特定的positon,之后可以通过调用reset()方法恢复到这个position。

标记、位置、限制、容量遵守以下的表达式:

0 <= mark <= position <= limit <= capacity

学新通

3.4 Buffer的常见方法

方法 作用
Buffer clear() 清空缓冲区并返回对缓冲区的引用
Buffer flip() 将缓冲区的界限设置为当前位置,并将当前位置重置为0
int capacity() 返回Buffer的capacity大小
boolean hasRemaning() 判断缓冲区中是否还有元素
int limit() 返回Buffer的界限(limit)的位置
Buffer limit(int n) 设置缓冲区的界限为n,并返回一个具有新limit的缓冲区对象
Buffer mark() 对缓冲区设置标记
int positon() 返回缓冲区的当前position
Buffer position(int n) 将设置缓冲区的当前位置为n,并返回修改后的Buffer对象
int remaining() 返回position和limit间的元素个数
Buffer reset() 将位置position转到以前设置的mark所在的位置
Buffer rewind() 将位置设为0,取消设置的mark

3.5 Buffer的数据操作Api

Buffer的所有子类提供了两个用于数据操作的方法:get()/put()

方法 作用
get() 读取单个字节
get(byte[] dst) 批量读取多个字节到dst内
get(int index) 读取指定索引位置的字节(不会移动position)
put(byte b) 将给定的单个字节写入缓冲区的当前位置
put(byte[] src) 将src中的字节写入缓冲区的当前位置
put(int index,byte b) 将指定字节写入缓冲区的索引位置(不会移动position)

使用Buffer读写数据一般遵循以下四个步骤:

  1. 写入数据到Buffer
  2. 调用flip()方法,转换读写模式
  3. 从Buffer中读取数据
  4. 调用buffer.clear()方法或者buffer.compact()方法清除缓冲区

3.6 直接与非直接缓冲区

根据官方文档:

byte buffer可以是两种类型,一种是基于直接内存(也就是非堆内存);另一种是非直接内存(也就是堆内存)。对于直接内存来说,JVM将会在IO操作上具有更高的性能,因为它直接作用于本地系统的IO操作。而非直接内存是下面这样的作用链:

本地IO
直接内存
非直接内存

而直接内存是:

本地IO
直接内存

使用场景:

  • 有很大的数据需要存储,且生命周期很长。
  • 频繁的IO操作,比如网络并发场景。

四、Selector选择器

4.1 选择器的概述

选择器(Selector)是SelectableChannel对象的多路复用器,Selector可以同时监控多个SelectableChannel的IO状况,也就是说,利用Selector可使一个单独的线程管理多个Channel。Selector是非阻塞IO的核心。

SelectableChannel
AbstractSelectableChannel
SocketChannel
ServerSocketChannel
DatagramChannel
PipeSinkChannel
  • Java的NIO,用非阻塞的IO方式。可以用一个线程,处理多个的客户端连接,就会使用到Selector(选择器)
  • Selector能够检测多个注册的通道上是否有事件发生(多个Channel以事件的方式可以注册代同一个Selector),如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求。
  • 只有在 连接/通道 真正有读写事件发生时,才会进行读写,就大大地减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程。

4.2 选择器的使用

下面是使用Selector并注册Channel实现事件监听的代码:

package com.zjh.selector;

import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;

/**
 * @author zjh
 * @date 2023/1/1 18:00
 * @desc 使用Selector监听ServerSocketChannel上的accept事件
 */
public class SelectorDemo01 {
    public static void main(String[] args) throws IOException {
        // 开启一个serverSocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        // 切换为非阻塞模式
        serverSocketChannel.configureBlocking(false);
        // 获取Selector
        Selector selector = Selector.open();
        // 向selector注册channel的accept事件
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    }
}
学新通

当调用register(Selector sel,int ops)将通道注册选择器时,选择器对通道的监听事件,需要通过第二个参数ops指定。可以监听的事件类型(用SelectionKey的四个常量表示):

  • 读:SelectionKey.OP_READ(1)
  • 写:SelectionKey.OP_WRITE(4)
  • 连接:SelectionKey.OP_CONNECT(8)
  • 接收:SelectionKey.OP_ACCEPT(16)

若注册时不止一个监听事件,则可以使用位或操作符连接,如下:

int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE

五、综合案例

目的:使用NIO实现一个在线的聊天程序

实现功能:此案例包括两个程序,一个为客户端,一个为服务端

客户端:

  1. 能够连接到服务端
  2. 接受其他用户发送的消息
  3. 发送消息到其他用户

服务端:

  1. 能够处理客户端的连接请求
  2. 转发客户端的消息
  3. 当用户下线时打印提示消息

客户端代码如下:

package com.zjh;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Scanner;

/**
 * @author zjh
 * @date 2023/1/1 19:23
 * @desc nio在线聊条室客户端的实现
 */
public class Client {
    public Selector selector;
    public SocketChannel socketChannel;

    public Client() {
        try {
            // 开启Selector
            selector = Selector.open();
            // 开启SocketChannel
            socketChannel = SocketChannel.open();
            socketChannel.connect(new InetSocketAddress("127.0.0.1", 8888));
            // 开启非阻塞模式
            socketChannel.configureBlocking(false);
            // 注册读事件到selector上
            socketChannel.register(selector, SelectionKey.OP_READ);
            System.out.println("客户端:"   socketChannel.getLocalAddress());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        Client client = new Client();
        // 创建一个线程专门读取消息
        new Thread(new Runnable() {
            @Override
            public void run() {
                client.readInfo();
            }
        }).start();
        // 读取用户输入并发送到服务端
        Scanner sc = new Scanner(System.in);
        while (sc.hasNextLine()) {
            String s = sc.nextLine();
            client.sendInfo(s);
        }
    }


    /**
     * 发送消息到服务端
     *
     * @param msg
     */
    public void sendInfo(String msg) {
        try {
            socketChannel.write(ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8)));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 从服务端读取消息
     */
    public void readInfo() {
        SocketChannel channel;
        SelectionKey selectionkey;
        try {
            while (selector.select() >= 0) {
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    selectionkey = iterator.next();
                    if (selectionkey.isReadable()) {
                        channel = (SocketChannel) selectionkey.channel();
                        String msg = "";
                        // 创建缓冲区读取
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        int len;
                        while (true) {
                            len = socketChannel.read(buffer);
                            if (len <= 0) {
                                break;
                            }
                            msg  = new String(buffer.array(), 0, len);
                        }
                        System.out.println(msg);
                    }
                }
                iterator.remove();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
学新通

服务端代码如下:

package com.zjh;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;

/**
 * @author zjh
 * @date 2023/1/1 19:23
 * @desc nio在线聊天室服务端的实现
 */
public class Server {
    public Selector selector;
    public ServerSocketChannel serverSocketChannel;
    private static final int PORT = 8888;

    public static void main(String[] args) {
        Server server = new Server();
        server.listen();
    }

    public Server() {
        try {
            // 开启selector
            selector = Selector.open();
            // 开启serverSocketChannel
            serverSocketChannel = ServerSocketChannel.open();
            // 绑定监听端口
            serverSocketChannel.bind(new InetSocketAddress(PORT));
            // 开启非阻塞模式
            serverSocketChannel.configureBlocking(false);
            // 注册监听事件
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 服务器的监听方法,处理客户端的请求
     */
    public void listen() {
        // 打印信息
        System.out.println(Thread.currentThread().getName()   "正在监听。。。。");
        try {
            while (selector.select() > 0) {
                System.out.println("开始新一轮事件处理!");
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                // 进行处理
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    // accept事件,注册到selector内,注册读事件
                    if (key.isAcceptable()) {
                        SocketChannel socketChannel = serverSocketChannel.accept();
                        // 切换为非阻塞模式
                        socketChannel.configureBlocking(false);
                        System.out.println(socketChannel.getRemoteAddress()   "上线!");
                        socketChannel.register(selector, SelectionKey.OP_READ);
                    } else if (key.isReadable()) {
                        // 读取数据进行处理
                        readData(key);
                    }
                }
                iterator.remove();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 读取客户端发送的数据,进行处理
     *
     * @param key
     */
    public void readData(SelectionKey key) {
        SocketChannel socketChannel = null;
        try {
            // 获取SocketChannel
            socketChannel = (SocketChannel) key.channel();
            // 创建缓冲区
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            // 读取内容并拼接消息
            StringBuilder sb = new StringBuilder();
            int len;
            while (true) {
                len = socketChannel.read(buffer);
                if (len <= 0) {
                    break;
                }
                sb.append(new String(buffer.array(), 0, len));
            }
            // 获取客户端的地址
            SocketAddress clientAddress = socketChannel.getRemoteAddress();
            String msg = clientAddress   ",说:"   sb.toString();
            // 开始消息转发
            System.out.println(msg);
            sendInfoToOther(msg, key);

        } catch (Exception e) {
            try {
                System.out.println(socketChannel.getRemoteAddress()   "下线了");
                // 取消注册的事件
                key.channel();
                // 关闭通道
                socketChannel.close();
            } catch (Exception e1) {
                e1.printStackTrace();
            }
        }

    }

    /**
     * 转发消息到其他客户端并排除自己
     *
     * @param msg
     * @param key
     */
    private void sendInfoToOther(String msg, SelectionKey key) {
        Channel channel;
        try {
            for (SelectionKey selectedKey : selector.keys()) {
                channel = selectedKey.channel();
                // 判断key的类型
                if (channel instanceof SocketChannel && selectedKey != key) {
                    ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8));
                    SocketChannel sc = (SocketChannel) channel;
                    sc.write(buffer);
                    System.out.println("转发到客户端:"   sc.getRemoteAddress());
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhfhghjj
系列文章
更多 icon
同类精品
更多 icon
继续加载