一、思路原理
- 当启动服务提供者(Netty服务器),将服务注册到zookeeper上,也就是在zk上创建/xiaojie-dubbo/com.xiaojie.api.UserService/providers/xiaojie-dubbo://127.0.0.1:8080方法名称等信息。
- 服务发现,从zookeeper节点上获取到服务列表[xiaojie://127.0.0.1:8080…,xiaojie://127.0.0.1:8081….],然后通过轮询、权重、一致性hash,随机等负载均衡算法获取服务。
- 当有服务下线或者宕机之后,通知服务调用者(zk事件通知实现,我没有实现)服务下线或者宕机。
- 通过反射,获取实现类的方法,调用之后,消费者通过代理模式,代理出来客户端的类,进行方法的调用,然后通过netty中ChannelInboundHandlerAdapter的channelRead()将获取的数据写出去。
- 利用户Jboss Marshalling解码器MarshallingDecoder、MarshallingEnecoder进行解码。
二、代码
代码结构如下
自定义注解代码
package com.xiaojie.dubbo.annotation;
import java.lang.annotation.*;
/**
* 自定义rpc注解
*/
@Documented
@Inherited
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcAnnotation {
Class value();
}
服务的注册
package com.xiaojie.dubbo.server.register;
import org.I0Itec.zkclient.ZkClient;
import java.net.URLEncoder;
/**
* 将服务注册到zk上
*/
public class ServiceRegisterImpl implements ServiceRegister{
//定义zk地址
private String zkServer="127.0.0.1:2181";
//超时时间
private Integer timeOut=5000;
//zkclient
private ZkClient zkClient;
public ServiceRegisterImpl() {
zkClient = new ZkClient(zkServer,timeOut);
}
//定义跟节点
private String rootNode="/xiaojie-dubbo";
@Override
public void register(String serviceName, String servicePath) {
System.out.println("..............");
if (!zkClient.exists(rootNode)){
zkClient.createPersistent(rootNode); //xiaojie-dubbo
}
//rootNode+"/"+serviceName
String serviceNameNode=rootNode+"/"+serviceName;
if (!zkClient.exists(serviceNameNode)){
zkClient.createPersistent(serviceNameNode); //xiaojie-dubbo/com.xiaojie.api.UserService
}
String providerNode=serviceNameNode+"/"+"providers";
if (!zkClient.exists(providerNode)){
zkClient.createPersistent(providerNode);//xiaojie-dubbo/com.xiaojie.api.UserService/providers
}
//创建我们服务地址
String serviceAddresNodePath = providerNode + "/" + URLEncoder.encode(servicePath);
System.out.println("serviceAddresNodePath:" + serviceAddresNodePath);
if (zkClient.exists(serviceAddresNodePath)) {
zkClient.delete(serviceAddresNodePath);
}
zkClient.createEphemeral(serviceAddresNodePath);//xiaojie-dubbo/com.xiaojie.api.UserService/providers/getName....
}
}
服务绑定
package com.xiaojie.dubbo.server.rpc;
import com.xiaojie.dubbo.annotation.RpcAnnotation;
import com.xiaojie.dubbo.server.handler.ServerHandler;
import com.xiaojie.dubbo.server.marshalling.MarshallingCodeCFactory;
import com.xiaojie.dubbo.server.register.ServiceRegister;
import com.xiaojie.dubbo.server.register.ServiceRegisterImpl;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.util.HashMap;
import java.util.Map;
public class NettyServer {
/**
* 核心过程是将服务绑定到zk
* 启动netty服务实现监听
*/
private ServiceRegister serviceRegister;
//host
private String host;
//端口号
private Integer port;
private Map<String,Object> handlerMap=new HashMap<>();
public NettyServer( String host, Integer port) {
this.host = host;
this.port = port;
serviceRegister = new ServiceRegisterImpl();
}
public void bind(Object obj){
//获取类上注解
RpcAnnotation declaredAnnotation = obj.getClass().getDeclaredAnnotation(RpcAnnotation.class);
if (declaredAnnotation == null) {
throw new RuntimeException("declaredAnnotation is null");
}
//获取名称,也就是serviceName
String serviceName = declaredAnnotation.value().getName();
String serviceAddr="xiaojie://"+host+":"+port;
serviceRegister.register(serviceName,serviceAddr);
handlerMap.put(serviceName,obj);
}
/**
* 创建netty服务端,开启监听
*/
public void initNetty(){
//创建线程接收请求
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
//创建线程处理请求
NioEventLoopGroup workGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap=new ServerBootstrap();
serverBootstrap.group(bossGroup,workGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
socketChannel.pipeline().addLast(new ServerHandler(handlerMap));
}
});
//绑定端口号
try {
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
//等待服务器监听端口
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//关闭
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
//启动
public void start(Object obj){
bind(obj);
initNetty();
}
}
服务发现
package com.xiaojie.dubbo.server.discover;
import org.I0Itec.zkclient.ZkClient;
import java.util.List;
public class ServiceDiscoverImpl implements ServiceDiscover{
//定义zk地址
private String zkServer="127.0.0.1:2181";
//超时时间
private Integer timeOut=5000;
//zkclient
private ZkClient zkClient;
public ServiceDiscoverImpl() {
zkClient = new ZkClient(zkServer,timeOut);
}
//定义跟节点
private String rootNode="/xiaojie-dubbo";
@Override
public List<String> discoverList(String serviceName) {
List<String> children = zkClient.getChildren(rootNode + "/" + serviceName + "/" + "providers");
return children;
}
}
传送Request的实体类
package com.xiaojie.dubbo.server.req;
import java.io.Serializable;
public class RpcRequest implements Serializable {
private static final long SerialVersionUID = 1L;
/**
* 类的className
*/
private String className;
/**
* 方法名称
*/
private String methodName;
/**
* 参数类型
*/
Class<?> parameterTypes[];
/**
* 参数value
*/
Object paramsValue[];
public RpcRequest(String className, String methodName, Class<?>[] parameterTypes, Object[] paramsValue) {
this.className = className;
this.methodName = methodName;
this.parameterTypes = parameterTypes;
this.paramsValue = paramsValue;
}
public String getClassName() {
return className;
}
public String getMethodName() {
return methodName;
}
public Class<?>[] getParameterTypes() {
return parameterTypes;
}
public Object[] getParamsValue() {
return paramsValue;
}
public void setClassName(String className) {
this.className = className;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public void setParameterTypes(Class<?>[] parameterTypes) {
this.parameterTypes = parameterTypes;
}
public void setParamsValue(Object[] paramsValue) {
this.paramsValue = paramsValue;
}
@Override
public String toString() {
return className + "," + methodName + "," + parameterTypes + paramsValue;
}
}
通过反射执行方法
package com.xiaojie.dubbo.server.handler;
import com.xiaojie.dubbo.server.req.RpcRequest;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.lang.reflect.Method;
import java.util.Map;
public class ServerHandler extends ChannelInboundHandlerAdapter {
private Map<String,Object> handlerMap;
public ServerHandler(Map<String, Object> handlerMap) {
this.handlerMap = handlerMap;
}
@Override
/**
* 服务器端监听客户端的消息
*/
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
RpcRequest req=(RpcRequest) msg;
if (req==null){
throw new RuntimeException("req is null");
}
//获取serviceName 接口路径
String serviceName = req.getClassName();
//实现类
Object objectImpl = handlerMap.get(serviceName);
if (objectImpl == null) {
throw new RuntimeException("service is not exist");
}
//反射获取方法
Method method = objectImpl.getClass().getMethod(req.getMethodName(), req.getParameterTypes());
//执行实现类方法
Object result = method.invoke(objectImpl,req.getParamsValue());
//返回结果给客户端
ctx.writeAndFlush(result);
}
}
客户端通过代理模式执行方法
package com.xiaojie.dubbo.server.proxy;
import com.xiaojie.dubbo.server.discover.ServiceDiscover;
import com.xiaojie.dubbo.server.discover.ServiceDiscoverImpl;
import com.xiaojie.dubbo.server.handler.ClientHandler;
import com.xiaojie.dubbo.server.loadBalance.LoadBalance;
import com.xiaojie.dubbo.server.loadBalance.LoopBalance;
import com.xiaojie.dubbo.server.loadBalance.RandomBalance;
import com.xiaojie.dubbo.server.loadBalance.WeightBalance;
import com.xiaojie.dubbo.server.marshalling.MarshallingCodeCFactory;
import com.xiaojie.dubbo.server.req.RpcRequest;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URLDecoder;
import java.util.List;
public class RpcClientProxy {
public static <T> T create(Class<T> interfaceClass){
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//使用代理拼接地址
ServiceDiscover serviceDiscover=new ServiceDiscoverImpl();
List<String> strings = serviceDiscover.discoverList(interfaceClass.getName());
LoadBalance loadBalance=new RandomBalance();
String servicePath = URLDecoder.decode((String) loadBalance.select(strings));//[xiaojie-dubbo://192.168.1.110:8080,xiaojie-dubbo://192.168.1.110:8081]
String[] split = servicePath.split(":");
String host=split[1].replace("//","");
Integer port= Integer.valueOf(split[2]);
//封装具体参数
RpcRequest rpcRequest = new RpcRequest(interfaceClass.getName(), method.getName(), method.getParameterTypes(), args);
//启动客户端,发送结果
return sendMsg(host,port,rpcRequest);
}
});
}
/**
* 客户端发送消息
* @return
*/
public static Object sendMsg(String host, Integer port, RpcRequest rpcRequest){
final ClientHandler clientHandler = new ClientHandler();
NioEventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap=new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.remoteAddress( new InetSocketAddress( host,port))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
socketChannel.pipeline().addLast(clientHandler);
}
});
//发起同步连接
try {
ChannelFuture channelFuture = bootstrap.connect().sync();
//客户端发送
channelFuture.channel().writeAndFlush(rpcRequest);
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
group.shutdownGracefully();
}
return clientHandler.getResponse();
}
}
完整代码https://gitee.com/whisperofjune/netty-dubbo-rpc.git
参考:蚂蚁课堂的netty部分
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/18529.html