侧边栏壁纸
博主头像
ZHD的小窝博主等级

行动起来,活在当下

  • 累计撰写 79 篇文章
  • 累计创建 53 个标签
  • 累计收到 1 条评论

目 录CONTENT

文章目录

Netty原理

江南的风
2017-06-23 / 0 评论 / 0 点赞 / 26 阅读 / 10507 字 / 正在检测是否收录...

Netty 是一个基于 Java NIO(Non-blocking I/O)的异步事件驱动的网络应用框架,用于快速开发可维护的高性能协议服务器和客户端。

1. Netty的由来

Netty,这一强大的网络编程框架,由Trustin Lee在2004年首次推出,其诞生初衷深刻根植于对Java NIO局限性的洞察与弥补之上。在Netty出现之前,Java开发者在进行网络应用开发时,主要依赖于Java NIO技术。尽管Java NIO引入了非阻塞IO和选择器(Selector)机制,为高效网络通信提供了可能,但其自身却伴随着一系列挑战与不足。

Java NIO的局限性:

  • 复杂性高:Java NIO的API设计倾向于底层操作,这意味着开发者需要深入理解并手动处理众多底层细节,增加了编程的复杂性和出错的风险。

  • 性能瓶颈:在高并发场景下,Java NIO的选择器实现暴露出性能上的短板,加之多线程管理的复杂性,进一步限制了其在高负载环境下的表现。

  • 功能局限性:尽管Java NIO提供了非阻塞IO的基础,但在高级功能方面却显得力不从心,如缺乏内置的协议编解码器、连接管理策略等,这些都要求开发者自行实现,增加了开发成本。

鉴于上述Java NIO的局限性,Netty应运而生,它以解决这些问题为目的,通过封装底层的NIO操作,为开发者提供了一套简洁、高效且功能丰富的网络编程框架。Netty不仅简化了网络编程的复杂度,还通过引入异步事件驱动模型、优化的线程模型以及广泛的协议支持等高级特性,极大地提升了网络应用的性能和可维护性。因此,Netty迅速成为Java网络编程领域的佼佼者,被广泛应用于各种高性能、高并发的网络服务器和客户端开发中。

2. Netty的核心组件

  • Bootstrap/ServerBootstrap(引导器)BootstrapServerBootstrap是Netty的引导类,分别用于客户端和服务端的启动、初始化和配置。它们相当于Netty应用程序的入口,负责串联其他核心组件,确保Netty程序能够正常启动和运行。

  • Channel(通道)Channel代表了到实体(如硬件设备、文件、网络套接字或能够执行IO操作的程序组件)的开放连接,是Netty进行网络通信的基础。它提供了基本的API用于网络IO操作,如读、写、连接和绑定等。Netty 提供了多种类型的通道,如 NioSocketChannelNioServerSocketChannel,分别用于客户端和服务器端的通信。

  • EventLoopGroup/EventLoop(事件循环器):事件循环是 Netty 的核心组件之一,负责处理所有的事件和 I/O 操作。每个通道都绑定到一个特定的事件循环上,事件循环负责处理该通道的所有 I/O 事件。事件循环通常是一个单线程的线程池,确保同一时间只有一个线程处理一个通道的 I/O 操作,从而避免了多线程并发问题。EventLoopGroup是一个处理IO操作和任务的线程组,它负责接收客户端连接、处理网络事件(如读写事件)等。EventLoop则是EventLoopGroup中的一个成员,它绑定到一个线程上,负责处理多个Channel的IO事件。

  • ChannelHandler(通道处理器):通道处理器是 Netty 中用于处理网络数据的核心组件。通道处理器可以分为两类:入站处理器(Inbound Handler)和出站处理器(Outbound Handler)。入站处理器用于处理从网络接收到的数据,出站处理器用于处理发送到网络的数据。开发者可以通过自定义ChannelHandler来实现复杂的业务逻辑和数据处理流程。

  • ChannelPipeline(通道管道):通道管道是通道处理器的容器,负责管理通道处理器的有序链。当数据流经通道时,数据会按照通道管道中定义的顺序依次通过每个通道处理器进行处理。ChannelPipeline使得数据在Netty中的处理流程变得清晰和可控。开发者可以灵活地添加、删除或修改ChannelHandler,以适应不同的业务需求和网络环境。

  • ByteBuf(缓冲区):Netty 使用 ByteBuf 作为数据缓冲区,用于在网络中传输数据。ByteBuf 提供了比 Java NIO 的 ByteBuffer 更强大和灵活的功能,如自动扩容、零拷贝等,同时还提供了丰富的API来操作字节数据,如读取、写入、复制、切片等。

  • Codec(编解码器):编解码器是用于处理网络数据格式的通道处理器。编码器(Encoder)用于将应用程序的数据格式转换为网络传输格式,解码器(Decoder)用于将网络传输格式转换为应用程序的数据格式。Netty 提供了多种内置的编解码器,如 StringEncoderStringDecoderProtobufEncoder 等,同时,开发者也可以根据需要自定义编解码器来满足特定的业务需求。

  • ChannelFutureChannelFuture 是 Netty 中用于处理异步操作结果的对象。由于 Netty 的 I/O 操作都是异步的,因此所有的 I/O 操作都会返回一个 ChannelFuture 对象,通过该对象可以获取操作的结果或注册回调函数。

