Netty框架之协议应用二(RPC开发实战之Dubbo)

导读:本篇文章讲解 Netty框架之协议应用二(RPC开发实战之Dubbo),希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

前言

netty框架马上就进入尾声了,小编没有特别深入的讲解,第一是网络编程确实挺难的,第二用好netty其实是挺不容易的一件事情,尤其都是异步的情况下,今天小编继续为大家带来开发实战,上次分享了redis客户端和websocket弹幕功能的简单实现,这次为大家带来相对比较高档的rpc框架底层网络通信,今天主要以dubbo为例,希望大家有所收获。

RPC

定义

RPC为远程服务调用,即客户端远程调用服务端的方法,然后服务端返回响应或异常。常用的RPC解决方案有JAVA RMI,webService,Http Invoker,Dubbo,SpringCloud等等。

去中心化架构

传统集中式架构:
下图是小编理解的中心化架构

在这里插入图片描述

中心化架构其优点为:架构简单,客户端调用的时候可以跨语言,缺点的话所有的调用都会进过ngnix(这里就可以理解为中心,大家都得进过他,无论是调用还是返回响应),ngnix一旦挂了之后就会是服务挂了,当然如果ngnix部署集群也会让架构变的复杂。
去中心化架构
如下图:

在这里插入图片描述
这里的话客户端调用服务端不需要进过中心直连调用。
去中心化架构简单描述后,继续来看一下rpc框架组成。

框架组成

在这里插入图片描述
这个架构是不是似曾相识,这里如果看过小编的dubbo分享就觉得面熟了。
上面最小化实现rpc框架就是最下面的rpc协议就可以了,这样两个服务就可以通信即可。接着咱们来介绍一下rpc协议。

协议报文

这里小编直接用dubbo协议来说明报文:

在这里插入图片描述
上面请求头主要有16个字节,如果看过小编的前基本应用的使用后,看到这个就可以使用netty来进行消息的拆包以及编解码。那小编接下来继续说明编解码的过程。

概设过程

这边在写代码之前,小编先分享一下设计的思路,以及一些调用的逻辑。
首先是编解码:编解码占网络传输中必须且固定的,先看下图:

在这里插入图片描述
具体已经在上图解释清楚了,接下来看器调用过程即各个组件功能。

在这里插入图片描述
上图是比较简单的,接下来是至关重要的的从客户端到服务整个流程的调用过程

在这里插入图片描述
容小编解释一下:

  • 从客户端到服务端的调用涉及到了四个线程,分别是客户端以及服务端的业务线程和IO线程
  • 发起掉用写入消息体的内容是上面的Transfer,而编码request则为客户端发起的请求。而Transfer中的Request包含了接口,方法以及参数
  • 写入到socket中的为bytebuf,其经过Bytebuf -> head -> unsafe -> nio socket (doWrite) -> java nio channel -> socket
  • 写入到socket则到达服务端,服务端的io线程通过多路复用选择器select轮询,之后调用read
  • 读取到内容后,这边的读取流程和上面相似 unsafe read -> pipeline fireChannelRead,拿到了ByteBuf,然后解码request,根据上面的解码工具类
  • 从ByteBuf拿到Transfer,之后交给业务的handler,之后涉及到服务端业务线程的处理,业务处理后返回了结果或者报错信息(这里同上其实是Transfer)
  • 之后又交还给io线程,再次将Response进行编码(服务端的响应),Bytebuf写到socket
  • 回到客户端io线程后,再次有select进行轮询,读取到内容Bytebuf,解码成Transfer,Transfer中的response进行反序列化拿到结果填充回执,
  • 客户端拿到回执,释放等待。

注意事项
第一:加入客户端A和B的请求,客户端怎样拿到服务端回来的A响应和B相应呢,这里就需要Transfer里面的id,即协议中的id,不过如果是协议中的id,那就需要做请求的时候放入到一个map中来保存。
第二:既然知道使用id来区分请求响应,那什么时候放入到map中, 怎么保证线程安全,那最好是线程安全的map,不过高并发的时候,对系统很不友好,所以放入到map的时候也在io线程中执行。
第三:如何在io线程放入map中,这里是用eventloop的submit,写入消息完成后监听并写入map

