欢迎加入RPC学习分享(Java)社区~
https://bbs.csdn.net/forums/RPC_Study?typeId=112251
代码免费下载地址 手写RPC框架代码(带注释)-Java文档类资源-CSDN下载
文末有使用 Spring + Netty + Protostuff + ZooKeeper 做的轻量级分布式框架,是“手写RPC框架”的进阶版。
如果对RPC的实现和性能优化感兴趣,可以看看专栏 RPC里面的内容,欢迎技术交流~
目录
代码免费下载地址 手写RPC框架代码(带注释)-Java文档类资源-CSDN下载https://download.csdn.net/download/weixin_40757930/81142672
RPC是什么?
RPC是远程过程调用(Remote Procedure Call)的缩写形式。通俗的理解就是我调用了一个函数func(args), 只不过这个func不在本地,需要远程访问这个函数,把我的args(输入参数)通过网络发送给这个函数所在的机子,然后由这个机子调用func(args),再将返回值通过网络发送回来。
应用场景
其实如果我们开发简单的应用,是用不着 RPC的。当我们的应用访问量增加和业务增加时,发现单机已无法承受,此时可以根据不同的业务(划分清楚业务逻辑)拆分成几个互不关联的应用,分别部署在不同的机器上,此时可能也不需要用到 RPC 。
随着我们的业务越来越多,应用也越来越多,应用与应用相互关联调用,发现有些功能已经不能简单划分开,此时可能就需要用到 RPC。
比如,我们开发电商系统,需要拆分出用户服务、商品服务、优惠券服务、支付服务、订单服务、物流服务、售后服务等等,这些服务之间都相互调用,这时内部调用最好使用 RPC ,同时每个服务都可以独立部署,独立上线。
也就说当我们的项目太大,需要解耦服务,扩展性强、部署灵活,这时就要用到 RPC ,主要解决了分布式系统中,服务与服务之间的调用问题。
RPC 优点
- 跨语言(C++、PHP、Java、Python …)
- 协议私密,安全性较高
- 数据传输效率高
- 支持动态扩展
写RPC框架需要具备哪些知识?
RPC原理(摘自:什么情况下使用 RPC ? – 知乎)
RPC 架构主要包括三部分:
- 服务注册中心(Registry),负责将本地服务发布成远程服务,管理远程服务,提供给服务消费者使用。
- 服务提供者(Server),提供服务接口定义与服务实现类。
- 服务消费者(Client),通过远程代理对象调用远程服务。
服务提供者启动后主动向服务注册中心(Registry)注册机器IP、端口以及提供的服务列表;
服务消费者启动时向服务注册中心(Registry)获取服务提供方地址列表。
服务注册中心(Registry)可实现负载均衡和故障切换。
Netty框架
原理图中的注册中心+生产者可以理解为一个 服务器,而消费者就是一个客户端。
在代码中需要先启动注册中心并注册服务,然后等待客户端请求对应的服务,所以需要用到通信框架,这里采用Netty。
具体代码
代码主要是基于RPC调用了两个类的函数,一个是RpcHelloServiceImpl的hello函数,另一个是RpcServiceImpl的加减乘除函数。
public class RpcConsumer {
public static void main(String [] args){
// 如果想要使用RpcHelloServiceImpl中的hello方法
// 和 RpcServiceImpl中的4个算术方法
// 可以直接新建对应的对象并使用 也可以是远程调用
// 本地调用示例
System.out.println("本地调用示例");
IRpcHelloService LpcHello = new RpcHelloServiceImpl();
System.out.println(LpcHello.hello("Tom老师"));
IRpcService LpcService = new RpcServiceImpl();
System.out.println("8 + 2 = " + LpcService.add(8, 2));
System.out.println("8 - 2 = " + LpcService.sub(8, 2));
System.out.println("8 * 2 = " + LpcService.mult(8, 2));
System.out.println("8 / 2 = " + LpcService.div(8, 2));
System.out.println("=======================");
// 远程调用示例
System.out.println("远程调用示例");
IRpcHelloService RpcHello = RpcProxy.create(IRpcHelloService.class);
System.out.println(RpcHello.hello("Tom老师"));
IRpcService RpcService = RpcProxy.create(IRpcService.class);
System.out.println("8 + 2 = " + RpcService.add(8, 2));
System.out.println("8 - 2 = " + RpcService.sub(8, 2));
System.out.println("8 * 2 = " + RpcService.mult(8, 2));
System.out.println("8 / 2 = " + RpcService.div(8, 2));
}
}
代码目录
参考书籍:《Netty4核心原理与手写rpc实战》
api
public interface IRpcHelloService {
String hello(String name);
}
public interface IRpcService {
/** 加 */
public int add(int a,int b);
/** 减 */
public int sub(int a,int b);
/** 乘 */
public int mult(int a,int b);
/** 除 */
public int div(int a,int b);
}
provider
// 类名 RpcHelloServiceImpl 函数名 hello
// 形参列表 name 实参列表 张三
public class RpcHelloServiceImpl implements IRpcHelloService {
public String hello(String name) {
return "Hello " + name + "!";
}
}
public class RpcServiceImpl implements IRpcService {
public int add(int a, int b) {
return a + b;
}
public int sub(int a, int b) {
return a - b;
}
public int mult(int a, int b) {
return a * b;
}
public int div(int a, int b) {
return a / b;
}
}
consumer
public class RpcConsumer {
public static void main(String [] args){
// 如果想要使用RpcHelloServiceImpl中的hello方法
// 和 RpcServiceImpl中的4个算术方法
// 可以直接新建对应的对象并使用 也可以是远程调用
// 本地调用示例
System.out.println("本地调用示例");
IRpcHelloService LpcHello = new RpcHelloServiceImpl();
System.out.println(LpcHello.hello("Tom老师"));
IRpcService LpcService = new RpcServiceImpl();
System.out.println("8 + 2 = " + LpcService.add(8, 2));
System.out.println("8 - 2 = " + LpcService.sub(8, 2));
System.out.println("8 * 2 = " + LpcService.mult(8, 2));
System.out.println("8 / 2 = " + LpcService.div(8, 2));
System.out.println("=======================");
// 远程调用示例
System.out.println("远程调用示例");
IRpcHelloService RpcHello = RpcProxy.create(IRpcHelloService.class);
System.out.println(RpcHello.hello("Tom老师"));
IRpcService RpcService = RpcProxy.create(IRpcService.class);
System.out.println("8 + 2 = " + RpcService.add(8, 2));
System.out.println("8 - 2 = " + RpcService.sub(8, 2));
System.out.println("8 * 2 = " + RpcService.mult(8, 2));
System.out.println("8 / 2 = " + RpcService.div(8, 2));
}
}
proxy
public class RpcProxyHandler extends ChannelInboundHandlerAdapter {
// 入站处理器
private Object response;
public Object getResponse() {
return response;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
response = msg;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("client exception is general");
}
}
public class RpcProxy {
// PRC代理类
public static <T> T create(Class<?> clazz){
// 返回这个实例 方便客户端使用
MethodProxy proxy = new MethodProxy(clazz);
Class<?> [] interfaces = clazz.isInterface() ? new Class[]{clazz} : clazz.getInterfaces();// 如果clazz.isInterface() 就找到所有的实现类 否则就直接使用该接口
T result = (T) Proxy.newProxyInstance(clazz.getClassLoader(),interfaces,proxy);
return result;
}
// new MethodProxy(IRpcHelloService.class);
private static class MethodProxy implements InvocationHandler {
private Class<?> clazz; //clazz = IRpcHelloService.class
public MethodProxy(Class<?> clazz){
this.clazz = clazz;//IRpcHelloService.class
}
// method:hello
// args: Tom老师
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//如果传进来是一个已实现的具体类(本次演示略过此逻辑)
if (Object.class.equals(method.getDeclaringClass())) {
try {
System.out.println("LPC");
return method.invoke(this, args);
} catch (Throwable t) {
t.printStackTrace();
}
//如果传进来的是一个接口(核心)
} else {
System.out.println("RPC");
return rpcInvoke(proxy, method, args);
}
return null;
}
/**
* 实现接口的核心方法
* 把调用的方法封装为消息 与服务器建立连接 发送消息得 在服务器上调用该方法 返回对应的值
* @param method
* @param args
* @return
*/
public Object rpcInvoke(Object proxy,Method method,Object[] args){
//传输协议封装 调用的方法 参数 成一个InvokerProtocol 对象 方便后面编解码
InvokerProtocol msg = new InvokerProtocol();
msg.setClassName(this.clazz.getName()); //IRpcHelloService
msg.setMethodName(method.getName()); //hello
msg.setParames(method.getParameterTypes()); // string
msg.setValues(args); // zhangsan
final RpcProxyHandler consumerHandler = new RpcProxyHandler();
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//对象参数类型编码器
pipeline.addLast("encoder", new ObjectEncoder());
//对象参数类型解码器
pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
pipeline.addLast("handler",consumerHandler);
}
});
ChannelFuture future = b.connect("localhost", 8080).sync();
future.channel().writeAndFlush(msg).sync();// 建立连接之后进行编解码 然后传输 等待结果
future.channel().closeFuture().sync();// 在这里等待closeFuture
} catch(Exception e){
e.printStackTrace();
}finally {
group.shutdownGracefully();// 最后关闭这个请求连接
}
return consumerHandler.getResponse();// 在建立连接的时候已经将响应存储在了consumerHandler中
}
}
}
registry
public class RegistryHandler extends ChannelInboundHandlerAdapter {
//用来保存所有可用的服务
public static ConcurrentHashMap<String, Object> registryMap = new ConcurrentHashMap<String,Object>();
//保存所有相关的服务类
private List<String> classNames = new ArrayList<>();
public RegistryHandler(){
//递归扫描provider文件夹,其中所有服务的全类名都被保存在了classNames这个链表中
// 比如com.gupaoedu.vip.netty.rpc.provider.RpcHelloServiceImpl
scannerClass("com.gupaoedu.vip.netty.rpc.provider");
//将classNames这个链表中的服务 注册到registryMap中
doRegister();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Object result = new Object();
InvokerProtocol request = (InvokerProtocol)msg;
//当客户端建立连接时,需要从自定义协议中获取信息,拿到具体的服务和实参
//使用反射调用
if(registryMap.containsKey(request.getClassName())){
Object clazz = registryMap.get(request.getClassName());
Method method = clazz.getClass().getMethod(request.getMethodName(), request.getParames());
result = method.invoke(clazz, request.getValues());
// clazz.method( request.getParames()(type) request.getValues())
}
ctx.write(result);
ctx.flush();
ctx.close();// 关闭该通道
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
/*
* 递归扫描
*/
private void scannerClass(String packageName){
URL url = this.getClass().getClassLoader().getResource(packageName.replaceAll("\\.", "/"));
File dir = new File(url.getFile());
for (File file : dir.listFiles()) {
//如果是一个文件夹,继续递归
if(file.isDirectory()){
scannerClass(packageName + "." + file.getName());
}else{
classNames.add(packageName + "." + file.getName().replace(".class", "").trim());
}
}
}
/**
* 完成注册
*/
private void doRegister(){
if(classNames.size() == 0){ return; }
for (String className : classNames) {
try {
Class<?> clazz = Class.forName(className); // 实现类
Class<?> i = clazz.getInterfaces()[0]; // 上层接口
registryMap.put(i.getName(), clazz.newInstance());// 类名 服务名 和 对应的服务实例
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
public class RpcRegistry {
/*
新建一个注册中心并启动
*/
private int port; // 服务发布的端口 例如8080
public RpcRegistry(int port){
this.port = port;
}
/*
利用netty新建一个服务器(注册中心) 启动该服务器并监听port端口
*/
public void start(){
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 非阻塞的事件循环组 里面有多个事件循环 bossGroup负责
EventLoopGroup workerGroup = new NioEventLoopGroup();
ChannelInitializer<SocketChannel> channelInitializer = new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {// 信道初始化 主要是在该信道中添加一些处理器
ChannelPipeline pipeline = ch.pipeline();
//对象参数类型编码器
pipeline.addLast("encoder", new ObjectEncoder()); // netty中的编码器 把对象转化为Byte 然后进行网络传输
//对象参数类型解码器 ObjectDecoder extends LengthFieldBasedFrameDecoder
pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));// netty中的解码器 把Byte转化为对象
// 在信道初始化时就 进行服务的注册
RegistryHandler registryHandler = new RegistryHandler();
pipeline.addLast(registryHandler);
}
};
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(channelInitializer)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture future = b.bind(port).sync(); // 绑定端口
System.out.println("RPC Registry has started and is listening at " + port );
future.channel().closeFuture().sync();
} catch (Exception e) {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new RpcRegistry(8080).start();// 新建一个端口号为8080的注册中心 并启动
}
}
protocol
/**
* 自定义传输协议
*/
@Data
public class InvokerProtocol implements Serializable {
private String className;//类名
private String methodName;//函数名称
private Class<?>[] parames;//形参类型?列表
private Object[] values;//实参列表
// 类名 RpcHelloServiceImpl 函数名 hello
// 形参列表 name 实参列表 张三
}
本地调用和远程调用 产生的对象有什么区别呢?
远程调用产生的是代理对象,本地调用产生
RPC的调用速度如何?
由于RPC涉及到数据的序列化和网络传输,性能不如本地调用LPC,不过瑕不掩瑜。
速度对比:
public class TestLpcRpc {
public static void main(String[] args) {
int num = 100;
Lpc(num);
Rpc(num);
}
public static void Lpc(int num){
long startTime = System.currentTimeMillis(); //获取开始时间
for (int i = 0; i < num; i++) {
IRpcHelloService LpcHello = new RpcHelloServiceImpl();
LpcHello.hello("Tom老师");
}
long endTime = System.currentTimeMillis(); //获取结束时间
System.out.println("Lpc程序运行时间:" + (endTime - startTime) + "ms");
}
public static void Rpc(int num){
long startTime = System.currentTimeMillis(); //获取开始时间
for (int i = 0; i < num; i++) {
IRpcHelloService RpcHello = RpcProxy.create(IRpcHelloService.class);
RpcHello.hello("Tom老师");
}
long endTime = System.currentTimeMillis(); //获取结束时间
System.out.println("Rpc程序运行时间:" + (endTime - startTime) + "ms");
}
}
代码的github地址为 GitHub – VeniVeci/RpcLearnhttps://github.com/VeniVeci/RpcLearn
代码免费下载地址 手写RPC框架代码(带注释)-Java文档类资源-CSDN下载
严格来说,这只能叫做RPC调用,RPC框架除了RPC调用之外还应该有更丰富的功能,比如 像dubbo的高性能NIO通讯及多协议集成,服务动态寻址与路由,软负载均衡与容错,依赖分析与降级等等。
在学习RPC框架的时候发现了一个很不错的项目,以下是我在学习这个项目的过程中遇到的疑问和问题的答案。
轻量级分布式RPC框架
《轻量级分布式 RPC 框架》笔记 (源码+注释)_trigger的博客-CSDN博客
Spring + Netty + Protostuff + ZooKeeper
使用Netty构建RPC服务器和客户端;
使用Protostuff将请求和响应消息序列化;
使用ZooKeeper做服务注册和服务发现。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/92882.html