基于Netty实现RPC框架

笔者以前写过一篇文章《摊牌了,我要手写一个 RPC》,当时实现的比较粗糙,底层基于 JDK 的阻塞 IO 进行通信。刚好最近在看 Netty,于是想重构一下,用 Netty 来作为底层的 RPC 通信框架。

1. 项目大纲

项目取名「nrpc」,n 取自 Netty 的首字母,项目结构如下:

nrpc
|--- common [公共模块]
|--- example [使用示例]
|--- rpc [RPC调用核心模块]
|--- serialization [序列化模块]

项目特点:

  1. 基于 Netty 通信的 RPC 框架。
  2. 自定义的 RPC 通信协议。
  3. 基于 JDK 动态代理交互。
  4. Kryo 序列化。


代码已上传至 Gitee:https://gitee.com/panchanghe/nrpc


1.1 项目依赖

项目依赖比较简单:

  1. hutool 工具包
  2. netty 肯定要有了。
  3. kryo 做序列化用的。
<dependency>
  <groupId>cn.hutool</groupId>
  <artifactId>hutool-all</artifactId>
  <version>5.4.4</version>
</dependency>

<dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-all</artifactId>
  <version>4.1.50.Final</version>
</dependency>

<dependency>
  <groupId>com.esotericsoftware</groupId>
  <artifactId>kryo</artifactId>
  <version>4.0.0</version>
</dependency>


1.2 使用示例

通过调用UserService.getById()方法来测试 RPC 调用是否正常,消费者随便传一个 userId,生产者会 Mock 一个用户数据并响应。

public interface UserService {

 User getById(Long userId);
}

@Service
public class UserServiceImpl implements UserService {
 static String[] names = new String[]{"不闻不问""逆天而行""寒风凛凛""穷凶极恶""苍白的脸"};
 static int[] ages = new int[]{1823565866};

 @Override
 public User getById(Long userId) {
  System.err.println("接收请求:userId = " + userId);
  User user = new User();
  user.setId(userId);
  user.setName(names[ThreadLocalRandom.current().nextInt(names.length)]);
  user.setAge(ages[ThreadLocalRandom.current().nextInt(ages.length)]);
  user.setBirthday(LocalDate.now());
  return user;
 }
}

RPC 调用示例:RpcCallExample

public class RpcCallExample {

 static class Provider {
  public static void main(String[] args) {
   new RpcServer(9999).start();
   System.err.println("生产者启动...");
  }
 }

 static class Consumer {
  public static void main(String[] args) {
   RpcClientFactory.init(new String[]{"127.0.0.1"}, new int[]{9999});

   UserService userService = RpcProxyFactory.getProxy(UserService.class);
   for (int i = 1; i <= 10; i++) {
    System.err.println(userService.getById(Long.valueOf(i)));
   }
  }
 }
}

先启动生产者,再启动消费者,消费者会发出 10 次调用。

生产者控制台输出:

生产者启动...
接收请求:userId = 1
接收请求:userId = 2
接收请求:userId = 3
接收请求:userId = 4
接收请求:userId = 5
接收请求:userId = 6
接收请求:userId = 7
接收请求:userId = 8
接收请求:userId = 9
接收请求:userId = 10

消费者控制台输出:

User{id=1, name='穷凶极恶', age=23, birthday=2021-06-18}
User{id=2, name='穷凶极恶', age=58, birthday=2021-06-18}
User{id=3, name='苍白的脸', age=56, birthday=2021-06-18}
User{id=4, name='寒风凛凛', age=58, birthday=2021-06-18}
User{id=5, name='不闻不问', age=18, birthday=2021-06-18}
User{id=6, name='不闻不问', age=23, birthday=2021-06-18}
User{id=7, name='寒风凛凛', age=23, birthday=2021-06-18}
User{id=8, name='苍白的脸', age=56, birthday=2021-06-18}
User{id=9, name='不闻不问', age=58, birthday=2021-06-18}
User{id=10, name='苍白的脸', age=23, birthday=2021-06-18}

