侧边栏壁纸
博主头像
ZHD的小窝博主等级

行动起来,活在当下

  • 累计撰写 79 篇文章
  • 累计创建 53 个标签
  • 累计收到 1 条评论

目 录CONTENT

文章目录

基于WebRTC的视频会议系统(1)

江南的风
2024-07-16 / 0 评论 / 0 点赞 / 39 阅读 / 11842 字 / 正在检测是否收录...

WebRTC的原理图

编写信令服务

1. 技术选型

  • JDK 11

  • springboot 2.6.13

  • Netty 4.1.112.Final

  • 协议:TCP协议

2. Spring Boot集成Netty

  • 添加依赖

<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.112.Final</version>
</dependency>

3. 信令定义

信令

说明

类型

join

用户加入房间

客户端

leave

用户离开房间

客户端

cmd

端到端命令(offer、answer、candidate)

客户端

ujoin

用户已加入

服务端

uleave

用户已离开

服务端

ojoin

其他用户已加入

服务端

oleave

其他用户已离开

服务端

full

房间已满

服务端

4. 信令系统时序图

5. 实现

5.1 使用Netty构建网络服务器

定义BootNettyServer,用于启动Netty服务,监听端口可配置。

@Slf4j
public class BootNettyServer {

    private final int port;

    private Channel channel;

    public BootNettyServer(int port) {
        this.port = port;
    }

    public void start() {
        /**
         * 配置服务端的NIO线程组
         * NioEventLoopGroup 是用来处理I/O操作的Reactor线程组
         * bossGroup:用来接收进来的连接,workerGroup:用来处理已经被接收的连接,进行socketChannel的网络读写,
         * bossGroup接收到连接后就会把连接信息注册到workerGroup
         * workerGroup的EventLoopGroup默认的线程数是CPU核数的二倍
         */
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // ServerBootstrap 是一个启动NIO服务的辅助启动类
            ServerBootstrap bootstrap = new ServerBootstrap();
            // 设置group,将bossGroup, workerGroup线程组传递到ServerBootstrap
            bootstrap = bootstrap.group(bossGroup, workerGroup);
            // ServerSocketChannel是以NIO的selector为基础进行实现的,用来接收新的连接,这里告诉Channel通过NioServerSocketChannel获取新的连接
            bootstrap = bootstrap.channel(NioServerSocketChannel.class);
            /**
             * option是设置 bossGroup,childOption是设置workerGroup
             * netty 默认数据包传输大小为1024字节, 设置它可以自动调整下一次缓冲区建立时分配的空间大小,避免内存的浪费    最小  初始化  最大 (根据生产环境实际情况来定)
             * 使用对象池,重用缓冲区
             */
            bootstrap = bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(64, 10496, 1048576));
            bootstrap = bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(64, 10496, 1048576));
            // 设置线程队列得到链接个数
            bootstrap = bootstrap.option(ChannelOption.SO_BACKLOG, 128);
            // 设置保持活动连接状态
            bootstrap = bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);


            // 设置 I/O处理类,主要用于网络I/O事件,记录日志,编码、解码消息
            bootstrap = bootstrap.childHandler(new BootNettyChannelInitializer<SocketChannel>());

            // 绑定端口,同步等待成功
            ChannelFuture future = bootstrap.bind(port).sync();
            // 为channelfuture注册监听器,监听关心的事件
            future.addListener(channelFuture -> {
                if (channelFuture.isSuccess()) {
                    log.info("Netty server started and listening on port {}!", port);
                }else {
                    log.error("Netty server start failure on port {}!!!", port);
                }
            });
            // 等待服务器监听端口关闭
            channel = future.channel();
            channel.closeFuture().sync();
        } catch (InterruptedException e) {
            log.warn("Netty server thread was Interrupted!");
        } finally {
            // 退出,释放线程池资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }

    public void shutdown() {
        if (channel != null) {
            channel.close();
        }
        log.info("Netty server shutdown!");
    }
}

5.2 定义ChannelPipeline

包含编解码器和请求处理器

public class BootNettyChannelInitializer<SocketChannel> extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel channel) throws Exception {
        // ChannelOutboundHandler,编码器,依照逆序执行
        channel.pipeline().addLast(new StringEncoder());
        // 属于ChannelInboundHandler,解码器,依照顺序执行
        channel.pipeline().addLast(new StringDecoder());
        // 添加自定义的业务处理器
        channel.pipeline().addLast(new BootNettyChannelInboundHandlerAdapter());
    }
}

5.3 定义请求处理器

处理器BootNettyChannelInboundHandlerAdapter,用来处理请求

public class BootNettyChannelInboundHandlerAdapter extends ChannelInboundHandlerAdapter {

    /**
     * 从客户端收到新的数据时,这个方法会在收到消息时被调用
     *
     * @param ctx 上下文
     * @param msg 消息
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("channelRead:read msg:" + msg);
        //回应客户端
        ctx.writeAndFlush("I got it\n");
    }

    /**
     * 从客户端收到新的数据、读取完成时调用
     *
     * @param ctx 上下文
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws IOException {
        System.out.println("channelReadComplete");
        ctx.flush();
    }

    /**
     * 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时
     *
     * @param ctx   上下文
     * @param cause
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws IOException {
        System.out.println("exceptionCaught");
        cause.printStackTrace();
        ctx.close();//抛出异常,断开与客户端的连接
    }

    /**
     * 客户端与服务端第一次建立连接时 执行
     *
     * @param ctx 上下文
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception, IOException {
        super.channelActive(ctx);
        ctx.channel().read();
        InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIp = insocket.getAddress().getHostAddress();
        //此处不能使用ctx.close(),否则客户端始终无法与服务端建立连接
        System.out.println("channelActive:" + clientIp + ctx.name());
    }

    /**
     * 客户端与服务端 断连时 执行
     *
     * @param ctx 上下文
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception, IOException {
        super.channelInactive(ctx);
        InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIp = insocket.getAddress().getHostAddress();
        ctx.close(); //断开连接时,必须关闭,否则造成资源浪费,并发量很大情况下可能造成宕机
        System.out.println("channelInactive:" + clientIp);
    }

    /**
     * 服务端当read超时, 会调用这个方法
     *
     * @param ctx 上下文
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception, IOException {
        super.userEventTriggered(ctx, evt);
        InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIp = insocket.getAddress().getHostAddress();
        ctx.close();//超时时断开连接
        System.out.println("userEventTriggered:" + clientIp);
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelRegistered");
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelUnregistered");
    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelWritabilityChanged");
    }

5.4 定义启动类

@EnableAsync
@SpringBootApplication
public class SignalServerApplication implements CommandLineRunner,DisposableBean {

    private BootNettyServer server;

    public static void main(String[] args) {
        SpringApplication.run(SignalServerApplication.class, args);
    }

    @Async
    @Override
    public void run(String... args) throws Exception {
        server = new BootNettyServer(8888);
        server.start();
    }

    @Override
    public void destroy() throws Exception {
        server.shutdown();
    }
}

0

评论区