侧边栏壁纸
博主头像
ZHD的小窝博主等级

行动起来,活在当下

  • 累计撰写 79 篇文章
  • 累计创建 53 个标签
  • 累计收到 1 条评论

目 录CONTENT

文章目录

理解NIO

江南的风
2016-05-08 / 0 评论 / 0 点赞 / 30 阅读 / 8688 字 / 正在检测是否收录...

NIO(Non-Blocking I/O,在Java中也称为New I/O)是一种同步非阻塞的I/O模型,它是I/O多路复用的基础,已经被越来越多地应用到大型应用服务器中,是解决高并发、I/O处理问题的有效方式。

1. 核心组件

NIO主要由三个核心组件构成,分别是缓冲区(Buffer)、通道(Channel)和选择器(Selector)。

  1. 缓冲区(Buffer)

    • 缓冲区是NIO中的一个重要概念,它是一个固定数据量的指定基本类型的数据容器。除了内容之外,缓冲区还具有位置和界限,其中位置是要读写的下一个元素的索引,界限是第一个不应该读写的元素的索引。

    • NIO提供了多种类型的缓冲区,如ByteBuffer、CharBuffer、ShortBuffer、IntBuffer、LongBuffer、FloatBuffer和DoubleBuffer,分别用于存储不同类型的数据。

    • 缓冲区提供了丰富的操作方法,如get和put方法用于数据的读写,clear、flip和rewind等方法用于重置缓冲区的状态。

  2. 通道(Channel)

    • 通道是NIO中用于数据读写的对象,与传统的流(Stream)不同,通道是双向的,可以同时进行读和写操作。

    • NIO提供了多种类型的通道,如FileChannel、SocketChannel、ServerSocketChannel和DatagramChannel等,每种类型都有着自己的特点和适用场景。

    • 通道与缓冲区是紧密关联的,数据总是通过缓冲区来进行读写操作,而不是直接从通道中读写数据。

  3. 选择器(Selector)

    • 选择器是NIO中实现I/O多路复用的关键组件,它允许一个线程同时处理多个通道上的I/O事件。

    • 选择器会不断地轮询注册在其上的所有通道,如果某个通道已经准备好进行读写操作(即处于就绪状态),选择器就会通知相应的线程进行处理。

    • 使用选择器可以显著减少线程的数量,降低系统资源的消耗,并提高系统的可伸缩性和性能。

2. 线程模型

基于以上IO数据流转过程,NIO采用以下线程模型实现:

  • 单reactor单线程模式这是最简单的Reactor模型,整个过程中的事件处理全部发生在一个线程里。

该模式很简单,易于理解,但是存在一定的问题,比如单线处理程模型下,无法发挥多核CPU的性能,如果Handler上的业务处理很慢,则意味着整个程序无法处理其他连接事件,造成性能问题。

  • 单reactor多线程模式相较于上面的模型,对业务处理模块进行了多线程异步处理。

该模式充分利用了多核CPU的处理能力,降低了由业务处理引起的性能问题,Reactor线程仅负责接收连接、读写事件操作。但是Reactor除了负责连接处理外仍然负责读写操作,大量的请求下仍然可能仍然存在性能问题。

  • 主从reactor多线程模式独立出另一个Reactor对象来处理非连接处理的其他处理。

该模式存在两个线程分别处理Reactor事件,主线程只负责处理连接事件,子线程只负责处理读写事件,这样主线程可以处理更多的连接,而不用关心子线程里的读写处理是否会影响到自己,目前这种模型被广泛使用在各种项目中(如Netty、Memcached等)。

3. 应用场景

NIO适用于多种需要高效处理I/O操作的场景,如:

  • 网络编程:NIO提供了非阻塞式的网络通信模型,可以使用单线程处理多个并发连接,从而实现高并发网络编程。

  • 文件操作:NIO程序可以对文件进行高效的读写操作,支持内存映射文件(MappedByteBuffer)方式,这比传统的InputStream和OutputStream更快、更可靠。

  • 数据库操作:NIO可以用于数据库应用程序,特别是涉及到大量数据读取和写入的情况。

  • 安全加密:NIO可以很好地处理安全加密,通过加密算法和非阻塞性能相结合,具有非常高的效率和可靠性。

  • 多线程数据同步:NIO可以让多个线程更新共享数据,同时避免了线程间同步问题,提高了并发性能。

4. 使用例子

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class NIOServer {

    static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();
        Selector subSelector = Selector.open();
        // 打开服务器套接字通道
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        // 绑定端口
        serverChannel.socket().bind(new InetSocketAddress(9999));
        // 将通道注册到选择器上,并设置监听连接事件
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        // 新开启一个线程运行subReactor
        new Thread(() -> {
            try {
                subreactor(subSelector);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();
        // 开启reactor
        reactor(selector, subSelector);
    }

    // 模仿reactor
    public static void reactor(Selector selector, Selector subSelector) throws IOException {
        // 接收客户端连接
        while (true) {
            // 选择一组键,其对应的通道已经准备好进行I/O操作
            selector.select();
            // 遍历选中的键
            Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
            while (selectedKeys.hasNext()) {
                SelectionKey key = selectedKeys.next();
                selectedKeys.remove();
                dispatch(key,selector, subSelector);
            }
        }
    }

    // 模仿subreactor
    public static void subreactor(Selector subSelector) throws IOException {
        // 接收read
        while (true) {
            // 选择一组键,其对应的通道已经准备好进行I/O操作
            subSelector.select();
            // 遍历选中的键
            Iterator<SelectionKey> selectedKeys = subSelector.selectedKeys().iterator();
            while (selectedKeys.hasNext()) {
                SelectionKey key = selectedKeys.next();
                subDispatch(key);
                selectedKeys.remove();
            }
        }
    }

    // 模仿handler
    public static void handle(SelectionKey key) throws IOException {
        // 读取数据
        SocketChannel client = (SocketChannel) key.channel();
        ByteBuffer buffer = ByteBuffer.allocate(1024);

        int bytesRead = client.read(buffer);
        if (bytesRead > 0) {
            buffer.flip();
            byte[] bytes = new byte[buffer.limit()];
            buffer.get(bytes);
            String incoming = new String(bytes, "UTF-8");
            executor.execute(() -> worker(incoming, client));

        }
    }

    private static void worker(String incoming, SocketChannel client) {
        System.out.println("Received: " + incoming);
        try {
            // 处理业务逻辑
            Thread.sleep(100);
            // 发送响应到客户端
            client.write(ByteBuffer.wrap("Hello from server".getBytes("UTF-8")));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    // 模仿dispatcher
    public static void dispatch(SelectionKey key,Selector selector, Selector subSelector) throws IOException {
        if (key.isAcceptable()) {
            accept(key,selector, subSelector);
        }
        if (key.isReadable()) {
            SocketChannel client = (SocketChannel) key.channel();
            client.register(subSelector, SelectionKey.OP_READ);
            subSelector.wakeup();
        }
    }

    // 模仿dispatcher
    public static void subDispatch(SelectionKey key) throws IOException {
        if (key.isReadable()) {
            handle(key);
        }
    }

    // 模仿acceptor
    public static void accept(SelectionKey key,Selector selector, Selector subSelector) throws IOException {
        // 接受连接
        ServerSocketChannel server = (ServerSocketChannel) key.channel();
        SocketChannel client = server.accept();
        client.configureBlocking(false);

        // 注册客户端的读事件
        client.register(selector, SelectionKey.OP_READ);

        System.out.println("Accepted connection from " + client);
    }
}

0

评论区