2. 框架实现

2.1 自定义注解

先创建两个注解,代表暴露服务和引用服务。

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface Service {

}

public @interface Reference {

}

2.2 Kryo 序列化

定义 ObjectInput 和 ObjectOutput 接口,它俩的作用是可以将 Java 对象和字节序列之间进行转换。

public interface ObjectInput {

 <T> readObject(byte[] bytes, Class<T> clazz);
}

public interface ObjectOutput {

 byte[] writeObject(Object o);
}

使用 Kryo 序列化的实现:KryoObjectInput 和 KryoObjectOutput:

public class KryoObjectInput implements ObjectInput {
 public static final KryoObjectInput INSTANCE = new KryoObjectInput();

 private KryoObjectInput(){}

 @Override
 public <T> readObject(byte[] bytes, Class<T> clazz) {
  Input input = new Input(bytes);
  return KryoHolder.get().readObject(input, clazz);
 }
}

public class KryoObjectOutput implements ObjectOutput {
 public static final KryoObjectOutput INSTANCE = new KryoObjectOutput();

 private KryoObjectOutput(){}

 @Override
 public byte[] writeObject(Object o) {
  Output output = new Output(10241024 * 1024 * 10);
  KryoHolder.get().writeObject(output, o);
  return Arrays.copyOf(output.getBuffer(), output.position());
 }
}

要想使用 Kryo 对 Java 对象进行序列化和反序列化,首先需要创建 Kryo 对象,但是由于其创建过程比较重,这里做一个缓存处理,每个线程只使用自己的 Kryo 对象,避免重复创建。

public class KryoHolder {

 private static final ThreadLocal<Kryo> LOCAL = new ThreadLocal<Kryo>(){
  @Override
  protected Kryo initialValue() {
   return new Kryo();
  }
 };

 public static Kryo get(){
  return LOCAL.get();
 }
}

2.3 通信协议

节点之间要通信,那么必然要有通信协议,否则谁都不知道对方在说什么。

例如:节点 A 要调用节点 B 的 UserService 的getById()方法,报文格式应该是什么样的呢?

这里将 TCP 请求的报文拆分成两段:报文头和报文体。

public class RpcProtocol<T{
    // 报文头
 private Header header;
    // 报文体
 private T body;

    // 忽略get、set方法
}

报文头 Header:

public class Header {
    // 默认版本号
 public static final byte DEFAULT_VERSION = 1;

 // 报文长度,解决粘包、拆包
 private int length;
 // 协议的大版本号
 private short largeVersion = DEFAULT_VERSION;
 // 协议的小版本号
 private short smallVersion = DEFAULT_VERSION;
 // 回话ID
 private long sessionId;

    // 忽略get、set方法
}

RPC 请求的报文体:RpcRequest

public class RpcRequest implements Serializable {
 private static final long serialVersionUID = 0L;

 // 接口名
 private String interfaceName;
 // 方法名
 private String methodName;
 // 参数列表
 private Class<?>[] argTypes;
 // 参数
 private Object[] args;

    // 忽略get、set方法
}

RPC 响应的报文体:RpcResponse

public class RpcResponse<T{
 // 本次请求是否成功
 private Boolean success;
 // 提示消息
 private String msg;
 // 响应数据
 private T data;

    // 忽略get、set方法
}

2.4 提取暴露服务

扫描 Class,找到加了@Service注解的服务,提取出来,等待消费者的调用。

MethodContext:服务暴露的方法上下文

// 暴露方法上下文
public class MethodContext {
 // 服务实例
 private Object instance;
 // 方法
 private Method method;

    // 忽略get、set方法
}

Interface:记录方法名和 MethodContext 的映射,方便根据方法名通过反射快速调用方法。