讲完理论小编不是纯粹的理论派,还是代码实战派

代码实战

编解码工具以及传输类

Transfer类

public class Transfer {
   public static final byte STATUS_ERROR = 0;
   public static final byte STATUS_OK = 1;
   public static final byte STATUS_ILLEGAL = 2;
   public static final byte SERIALIZABLE_JAVA=1;
   public static final byte SERIALIZABLE_HESSIAN2=2;
   public static final byte SERIALIZABLE_JSON=3;

    boolean request;
    byte serializableId; // 1:java 2:hessian2 3:json
    boolean twoWay;
    boolean heartbeat;
    long id;
    byte status;    // 1正常 0失败 2请求非法
    Object target;

    public Transfer(long id) {
        this.id = id;
    }
    
}

编解码工具类

public class RpcCodec extends ByteToMessageCodec {
    private static final int HEADER_LENGTH = 16;
    private static final short MAGIC = 0xdad;
    private static final ByteBuf MAGIC_BUF = Unpooled.copyShort(MAGIC);
    private static final byte FLAG_REQUEST = (byte) 0x80;//1000 0000
    private static final byte FLAG_TWO_WAY = (byte) 0x40; //0100 0000
    private static final byte FLAG_EVENT = (byte) 0x20;  //0010 0000
    private static final int SERIALIZATION_MASK = 0x1f;  //0001 1111

    // 编码
    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) {
        if (msg instanceof Transfer) {
            doEncode((Transfer) msg, out);
        } else {
            throw new IllegalArgumentException();
        }
    }

    //解码
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) {
        Transfer transfer = doDecode(in);
        if (transfer != null) {
            out.add(transfer);
        }
    }

    // 编码
    protected void doEncode(Transfer data, ByteBuf buf) {
        byte[] header = new byte[HEADER_LENGTH];
        Bytes.short2bytes(MAGIC, header);

        header[2] = data.serializableId;
        if (data.request) header[2] |= FLAG_REQUEST;
        if (data.twoWay) header[2] |= FLAG_TWO_WAY;
        if (data.heartbeat) header[2] |= FLAG_EVENT;
        if (!data.request) header[3] = data.status;

        Bytes.long2bytes(data.id, header, 4);// id 占8个字节
        int len = 0;
        byte[] body = new byte[0];
        if (!data.heartbeat) {
            body = serialize(data.serializableId, data.target);
            len = body.length;
        }
        Bytes.int2bytes(len, header, 12);
        buf.writeBytes(header);
        buf.writeBytes(body);
    }

    // 解码
    protected Transfer doDecode(ByteBuf in) {
        int index = ByteBufUtil.indexOf(MAGIC_BUF, in);
        //是否有魔数
        if (index < 0) {
            return null;
        }
        //消息头是否完整
        if (!in.isReadable(index + HEADER_LENGTH)) {
            return null;
        }
        byte[] header = new byte[HEADER_LENGTH];
//      in.getBytes(index, header);
        ByteBuf slice = in.slice();
        slice.readBytes(header);
        int length = Bytes.bytes2int(header, 12);
        //消息体是否完整
        if (!in.isReadable(index + HEADER_LENGTH + length)) {
            return null;//需要更多的字节
        }
        Transfer transfer = new Transfer(Bytes.bytes2long(header, 4));
        transfer.heartbeat = (header[2] & FLAG_EVENT) != 0;
        transfer.request = (header[2] & FLAG_REQUEST) != 0;
        transfer.twoWay = (header[2] & FLAG_TWO_WAY) != 0;
        transfer.serializableId = (byte) (header[2] & SERIALIZATION_MASK);
        transfer.status = header[3];
        if (!transfer.heartbeat) {
            byte[] content = new byte[length];
//          in.getBytes(index + HEADER_LENGTH, bytes);
            slice.readBytes(content);
            transfer.target = deserialize(transfer.serializableId, content);
        }
        //跳过已经读取的
        in.skipBytes(index + HEADER_LENGTH + length);

        return transfer;
    }

    // 序列化
    private byte[] serialize(byte serializableId, Object target) {

        if (serializableId == Transfer.SERIALIZABLE_JAVA) { //JAVA
            ByteArrayOutputStream out;
            try {
                out = new ByteArrayOutputStream();
                ObjectOutputStream stream = new ObjectOutputStream(out);
                stream.writeObject(target);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
            return out.toByteArray();
        } else {
            throw new UnsupportedOperationException();
        }
    }

    // 反序列化
    private Object deserialize(byte serializableId, byte[] bytes) {
        if (serializableId == Transfer.SERIALIZABLE_JAVA) { //JAVA
            try {
                ObjectInputStream stream =
                        new ObjectInputStream(new ByteArrayInputStream(bytes));
                return stream.readObject();
            } catch (IOException | ClassNotFoundException e) {
                throw new RuntimeException(e);
            }
        } else {
            throw new UnsupportedOperationException();
        }
    }
}

