自己设计一个的轻量级的RPC框架–客户端netty
前言
之前讲过netty的服务端,其实客户端和服务差不多。主要是建立连接,发送请求和接收请求做业务处理。
netty 客户端
private RPCRequestNet(String host,int port) {
//netty线程组
EventLoopGroup group=new NioEventLoopGroup();
//启动辅助类 用于配置各种参数
Bootstrap b=new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)//代表异步的客户端 TCP Socket 连接
.option(ChannelOption.TCP_NODELAY,true)//禁止使用Nagle算法 作用小数据即时传输
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new LineBasedFrameDecoder(8192));//以换行符分包
socketChannel.pipeline().addLast(new StringDecoder());//将接收到的对象转为字符串
socketChannel.pipeline().addLast(new RPCRequestHandler());//添加响应的处理类
}
});
try {
ChannelFuture f=b.connect(host,port).sync();
f.addListener(new ChannelFutureListener() {
@Override
//监听事件
public void operationComplete(ChannelFuture channelFuture) throws Exception {
}
});
} catch (InterruptedException e) {
e.printStackTrace();
}
}
netty 发送请求
调用服务接口 其实代理对象就会触发invoke()方法,我们在这里进行我们的业务操作即可。
从本地缓存的server列表中获取相对应的列表,进行轮询之后,指定一个具体的地址即可。
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
RPCRequest request=new RPCRequest();
request.setRequestID(buildRequestID(method.getName()));
//获取调用方法的ClassName和MethodName
RPCURL url = method.getAnnotation(RPCURL.class);
Map serverMap = ZkServer.serviceMap.get(url.className());
Iterator<Map.Entry<String, List<String>>> it = serverMap.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, List<String>> entry = it.next();
String className = entry.getKey();
List<String> serverList = entry.getValue();
String ipAndHost = RoundRobin.getServer(serverList);//后期需要添加负载均衡策略(已有轮询)
String str[] = ipAndHost.split(":");
request.setClassName(className);
request.setMethodName(url.methodName());
request.setParameters(args);
requestLockMap.put(request.getRequestID(),request);
RPCRequestNet.connect(str[0], Integer.parseInt(str[1])).send(request);
requestLockMap.remove(request.getRequestID());
return request.getResult();
}
return "找不到服务";
}
public void send(RPCRequest request){
String requestJson= null;
try {
requestJson = RPC.requestEncode(request);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
ByteBuf requestBuf= Unpooled.copiedBuffer(requestJson.getBytes());
//发送请求给服务段
RPCRequestHandler.channelCtx.writeAndFlush(requestBuf);
System.out.println("调用"+request.getRequestID()+"已发送");
synchronized (request) {
//因为异步 所以不阻塞的话 该线程获取不到返回值
//放弃对象锁 并阻塞等待notify
try {
request.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
netty 接收请求
//异步调用读取管道数据
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String responseJson= (String) msg;
RPCResponse response= (RPCResponse) RPC.responseDecode(responseJson);
System.out.println("获取到服务器返回值"+responseJson);
synchronized (RPCProxyHandler.requestLockMap.get(response.getRequestID())) {
//当客户段获取到返回值的时候唤醒在该对象锁上wait的线程
RPCRequest request= (RPCRequest) RPCProxyHandler.requestLockMap.get(response.getRequestID());
request.setResult(response.getResult());
request.notifyAll();
}
}
说明
- 通道 channel 这个是一个双向通道和流不同,它既可以读数据也可以写数据
- 缓冲区 ByteBuf 本质一块内存区域,netty 是将ByteBuf写入channel ,再从channel写出到ByteBuf
- 多路复用器 Selector 将channel注册到Selector中 之后轮询监听哪些I/O事件已经就绪了, 然后处理它们
- RPCRequestHandler.channelCtx.writeAndFlush(requestBuf); //将数据写入channel
- channelRead() //读取到数据的处理
这里将主线程阻塞的原因是 由于netty的读写都是异步的,如果不将主线程阻塞那么在读到数据之后把返回值设置入request对象中,我们主线程还是获取不到返回值,所以要在发送请求之后阻塞当前主线程,由netty的线程组处理读写操作之后在唤醒当前线程。(ps 想进一步了解netty的可以去看看netty权威指南,这里只是简单的介绍如何使用)
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/15339.html