侧边栏壁纸
博主头像
ZHD的小窝博主等级

行动起来,活在当下

  • 累计撰写 79 篇文章
  • 累计创建 53 个标签
  • 累计收到 1 条评论

目 录CONTENT

文章目录

了解RPC

江南的风
2019-06-21 / 0 评论 / 0 点赞 / 28 阅读 / 17891 字 / 正在检测是否收录...

一、RPC原理

RPC(Remote Procedure Call,远程过程调用)是一种计算机通信协议,它允许一个计算机程序通过网络调用另一个计算机程序中的子程序(也就是远程过程),并获取返回值。RPC是分布式计算的重要基础,使得分布式计算应用更加方便和高效。以下是RPC原理的详细解析:

1. RPC的基本思想

RPC的基本思想是本地计算机上的客户端程序调用远程服务器上的过程或子程序,就像调用本地过程一样,而不用关心底层网络通信的细节。这种抽象使得开发者可以更加专注于业务逻辑的实现,而无需深入了解网络通信的复杂性。

2. RPC的工作原理

RPC的工作原理主要包括以下几个步骤:

  1. 定义远程方法接口:首先,需要定义一个远程方法接口(服务契约),包括方法名称、参数类型、返回值类型等信息。这是RPC通信的基础,客户端和服务端都需要遵循这个接口定义。

  2. 生成stub和skeleton代码:在RPC框架中,通常会为客户端和服务端生成stub(桩)和skeleton(骨架)代码。Stub是客户端的代理,负责将客户端的请求序列化为网络传输格式,并发送给服务端;Skeleton是服务端的代理,负责接收客户端的请求,进行反序列化,并调用本地服务处理请求。

  3. 客户端调用:客户端通过stub调用远程方法,就像调用本地方法一样。Stub将请求参数序列化为可以在网络上传输的格式(如二进制流或JSON),并通过网络发送给服务端。

  4. 服务端处理:服务端接收到请求后,通过skeleton进行反序列化,得到原始请求参数。然后,根据接口定义调用本地服务处理请求,并将处理结果序列化为网络传输格式。

  5. 结果返回:服务端将序列化后的结果通过网络发送给客户端。客户端的stub接收到结果后,进行反序列化,得到最终的返回值,并返回给客户端程序。

3. RPC的核心组件

RPC系统通常包括以下几个核心组件:

  1. 客户端(Client):服务调用方,发起RPC调用的程序。

  2. 客户端存根(Client Stub):存放服务端地址信息,将客户端的请求参数打包成网络消息,再通过网络发送给服务方。

  3. 服务端存根(Server Stub):接受客户端发送过来的消息并解包,再调用本地服务。

  4. 服务端(Server):真正的服务提供者,执行客户端请求的服务逻辑。

4. RPC的优点

  1. 抽象性:提供了一种高层抽象,使开发者可以像调用本地方法一样调用远程服务,无需关心底层网络通信。

  2. 跨语言支持:许多RPC框架支持多种编程语言,使得不同语言的服务可以无缝协同工作。

  3. 高性能:通过优化网络通信和序列化/反序列化过程,RPC可以实现高效的远程调用。

  4. 可扩展性:RPC框架通常提供丰富的扩展点,支持开发者根据需求进行定制和优化。

  5. 透明性:对于开发者来说,RPC调用就像调用本地方法一样简单,无需关心网络通信的复杂性。

综上所述,RPC通过定义远程方法接口、生成stub和skeleton代码、客户端调用、服务端处理以及结果返回等步骤,实现了远程过程调用的功能。RPC的抽象性、跨语言支持、高性能、可扩展性和透明性等优点使得它成为分布式计算和微服务架构中不可或缺的技术之一。

二、自己实现RPC

在Java中实现一个简单的RPC框架是一个很好的学习分布式系统和网络通信的方式。下面我将概述一个基本的RPC框架的实现步骤,并给出一些关键代码示例。

步骤 1: 定义RPC接口

首先,你需要定义一个RPC接口,这个接口将作为服务端和客户端之间的契约。