public class Interface extends ConcurrentHashMap<StringMethodContext{
 private static final long serialVersionUID = 0L;

 // 接口名
 private String interfaceName;
 // 实现对象
 private Object instance;

    // 忽略get、set方法
}

ServerHolder:服务的持有者,扫描暴露的服务。

public class ServerHolder {
 private static ConcurrentMap<String,Interface> container;

 static {
  container = ClassUtil.scanPackage("top.javap.nrpc")
    .stream()
    .filter(c -> Objects.nonNull(c.getAnnotation(Service.class)))
    .map(i -> 
{
     try {
      Interface ins = new Interface(i.getInterfaces()[0].getName(), i.newInstance());
      for (Method method : i.getMethods()) {
       ins.put(method.getName(), new MethodContext(ins.getInstance(), method));
      }
      return ins;
     }catch (Exception e){
      e.printStackTrace();
      return null;
     }
    }).collect(Collectors.toConcurrentMap(Interface::getInterfaceName, Function.identity()));
 }

    // 根据接口名+方法名快速找到要调用的目标方法
 public static MethodContext get(String interfaceName, String methodName) {
  return container.get(interfaceName).get(methodName);
 }
}

ServerExecutor:服务的执行器,收到消费者的调用请求后会通过反射调用方法,并返回结果。

public class ServerExecutor {
 private static final ExecutorService executor = Executors.newCachedThreadPool();

 public static void invokeMethod(ChannelHandlerContext ctx,RpcRequest request) {
  executor.submit(() -> {
   try {
    MethodContext methodContext = ServerHolder.get(request.getInterfaceName(), request.getMethodName());
    Object result = methodContext.getMethod().invoke(methodContext.getInstance(), request.getArgs());
    RpcResponse response = new RpcResponse(true"success", result);
    RpcProtocol<RpcResponse> responseRpcProtocol = new RpcProtocol(new Header(1L), response);
    ctx.writeAndFlush(responseRpcProtocol);
   } catch (Exception e) {
    e.printStackTrace();
   }
  });
 }
}

2.5 代理对象

因为我们要通过接口去调用远程服务,所以需要通过 JDK 的动态代理对接口生成代理对象,然后在代理对象中使用 Netty 去和远程节点通信。

RpcProxy:生成 RPC 代理对象

public class RpcProxy<Timplements InvocationHandler {
 // 接口类
 private final Class<T> clazz;
 // 代理对象
 private final Object instance;

 public RpcProxy(Class<T> clazz) {
  this.clazz = clazz;
  instance = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this);
 }

 // 调用接口方法时,会触发该方法
 @Override
 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  // TODO 忽略toString()、hashCode()等方法
  String interfaceName = clazz.getName();
  String methodName = method.getName();
  // 构建RpcRequest
  RpcRequest request = new RpcRequest();
  request.setInterfaceName(interfaceName);
  request.setMethodName(methodName);
  request.setArgTypes(buildArgTypes(args));
  request.setArgs(args);
  // 发起RPC调用,获取结果
  RpcResponse<Object> response = RpcClientFactory.nextClient().request(request);
  return response.getData();
 }

 private Class[] buildArgTypes(Object[] args){
  if (args == null) {
   return null;
  }
  Class[] classes = new Class[args.length];
  for (int i = 0; i < args.length; i++) {
   classes[i] = args[i].getClass();
  }
  return classes;
 }

 public Object getInstance() {
  return instance;
 }
}

RpcProxyFactory:代理对象工厂,缓存代理对象,避免重复生成。

public class RpcProxyFactory {

 private final static ConcurrentMap<Class, Object> container = new ConcurrentHashMap<>();

 public static <T> getProxy(Class<T> clazz) {
  T t = (T) container.get(clazz);
  if (Objects.isNull(t)) {
   synchronized (container) {
    t = (T) container.get(clazz);
    if (Objects.isNull(t)) {
     t = newProxy(clazz);
     container.put(clazz, t);
    }
   }
  }
  return t;
 }