客户端代码

public class RpcClient {
    static AtomicLong atomicLong = new AtomicLong(100);
    private Channel channel;
    private Map<Long, Promise<Response>> results = new HashMap<>();

    public static long getNextId() {
        return atomicLong.getAndIncrement();
    }

    public void init(String address, int port) throws InterruptedException {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(new NioEventLoopGroup(1))
                .channel(NioSocketChannel.class);
        bootstrap.handler(new ChannelInitializer<Channel>() {
            @Override
            protected void initChannel(Channel ch) {
                ch.pipeline().addLast("codec", new RpcCodec());
                ch.pipeline().addLast("resultSet", new ResultFill());// 结果集填充
            }
        });
        ChannelFuture connect = bootstrap.connect(address, port);
        channel = connect.sync().channel();
        System.out.println("连接成功");
        //
        // 每隔 两秒发送心跳
        channel.eventLoop().scheduleWithFixedDelay(() -> {
            Transfer transfer=new Transfer(getNextId());
            transfer.heartbeat=true;
            channel.writeAndFlush(transfer);
        },2000,2000,TimeUnit.MILLISECONDS);
    }

    public Response invokerRemote(Class serverInterface,
                                  String methodDesc,
                                  Object[] args) throws InterruptedException, ExecutionException, TimeoutException {
        Request request = new Request(serverInterface.getName(), methodDesc);
        request.setArgs(args);
        Transfer transfer = new Transfer(getNextId());
        transfer.request=true;
        transfer.serializableId=Transfer.SERIALIZABLE_JAVA;
        transfer.target = request;
        DefaultPromise<Response> resultPromise = new DefaultPromise(channel.eventLoop());
        // 写入成功后添加 结果
        channel.writeAndFlush(transfer).addListener(future ->
                {// IO线程
                    if (future.cause() != null) {// 写入失败
                        resultPromise.setFailure(future.cause()); //写入失败必须处理
                    } else {    // 写入成功
                        results.put(transfer.id, resultPromise);
                    }
                }
        );

        return resultPromise.get(10000, TimeUnit.MILLISECONDS);
    }

    private class ResultFill extends SimpleChannelInboundHandler<Transfer> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Transfer msg) {
            if (msg.heartbeat) {
                System.out.println(String.format("服务端心跳返回:%s",
                        ctx.channel().remoteAddress()));
            } else {
                Promise<Response> promise = results.remove(msg.id);
                promise.setSuccess((Response) msg.target); // 填充结果
            }
        }
    }

    public <T> T getRemoteService(Class<T> serviceInterface) {
        assert serviceInterface.isInterface();
        Object o = Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{serviceInterface}, new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Exception {
                if (Object.class.equals(method.getDeclaringClass())) {
                    return method.invoke(this, args);
                }

                String methodDescriptor = method.getName()+Type.getMethodDescriptor(method);
                Response response = invokerRemote(serviceInterface, methodDescriptor, args);
                if (response.getError() != null) {
                    throw new RuntimeException("远程服务调用异常:", response.getError());
                }
                return response.getResult();
            }
        });
        return (T) o;
    }


}

服务端代码:

public class RpcServer {
    ExecutorService threadPool = Executors.newFixedThreadPool(500);
    private Map<String, ServiceBean> register = new HashMap<>();

