java nio

nio

Posted by CHuiL on April 4, 2021

基本io模型

image

select/poll epoll的区别

select和poll都是需要传递所有待监听的文件描述符给内核,然后内核会去查找每个fd是否准备好,当fd很多的时候,效率会直线下降,之后内核返回一个就绪的文件描述符集合,我们需要遍历所有描述符以查看是否在返回的集合中;处理完之后重新调用时还需要再一次传递完整的文件描述符;当检查大量的文件描述符时,需用从用户态到内核态来回拷贝这些数据将会占用大量的CPU时间;

而且select在一个进程中打开的文件描述符是有限制的,默认是2048个,不过epoll没有这个限制,内存越大,上限越高;

epoll能提供高效得性能;他区别于select和poll的最大特点是内核能够“记住”文件描述符,不需要每次调用都epoll_wait()的时候都重新传递一遍完整的文件描述符,需要监听的文件描述符添加一次便可,还可设置监听的事件,而他内部是有一颗红黑树和一个ready链表组成的,当添加文件描述符时,如果存在就直接返回,不存在就添加到树干上;epoll采用基于事件的通知方式,一旦某个fd数据就绪时,内核就会采用类似callback的回调机制,迅速激活这个文件描述符,添加到队列上,并返回就绪的文件描述符;

java NIO

即java no-blocking IO;在jdk1.4中引入,是一种同步非阻塞的IO模型,也提供了JAVA IO多路复用的能力;其实在了解几种基本的io模型之后,就能明白java nio只是将该操作系统本身提供的多路复用io封装起来进行使用而已,然后为何需要多路复用io的思路是一样的,即提供一个线程能同时监听处理多个文件描述(java中是channel)的能力,本质上得益于select/epoll的实现;

java nio包提供了以下相关类:

  • Buffers:存储固定数量基本类型数据的线性缓冲容器;
  • Charsets:他们是16Unicode字符序列和字节序列直接的命名映射;提供字符集的解码器和编码器,能够在bytes和Uncode字符之间进行转;
  • Channles:他表示对一个实体的连接通道(实体有硬件设备、文件、网络套接字等),并通过该连接能够对其进行读写等io操作;
  • 非阻塞多路复用IO:多路复用指能够在一个channel中处理多个io的能力,而这就要使用Selectable channels;一个 selectable channel有blocking 和 non-blocking两个模式;

类关系图

buffers

缓冲区是由特定基元类型的元素组成的线性有限序列。他有三个重要的基本属性

  • capacity:容量;在初始化一个Buffer的时候需要指定容量
  • position:表示下一个应该给被读写的位置;该值不可以为负的,也不能大于limit;
  • limit:表示读写不应该超过的位置;该值不能为负的,也不能大于capacity;

image

每种Buffer都会提供基本的get和put操作,来对Buffer进行读写;(当我们的channel需要进行写操作时,需要从Buffer中get数据写入到内核缓冲区,读操作时就是将内核缓冲数据put到Buffer中,在从Buffer中取出)

mark标记位,并不一定会设置该值,该值不能小于0,也不能大于当前position;每当调用mark时,就会将当前position设置到mark上,每当调用reset时,就会将position设置为mark值;当mark大于postion或limit时,该值废弃;

0 <= mark <= position <= limit <= capacity

Buffer几个关键api

  • clear():设置limit=capacity,position=0;使该buffer能够进行新数据的写入
  • flip(): 设置limit=position,position=0;使该buffer准备好能够被读取;
  • rewind():使该buffer能够被重新读取,设置position=0;
  • slice(),slice(index,length):创建并返回子Buffer;
  • duplicate():浅拷贝创建Buffer副本;

Buffer不是线程安全的,要线程安全就要调用方去进行同步;

Buffer下有多种基本类型的实现;基本关系如下;其中最常用的应该就是ByteBuffer吧; image

MappedByteBuffer

MappedByteBuffer是NIO基于内存映射(mmap)这种零拷贝方式提供的一种实现
它继承自ByteBuffer,通过调用FileChannel.map()方法返回,底层通过调用本地方法map0()为文件分配一块虚拟内存(注意是内核空间的虚拟内存),作为它的内存映射区域,然后返回这块内存射区域的起始地址,这里就将映射这个文件的内核虚拟空间地址的起始地址返回给了用户进程;而MappedByteBuffer就通过这个起始地址来构建MappedByteBuffer;

