Netty之自定义RPC

生活中,最使人疲惫的往往不是道路的遥远,而是心中的郁闷;最使人痛苦的往往不是生活的不幸,而是希望的破灭;最使人颓废的往往不是前途的坎坷,而是自信的丧失;最使人绝望的往往不是挫折的打击,而是心灵的死亡。所以我们要有自己的梦想,让梦想的星光指引着我们走出落漠,走出惆怅,带着我们走进自己的理想。

导读:本篇文章讲解 Netty之自定义RPC,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文

RPC概述

RPC(Remote Procedure Call),即远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络实现的技术。

RPC调用过程

1. 服务消费方(client)以本地调用方式调用服务

2. client stub 接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体

3. client stub 将消息进行编码并发送到服务端

4. server stub 收到消息后进行解码

5. server stub 根据解码结果调用本地的服务

6. 本地服务执行并将结果返回给server stub

7. server stub 将返回导入结果进行编码并发送至消费方

8. client stub 接收到消息并进行解码

9. 服务消费方(client)得到结果

实现Netty自定义RPC

1.服务的提供方

创建一个接口+接口的实现类,供消费方远程调用。

public interface HelloRPC {
    /**
     * 无参数调用
     * @return
     */
    String helloRpcTest();

    /**
     * 有参数调用
     * @param name
     * @return
     */
    String helloRpcTest(String name);
}
public class HelloRPCImpl implements HelloRPC {

    @Override
    public String helloRpcTest() {
        return "无参数调用: helloRpcTest";
    }

    @Override
    public String helloRpcTest(String name) {
        return "有参数调用: " + name;
    }
}

2.服务的消费方

创建一个接口且该服务调用方的接口必须跟服务提供方的接口保持一致。

public interface HelloRPC {
    /**
     * 无参数调用
     * @return
     */
    String helloRpcTest();

    /**
     * 有参数调用
     * @param name
     * @return
     */
    String helloRpcTest(String name);
}

3.Netty服务端

类信息封装类

创建类信息封装类,该实体类用来封装消费方发起远程调用时传给服务方的数据。

public class ClassInfo implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * 类名
     */
    private String className;
    /**
     * 方法名
     */
    private String methodName;
    /**
     * 参数类型
     */
    private Class<?>[] types;
    /**
     * 参数列表
     */
    private Object[] objects;

	//getter() setter()
}

Netty服务器

使用Netty自带的ObjectEncoder和ObjectDecoder作为编解码器

public class NettyRPCServer {
    
    private int port;

    public NettyRPCServer(int port) {
        this.port = port;
    }

    public void start() {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 88)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .localAddress(port).childHandler(
                    new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            //编码器
                            pipeline.addLast("encoder", new ObjectEncoder());
                            //解码器
                            pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
                            //业务处理类
                            pipeline.addLast(new InvokeHandler());
                        }
                    });
            ChannelFuture future = serverBootstrap.bind(port).sync();
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new NettyRPCServer(8888).start();
    }
}

服务器业务处理类

读取消费方发来的数据,并解析得到的数据进行本地调用,然后把结果返回给消费方。

public class ServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 读取客户端发送的数据
     * 通过反射调用实现类的方法
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ClassInfo classInfo = (ClassInfo) msg;
        //得到类的实例
        Object clazz = Class.forName(getImplClassName(classInfo)).newInstance();
        //得到类的方法
        Method method = clazz.getClass().getMethod(classInfo.getMethodName(), classInfo.getTypes());
        //反射调用实现类的方法
        Object result = method.invoke(clazz, classInfo.getObjects());
        ctx.writeAndFlush(result);
    }

    /**
     * 获取某个接口下的某个实现类的名称
     * @param classInfo
     * @return
     * @throws Exception
     */
    private String getImplClassName(ClassInfo classInfo) throws Exception{
        //服务方接口和实现类所在的包路径
        String interfacePath="cn.itcast.rpc.server";
        //得到接口名称
        int lastIndexOf = classInfo.getClassName().lastIndexOf(".");
        String interfaceName=classInfo.getClassName().substring(lastIndexOf);
        //反射获取类
        Class superClass=Class.forName(interfacePath+interfaceName);
        Reflections reflections = new Reflections(interfacePath);
        //得到某接口下的所有实现类
        Set<Class> ImplClassSet=reflections.getSubTypesOf(superClass);
        if(ImplClassSet.size()==0){
            System.out.println("该接口没有找到实现类");
            return null;
        }else if(ImplClassSet.size()>1){
            System.out.println("该接口找到多个实现类");
            return null;
        }else {
            //把集合转换为数组
            Class[] classes=ImplClassSet.toArray(new Class[0]);
            //得到实现类的名字
            return classes[0].getName();
        }
    }
}

4.Netty客户端

Netty客户端代理类

同Netty服务器采用Netty自带的ObjectEncoder 和ObjectDecoder作为编解码器

public class NettyRPCProxy {
    //根据接口创建代理对象
    public static Object create(Class target) {
        return Proxy.newProxyInstance(target.getClassLoader(), new Class[]{target}, new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args)
                    throws Throwable {
                //封装ClassInfo
                ClassInfo classInfo = new ClassInfo();
                classInfo.setClassName(target.getName());
                classInfo.setMethodName(method.getName());
                classInfo.setObjects(args);
                classInfo.setTypes(method.getParameterTypes());

                //使用Netty发送数据
                EventLoopGroup group = new NioEventLoopGroup();
                ClientHandler clientHandler = new ClientHandler();
                try {
                    Bootstrap b = new Bootstrap();
                    b.group(group)
                            .channel(NioSocketChannel.class)
                            .handler(new ChannelInitializer<SocketChannel>() {
                                @Override
                                public void initChannel(SocketChannel ch) throws Exception {
                                    ChannelPipeline pipeline = ch.pipeline();
                                    //编码器
                                    pipeline.addLast("encoder", new ObjectEncoder());
                                    //解码器  构造方法第一个参数设置二进制数据的最大字节数  第二个参数设置具体使用哪个类解析器
                                    pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
                                    //业务处理类
                                    pipeline.addLast("handler", clientHandler);
                                }
                            });
                    ChannelFuture future = b.connect("127.0.0.1", 8888).sync();
                    future.channel().writeAndFlush(classInfo).sync();
                    future.channel().closeFuture().sync();
                } finally {
                    group.shutdownGracefully();
                }
                return clientHandler.getResponse();
            }
        });
    }
}

客户端业务处理类

该业务处理类读取远程调用返回的数据

public class ClientHandler extends ChannelInboundHandlerAdapter {

    private Object response;

    public Object getResponse() {
        return response;
    }

    /**
     * 读取服务器端返回的数据
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        response = msg;
        ctx.close();
    }
}

5.执行测试

public static void main(String[] args) {
        HelloRPC helloRPC = (HelloRPC) NettyRPCProxy.create(HelloRPC.class);
        System.out.println(helloRPC.helloRpcTest());
        System.out.println(helloRPC.helloRpcTest("helloRpcTest"));
    }
无参数调用: helloRpcTest
有参数调用: helloRpcTest

进程已结束,退出代码0

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/137070.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!