    public void start(int port) throws InterruptedException {
        ServerBootstrap bootstrap = new ServerBootstrap();
        EventLoopGroup boss = new NioEventLoopGroup(1);
        EventLoopGroup work = new NioEventLoopGroup(8);
        bootstrap.group(boss, work).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<Channel>() {
            @Override
            protected void initChannel(Channel ch) throws Exception {
                ch.pipeline().addLast("codec", new RpcCodec());
                ch.pipeline().addLast("dispatch", new Dispatch());
            }
        }).bind(port).sync();
        System.out.println("服务启动成功");
    }

    private class Dispatch extends SimpleChannelInboundHandler<Transfer> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Transfer transfer) {
            if (transfer.heartbeat) { // 心跳处理
                Transfer t = new Transfer(transfer.id);
                t.heartbeat = true;
                t.request = false;
                ctx.writeAndFlush(t);// 返回心跳
            } else {
                threadPool.submit(() -> {
                    Transfer to = doDispatchRequest(transfer);
                    ctx.writeAndFlush(to);// 非IO线程 异步提交到IO
                });
            }
        }
        // 业务请求处理
        Transfer doDispatchRequest(Transfer from) {
            Request request = (Request) from.target;
            Transfer to = new Transfer(from.id);
            to.request = false;
            to.serializableId = from.serializableId;
            Response response = new Response();
            try {
                String serverId = request.getClassName() + request.getMethodDesc();
                ServiceBean serverBean = register.get(serverId);
                if (serverBean == null) {
                    throw new IllegalArgumentException("找不到服务" + serverId);
                }
                Object result = serverBean.invoke(request.getArgs());
                response.setResult(result);
                to.status = Transfer.STATUS_OK;
            } catch (Throwable e) {
                e.printStackTrace();
                response.setError(e);
                to.status = Transfer.STATUS_ERROR;
            }
            to.target = response;
            return to;
        }
    }

    private static class ServiceBean {
        Method method;
        Object target;

        public ServiceBean(Method method, Object target) {
            this.method = method;
            this.target = target;
        }

        public Object invoke(Object[] args) throws Exception {
            return method.invoke(target, args);
        }
    }

    public void registerServer(Class serviceInterface, Object serverBean) {
        assert serviceInterface.isInterface();
        for (Method method : serviceInterface.getMethods()) {
            int modifiers = method.getModifiers();
            if (Modifier.isStatic(modifiers) || Modifier.isNative(modifiers)) {
                continue;
            }
            String methodDescriptor = Type.getMethodDescriptor(method);
            String key = serviceInterface.getName() +method.getName()+ methodDescriptor;
            register.put(key, new ServiceBean(method, serverBean));
        }
    }
}

测试类

public class RpcTest {
    @Test
    public void startServerTest() throws InterruptedException, IOException {
        RpcServer server = new RpcServer();
        server.registerServer(UserService.class, new UserServiceImpl());
        server.start(8080);
        System.in.read();
    }

    public static void main(String[] args) throws InterruptedException, IOException {
        RpcClient client = new RpcClient();
        client.init("127.0.0.1", 8080);
        UserService service = client.getRemoteService(UserService.class);


        BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
        while (true) {
            String s = in.readLine();
            System.out.println(service.getUser(1));

        }
    }

    // 多线程并发调用
    @Test
    public void syncTest() throws InterruptedException, IOException {
        RpcClient client = new RpcClient();
        client.init("127.0.0.1", 8080);
        UserService service = client.getRemoteService(UserService.class);
        ExecutorService executor = Executors.newFixedThreadPool(100);
        for (int i = 0; i < 100; i++) {
            int id = i;
            executor.execute(() -> {
                User user = service.getUser(id);
                System.out.println(user);
                assert user.getId().equals(id);
            });
        }
        System.in.read();
    }
}

对于UserSevice大家可以自己建一个接口和随便实现一下即可。

总结

利用netty实现rpc还是挺有难度的,小编是进过将近一周才陆续写完这篇博客,希望小编已经充分讲清楚了,假如大家将netty的理论与实战结合完毕,那相信和小编一样有长足进步。加油!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/13545.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!