public interface HelloService {  
    String sayHello(String name);  
}

步骤 2: 实现服务端

服务端需要实现这个接口,我们通过注解方式找到服务接口实现类

@RpcAnnotation
public class HelloServiceImpl implements HelloService {  
    @Override  
    public String sayHello(String name) {  
        return "Hello, " + name + "!";  
    }  
}

// 保留到运行时,以便反射读取
@Retention(RetentionPolicy.RUNTIME)
// 应用于类
@Target(ElementType.TYPE)
public @interface RpcAnnotation {
}

// 注解和接口扫描器,用于自动查找接口实现类
public class RpcAnnotationScanner {
    private static final String BASE_PACKAGE = "com.example.springlearning.rpc.server"; // 修改为你的包名

    public static List<Class<?>> findAnnotatedClassesImplementingInterface(Class<?> interfaceToCheck) {
        List<Class<?>> classes = new ArrayList<>();
        String path = BASE_PACKAGE.replace('.', '/');
        try (URLClassLoader classLoader = URLClassLoader.newInstance(new URL[]{new File(".").toURI().toURL()})) {
            Enumeration<URL> resources = classLoader.getResources(path);
            while (resources.hasMoreElements()) {
                URL resource = resources.nextElement();
                String filePath = resource.getFile();
                findAndAddClassesInPackage(filePath, interfaceToCheck, classes);
            }
        } catch (IOException | ClassNotFoundException e) {
            e.printStackTrace();
        }
        return classes;
    }

    private static void findAndAddClassesInPackage(String packagePath, Class<?> interfaceToCheck, List<Class<?>> classes) throws ClassNotFoundException {
        File directory = new File(packagePath);

        File[] files = directory.listFiles();
        if (files != null) {
            for (File file : files) {
                if (file.isDirectory()) {
                    findAndAddClassesInPackage(packagePath + File.separator + file.getName(), interfaceToCheck, classes);
                } else if (file.getName().endsWith(".class")) {
                    try {
                        String className = BASE_PACKAGE + "." + file.getName().substring(0, file.getName().lastIndexOf(".class"));
                        Class<?> cls = Class.forName(className);

                        if (cls.isAnnotationPresent(RpcAnnotation.class) && interfaceToCheck.isAssignableFrom(cls)) {
                            classes.add(cls);
                        }
                    } catch (ClassNotFoundException e) {
                        // 忽略不符合条件的类
                    }
                }
            }
        }
    }

}

提供一个网络服务器来监听客户端的请求

public class Server {

    // 假设我们使用一个简单的Socket服务器
    public void startServer(int port) throws IOException {
        ServerSocket serverSocket = new ServerSocket(port);
        System.out.println("Server is running on port " + port);

        while (true) {
            Socket socket = serverSocket.accept();
            new Thread(() -> handleClient(socket)).start();
        }
    }

