Netty框架之编解码机制二(自定义协议)

导读:本篇文章讲解 Netty框架之编解码机制二(自定义协议),希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

前言

自上篇文章Netty框架之编解码机制一(ByteBuf以及Tcp粘包拆包),小编将继续讲解netty中的编解码,以及tcp拆包粘包的解决方案代码实践,希望对大家理解有所帮助。好了话不多说进入正题。

拆包粘包的解决方案代码实践

上篇文章分享了一系列解决粘包拆包的方案,下面用代码来编写一些。

固定长度
换行
自定义分割符号

public class PacketSplicingTest {
    private ServerBootstrap serverBootstrap;

    @Before
    public void initSocketServer() {
        serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(new NioEventLoopGroup(1), new NioEventLoopGroup(5));
        serverBootstrap.channel(NioServerSocketChannel.class);
    }

    @Test
    public void splicingTest() throws InterruptedException {

        serverBootstrap.childHandler(new ChannelInitializer<Channel>() {
            @Override
            protected void initChannel(Channel ch) throws Exception {
            	//使用固定长度 如果没有到4的长度则下面不会打印
                //ch.pipeline().addLast(new FixedLengthFrameDecoder(4));
				//换行分割,并且一行最大的长度为10
				//ch.pipeline().addLast(new LineBasedFrameDecoder(10));
				//特殊符号分割,下面示例为以#分割,最大读长度为10的字符串,中间true为自动会将#丢弃(即丢弃分隔符)
				ByteBuf byteBuf = Unpooled.wrappedBuffer(new byte[]{'#'});
                ch.pipeline().addLast(new DelimiterBasedFrameDecoder(10,true,byteBuf));
                ch.pipeline().addLast(new TrackHandler());
            }
        });
        ChannelFuture sync = serverBootstrap.bind(8080).sync();
        sync.channel().closeFuture().sync();

    }

    private class TrackHandler extends SimpleChannelInboundHandler {
        int count = 0;

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf byteBuf = (ByteBuf) msg;
            String message = byteBuf.toString(Charset.defaultCharset());
            System.out.println(String.format("message%s:%s", ++count, message));
        }
    }
}

看完上述比较简单的应用后,小编带大家编写一个自定义协议简单示例。

自定义协议

首先咱们来自定义一个协议的报文,以及编解码实现流程

在这里插入图片描述
是不是非常简单,那么现在小编来开始愉快的编写源码。
首先是编解码器

public class ProtocolCodes extends ByteToMessageCodec<String> {
    //标识符
    private static int MAGIC = 0xDDDD;
    //标识符 的bytebuf
    private static ByteBuf MAGIC_BUF = Unpooled.copyInt(MAGIC);

    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, String msg, ByteBuf byteBuf) throws Exception {
        byte[] bytes = msg.getBytes();
        byteBuf.writeInt(MAGIC);
        byteBuf.writeInt(bytes.length);
        byteBuf.writeBytes(bytes);
    }

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        //是否有标识符
        int indexOfMagic = indexOf(byteBuf, MAGIC_BUF);
        if (indexOfMagic < 0) {
            return; //需要更多的字节
        }
        //消息头的长度
        if (!byteBuf.isReadable(indexOfMagic + 8)) {
            return;//需要更多的字节
        }
        //读取消息头中消息长度
        int length = byteBuf.slice(indexOfMagic + 4, 4).readInt();
        //是否可读完整的消息体
        if (!byteBuf.isReadable(indexOfMagic + 8 + length)) {
            return;//需要更多的字节
        }
        //跳过消息头
        byteBuf.skipBytes(indexOfMagic + 8);
        //读取消息体
        ByteBuf buf = byteBuf.readRetainedSlice(length);
        String message = buf.toString(Charset.defaultCharset());
        list.add(message);
    }
    //netty 内部工具类
    private static int indexOf(ByteBuf haystack, ByteBuf needle) {
        for (int i = haystack.readerIndex(); i < haystack.writerIndex(); i++) {
            int haystackIndex = i;
            int needleIndex;
            for (needleIndex = 0; needleIndex < needle.capacity(); needleIndex++) {
                if (haystack.getByte(haystackIndex) != needle.getByte(needleIndex)) {
                    break;
                } else {
                    haystackIndex++;
                    if (haystackIndex == haystack.writerIndex() &&
                            needleIndex != needle.capacity() - 1) {
                        return -1;
                    }
                }
            }

            if (needleIndex == needle.capacity()) {
                // Found the needle from the haystack!
                return i - haystack.readerIndex();
            }
        }
        return -1;
    }
}

客户端

public class ProtocolClient {

    private Channel channel;


    private void start() throws InterruptedException {
        // ServerBootstrap
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(new NioEventLoopGroup(1));
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.handler(new ChannelInitializer<Channel>() {
            @Override
            protected void initChannel(Channel ch) {
                ch.pipeline().addLast(new ProtocolCodes());//出站处理
            }
        });
        ChannelFuture future = bootstrap.connect("127.0.0.1", 8080);
        channel = future.sync().channel();

    }


    public static void main(String[] args) throws Exception {
        ProtocolClient client = new ProtocolClient();
        client.start();
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        while (true) {
            String line = reader.readLine();
            client.channel.writeAndFlush(line);
        }
    }
}

服务端

public class ProtocolServer {

    private ServerBootstrap bootstrap;

    @Before
    public void init() {
        bootstrap = new ServerBootstrap();
        bootstrap.group(new NioEventLoopGroup(1), new NioEventLoopGroup(8));
        bootstrap.channel(NioServerSocketChannel.class);

    }

    @After
    public void start() throws InterruptedException {
        ChannelFuture future = bootstrap.bind(8080);
        System.out.println("启动成功");
        future.sync().channel().closeFuture().sync();
    }

    @Test
    public void test() {
        bootstrap.childHandler(new ChannelInitializer<Channel>() {
            @Override
            protected void initChannel(Channel ch) {
                ch.pipeline().addLast(new ProtocolCodes());
                ch.pipeline().addLast(new TrackHandler());
            }
        }) ;
    }
    private static class TrackHandler extends SimpleChannelInboundHandler<String> {
        int i = 0;
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) {
            System.out.println(String.format("消息%s:%s", i++, msg));
            ctx.writeAndFlush("返回消息");
        }
    }
}

测试结果:
在这里插入图片描述
在这里插入图片描述
成功编解码,这下发长度不一致的都可以了。

总结

今天主要是使用了netty编写了自定义协议是如何编解码的,当然小编自定义了一个简单的自定义协议,没有像http或dubbo等协议那么复杂,不过即使这样,大家其实也应该明白咱们的协议底层是如何编解码了吧。
看到这儿大家会不会觉得,网络编程的小伙伴确实会比写业务编程的小伙伴拿的工资高,这是有原因的。愿小编再接再厉,将底层的网络传输原理以及一些常用的协议研究一番,知道其架构的思想并掌握。期待小编后续的netty应用吧。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/13548.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!