public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException {
    int pagePosition = (int)(position % allocationGranularity);
    long mapPosition = position - pagePosition;
    long mapSize = size + pagePosition;
    try {
        addr = map0(imode, mapPosition, mapSize);
    } catch (OutOfMemoryError x) {
        System.gc();
        try {
            Thread.sleep(100);
        } catch (InterruptedException y) {
            Thread.currentThread().interrupt();
        }
        try {
            addr = map0(imode, mapPosition, mapSize);
        } catch (OutOfMemoryError y) {
            throw new IOException("Map failed", y);
        }
    }
    int isize = (int)size;
    Unmapper um = new Unmapper(addr, mapSize, isize, mfd);
    if ((!writable) || (imode == MAP_RO)) {
        return Util.newMappedByteBufferR(isize, addr + pagePosition, mfd, um);
    } else {
        return Util.newMappedByteBuffer(isize, addr + pagePosition, mfd, um);
    }
}

他们之间的关系可以看下图 image 通过map(),我们在用户空间中通过起始地址加偏移量来读写内核虚拟空间中的文件数据,并可以调用MappedByteBuffer的force()方法来讲缓冲区的更改强制写入到本地文件中;也就是上图中的write()操作;

MappedByteBuffer写示例

public void writeToFileByMappedByteBuffer() {
    Path path = Paths.get(getClass().getResource(FILE_NAME).getPath());
    byte[] bytes = CONTENT.getBytes(Charset.forName(CHARSET));
    try (FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.READ,
            StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING)) {
        MappedByteBuffer mappedByteBuffer = fileChannel.map(READ_WRITE, 0, bytes.length);
        if (mappedByteBuffer != null) {
            mappedByteBuffer.put(bytes);
            mappedByteBuffer.force();
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
}

MappedByteBuffer的一些重要特征

  • MappedByteBuffer使用是堆外的虚拟内存,因此分配(map)的内存大小不受 JVM 的 -Xmx 参数限制,但是也是有大小限制的。
  • MappedByteBuffer 在处理大文件时性能的确很高,但也存在内存占用、文件关闭不确定等问题,被其打开的文件只有在垃圾回收的才会被关闭,而且这个时间点是不确定的。
  • MappedByteBuffer 提供了文件映射内存的 mmap() 方法,也提供了释放映射内存的 unmap() 方法。然而 unmap() 是 FileChannelImpl 中的私有方法,无法直接显示调用。

DirectBuffer

DirectByteBuffer 的对象引用位于 Java 内存模型的堆里面,JVM 可以对 DirectByteBuffer 的对象进行内存分配和回收管理。DirectByteBuffer 内部的字节缓冲区位在于堆外的(用户态)直接内存,它是通过 Unsafe 的本地方法 allocateMemory() 进行内存分配,底层调用的是操作系统的 malloc() 函数。

初始化 DirectByteBuffer 时还会创建一个 Deallocator 线程,并通过 Cleaner 的 freeMemory() 方法来对直接内存进行回收操作,freeMemory() 底层调用的是操作系统的 free() 函数。

由于使用 DirectByteBuffer 分配的是系统本地的内存,不在 JVM 的管控范围之内,因此直接内存的回收和堆内存的回收不同,直接内存如果使用不当,很容易造成 OutOfMemoryError。

DirectByteBuffer 是 MappedByteBuffer 的具体实现类。Util.newMappedByteBuffer() 方法通过反射机制获取 DirectByteBuffer 的构造器,然后创建一个 DirectByteBuffer 的实例,对应的是一个单独用于内存映射的构造方法:

Charsets

提供将byte与字符集进行转换的编解码工具;

channels

类似在 Linux 之类操作系统上看到的文件描述符,是 NIO 中被用来支持批量式 IO 操作的一种抽象。他是对实体的一种连接,这里的实体包括文件,网络socket等;我们可以通过该channel对特定实体进行读写等io;
image 常见的有

  • FileChannel:读写文件
  • DatagramChannel: UDP协议网络通信
  • SocketChannel:TCP协议网络通信
  • ServerSocketChannel:监听TCP连接

FileChannel

可以通过FileChannel静态的open方法获取,也可以通过 FileInputStream, FileOutputStream或者RandomAccessFile 的getChannel来获取;
使用NIO读取文件的简单例子;

    public static void testFileChannel(){
        try( FileInputStream fileInputStream = new FileInputStream(UnlessCsvPath)) {
            FileChannel fileChannel = fileInputStream.getChannel();
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            while (fileChannel.read(byteBuffer)!=-1){
                byteBuffer.flip();
                System.out.println("读取出来的数据:"+ Charset.defaultCharset().decode(byteBuffer).toString());
                byteBuffer.clear();
            }
        }catch (Exception  e){
            System.out.println(e.getMessage());
        }
    }
FileChannel的transferTo(),transferFrom()方法

FileChannel进行文件拷贝的时候,可以使用transferTo(),transferFrom()方法;底层实现基于Sendfile的零拷贝技术实现数据传输的;基于 Sendfile 系统调用的零拷贝方式,整个拷贝过程会发生 2 次上下文切换,1 次 CPU 拷贝和 2 次 DMA 拷贝。 image

  • java nio底层通过sendfile()函数向内核(kernel)发起系统调用,上下文从用户态(user space)切换为内核态(kernel space)。
  • CPU 利用 DMA 控制器将数据从主存或硬盘拷贝到内核空间(kernel space)的读缓冲区(read buffer)。
  • CPU 将读缓冲区(read buffer)中的数据拷贝到的网络缓冲区(socket buffer)。
  • CPU 利用 DMA 控制器将数据从网络缓冲区(socket buffer)拷贝到网卡进行数据传输。
  • 上下文从内核态(kernel space)切换回用户态(user space),Sendfile 系统调用执行返回。

示例代码


public static void copyFileByChannel(File source, File dest) throws
        IOException {
    try (FileChannel sourceChannel = new FileInputStream(source)
            .getChannel();
         FileChannel targetChannel = new FileOutputStream(dest).getChannel
                 ();){
        for (long count = sourceChannel.size() ;count>0 ;) {
            long transferred = sourceChannel.transferTo(
                    sourceChannel.position(), count, targetChannel);            sourceChannel.position(sourceChannel.position() + transferred);
            count -= transferred;
        }
    }
 }

multiplexed,non-blocking I/O

多路非阻塞IO,主要由selectors, selectable channels 和 selection keys组成; 为了实现非阻塞多路复用IO,我们首先会创建一个或者多个selectable Channel,并设置为non-blocking模式,并通过调用自身的的register方法注册到selector上并返回一个selection key key,注册过程中可以选择特定关注的IO操作,这些操作将由selector检测是否准备就绪,一旦准备就绪就会将前面注册返回的selection key添加到selection-key set;可以通过遍历该set来获取key,然后通过该key可以得到其channel进行对应的io操作;

selector

即SelectableChannel的多路复用器;channel在selector中以selection key表示,selector维护三个主要的set;在所持有的key set中有已经就绪的key,会从阻塞中返回selected—key set;

  • Key set:表示所有已注册的channel对应的key;
  • selected—key set:表示所监听的channel中至少一种关注的io操作已经就绪的 selection key集合;
  • cancelled-key:已经取消但是还未取消注册的channel对应key集合,不可直接访问;当channel被close或者调用key的close方法,该key会被加入到该集合中;

==Linux 上依赖于epoll,Windows 上 NIO2(AIO)模式则是依赖于iocp。==

SelectableChannel

可被注册到selector的channel;关键就是register方法;

阻塞模式 一个SelectableChannel要么处于blocking mode,要么处于non-blocking mode。在阻塞模式下,在通道上调用的每个I/O操作都将阻塞,直到它完成。在非阻塞模式下,I/O操作永远不会阻塞,并且可能传输比请求的字节更少或根本没有字节。可选择通道的阻塞模式可以通过调用它的isBlocking方法来确定;新创建的SelectableChannel默认是blocking mode;在注册近selector之前必须设置为non-blocking mode;

image 可以看到,常见的 DatagramChannel,SocketChannel,ServerSocketChannel都属于SelectableChannel

SelectionKey

它表示一个注册到selector中的channel的令牌;当一个channel注册到一个selector中时会创建一个selectionKey,在channel关闭或者调用key的cancel方法时,会取消该key,但是并不是马上就将该key从seletor中移除,而是会想放入selector中的cancle key集合,并在下次选择操作时删除;
它维护两个整数值表示的操作集;操作集的每个位都表示键的通道所支持的可选择操作的类别。

  • interset set:兴趣集合;在我们注册传入的感兴趣的io操作集合,每次selector就会通过这个集合来检查该key是否有感兴趣的操作已经就绪;可以通过调用interestOps(int)方法修改;
  • ready set:已经就绪的操作集合;

操作类型

  • OP_ACCEPT: socket-accept operations.
  • OP_CONNECT:socket-connect operations.
  • OP_READ:read operations.
  • OP_WRITE:write operations.

multiplexed,non-blocking I/O 例子

public class NioServer {

public static void main(String[] args) throws IOException {
    // 创建一个selector
    Selector selector = Selector.open();

    // 初始化TCP连接监听通道
    ServerSocketChannel listenChannel = ServerSocketChannel.open();
    listenChannel.bind(new InetSocketAddress(9999));
    listenChannel.configureBlocking(false);
    // 注册到selector(监听其ACCEPT事件)
    listenChannel.register(selector, SelectionKey.OP_ACCEPT);

    // 创建一个缓冲区
    ByteBuffer buffer = ByteBuffer.allocate(100);

    while (true) {
        selector.select(); //阻塞,直到有监听的事件发生
        Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator();

        // 通过迭代器依次访问select出来的Channel事件
        while (keyIter.hasNext()) {
            SelectionKey key = keyIter.next();

            if (key.isAcceptable()) { // 有连接可以接受
                SocketChannel channel = ((ServerSocketChannel) key.channel()).accept();
                channel.configureBlocking(false);
                channel.register(selector, SelectionKey.OP_READ);

                System.out.println("与【" + channel.getRemoteAddress() + "】建立了连接!");

            } else if (key.isReadable()) { // 有数据可以读取
                buffer.clear();

                // 读取到流末尾说明TCP连接已断开,
                // 因此需要关闭通道或者取消监听READ事件
                // 否则会无限循环
                if (((SocketChannel) key.channel()).read(buffer) == -1) {
                    key.channel().close();
                    continue;
                } 

                // 按字节遍历数据
                buffer.flip();
                while (buffer.hasRemaining()) {
                    byte b = buffer.get();

                    if (b == 0) { // 客户端消息末尾的\0
                        System.out.println();

                        // 响应客户端
                        buffer.clear();
                        buffer.put("Hello, Client!\0".getBytes());
                        buffer.flip();
                        while (buffer.hasRemaining()) {
                            ((SocketChannel) key.channel()).write(buffer);
                        }
                    } else {
                        System.out.print((char) b);
                    }
                }
            }

            // 已经处理的事件一定要手动移除
            keyIter.remove();
        }
    }
}
}

简单Reactor模型伪代码

   interface ChannelHandler{
      void channelReadable(Channel channel);
      void channelWritable(Channel channel);
   }
   class Channel{
     Socket socket;
     Event event;//读,写或者连接
   }

   //IO线程主循环:
   class IoThread extends Thread{
   public void run(){
   Channel channel;
   while(channel=Selector.select()){//选择就绪的事件和对应的连接
      if(channel.event==accept){
         registerNewChannelHandler(channel);//如果是新连接,则注册一个新的读写处理器
      }
      if(channel.event==write){
         getChannelHandler(channel).channelWritable(channel);//如果可以写,则执行写事件
      }
      if(channel.event==read){
          getChannelHandler(channel).channelReadable(channel);//如果可以读,则执行读事件
      }
    }
   }
   Map<Channel,ChannelHandler> handlerMap;//所有channel的对应事件处理器
  }

参考

如何学习Java的NIO?
Java NIO
Java NIO浅析
一文让你彻底理解 Java NIO 核心组件
支撑百万并发的“零拷贝”技术,你了解吗?- 关于多种零拷贝技术的详细解释,以及java nio中哪些技术运用到了对应的零拷贝技术