RPC概览
所谓RPC,指的是远程服务调用。抽离所有技术细节,其大致流程不外乎如下几个部分:
- 接口定义
- 服务方实现接口
- 指定服务方端口和host,服务方服务器运行
- 服务方的服务器通过反射的方式调用指定方法并回参
- 消费方调用接口定义
- 消费方通过动态代理在运行时把接口方法的调用转变成远程调用服务方,这个过程有自定义传输协议也有HTTP协议(很少)
- 消费方拿到回执,结束
背景
今天我们会开一个新坑,手写一个RPC框架。这个坑会分成好几季,这是第一季。本文会协助你开始一个最简单版本的RPC框架,特点有以下几个:
- 可以实现远程调用的功能
- 简单易实现,几乎不需要任何外部依赖(除了common-lang3的一个反射工具和guava的线程池工具)
学习本文你应当有的前置知识:
- maven项目管理技术
- RPC的概念(远程服务调用)
- Java反射技术
- Java Socket技术
本系列绝对不是入门RPC的文章,还是有那么点,不太好懂的,如果是刚开始学习Java或者springboot的同学请先去看我写的Springboot入门系列和Thrift、Dubbo等现成Rpc框架的使用,有了初步的概念再来看这一篇会好很多。
设计
一个最简单RPC框架应当包含哪些部分呢?
我们先来看一下经典的RPC框架都有什么:服务接口定义,服务提供者,序列化,stub代理,传输协议,注册中心,服务消费者,反序列化。
如果说其中哪些部分可以暂时放一放,或者用简化方案代替,那必然是序列化、反序列化、传输协议,不是说这些不需要,而是可以用现成的方案,不需要自己造轮子(当然如果你要实现商用闭源的RPC框架那肯定要自己设计协议)。其实服务接口定义也可以不要,如果你只是做一个测试方法比如sayHello,那要不要接口真的不重要。
所以,最终我们知道,服务提供方和服务消费方以及实现socket传输的部分是绝对必要的。那么我们可以这样简化:
Service Api -> Consumer Proxy <———socket传输———> Provider Reflect -> Service Impl
实现
POM
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>28.0-jre</version>
</dependency>
</dependencies>
服务定义
package service;
public interface HelloService {
/**
* 服务接口
*
* @param content
* @return
*/
public String sayHello(String content);
}
服务提供方
package service;
public class HelloServiceImpl implements HelloService {
@Override
public String sayHello(String content) {
return String.format("hello, %s", content);
}
}
服务提供代理类
package framework;
/**
* @Desc
**/
public class ProviderReflect {
private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
5,
200,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024),
new ThreadFactoryBuilder().setNameFormat("test-pool-%d").build(),
new ThreadPoolExecutor.AbortPolicy());
public static void provider(final Object service, int port) throws Exception {
ServerSocket serverSocket = new ServerSocket(port);
while (true) {
final Socket socket = serverSocket.accept();
EXECUTOR_SERVICE.execute(() -> {
try {
ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
try {
try {
String methodName = input.readUTF();
Object[] arguments = (Object[]) input.readObject();
ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
try {
Object result = MethodUtils.invokeExactMethod(service, methodName, arguments);
output.writeObject(result);
} catch (Throwable t) {
output.writeObject(t);
} finally {
output.close();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
input.close();
}
} finally {
socket.close();
}
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
}
服务启动器
public class RpcProviderMain {
public static void main(String[] args) throws Exception {
HelloService service = new HelloServiceImpl();
ProviderReflect.provider(service, 10020);
}
}
消费方代理
package framework;
/**
* @Desc
**/
public class ConsumerProxy {
@SuppressWarnings("unchecked")
public static <T> T consume(final Class<T> interfaceClass, final String host, final int port) {
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[]{interfaceClass},
(proxy, method, args) -> {
try (Socket socket = new Socket(host, port)) {
try (ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream())) {
output.writeUTF(method.getName());
output.writeObject(args);
try (ObjectInputStream input = new ObjectInputStream(socket.getInputStream())) {
Object result = input.readObject();
if (result instanceof Throwable) {
throw (Throwable) result;
}
return result;
}
}
}
});
}
}
服务消费方启动器
package invoke;
/**
* @Desc
**/
public class RpcConsumerMain {
public static void main(String[] args) throws Exception {
HelloService service = ConsumerProxy.consume(HelloService.class, "127.0.0.1", 10020);
for(int i = 0; i < 1000; ++i) {
String hello = service.sayHello(String.format("test_%d", i));
System.out.println(hello);
Thread.sleep(1000);
}
}
}
项目结构
代码重点说明
- 使用时先启动服务方,后启动消费方即可
- 服务方本质是通过while(true)让服务方一直停在socket.accept的阶段,只要收到新的socket连接就进行相应处理。处理方式是通过MethodUtils工具,以反射的方式调用服务中指定名称的方法
- 消费方本质上是通过动态代理HelloService接口,在运行时实际去调用invoke来实现该接口。在本例中就是用socket进行连接,在运行时把方法名、参数值发送到服务方,然后用输入流拿到回参。这样,一个自定义RPC的闭环就完成了
结论
虽然本例中借口定义、数据传输都非常简陋,但实际上已经具备了RPC的基本要素,服务提供,服务消费,和远程调用,这个例子将是我们这个系列的起点
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/153507.html