参考文档:Java NIO全面详解(看这篇就够了)
NIO BufferAPI
1. NIO 简介
很多技术框架都使用NIO技术,学习和掌握Java NIO技术对于高性能、高并发网络的应用是非常关键的。
NIO 中的 N 可以理解为 Non-blocking,不单纯是 New,是解决高并发、I/O 高性能的有效方式。
Java NIO 是 Java1.4 之后推出来的一套 IO 接口,NIO 提供了一种完全不同的操作方式, NIO 支持面向缓冲区的、基于通道的 IO 操作。
新增了许多用于处理输入输出的类,这些类都被放在 java.nio 包及子包下,并且对原 java.io 包中的很多类进行改写,新增了满足 NIO 的功能。
2. BIO
BIO 全称是 Blocking IO,同步阻塞式 IO,是 JDK1.4 之前的传统 IO 模型。
Java BIO:服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如下图所示:
虽然此时服务器具备了高并发能力,即能够同时处理多个客户端请求了,但是却带来了一个问题,随着开启的线程数目增多,将会消耗过多的内存资源,导致服务器变慢甚至崩溃,NIO 可以一定程度解决这个问题。
3. NIO
Java NIO: 同步非阻塞,服务器实现模式为一个线程处理多个请求(连接),即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求就进行处理。
一个线程中就可以调用多路复用接口(java中是select)阻塞同时监听来自多个客户端的IO请求,一旦有收到IO请求就调用对应函数处理,NIO擅长1个线程管理多条连接,节约系统资源。
3.1 NIO 核心实现
NIO 包含3个核心的组件:
- Channel(通道)
- Buffer(缓冲区)
- Selector(选择器)
关系图的说明:
- 每个 Channel 对应一个 Buffer。
- Selector 对应一个线程,一个线程对应多个 Channel。
- 该图反应了有三个 Channel 注册到该 Selector。
- 程序切换到那个 Channel 是由事件决定的(Event)。
- Selector 会根据不同的事件,在各个通道上切换。
- Buffer 就是一个内存块,底层是有一个数组。
- 数据的读取和写入是通过 Buffer,但是需要flip()切换读写模式,而 BIO 是单向的,要么输入流要么输出流。
3.2 Channel
Channel 是 NIO 的核心概念,它表示一个打开的连接,这个连接可以连接到 I/O 设备(例如:磁盘文件,Socket)或者一个支持 I/O 访问的应用程序,Java NIO 使用缓冲区和通道来进行数据传输。
3.2.1 FileChannel
本地文件IO通道,用于读取、写入、映射和操作文件的通道,使用文件通道操作文件的一般流程为:
1)获取通道
文件通道通过 FileChannel 的静态方法 open() 来获取,获取时需要指定文件路径和文件打开方式。
// 获取文件通道
FileChannel.open(Paths.get(fileName), StandardOpenOption.READ);
2)创建字节缓冲区
文件相关的字节缓冲区有两种,一种是基于堆的 HeapByteBuffer,另一种是基于文件映射,放在堆外内存中的 MappedByteBuffer。
// 分配字节缓存
ByteBuffer buf = ByteBuffer.allocate(10);
3)读写操作
读取数据
一般需要一个循环结构来读取数据,读取数据时需要注意切换 ByteBuffer 的读写模式。
while (channel.read(buf) != -1){ // 读取通道中的数据,并写入到 buf 中
buf.flip(); // 缓存区切换到读模式
while (buf.position() < buf.limit()){ // 读取 buf 中的数据
text.append((char)buf.get());
}
buf.clear(); // 清空 buffer,缓存区切换到写模式
}
写入数据
for (int i = 0; i < text.length(); i++) {
buf.put((byte)text.charAt(i)); // 填充缓冲区,需要将 2 字节的 char 强转为 1 自己的 byte
if (buf.position() == buf.limit() || i == text.length() - 1) { // 缓存区已满或者已经遍历到最后一个字符
buf.flip(); // 将缓冲区由写模式置为读模式
channel.write(buf); // 将缓冲区的数据写到通道
buf.clear(); // 清空缓存区,将缓冲区置为写模式,下次才能使用
}
}
4)将数据刷出到物理磁盘,FileChannel 的 force(boolean metaData) 方法可以确保对文件的操作能够更新到磁盘。
channel.force(false);
5)关闭通道
channel.close();
3.2.2 SocketChannel
网络套接字 IO 通道,TCP 协议,针对面向流的连接套接字的可选择通道(一般用在客户端)。
TCP 客户端使用 SocketChannel 与服务端进行交互的流程为:
1)打开通道,连接到服务端
SocketChannel channel = SocketChannel.open(); // 打开通道,此时还没有打开 TCP 连接
channel.connect(new InetSocketAddress("localhost", 9090)); // 连接到服务端
2)分配缓冲区
ByteBuffer buf = ByteBuffer.allocate(10); // 分配一个 10 字节的缓冲区,不实用,容量太小
3)配置是否为阻塞方式(默认为阻塞方式)
channel.configureBlocking(false); // 配置通道为非阻塞模式
4)与服务端进行数据交互
5)关闭连接
channel.close(); // 关闭通道
3.2.3 ServerSocketChannel
网络通信 IO 操作,TCP 协议,针对面向流的监听套接字的可选择通道(一般用于服务端),流程如下:
1)打开一个 ServerSocketChannel 通道, 绑定端口
ServerSocketChannel server = ServerSocketChannel.open(); // 打开通道
2)绑定端口
server.bind(new InetSocketAddress(9090)); // 绑定端口
3)阻塞等待连接到来,有新连接时会创建一个 SocketChannel 通道,服务端可以通过这个通道与连接过来的客户端进行通信。等待连接到来的代码一般放在一个循环结构中
SocketChannel client = server.accept(); // 阻塞,直到有连接过来
4)通过 SocketChannel 与客户端进行数据交互
5)关闭 SocketChannel
client.close();
3.3 Buffer
缓冲区 Buffer 是 Java NIO 中一个核心概念,在NIO库中,所有数据都是用缓冲区处理的。
在读取数据时,它是直接读到缓冲区中的,在写入数据时,它也是写入到缓冲区中的,任何时候访问 NIO 中的数据,都是将它放到缓冲区中。
而在面向流I/O系统中,所有数据都是直接写入或者直接将数据读取到Stream对象中。
3.3.1 Buffer 数据类型
从类图中可以看到,7 种数据类型对应着 7 种子类,这些名字是 Heap 开头子类,数据是存放在 JVM 堆中的。
3.3.2 MappedByteBuffer
而 MappedByteBuffer 则是存放在堆外的直接内存中,可以映射到文件。
通过 java.nio 包和 MappedByteBuffer 允许 Java 程序直接从内存中读取文件内容,通过将整个或部分文件映射到内存,由操作系统来处理加载请求和写入文件,应用只需要和内存打交道,这使得 IO 操作非常快。
Mmap 内存映射和普通标准 IO 操作的本质区别在于它并不需要将文件中的数据先拷贝至 OS 的内核 IO 缓冲区,而是可以直接将用户进程私有地址空间中的一块区域与文件对象建立映射关系,这样程序就好像可以直接从内存中完成对文件读/写操作一样。
只有当缺页中断发生时,直接将文件从磁盘拷贝至用户态的进程空间内,只进行了一次数据拷贝,对于容量较大的文件来说(文件大小一般需要限制在1.5~2G以下),采用Mmap的方式其读/写的效率和性能都非常高,大家熟知的RocketMQ就使用了该技术。
3.3.3 Buffer数据流程
应用程序可以通过与 I/O 设备建立通道来实现对 I/O 设备的读写操作,操作的数据通过缓冲区 Buffer 来进行交互。
从 I/O 设备读取数据时:
-
应用程序调用通道 Channel 的 read() 方法;
-
通道往缓冲区 Buffer 中填入 I/O 设备中的数据,填充完成之后返回;
-
应用程序从缓冲区 Buffer 中获取数据。
往 I/O 设备写数据时:
- 应用程序往缓冲区 Buffer 中填入要写到 I/O 设备中的数据;
- 调用通道 Channel 的 write() 方法,通道将数据传输至 I/O 设备。
3.3.4 缓冲区核心方法
缓冲区存取数据的两个核心方法:
1)put():存入数据到缓冲区
- put(byte b):将给定单个字节写入缓冲区的当前位置
- put(byte[] src):将 src 中的字节写入缓冲区的当前位置
- put(int index, byte b):将指定字节写入缓冲区的索引位置(不会移动 position)
2)get():获取缓冲区的数据
- get() :读取单个字节
- get(byte[] dst):批量读取多个字节到 dst 中
- get(int index):读取指定索引位置的字节(不会移动 position)
3.3.5 Buffer 属性介绍
capacity (容量): 不能为负,不可更改;就是buffer的长度(buffer.length)
limit(限制): 指第一个不可被读入缓冲区元素的位置;不可为负,若 position 大于 limit,那么 limit 就是 position
position(位置): 指下一个被读入缓冲区元素的位置;不可为负,小于limit,默认索引由0开始;
mark (标记): 指在缓冲区设置标记;若调用 reset() 方法会回到 position 的位置;如果未设置 mark 调用 reset() 方法会报异常;如果positon 或者 limt 小于 mark 时,mark 被丢弃,其指为 -1;
0 <= mark <= position <= limit <= capacity
3.3.6 Buffer 方法介绍
position()
int position() 方法是返回缓冲区的位置;
@Test
public void testPosition(){
byte[] bytes = {15,17,25,13,46,18};
ByteBuffer wrap = ByteBuffer.wrap(bytes);
// 获得位置
int position = wrap.position();
System.out.println(position);//0
// 设置位置
Buffer position1 = wrap.position(5);
//[pos=5 lim=6 cap=6]
System.out.println(position1);
}
limit()
int limit() 返回缓冲区的限制;如果 position > limit ,position就是limit,自行验证;
public void testLimit(){
byte[] bytes = {15,17,25,13,46,18};
ByteBuffer wrap = ByteBuffer.wrap(bytes);
// 获得限制
int limit = wrap.limit();
System.out.println(limit);//6
// 设置限制
Buffer limit1 = wrap.limit(3);
//[pos=0 lim=3 cap=6]
System.out.println(limit1);
// 输出缓冲区元素
for (int i=0; i<bytes.length; i++){
// 只输出 15 17 25 继续get会报 BufferUnderflowException
System.out.println(wrap.get());
}
}
mark()
Buffer mark() 设置缓冲区标记;如果未设置mark调用reset()会报 InvalidMarkException 异常,自行验证;
public void testMark() {
byte[] bytes = {15, 17, 25, 13, 46, 18};
ByteBuffer wrap = ByteBuffer.wrap(bytes);
// 设置位置
wrap.position(1);
// 设置标记
wrap.mark();
// [pos=1 lim=6 cap=6]
System.out.println(wrap);
// 改变位置
wrap.position(4);
// 调用reset
wrap.reset();
// [pos=1 lim=6 cap=6]
System.out.println(wrap);
}
capacity()
int capacity() 返回缓冲区的容量,不可改变;
public void test(){
byte[] bytes = new byte[25];
ByteBuffer wrap = ByteBuffer.wrap(bytes);
// 获得缓冲区容量
int capacity = wrap.capacity();
// 25
System.out.println(capacity);
}
remaining()
int remaining() 返回当前位置和限制之间的大小;即 remaining = limit - position;
public void testRemaining() {
byte[] bytes = {15, 17, 25, 13, 46, 18};
ByteBuffer wrap = ByteBuffer.wrap(bytes);
// 获得缓冲区的元素个数
int remaining = wrap.remaining();
// 6
System.out.println(remaining);
// 设置位置
wrap.position(1);
// 设置limit
wrap.limit(5);
// 重新获得缓冲区的元素个数
int remaining1 = wrap.remaining();
//4
System.out.println(remaining1);
}
isDirect()
abstract boolean isDirect() 判断该缓冲区是否是直接缓冲区;平常的缓冲区都是非直接缓冲区,即在jvm内部创建的缓冲区,我们调用Buffer相关的方法都会走jvm内部缓冲区,其性能不如直接缓存区快;直接缓冲区是指无需创建jvm内部缓冲区,直接跟计算级的内存空间交互,其速度较快;
public void testIsDirect() {
byte[] bytes = {15, 17, 25, 13, 46, 18};
ByteBuffer wrap = ByteBuffer.wrap(bytes);
// 判断是否是直接缓冲区
boolean direct = wrap.isDirect();
// false
System.out.println(direct);
// 分配直接缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(8);
boolean direct1 = byteBuffer.isDirect();
// true
System.out.println(direct1);
}
isReadOnly()
abstract boolean isReadOnly() 判断是否是只读缓冲区;
public void testIsDeadOnly() {
byte[] bytes = {15, 17, 25, 13, 46, 18};
ByteBuffer wrap = ByteBuffer.wrap(bytes);
// 判断是否是只读缓冲区
boolean readOnly = wrap.isReadOnly();
// false
System.out.println(readOnly);
}
clear()
Buffer clear() 是还原缓冲区的初始状态,记住不是字面的意思清除缓冲区数据;
public void testClean() {
byte[] bytes = {15, 17, 25, 13, 46, 18};
ByteBuffer wrap = ByteBuffer.wrap(bytes);
wrap.position(2);
wrap.limit(5);
// [pos=2 lim=5 cap=6]
System.out.println(wrap);
// 还原缓冲区状态
wrap.clear();
// [pos=0 lim=6 cap=6]
System.out.println(wrap);
}
其主要使用于重新写入数据至缓冲区;通常在通道read,put 操作之前调用为了填充缓冲区;
示例:
public void testClean2() {
CharBuffer wrap = CharBuffer.allocate(24);
wrap.put("youku1327");
wrap.clear();
wrap.put("知识追寻者");
wrap.rewind();
for (int i=0; i<wrap.limit();i++){
//知识追寻者1327
System.out.print(wrap.get());
}
}
源码:
public final Buffer clear() {
// 位置清0
position = 0;
// 调整限制等于容量
limit = capacity;
// 标记调整为默认值
mark = -1;
return this;
}
flip()
Buffer flip() 翻转缓冲区;不是字面意思上的将缓冲区的数据倒转,是指截取的意思;将限制设置为位置所在的当前值,将位置清0,如果有定义标记,则抛弃标记;
源码:
public final Buffer flip() {
// 将限制设置当前位置的值
limit = position;
// 位置清 0
position = 0;
// 抛弃标记
mark = -1;
return this;
}
示例:
public void testFilp() {
byte[] bytes = {15, 17, 25, 13, 46, 18};
ByteBuffer wrap = ByteBuffer.wrap(bytes);
wrap.position(2);
wrap.mark();
// 翻转前 [pos=2 lim=6 cap=6]
System.out.println(wrap);
// 翻转
wrap.flip();
// 反正后 [pos=0 lim=2 cap=6]
System.out.println(wrap);
// 输出缓冲区元素
for (int i=0; i<wrap.limit(); i++){
// 15 17
System.out.println(wrap.get());
}
}
其通常在一系列 通道 put 或者 read 操作之后调用此方法为通道的write或者 get操作做准备;
示例:
public void testFilp2() {
CharBuffer wrap = CharBuffer.allocate(15);
wrap.put("公众号:知识追寻者");
// 翻转
wrap.flip();
for (int i=0; i<wrap.limit(); i++){
// 公众号:知识追寻着
System.out.print(wrap.get());
}
}
hasArray()
abstract boolean hasArray() 判断底层是否支持数组的实现;
public void testHasArray(){
// 间接缓存
ByteBuffer allocate = ByteBuffer.allocate(10);
boolean hasArray = allocate.hasArray();
// true
System.out.println(hasArray);
// 直接缓存
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(10);
boolean hasArray1 = byteBuffer.hasArray();
// false
System.out.println(hasArray1);
}
hasRemaining()
boolean hasRemaining() 判断 limit 和 position直接是否有元素;经常使用于缓冲区读取数据;
public void testHasRemaining(){
byte[] bytes = {15, 17, 25, 13, 46, 18};
ByteBuffer wrap = ByteBuffer.wrap(bytes);
while (wrap.hasRemaining()){
// 15 17 25 13 46 18
System.out.println(wrap.get());
}
}
rewind()
Buffer rewind() 重绕缓冲区;其通常在通道write 或者 get 操作之前调用,为了重新读取数据;注意其限制不变;
源码:
public final Buffer rewind() {
// 位置设置为0
position = 0;
// 抛弃标记
mark = -1;
return this;
}
示例:
public void testRewind(){
byte[] bytes = {15, 17, 25, 13, 46, 18};
ByteBuffer wrap = ByteBuffer.wrap(bytes);
while (wrap.hasRemaining()){
// 15 17 25 13 46 18
System.out.println(wrap.get());
}
// 重绕缓冲区
wrap.rewind();
while (wrap.hasRemaining()){
// 15 17 25 13 46 18
System.out.println(wrap.get());
}
}
arrayOffset()
abstract int arrayOffset() 返回写入缓冲区第一个元素的偏移,可选操作;
public void testOffset(){
byte[] bytes = {15, 17, 25, 13, 46, 18};
ByteBuffer wrap = ByteBuffer.wrap(bytes);
// 获得偏移
int arrayOffset = wrap.arrayOffset();
// 0
System.out.println(arrayOffset);
}
源码:
public final int arrayOffset() {
if (hb == null)
throw new UnsupportedOperationException();
if (isReadOnly)
throw new ReadOnlyBufferException();
return offset;
}
final int offset;
3.4 Selector(选择器)
Selector 类是 NIO 的核心类,Selector(选择器)选择器提供了选择已经就绪的任务的能力。
Selector 会不断的轮询注册在上面的所有 channel,如果某个 channel 为读写等事件做好准备,那么就处于就绪状态,通过 Selector 可以不断轮询发现出就绪的 channel,进行后续的 IO 操作。
一个 Selector 能够同时轮询多个 channel,这样,一个单独的线程就可以管理多个channel,从而管理多个网络连接,这样就不用为每一个连接都创建一个线程,同时也避免了多线程之间上下文切换导致的开销。
3.4.1 使用步骤
1 获取选择器
与通道和缓冲区的获取类似,选择器的获取也是通过静态工厂方法 open() 来得到的。
Selector selector = Selector.open(); // 获取一个选择器实例
2 获取可选择通道
能够被选择器监控的通道必须实现了 SelectableChannel 接口,并且需要将通道配置成非阻塞模式,否则后续的注册步骤会抛出 IllegalBlockingModeException。
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9090)); // 打开 SocketChannel 并连接到本机 9090 端口
socketChannel.configureBlocking(false); // 配置通道为非阻塞模式
3 将通道注册到选择器
通道在被指定的选择器监控之前,应该先告诉选择器,并且告知监控的事件,即:将通道注册到选择器。
通道的注册通过 SelectableChannel.register(Selector selector, int ops) 来完成,ops 表示关注的事件,如果需要关注该通道的多个 I/O 事件,可以传入这些事件类型或运算之后的结果。这些事件必须是通道所支持的,否则抛出 IllegalArgumentException。
socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); // 将套接字通过到注册到选择器,关注 read 和 write 事件
4 轮询 selector 就绪事件
通过调用选择器的 Selector.select() 方法可以获取就绪事件,该方法会将就绪事件放到一个 SelectionKey 集合中,然后返回就绪的事件的个数。这个方法映射多路复用 I/O 模型中的 select 系统调用,它是一个阻塞方法。正常情况下,直到至少有一个就绪事件,或者其它线程调用了当前 Selector 对象的 wakeup() 方法,或者当前线程被中断时返回。
while (selector.select() > 0){ // 轮询,且返回时有就绪事件
Set<SelectionKey> keys = selector.selectedKeys(); // 获取就绪事件集合
.......
}
有 3 种方式可以获取就绪事件:
- select() 阻塞方法,有一个就绪事件,或者其它线程调用了 wakeup() 或者当前线程被中断时返回。
- select(long timeout) 阻塞方法,有一个就绪事件,或者其它线程调用了 wakeup(),或者当前线程被中断,或者阻塞时长达到了 timeout 时返回。不抛出超时异常。
- selectNode() 不阻塞,如果无就绪事件,则返回 0;如果有就绪事件,则将就绪事件放到一个集合,返回就绪事件的数量。
5 处理就绪事件
每次可以 select 出一批就绪的事件,所以需要对这些事件进行迭代。
for(SelectionKey key : keys){
if(key.isWritable()){ // 可写事件
if("Bye".equals((line = scanner.nextLine()))){
socketChannel.shutdownOutput();
socketChannel.close();
break;
}
buf.put(line.getBytes());
buf.flip();
socketChannel.write(buf);
buf.compact();
}
}
从一个 SelectionKey 对象可以得到:
- 就绪事件的对应的通道;
- 就绪的事件。
通过这些信息,就可以很方便地进行 I/O 操作。
3.5 NIO源码案例
3.5.1 NIOServer
public static void main(String[] args) throws Exception{
//创建ServerSocketChannel,-->> ServerSocket
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
InetSocketAddress inetSocketAddress = new InetSocketAddress(5555);
serverSocketChannel.socket().bind(inetSocketAddress);
serverSocketChannel.configureBlocking(false); //设置成非阻塞
//开启selector,并注册accept事件
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while(true) {
selector.select(2000); //监听所有通道
//遍历selectionKeys
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if(key.isAcceptable()) { //处理连接事件
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false); //设置为非阻塞
System.out.println("client:" + socketChannel.getLocalAddress() + " is connect");
socketChannel.register(selector, SelectionKey.OP_READ); //注册客户端读取事件到selector
} else if (key.isReadable()) { //处理读取事件
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
SocketChannel channel = (SocketChannel) key.channel();
channel.read(byteBuffer);
System.out.println("client:" + channel.getLocalAddress() + " send " + new String(byteBuffer.array()));
}
iterator.remove(); //事件处理完毕,要记得清除
}
}
}
3.5.2 NIOClient
public class NIOClient {
public static void main(String[] args) throws Exception {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
InetSocketAddress inetSocketAddress = new InetSocketAddress(
"127.0.0.1",
5555
);
if (!socketChannel.connect(inetSocketAddress)) {
while (!socketChannel.finishConnect()) {
System.out.println("客户端正在连接中,请耐心等待");
}
}
ByteBuffer byteBuffer = ByteBuffer.wrap("mikechen的互联网架构".getBytes());
socketChannel.write(byteBuffer);
socketChannel.close();
}
}