3. Netty I/O数据流转

4. Netty请求响应时序

4. Netty线程模型

5. 使用例子

  • 高并发网络服务器

public class ChatServer {
    private final int port;

    public ChatServer(int port) {
        this.port = port;
    }

    public void start() throws Exception {
        // 创建一个用于接收连接的主线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        // 创建一个用于处理每个连接的工作线程组
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // 启动类
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     // 为每个连接的管道添加处理器
                     ch.pipeline().addLast(new StringDecoder(), new StringEncoder(), new ChatServerHandler());
                 }
             });

            // 绑定端口并启动
            ChannelFuture f = b.bind(port).sync();
            // 等待服务器套接字关闭
            f.channel().closeFuture().sync();
        } finally {
            // 优雅地关闭线程组
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new ChatServer(8080).start();
    }
}

class ChatServerHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        // 打印接收到的消息
        System.out.println("Received: " + msg);
        // 回显接收到的消息
        ctx.writeAndFlush("Echo: " + msg);
    }
}
  • HTTP服务器

public class HttpServer {
    private final int port;

    public HttpServer(int port) {
        this.port = port;
    }

    public void start() throws Exception {
        // 创建主线程组和工作线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // 启动类
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) {
                     // 为每个连接的管道添加HTTP处理器
                     ch.pipeline().addLast(new HttpServerCodec(), new HttpObjectAggregator(512 * 1024), new HttpServerHandler());
                 }
             });

            // 绑定端口并启动
            ChannelFuture f = b.bind(port).sync();
            // 等待服务器套接字关闭
            f.channel().closeFuture().sync();
        } finally {
            // 优雅地关闭线程组
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new HttpServer(8080).start();
    }
}

class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest req) {
        // 创建HTTP响应
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
        response.content().writeBytes("Hello, World!".getBytes());
        response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
        response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
        // 发送响应并关闭连接
        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    }
}
  • WebSocket服务器

public class WebSocketServer {
    private final int port;

    public WebSocketServer(int port) {
        this.port = port;
    }

    public void start() throws Exception {
        // 创建主线程组和工作线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // 启动类
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) {
                     // 为每个连接的管道添加HTTP和WebSocket处理器
                     ch.pipeline().addLast(new HttpServerCodec(), new HttpObjectAggregator(65536), new WebSocketServerProtocolHandler("/ws"), new WebSocketServerHandler());
                 }
             });

            // 绑定端口并启动
            ChannelFuture f = b.bind(port).sync();
            // 等待服务器套接字关闭
            f.channel().closeFuture().sync();
        } finally {
            // 优雅地关闭线程组
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new WebSocketServer(8080).start();
    }
}

class WebSocketServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {
        // 打印接收到的WebSocket消息
        System.out.println("Received: " + msg.text());
        // 回显接收到的消息
        ctx.channel().writeAndFlush(new TextWebSocketFrame("Hello, " + msg.text()));
    }
}

6. 部分源码

0

评论区