想要手写出一个东西我们肯定要先知道一个东西的原理,我们先看看一个RPC框架需要哪些东西。
如何调用他人的远程服务?
由于各服务部署在不同机器,服务间的调用免不了网络通信过程,服务消费方每调用一个服务都要写一坨网络通信相关的代码,不仅复杂而且极易出错。要让网络通信细节对使用者透明,我们需要对通信细节进行封装,我们先看下一个RPC调用的流程涉及到哪些通信细节:
-
服务消费方(client)调用以本地调用方式调用服务; -
client stub接收到调用后负责将方法、参数等组装成能够进行网络传输的消息体; -
client stub找到服务地址,并将消息发送到服务端; -
server stub收到消息后进行解码; -
server stub根据解码结果调用本地的服务; -
本地服务执行并将结果返回给server stub; -
server stub将返回结果打包成消息并发送至消费方; -
client stub接收到消息,并进行解码; -
服务消费方得到最终结果。
RPC的目标就是要2~8这些步骤都封装起来,让用户对这些细节透明。
1. 怎么做到透明化远程服务调用?
怎么封装通信细节才能让用户像以本地调用方式调用远程服务呢?对java来说就是使用代理!java代理有两种方式:1) jdk 动态代理;2)字节码生成。尽管字节码生成方式实现的代理更为强大和高效,但代码维护不易,大部分公司实现RPC框架时还是选择动态代理方式。我们这个最简易版的自然也是采用动态代理的方式。
2. 怎么对消息进行编码和解码?
2.1 确定消息数据结构
-
接口名称:在我们的例子里接口名是“HelloWorldService”,如果不传,服务端就不知道调用哪个接口了; -
方法名:一个接口内可能有很多方法,如果不传方法名服务端也就不知道调用哪个方法; -
参数类型&参数值参数类型有很多,比如有bool、int、long、double、string、map、list,甚至如struct(class)以及相应的参数值;超时时间
2.2 序列化
从RPC的角度上看,主要看三点:1)通用性,比如是否能支持Map等复杂的数据结构;2)性能,包括时间复杂度和空间复杂度,由于RPC框架将会被公司几乎所有服务使用,如果序列化上能节约一点时间,对整个公司的收益都将非常可观,同理如果序列化上能节约一点内存,网络带宽也能省下不少;3)可扩展性,对互联网公司而言,业务变化飞快,如果序列化协议具有良好的可扩展性,支持自动增加新的业务字段,而不影响老的服务,这将大大提供系统的灵活度。我们的是最简易版所以就采用了jdk序列化的方式来处理。
开始撸代码
-
初始化工程
首先创建2个项目分别是server和client;server项目下两个模块分别是rpc-server-api和rpc-server-provider。
为什么server项目要创建两个模块?
client在调用服务端的服务时需要知道服务端的一些信息,client可以依赖于这个模块。我们的项目中SDK和契约包就是提供了这个功能。而真正的实现是放在rpc-server-provider中。
-
rpc-server-api
public interface IHelloService {
String sayHello(String content);
String saveUser(User user);
}
请求参数类
private String className;
private String methodName;
private Object[] parameters;
-
rpc-server-provider
首先rpc-server-provider是依赖rpc-server-api的。我们写一个实现类,来实现api中定义的接口。
public class HelloServiceImpl implements IHelloService{
@Override
public String sayHello(String content) {
System.out.println("request in sayHello:"+content);
return "Say Hello:"+content;
}
}我这么写好了实现远程要怎么才能调用的到呢?我们还需把服务暴露出去,那就需要一个服务暴露的方法。这里就是不断去接受请求,每一个socket交给一个processorHandler来处理。
public class RpcProxyServer {
ExecutorService executorService = Executors.newCachedThreadPool();
public void publisher(Object service, int port) {
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(port);
while (true) {//不断接受请求
Socket socket = serverSocket.accept();//BIO
//每一个socket 交给一个processorHandler来处理
executorService.execute(new ProcessorHandler(socket, service));
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (serverSocket != null) {
try {
serverSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}具体processorHandler的代码,从socket中获取请求对象,并是使用请求对象调用本服务方法,同时返回方法执行结果,将返回结果写入socket中。
public class ProcessorHandler implements Runnable {
private Socket socket;
private Object service;
public ProcessorHandler(Socket socket, Object service) {
this.socket = socket;
this.service = service;
}
@Override
public void run() {
try (InputStream inputStream = socket.getInputStream();
ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) {
//输入流中应该有什么东西?
//请求哪个类,方法名称、参数
RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
Object result = invoke(rpcRequest); //反射调用本地服务
objectOutputStream.writeObject(result);
objectOutputStream.flush();
} catch (IOException | ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
e.printStackTrace();
}
}
private Object invoke(RpcRequest request) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
//反射调用
Object[] args = request.getParameters(); //拿到客户端请求的参数
Class<?>[] types = new Class[args.length]; //获得每个参数的类型
for (int i = 0; i < args.length; i++) {
types[i] = args[i].getClass();
}
Class clazz = Class.forName(request.getClassName()); //跟去请求的类进行加载
Method method = clazz.getMethod(request.getMethodName(), types); //sayHello, saveUser找到这个类中的方法
return method.invoke(service, args);
}
}大功告成,把服务发布出去。
/**
* Hello world!
*
*/
public class App {
public static void main( String[] args ){
IHelloService helloService=new HelloServiceImpl();
RpcProxyServer proxyServer=new RpcProxyServer();
// 发布到8080端口
proxyServer.publisher(helloService,8080);
}
} -
客户端代码开撸。我们现在在客户端依赖了服务端的api(SDK、契约包)如何才能实现调用远程方法呢?类似于服务端代理类。
public class RpcProxyClient {
public <T> T clientProxy(final Class<T> interfaceCls,final String host,final int port){
return (T)Proxy.newProxyInstance(interfaceCls.getClassLoader(),
new Class<?>[]{interfaceCls},new RemoteInvocationHandler(host,port));
}
}public class RemoteInvocationHandler implements InvocationHandler {
private String host;
private int port;
public RemoteInvocationHandler(String host, int port) {
this.host = host;
this.port = port;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//请求数据的包装
RpcRequest rpcRequest=new RpcRequest();
rpcRequest.setClassName(method.getDeclaringClass().getName());
rpcRequest.setMethodName(method.getName());
rpcRequest.setParameters(args);
//远程通信
RpcNetTransport netTransport=new RpcNetTransport(host,port);
Object result=netTransport.send(rpcRequest);
return result;
}
}处理网络传输的类
public class RpcNetTransport {
private String host;
private int port;
public RpcNetTransport(String host, int port) {
this.host = host;
this.port = port;
}
public Object send(RpcRequest request) {
Object result = null;
try (//建立连接
Socket socket = new Socket(host, port);
//网络socket
ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream())) {
outputStream.writeObject(request); //序列化()
outputStream.flush();
result = inputStream.readObject();
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
}
return result;
}最后使用这个远程调用:
public class App {
public static void main(String[] args) {
RpcProxyClient rpcProxyClient = new RpcProxyClient();
IHelloService iHelloService = rpcProxyClient.clientProxy(IHelloService.class,"localhost",8080);
}
}
最后画一张来总结一下整个流程
原文始发于微信公众号(三不猴子):手写一个RPC框架
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/71581.html