 public static <T> newProxy(Class<T> clazz) {
  return (T) new RpcProxy<T>(clazz).getInstance();
 }
}

2.6 消费者实现

消费者和生产者这两个角色,是站在服务调用的角度看待的,调用方为消费者,被调用方为生产者。实际上,一个节点可以既是生产者,又是消费者。所以每台节点都应当有暴露服务的能力,和调用服务的能力。

RpcClient:RPC 服务的调用者。

public class RpcClient {
 private final EventLoopGroup worker = new NioEventLoopGroup();
 private final String host;
 private final int port;
 //连接池,避免每次发起调用都建立连接
 private ChannelPool channelPool;


 public RpcClient(String host, int port) {
  this.host = host;
  this.port = port;
  start();
 }

 public <T> RpcResponse<T> request(RpcRequest request) {
  Channel channel = null;
  try {
   channel = channelPool.acquire().get();
   long sessionId = SessionIdGenerator.get();
   CompletableFuture future = new CompletableFuture();
   channel.attr(NettyConstant.CALLBACK_KEY).set(future);
   RpcProtocol<RpcRequest> protocol = new RpcProtocol<>(new Header(sessionId), request);
   channel.writeAndFlush(protocol);
   return (RpcResponse<T>) ((RpcProtocol<RpcResponse>) future.get()).getBody();
  } catch (Exception e) {
   e.printStackTrace();
   return null;
  } finally {
   if (channel != null) {
    channelPool.release(channel);
   }
  }
 }

 private void start() {
  Bootstrap bootstrap = new Bootstrap();
  bootstrap
    .group(worker)
    .channel(NioSocketChannel.class)
    .remoteAddress(hostport)
;
  channelPool = new FixedChannelPool(bootstrap, new AbstractChannelPoolHandler() {
   @Override
   public void channelCreated(Channel ch) throws Exception {
    ch.pipeline().addLast(new HalfDecoder());
    ch.pipeline().addLast(new RpcProtocolCodec<RpcResponse>(RpcResponse.class));
    ch.pipeline().addLast(new SimpleChannelInboundHandler<RpcProtocol<RpcResponse>>() {
     @Override
     public void channelRead0(ChannelHandlerContext ctx, RpcProtocol<RpcResponse> protocol) throws Exception {
      ctx.channel().attr(NettyConstant.CALLBACK_KEY).get().complete(protocol);
     }
    });

   }
  }, 100);
 }
}

可能有多个节点可以同时提供服务,提供一个 Factory,随机出一个 RpcClient。

public class RpcClientFactory {
 private static RpcClient[] clients;

 public static void init(String[] hosts, int[] ports) {
  if (hosts.length != ports.length) {
   throw new IllegalArgumentException();
  }
  clients = new RpcClient[hosts.length];
  for (int i = 0; i < hosts.length; i++) {
   clients[i] = new RpcClient(hosts[i], ports[i]);
  }
 }

 public static RpcClient nextClient() {
  return clients[ThreadLocalRandom.current().nextInt(clients.length)];
 }
}

和 Netty 相关的内容单独开一节说明,这里先略过。

2.7 生产者实现

生产者需要暴露服务,接收消费者的请求,执行服务并响应结果。

public class RpcServer {
 private final int port;

 public RpcServer(int port) {
  this.port = port;
 }

 public void start() {
  EventLoopGroup boss = new NioEventLoopGroup(1);
  EventLoopGroup worker = new NioEventLoopGroup();
  RpcRequestHandler requestHandler = new RpcRequestHandler();
  try {
   new ServerBootstrap()
     .group(boss, worker)
     .channel(NioServerSocketChannel.class)
     .childHandler(ServerChannelInitializer.INSTANCE)
     .localAddress(port)
     .bind()
;
  }catch (Exception e){
   e.printStackTrace();
  }
 }
}