    private void handleClient(Socket socket) {
        // 注意:这里假设客户端发送的是文本行
        try (BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
             PrintWriter out = new PrintWriter(socket.getOutputStream(), true)) {

            // 在实际中,你可能需要更复杂的协议来区分不同的class
            String className = in.readLine();

            // 读取方法名(在这个例子中,我们假设它总是 "sayHello")
            // 在实际中,你可能需要更复杂的协议来区分不同的方法
            String methodName = in.readLine();

            // 读取参数(在这个例子中,我们读取一个字符串)
            String param = in.readLine();

            // 查找实现类
            String result = "";
            List<Class<?>> foundClasses = findAnnotatedClassesImplementingInterface(Class.forName(className));
            if (foundClasses.size() > 0) {
                // 反射调用
                Class<?> aClass = foundClasses.get(0);
                Method method = aClass.getMethod(methodName, String.class);//
                result = (String) method.invoke(aClass.newInstance(), param);
            } else {
                throw new ClassNotFoundException();
            }

            // 发送响应
            out.println(result + "\n"); // (序列化一下)发送结果

        } catch (IOException | NoSuchMethodException | IllegalAccessException | InvocationTargetException |
                 ClassNotFoundException | InstantiationException e) {
            e.printStackTrace();
            // 在实际应用中,你可能需要更优雅地处理这些异常,比如向客户端发送错误消息
        } finally {
            try {
                socket.close(); // 关闭socket
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        Server server = new Server();
        try {
            server.startServer(9999);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

注意:上面的handleClient方法中的请求解析和响应发送是伪代码,实际中你需要定义一种协议来序列化方法名、参数和返回值,并在客户端和服务端之间传输。

步骤 3: 实现客户端

客户端需要有一个stub来代理服务端的调用。

public class HelloServiceStub implements HelloService {

    private Socket socket;

    public HelloServiceStub(String host, int port) throws IOException {
        socket = new Socket(host, port);
    }

    @Override
    public String sayHello(String name) {
        try (BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
             PrintWriter out = new PrintWriter(socket.getOutputStream(), true)) {

            out.println(this.getClass().getInterfaces()[0].getName());// 简单实现接口类名的传递
            out.println("sayHello");
            out.println(name);

            // 读取响应
            String result = in.readLine();
            return result;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    // 关闭连接的方法(可选)
    public void close() throws IOException {
        socket.close();
    }
}

步骤 4: 测试RPC

现在你可以启动服务端,并使用客户端来调用服务了。

public class Client {
    public static void main(String[] args) throws IOException {
        HelloService client = new HelloServiceStub("localhost", 9999);
        System.out.println(client.sayHello("World"));
    }

}

注意:上面的代码示例非常基础,并且省略了很多重要的细节,比如异常处理、多线程安全、序列化/反序列化机制的选择(Java自带的序列化通常不推荐用于生产环境)、连接管理(如连接池)、协议定义等。在实际应用中,你可能需要使用更成熟的库(如gRPC、Apache Thrift、Apache Dubbo等)来构建RPC系统。

三、gRPC

gRPC 是一个高性能、开源和通用的 RPC(远程过程调用)框架,由 Google 主导开发。它支持多种编程语言,并且基于 HTTP/2 协议设计,以提供低延迟和高吞吐量的通信。gRPC 使用 Protocol Buffers 作为接口定义语言(IDL),这使得它非常适合于需要高效数据序列化和反序列化的场景。

gRPC 的主要特点

  1. 跨语言支持:gRPC 支持多种编程语言,包括 C++, Java, Go, Python, Ruby, C#, Node.js 等,使得不同语言编写的服务可以轻松地相互调用。

  2. 基于 Protocol Buffers:gRPC 使用 Protocol Buffers 作为其接口定义语言和消息格式。Protocol Buffers 是一种轻便高效的结构化数据存储格式,可以用于结构化数据序列化,非常适合于 RPC 和数据存储。

  3. HTTP/2 支持:gRPC 基于 HTTP/2 协议设计,支持多种高级特性,如服务器推送、头部压缩、多路复用等,这些特性有助于减少网络延迟和提高吞吐量。

  4. 流式传输:gRPC 支持四种服务类型,包括一元 RPC(请求和响应都是单个消息)、服务器流式 RPC(客户端发送一个请求,服务器返回一系列消息)、客户端流式 RPC(客户端发送一系列消息,服务器返回一个响应)和双向流式 RPC(客户端和服务器都可以发送一系列消息)。

  5. 认证和安全性:gRPC 提供了与 SSL/TLS 的集成,支持在传输层上加密数据。此外,它还可以与各种认证机制(如 OAuth 2.0、JWT 等)集成,以确保服务的安全性。

gRPC 的使用场景

  • 微服务架构:在微服务架构中,服务之间的通信是不可避免的。gRPC 提供了一种高效、跨语言的通信方式,有助于构建松耦合、高可用的微服务系统。

  • 游戏开发:对于需要低延迟和高吞吐量的游戏来说,gRPC 是一个很好的选择。它可以帮助游戏服务器和客户端之间实现快速、可靠的数据传输。

  • 实时数据处理:在需要实时处理大量数据的场景中(如金融交易系统、实时分析平台等),gRPC 的高性能和流式传输特性非常有用。

gRPC 的使用

1.添加相关包:在pom文件中加入依赖

<!--grpc依赖-->
<dependency>
    <groupId>io.grpc</groupId>
    <artifactId>grpc-all</artifactId>
    <version>1.62.2</version>
</dependency>

<!--grpc插件-->
<plugin>
    <groupId>org.xolstice.maven.plugins</groupId>
    <artifactId>protobuf-maven-plugin</artifactId>
    <version>0.6.1</version>
    <configuration>
        <protocArtifact>com.google.protobuf:protoc:3.25.0:exe:${os.detected.classifier}</protocArtifact>
        <pluginId>grpc-java</pluginId>
        <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.64.0:exe:${os.detected.classifier}</pluginArtifact>
        <!--默认值-->
        <protoSourceRoot>${project.basedir}/src/main/proto</protoSourceRoot>
        <!--默认值-->
        <!--<outputDirectory>${project.build.directory}/generated-sources/protobuf/java</outputDirectory>-->
        <outputDirectory>${project.basedir}/src/main/java</outputDirectory>
        <!--设置是否在生成java文件之前清空outputDirectory的文件,默认值为true,设置为false时也会覆盖同名文件-->
        <clearOutputDirectory>false</clearOutputDirectory>
    </configuration>
    <executions>
        <execution>
            <!--在执行mvn compile的时候会执行以下操作-->
            <phase>compile</phase>
            <goals>
                <!--生成OuterClass类-->
                <goal>compile</goal>
                <!--生成Grpc类-->
                <goal>compile-custom</goal>
            </goals>
        </execution>
    </executions>
</plugin>

此时maven插件处就会看到protobuf

2.定义服务:在 .proto 文件中定义服务和消息。

syntax = "proto3";

option java_package = "com.example.springlearning.service";
option java_outer_classname = "HelloWorldProto";
option java_multiple_files = false;

package helloworld;

service HelloWorldService {
  // SayHello 客户端调用服务端使用的方法
  // HelloRequest 请求的消息体
  // HelloReply 返回的消息体
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}

message HelloRequest {
  string name = 1; // 1:代表第一个参数
}

message HelloReply {
  string message = 1;
}

3.生成代码:使用 Protocol Buffers 编译器(protoc)和 gRPC 插件生成 Java 代码。

protoc --java_out=./src/main/java -I./src/main/proto ./src/main/proto/helloworld.proto

或者

此时就生成了java代码,如下:

3.实现服务端

  • 添加依赖

<!-- 添加grpc服务端依赖 -->
<dependency>
    <groupId>net.devh</groupId>
    <artifactId>grpc-server-spring-boot-starter</artifactId>
    <version>3.1.0.RELEASE</version>
</dependency>
  • 配置服务端rpc端口

grpc.server.port=8081
  • 编写服务端代码

public class HelloWorldServiceImpl extends HelloWorldServiceGrpc.HelloWorldServiceImplBase{

    @Override
    public void sayHello(HelloWorldProto.HelloRequest req, StreamObserver<HelloWorldProto.HelloReply> responseObserver) {
        HelloWorldProto.HelloReply reply = HelloWorldProto.HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
        responseObserver.onNext(reply);
        responseObserver.onCompleted();
    }
}
  • 启动服务端

4.实现客户端

public class HelloWorldClientTest {

    @Test
    public void sayHello(){
        ManagedChannel channel = ManagedChannelBuilder.forTarget("localhost:8081").usePlaintext().build();
        HelloWorldServiceGrpc.HelloWorldServiceBlockingStub stub = HelloWorldServiceGrpc.newBlockingStub(channel);
        HelloWorldProto.HelloReply reply = stub.sayHello(HelloWorldProto.HelloRequest.newBuilder().setName("中国").build());
        System.out.println("reply:"+reply);
        channel.shutdown();
    }

0

评论区