NIO(Non-Blocking I/O,在Java中也称为New I/O)是一种同步非阻塞的I/O模型,它是I/O多路复用的基础,已经被越来越多地应用到大型应用服务器中,是解决高并发、I/O处理问题的有效方式。
1. 核心组件
NIO主要由三个核心组件构成,分别是缓冲区(Buffer)、通道(Channel)和选择器(Selector)。
缓冲区(Buffer):
缓冲区是NIO中的一个重要概念,它是一个固定数据量的指定基本类型的数据容器。除了内容之外,缓冲区还具有位置和界限,其中位置是要读写的下一个元素的索引,界限是第一个不应该读写的元素的索引。
NIO提供了多种类型的缓冲区,如ByteBuffer、CharBuffer、ShortBuffer、IntBuffer、LongBuffer、FloatBuffer和DoubleBuffer,分别用于存储不同类型的数据。
缓冲区提供了丰富的操作方法,如get和put方法用于数据的读写,clear、flip和rewind等方法用于重置缓冲区的状态。
通道(Channel):
通道是NIO中用于数据读写的对象,与传统的流(Stream)不同,通道是双向的,可以同时进行读和写操作。
NIO提供了多种类型的通道,如FileChannel、SocketChannel、ServerSocketChannel和DatagramChannel等,每种类型都有着自己的特点和适用场景。
通道与缓冲区是紧密关联的,数据总是通过缓冲区来进行读写操作,而不是直接从通道中读写数据。
选择器(Selector):
选择器是NIO中实现I/O多路复用的关键组件,它允许一个线程同时处理多个通道上的I/O事件。
选择器会不断地轮询注册在其上的所有通道,如果某个通道已经准备好进行读写操作(即处于就绪状态),选择器就会通知相应的线程进行处理。
使用选择器可以显著减少线程的数量,降低系统资源的消耗,并提高系统的可伸缩性和性能。
2. 线程模型
基于以上IO数据流转过程,NIO采用以下线程模型实现:
单reactor单线程模式:这是最简单的Reactor模型,整个过程中的事件处理全部发生在一个线程里。
该模式很简单,易于理解,但是存在一定的问题,比如单线处理程模型下,无法发挥多核CPU的性能,如果Handler上的业务处理很慢,则意味着整个程序无法处理其他连接事件,造成性能问题。
单reactor多线程模式:相较于上面的模型,对业务处理模块进行了多线程异步处理。
该模式充分利用了多核CPU的处理能力,降低了由业务处理引起的性能问题,Reactor线程仅负责接收连接、读写事件操作。但是Reactor除了负责连接处理外仍然负责读写操作,大量的请求下仍然可能仍然存在性能问题。
主从reactor多线程模式:独立出另一个Reactor对象来处理非连接处理的其他处理。
该模式存在两个线程分别处理Reactor事件,主线程只负责处理连接事件,子线程只负责处理读写事件,这样主线程可以处理更多的连接,而不用关心子线程里的读写处理是否会影响到自己,目前这种模型被广泛使用在各种项目中(如Netty、Memcached等)。
3. 应用场景
NIO适用于多种需要高效处理I/O操作的场景,如:
网络编程:NIO提供了非阻塞式的网络通信模型,可以使用单线程处理多个并发连接,从而实现高并发网络编程。
文件操作:NIO程序可以对文件进行高效的读写操作,支持内存映射文件(MappedByteBuffer)方式,这比传统的InputStream和OutputStream更快、更可靠。
数据库操作:NIO可以用于数据库应用程序,特别是涉及到大量数据读取和写入的情况。
安全加密:NIO可以很好地处理安全加密,通过加密算法和非阻塞性能相结合,具有非常高的效率和可靠性。
多线程数据同步:NIO可以让多个线程更新共享数据,同时避免了线程间同步问题,提高了并发性能。
4. 使用例子
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class NIOServer {
static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
Selector subSelector = Selector.open();
// 打开服务器套接字通道
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
// 绑定端口
serverChannel.socket().bind(new InetSocketAddress(9999));
// 将通道注册到选择器上,并设置监听连接事件
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
// 新开启一个线程运行subReactor
new Thread(() -> {
try {
subreactor(subSelector);
} catch (IOException e) {
e.printStackTrace();
}
}).start();
// 开启reactor
reactor(selector, subSelector);
}
// 模仿reactor
public static void reactor(Selector selector, Selector subSelector) throws IOException {
// 接收客户端连接
while (true) {
// 选择一组键,其对应的通道已经准备好进行I/O操作
selector.select();
// 遍历选中的键
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
dispatch(key,selector, subSelector);
}
}
}
// 模仿subreactor
public static void subreactor(Selector subSelector) throws IOException {
// 接收read
while (true) {
// 选择一组键,其对应的通道已经准备好进行I/O操作
subSelector.select();
// 遍历选中的键
Iterator<SelectionKey> selectedKeys = subSelector.selectedKeys().iterator();
while (selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
subDispatch(key);
selectedKeys.remove();
}
}
}
// 模仿handler
public static void handle(SelectionKey key) throws IOException {
// 读取数据
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int bytesRead = client.read(buffer);
if (bytesRead > 0) {
buffer.flip();
byte[] bytes = new byte[buffer.limit()];
buffer.get(bytes);
String incoming = new String(bytes, "UTF-8");
executor.execute(() -> worker(incoming, client));
}
}
private static void worker(String incoming, SocketChannel client) {
System.out.println("Received: " + incoming);
try {
// 处理业务逻辑
Thread.sleep(100);
// 发送响应到客户端
client.write(ByteBuffer.wrap("Hello from server".getBytes("UTF-8")));
} catch (Exception e) {
e.printStackTrace();
}
}
// 模仿dispatcher
public static void dispatch(SelectionKey key,Selector selector, Selector subSelector) throws IOException {
if (key.isAcceptable()) {
accept(key,selector, subSelector);
}
if (key.isReadable()) {
SocketChannel client = (SocketChannel) key.channel();
client.register(subSelector, SelectionKey.OP_READ);
subSelector.wakeup();
}
}
// 模仿dispatcher
public static void subDispatch(SelectionKey key) throws IOException {
if (key.isReadable()) {
handle(key);
}
}
// 模仿acceptor
public static void accept(SelectionKey key,Selector selector, Selector subSelector) throws IOException {
// 接受连接
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel client = server.accept();
client.configureBlocking(false);
// 注册客户端的读事件
client.register(selector, SelectionKey.OP_READ);
System.out.println("Accepted connection from " + client);
}
}
评论区