ServerChannelInitializer:生产者服务 ChannelHandler 初始化类。

@ChannelHandler.Sharable
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel{
 public static ServerChannelInitializer INSTANCE = new ServerChannelInitializer();

 private ServerChannelInitializer(){}

 @Override
 protected void initChannel(SocketChannel sc) throws Exception {
  ChannelPipeline pipeline = sc.pipeline();
  // 解决粘包、拆包
  pipeline.addLast(new HalfDecoder());
  // RPC请求、响应的编解码器
  pipeline.addLast(new RpcProtocolCodec<RpcRequest>(RpcRequest.class));
  pipeline.addLast(new RpcProtocolCodec<RpcResponse>(RpcResponse.class));
  // RPC请求的处理器
  pipeline.addLast(RpcRequestHandler.INSTANCE);
 }
}

2.8 Netty 相关

2.8.1 解决粘包/拆包

前面说过,在自定义的通信协议中,会在报文头的前 4 个字节记录报文长度,所以直接基于 LengthFieldBasedFrameDecoder 来解决读写半包的问题。

public class HalfDecoder extends LengthFieldBasedFrameDecoder {

    public HalfDecoder() {
        super(1024 * 1024 * 504);
    }
}

2.8.2 RPC 协议编解码器

网络传输的总是字节,因此需要将 Java 协议对象序列化为字节序列,以及反序列化。

public class RpcProtocolCodec<Textends ByteToMessageCodec<RpcProtocol<T>> {
 private final Class<T> decodeClass;

 public RpcProtocolCodec(Class<T> clazz) {
  decodeClass = clazz;
 }

 @Override
 protected void encode(ChannelHandlerContext ctx, RpcProtocol<T> msg, ByteBuf out) throws Exception {
  Header header = msg.getHeader();
  byte[] bytes = KryoObjectOutput.INSTANCE.writeObject(msg.getBody());
  out.writeInt(Constants.DEFAULT_LENGTH);
  out.writeShort(header.getLargeVersion());
  out.writeShort(header.getSmallVersion());
  out.writeLong(header.getSessionId());
  out.writeBytes(bytes);
  // 设置length
  out.setInt(0, out.readableBytes()-4);
 }

 @Override
 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
  Header header = new Header();
  header.setLength(in.readInt());
  header.setLargeVersion(in.readShort());
  header.setSmallVersion(in.readShort());
  header.setSessionId(in.readLong());
  byte[] bytes = new byte[in.readableBytes()];
  in.readBytes(bytes);
  T body = KryoObjectInput.INSTANCE.readObject(bytes, decodeClass);
  RpcProtocol<T> protocol = new RpcProtocol<>(header, body);
  out.add(protocol);
 }
}

2.8.3 RPC 请求处理

生产者在收到消费者的请求后,拿到请求参数去执行服务,并响应结果。

@ChannelHandler.Sharable
public class RpcRequestHandler extends SimpleChannelInboundHandler<RpcProtocol<RpcRequest>> {
 public static RpcRequestHandler INSTANCE = new RpcRequestHandler();

 @Override
 protected void channelRead0(ChannelHandlerContext ctx, RpcProtocol<RpcRequest> msg) throws Exception {
  RpcRequest request = msg.getBody();
  // IO线程读取到的结果交给业务线程池
  ServerExecutor.invokeMethod(ctx, request);
 }
}

3. 总结

自定义一套 RPC 调用的通信协议,让通信双方知道请求的报文语义。消费者调用 Service 接口时,在生成的代理对象中去发 TCP 请求,生产者在接收到请求后进行协议的解码,获取到请求参数后执行服务方法,然后将返回结果再根据协议规范进行封装并返回。

大部分核心代码都贴出来了,部分不重要的代码没贴,需要的同学可以去 Gitee 拉。


原文始发于微信公众号(程序员小潘):基于Netty实现RPC框架

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/29450.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!