笔者以前写过一篇文章《摊牌了,我要手写一个 RPC》,当时实现的比较粗糙,底层基于 JDK 的阻塞 IO 进行通信。刚好最近在看 Netty,于是想重构一下,用 Netty 来作为底层的 RPC 通信框架。
1. 项目大纲
项目取名「nrpc」,n 取自 Netty 的首字母,项目结构如下:
nrpc
|--- common [公共模块]
|--- example [使用示例]
|--- rpc [RPC调用核心模块]
|--- serialization [序列化模块]
项目特点:
-
基于 Netty 通信的 RPC 框架。 -
自定义的 RPC 通信协议。 -
基于 JDK 动态代理交互。 -
Kryo 序列化。
代码已上传至 Gitee:https://gitee.com/panchanghe/nrpc
1.1 项目依赖
项目依赖比较简单:
-
hutool 工具包 -
netty 肯定要有了。 -
kryo 做序列化用的。
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.4.4</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.50.Final</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>4.0.0</version>
</dependency>
1.2 使用示例
通过调用UserService.getById()
方法来测试 RPC 调用是否正常,消费者随便传一个 userId,生产者会 Mock 一个用户数据并响应。
public interface UserService {
User getById(Long userId);
}
@Service
public class UserServiceImpl implements UserService {
static String[] names = new String[]{"不闻不问", "逆天而行", "寒风凛凛", "穷凶极恶", "苍白的脸"};
static int[] ages = new int[]{18, 23, 56, 58, 66};
@Override
public User getById(Long userId) {
System.err.println("接收请求:userId = " + userId);
User user = new User();
user.setId(userId);
user.setName(names[ThreadLocalRandom.current().nextInt(names.length)]);
user.setAge(ages[ThreadLocalRandom.current().nextInt(ages.length)]);
user.setBirthday(LocalDate.now());
return user;
}
}
RPC 调用示例:RpcCallExample
public class RpcCallExample {
static class Provider {
public static void main(String[] args) {
new RpcServer(9999).start();
System.err.println("生产者启动...");
}
}
static class Consumer {
public static void main(String[] args) {
RpcClientFactory.init(new String[]{"127.0.0.1"}, new int[]{9999});
UserService userService = RpcProxyFactory.getProxy(UserService.class);
for (int i = 1; i <= 10; i++) {
System.err.println(userService.getById(Long.valueOf(i)));
}
}
}
}
先启动生产者,再启动消费者,消费者会发出 10 次调用。
生产者控制台输出:
生产者启动...
接收请求:userId = 1
接收请求:userId = 2
接收请求:userId = 3
接收请求:userId = 4
接收请求:userId = 5
接收请求:userId = 6
接收请求:userId = 7
接收请求:userId = 8
接收请求:userId = 9
接收请求:userId = 10
消费者控制台输出:
User{id=1, name='穷凶极恶', age=23, birthday=2021-06-18}
User{id=2, name='穷凶极恶', age=58, birthday=2021-06-18}
User{id=3, name='苍白的脸', age=56, birthday=2021-06-18}
User{id=4, name='寒风凛凛', age=58, birthday=2021-06-18}
User{id=5, name='不闻不问', age=18, birthday=2021-06-18}
User{id=6, name='不闻不问', age=23, birthday=2021-06-18}
User{id=7, name='寒风凛凛', age=23, birthday=2021-06-18}
User{id=8, name='苍白的脸', age=56, birthday=2021-06-18}
User{id=9, name='不闻不问', age=58, birthday=2021-06-18}
User{id=10, name='苍白的脸', age=23, birthday=2021-06-18}
2. 框架实现
2.1 自定义注解
先创建两个注解,代表暴露服务和引用服务。
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface Service {
}
public @interface Reference {
}
2.2 Kryo 序列化
定义 ObjectInput 和 ObjectOutput 接口,它俩的作用是可以将 Java 对象和字节序列之间进行转换。
public interface ObjectInput {
<T> T readObject(byte[] bytes, Class<T> clazz);
}
public interface ObjectOutput {
byte[] writeObject(Object o);
}
使用 Kryo 序列化的实现:KryoObjectInput 和 KryoObjectOutput:
public class KryoObjectInput implements ObjectInput {
public static final KryoObjectInput INSTANCE = new KryoObjectInput();
private KryoObjectInput(){}
@Override
public <T> T readObject(byte[] bytes, Class<T> clazz) {
Input input = new Input(bytes);
return KryoHolder.get().readObject(input, clazz);
}
}
public class KryoObjectOutput implements ObjectOutput {
public static final KryoObjectOutput INSTANCE = new KryoObjectOutput();
private KryoObjectOutput(){}
@Override
public byte[] writeObject(Object o) {
Output output = new Output(1024, 1024 * 1024 * 10);
KryoHolder.get().writeObject(output, o);
return Arrays.copyOf(output.getBuffer(), output.position());
}
}
要想使用 Kryo 对 Java 对象进行序列化和反序列化,首先需要创建 Kryo 对象,但是由于其创建过程比较重,这里做一个缓存处理,每个线程只使用自己的 Kryo 对象,避免重复创建。
public class KryoHolder {
private static final ThreadLocal<Kryo> LOCAL = new ThreadLocal<Kryo>(){
@Override
protected Kryo initialValue() {
return new Kryo();
}
};
public static Kryo get(){
return LOCAL.get();
}
}
2.3 通信协议
节点之间要通信,那么必然要有通信协议,否则谁都不知道对方在说什么。
例如:节点 A 要调用节点 B 的 UserService 的getById()
方法,报文格式应该是什么样的呢?
这里将 TCP 请求的报文拆分成两段:报文头和报文体。
public class RpcProtocol<T> {
// 报文头
private Header header;
// 报文体
private T body;
// 忽略get、set方法
}
报文头 Header:
public class Header {
// 默认版本号
public static final byte DEFAULT_VERSION = 1;
// 报文长度,解决粘包、拆包
private int length;
// 协议的大版本号
private short largeVersion = DEFAULT_VERSION;
// 协议的小版本号
private short smallVersion = DEFAULT_VERSION;
// 回话ID
private long sessionId;
// 忽略get、set方法
}
RPC 请求的报文体:RpcRequest
public class RpcRequest implements Serializable {
private static final long serialVersionUID = 0L;
// 接口名
private String interfaceName;
// 方法名
private String methodName;
// 参数列表
private Class<?>[] argTypes;
// 参数
private Object[] args;
// 忽略get、set方法
}
RPC 响应的报文体:RpcResponse
public class RpcResponse<T> {
// 本次请求是否成功
private Boolean success;
// 提示消息
private String msg;
// 响应数据
private T data;
// 忽略get、set方法
}
2.4 提取暴露服务
扫描 Class,找到加了@Service
注解的服务,提取出来,等待消费者的调用。
MethodContext:服务暴露的方法上下文
// 暴露方法上下文
public class MethodContext {
// 服务实例
private Object instance;
// 方法
private Method method;
// 忽略get、set方法
}
Interface:记录方法名和 MethodContext 的映射,方便根据方法名通过反射快速调用方法。
public class Interface extends ConcurrentHashMap<String, MethodContext> {
private static final long serialVersionUID = 0L;
// 接口名
private String interfaceName;
// 实现对象
private Object instance;
// 忽略get、set方法
}
ServerHolder:服务的持有者,扫描暴露的服务。
public class ServerHolder {
private static ConcurrentMap<String,Interface> container;
static {
container = ClassUtil.scanPackage("top.javap.nrpc")
.stream()
.filter(c -> Objects.nonNull(c.getAnnotation(Service.class)))
.map(i -> {
try {
Interface ins = new Interface(i.getInterfaces()[0].getName(), i.newInstance());
for (Method method : i.getMethods()) {
ins.put(method.getName(), new MethodContext(ins.getInstance(), method));
}
return ins;
}catch (Exception e){
e.printStackTrace();
return null;
}
}).collect(Collectors.toConcurrentMap(Interface::getInterfaceName, Function.identity()));
}
// 根据接口名+方法名快速找到要调用的目标方法
public static MethodContext get(String interfaceName, String methodName) {
return container.get(interfaceName).get(methodName);
}
}
ServerExecutor:服务的执行器,收到消费者的调用请求后会通过反射调用方法,并返回结果。
public class ServerExecutor {
private static final ExecutorService executor = Executors.newCachedThreadPool();
public static void invokeMethod(ChannelHandlerContext ctx,RpcRequest request) {
executor.submit(() -> {
try {
MethodContext methodContext = ServerHolder.get(request.getInterfaceName(), request.getMethodName());
Object result = methodContext.getMethod().invoke(methodContext.getInstance(), request.getArgs());
RpcResponse response = new RpcResponse(true, "success", result);
RpcProtocol<RpcResponse> responseRpcProtocol = new RpcProtocol(new Header(1L), response);
ctx.writeAndFlush(responseRpcProtocol);
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
2.5 代理对象
因为我们要通过接口去调用远程服务,所以需要通过 JDK 的动态代理对接口生成代理对象,然后在代理对象中使用 Netty 去和远程节点通信。
RpcProxy:生成 RPC 代理对象
public class RpcProxy<T> implements InvocationHandler {
// 接口类
private final Class<T> clazz;
// 代理对象
private final Object instance;
public RpcProxy(Class<T> clazz) {
this.clazz = clazz;
instance = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this);
}
// 调用接口方法时,会触发该方法
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// TODO 忽略toString()、hashCode()等方法
String interfaceName = clazz.getName();
String methodName = method.getName();
// 构建RpcRequest
RpcRequest request = new RpcRequest();
request.setInterfaceName(interfaceName);
request.setMethodName(methodName);
request.setArgTypes(buildArgTypes(args));
request.setArgs(args);
// 发起RPC调用,获取结果
RpcResponse<Object> response = RpcClientFactory.nextClient().request(request);
return response.getData();
}
private Class[] buildArgTypes(Object[] args){
if (args == null) {
return null;
}
Class[] classes = new Class[args.length];
for (int i = 0; i < args.length; i++) {
classes[i] = args[i].getClass();
}
return classes;
}
public Object getInstance() {
return instance;
}
}
RpcProxyFactory:代理对象工厂,缓存代理对象,避免重复生成。
public class RpcProxyFactory {
private final static ConcurrentMap<Class, Object> container = new ConcurrentHashMap<>();
public static <T> T getProxy(Class<T> clazz) {
T t = (T) container.get(clazz);
if (Objects.isNull(t)) {
synchronized (container) {
t = (T) container.get(clazz);
if (Objects.isNull(t)) {
t = newProxy(clazz);
container.put(clazz, t);
}
}
}
return t;
}
public static <T> T newProxy(Class<T> clazz) {
return (T) new RpcProxy<T>(clazz).getInstance();
}
}
2.6 消费者实现
消费者和生产者这两个角色,是站在服务调用的角度看待的,调用方为消费者,被调用方为生产者。实际上,一个节点可以既是生产者,又是消费者。所以每台节点都应当有暴露服务的能力,和调用服务的能力。
RpcClient:RPC 服务的调用者。
public class RpcClient {
private final EventLoopGroup worker = new NioEventLoopGroup();
private final String host;
private final int port;
//连接池,避免每次发起调用都建立连接
private ChannelPool channelPool;
public RpcClient(String host, int port) {
this.host = host;
this.port = port;
start();
}
public <T> RpcResponse<T> request(RpcRequest request) {
Channel channel = null;
try {
channel = channelPool.acquire().get();
long sessionId = SessionIdGenerator.get();
CompletableFuture future = new CompletableFuture();
channel.attr(NettyConstant.CALLBACK_KEY).set(future);
RpcProtocol<RpcRequest> protocol = new RpcProtocol<>(new Header(sessionId), request);
channel.writeAndFlush(protocol);
return (RpcResponse<T>) ((RpcProtocol<RpcResponse>) future.get()).getBody();
} catch (Exception e) {
e.printStackTrace();
return null;
} finally {
if (channel != null) {
channelPool.release(channel);
}
}
}
private void start() {
Bootstrap bootstrap = new Bootstrap();
bootstrap
.group(worker)
.channel(NioSocketChannel.class)
.remoteAddress(host, port);
channelPool = new FixedChannelPool(bootstrap, new AbstractChannelPoolHandler() {
@Override
public void channelCreated(Channel ch) throws Exception {
ch.pipeline().addLast(new HalfDecoder());
ch.pipeline().addLast(new RpcProtocolCodec<RpcResponse>(RpcResponse.class));
ch.pipeline().addLast(new SimpleChannelInboundHandler<RpcProtocol<RpcResponse>>() {
@Override
public void channelRead0(ChannelHandlerContext ctx, RpcProtocol<RpcResponse> protocol) throws Exception {
ctx.channel().attr(NettyConstant.CALLBACK_KEY).get().complete(protocol);
}
});
}
}, 100);
}
}
可能有多个节点可以同时提供服务,提供一个 Factory,随机出一个 RpcClient。
public class RpcClientFactory {
private static RpcClient[] clients;
public static void init(String[] hosts, int[] ports) {
if (hosts.length != ports.length) {
throw new IllegalArgumentException();
}
clients = new RpcClient[hosts.length];
for (int i = 0; i < hosts.length; i++) {
clients[i] = new RpcClient(hosts[i], ports[i]);
}
}
public static RpcClient nextClient() {
return clients[ThreadLocalRandom.current().nextInt(clients.length)];
}
}
和 Netty 相关的内容单独开一节说明,这里先略过。
2.7 生产者实现
生产者需要暴露服务,接收消费者的请求,执行服务并响应结果。
public class RpcServer {
private final int port;
public RpcServer(int port) {
this.port = port;
}
public void start() {
EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup worker = new NioEventLoopGroup();
RpcRequestHandler requestHandler = new RpcRequestHandler();
try {
new ServerBootstrap()
.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(ServerChannelInitializer.INSTANCE)
.localAddress(port)
.bind();
}catch (Exception e){
e.printStackTrace();
}
}
}
ServerChannelInitializer:生产者服务 ChannelHandler 初始化类。
@ChannelHandler.Sharable
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
public static ServerChannelInitializer INSTANCE = new ServerChannelInitializer();
private ServerChannelInitializer(){}
@Override
protected void initChannel(SocketChannel sc) throws Exception {
ChannelPipeline pipeline = sc.pipeline();
// 解决粘包、拆包
pipeline.addLast(new HalfDecoder());
// RPC请求、响应的编解码器
pipeline.addLast(new RpcProtocolCodec<RpcRequest>(RpcRequest.class));
pipeline.addLast(new RpcProtocolCodec<RpcResponse>(RpcResponse.class));
// RPC请求的处理器
pipeline.addLast(RpcRequestHandler.INSTANCE);
}
}
2.8 Netty 相关
2.8.1 解决粘包/拆包
前面说过,在自定义的通信协议中,会在报文头的前 4 个字节记录报文长度,所以直接基于 LengthFieldBasedFrameDecoder 来解决读写半包的问题。
public class HalfDecoder extends LengthFieldBasedFrameDecoder {
public HalfDecoder() {
super(1024 * 1024 * 5, 0, 4);
}
}
2.8.2 RPC 协议编解码器
网络传输的总是字节,因此需要将 Java 协议对象序列化为字节序列,以及反序列化。
public class RpcProtocolCodec<T> extends ByteToMessageCodec<RpcProtocol<T>> {
private final Class<T> decodeClass;
public RpcProtocolCodec(Class<T> clazz) {
decodeClass = clazz;
}
@Override
protected void encode(ChannelHandlerContext ctx, RpcProtocol<T> msg, ByteBuf out) throws Exception {
Header header = msg.getHeader();
byte[] bytes = KryoObjectOutput.INSTANCE.writeObject(msg.getBody());
out.writeInt(Constants.DEFAULT_LENGTH);
out.writeShort(header.getLargeVersion());
out.writeShort(header.getSmallVersion());
out.writeLong(header.getSessionId());
out.writeBytes(bytes);
// 设置length
out.setInt(0, out.readableBytes()-4);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Header header = new Header();
header.setLength(in.readInt());
header.setLargeVersion(in.readShort());
header.setSmallVersion(in.readShort());
header.setSessionId(in.readLong());
byte[] bytes = new byte[in.readableBytes()];
in.readBytes(bytes);
T body = KryoObjectInput.INSTANCE.readObject(bytes, decodeClass);
RpcProtocol<T> protocol = new RpcProtocol<>(header, body);
out.add(protocol);
}
}
2.8.3 RPC 请求处理
生产者在收到消费者的请求后,拿到请求参数去执行服务,并响应结果。
@ChannelHandler.Sharable
public class RpcRequestHandler extends SimpleChannelInboundHandler<RpcProtocol<RpcRequest>> {
public static RpcRequestHandler INSTANCE = new RpcRequestHandler();
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcProtocol<RpcRequest> msg) throws Exception {
RpcRequest request = msg.getBody();
// IO线程读取到的结果交给业务线程池
ServerExecutor.invokeMethod(ctx, request);
}
}
3. 总结
自定义一套 RPC 调用的通信协议,让通信双方知道请求的报文语义。消费者调用 Service 接口时,在生成的代理对象中去发 TCP 请求,生产者在接收到请求后进行协议的解码,获取到请求参数后执行服务方法,然后将返回结果再根据协议规范进行封装并返回。
大部分核心代码都贴出来了,部分不重要的代码没贴,需要的同学可以去 Gitee 拉。
原文始发于微信公众号(程序员小潘):基于Netty实现RPC框架
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/29450.html