【Netty专题】Netty实战与核心组件详解

不管现实多么惨不忍睹,都要持之以恒地相信,这只是黎明前短暂的黑暗而已。不要惶恐眼前的难关迈不过去,不要担心此刻的付出没有回报,别再花时间等待天降好运。真诚做人,努力做事!你想要的,岁月都会给你。【Netty专题】Netty实战与核心组件详解,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文

前言

Netty啊,真的是大名鼎鼎!Netty之于Java网络编程,相当于Spring之于Java开发。两者都是Java生态的金字塔尖的【框架】!所以,非常推荐Java程序员学习这个框架。

Netty有多牛逼?据说,曾经在性能上把谷歌公司一个用C++写的网络通信框架都给人干碎了。后来,后者参照了Netty的设计方案,才完成了超越。

阅读对象

需要有一定的网络编程基础。如若没有,请务必要学习下【阅读导航】中提到的系列上一篇文章。

另外,如果你们了解【设计模式】中的【责任链模式】就更好了。因为在Netty的开发中,Handler使用了【责任链模式】的方式,将各个Handler链化起来。

而且很负责地告诉大家,好多优秀的Java源码,都有【责任链模式】的影子。所以,去学习吧,能帮助你阅读源码以及提升自己的编程技巧。传送门:《史上最全设计模式导学目录(完整版)

阅读导航

系列上一篇文章:《【Netty专题】【网络编程】从OSI、TCP/IP网络模型开始到BIO、NIO(Netty前置知识)
系列下一篇文章:《【Netty专题】用Netty手写一个远程长连接通信框架

前置知识

课程内容

一、Netty简介

1.1 Netty是什么

Netty是由 JBOSS 提供的一个Java开源网络通信框架。它是一个【异步事件驱动】的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。
也就是说,Netty 是一个基于NIO的客户、服务器端的编程通信框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。

上面有个2细节很重要:

  1. Netty是一个【异步事件驱动】的网络应用程序框架(异步,事件驱动,如何理解?)
  2. Netty 是一个基于NIO的客户、服务器端的编程通信框架。NIO,NIO,NIO,讲三遍
    (PS:有心的同学这个时候应该回忆以下,Java的NIO编程里面,有什么组件,或者细节来着?)

1.2 Netty有什么优势

相比传统Java Socket编程、Java NIO,Netty具有如下明显优势

  1. 提供了更高层次的封装,API使用简单,降低了网络编程开发门槛;

传统的NIO开发,你需要独自考虑、处理网络编程中遇到的一些常见问题。如:【断线重连】、【 网络闪断】、【心跳处理】、【粘包】、【半包读写】、【网络拥塞】和【异常流】

  1. 功能强大,预制了多种编解码功能, 支持多种主流的协议;

编解码:网络编程一定要处理的环节。因为数据在网络中传输是需要转换为二进制的,不可能是明文
支持的协议:传输层有TCP、UDP、本地传输;应用层有HTTP、WebSocket等

  1. 定制能力强,通过自定义的ChannelHandler实现更灵活的拓展
  2. 性能高,对比其他主流的NIO框架,Netty的综合性能更优
  3. 成熟、稳定。Netty目前基本没有大更新了,基本上已经修复了所有JDK NIO的BUG
  4. 社区活跃
  5. 已经经历了大规模商业应用的考验,质量有保证

二、第一个Netty程序

2.1 Netty简单使用示例

话不多说,我们先来简单使用一下,开始我们的第一个Netty程序,然后再一点一点推敲。
先导入pom:

<dependency>
	<groupId>io.netty</groupId>
	<artifactId>netty-all</artifactId>
	<version>4.1.42.Final </version>
	<scope>compile</scope>
</dependency>

然后引入服务端代码:

/**
 * Netty服务端
 *
 * @author zhangshen
 * @date 2023/10/21 14:52
 * @slogan 编码即学习,注释断语义
 **/
public class NettyServer {
    static final int PORT = 9999;
    public static void main(String[] args) throws InterruptedException {

        EventLoopGroup bossEventLoopGroup = new NioEventLoopGroup();
        EventLoopGroup wokerEventLoopGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossEventLoopGroup, wokerEventLoopGroup)
                    .channel(NioServerSocketChannel.class)
                    .localAddress(new InetSocketAddress(PORT))
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new NettyServerHandler());
                        }
                    });
            System.out.println("Netty服务端正在启动...");

            // 异步绑定到服务器,sync()会阻塞到完成
            ChannelFuture channelFuture = bootstrap.bind().sync();

            // 对通道关闭进行监听,closeFuture是异步操作,监听通道关闭
            // 通过sync()同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossEventLoopGroup.shutdownGracefully().sync();
        }
    }
}


/**
 * Netty服务端,自定义handler
 *
 * @author zhangshen
 * @date 2023/10/21 15:01
 * @slogan 编码即学习,注释断语义
 **/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Netty服务器:客户端连接已建立");
        super.channelActive(ctx);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        ByteBuf in = (ByteBuf) msg;
        System.out.println("Netty服务器收到消息:" + in.toString(CharsetUtil.UTF_8));

        String responseMsg = "你好啊,Netty客户端";
        ByteBuf buf = Unpooled.copiedBuffer(responseMsg, CharsetUtil.UTF_8);
        ctx.writeAndFlush(buf);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
    }
}

接着是Netty客户端代码示例:


/**
 * Netty客户端代码示例
 *
 * @author zhangshen
 * @date 2023/10/21 15:05
 * @slogan 编码即学习,注释断语义
 **/
public class NettyClient {

    static final int NETTY_SERVER_PORT = 9999;

    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .remoteAddress(new InetSocketAddress("127.0.0.1", NETTY_SERVER_PORT))
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new NettyClientHandler());
                        }
                    });

            // 异步连接到服务器,sync()会阻塞到完成,和服务器的不同点
            ChannelFuture channelFuture = bootstrap.connect().sync();

            // 阻塞当前线程,直到客户端的Channel被关闭
            channelFuture.channel().closeFuture().sync();
        } finally {
            eventLoopGroup.shutdownGracefully().sync();
        }
    }
}

/**
 * Netty客户端代码,自定义handler
 *
 * @author zhangshen
 * @date 2023/10/21 15:05
 * @slogan 编码即学习,注释断语义
 **/
public class NettyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        System.out.println("客户端收到消息:" + msg.toString(CharsetUtil.UTF_8));
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        String msgAfterTCP = "你好啊,Netty服务器";
        ctx.writeAndFlush(Unpooled.copiedBuffer(msgAfterTCP, CharsetUtil.UTF_8));
        ctx.alloc().buffer();
    }
}

2.2 代码解读

我们先来简单总结下服务端NettyServer的流程:

  1. 先声明一个ServerBootstrap(Bootstrap的一种)、EventLoopGroup。服务端声明了2个EventLoopGroup,其实1个也可以。这个跟Reactor模式有关
  2. 初始化bootstrap。通过链式调用设置一些属性,比如:group()、localAddress()、childHandler()(其实这些方法,我们可以看成Java POJO中的setXxx() )。最核心的是新增了一个自定义的NettyClientHandler
  3. 然后bootstrap.bind()
  4. 监听通道关闭
  5. 在finally块内关闭EventLoopGroup

而客户端NettyClient呢,它的流程如下:

  1. 先声明一个Bootstrap、EventLoopGroup
  2. 初始化bootstrap。通过链式调用设置一些属性,比如:group()、channel()、remoteAddress()、handler()(其实这些方法,我们可以看成Java POJO中的setXxx() )。最核心的是新增了一个自定义的NettyServerHandler
  3. 然后bootstrap.connect()
  4. 监听通道关闭
  5. 在finally块内关闭EventLoopGroup

看看,从代码主流程来看,其实客户端跟服务端基本没什么很大的区别。当然这不是主要的东西。最重要的是,大家发现没有,这里出现了好几个陌生的API,这就是Netty提供给我们的核心组件!这些组件会在后面给大家详解介绍,也是本文的核心所在!这些组件,分别是:EventLoopGroupBootstrap(ServerBootstrap)NioServerSocketChannel(NioSocketChannel)ChannelHandlerChannelPipeline(这个需要点进去才能看到)、ByteBuf

这些API组件有什么特别吗?
同学们还记得我们说【Netty是什么吗】?【Netty 是一个基于NIO的客户、服务器端的编程通信框架】啊!那同学们还记得Java NIO 3个核心组件吗?Channel通道Selector多路复用器ByteBuffer缓冲区嘛(其实还要考虑一个多线程)。
好了,就算我不说你们通过英文翻译也稍微能一一对应上了,既然Netty是基于NIO的,那NIO的这些细节,肯定也会被包含在Netty的组件中!比如:

  • EventLoopGroup:直译【事件循环组】。NIO代码里面很多while(true),然后不断循环检测事件发生,像不像?对的,EventLoopGroup可以看成是一个【线程池】
  • Channel:通道嘛,这个大家最容易理解了。可能有些朋友还不明白Channel通道是什么,其实就是BIO演变到NIO之后,Socket被封装到了Channel里面

OK,更多详细介绍我会在【三、Netty核心组件详解】中讲到。

2.3 Netty的特性

我们在最开始介绍Netty的时候,有这么描述过:它是一个【异步事件驱动】的网络应用程序框架。并且向大家抛出了这么个问题:如何理解【异步事件驱动】?
如果你们看了我上一篇文章其实不难理解。【异步事件驱动】=【异步】+【事件驱动】。

  • 异步:跟同步相对。异步在编程中的体现就是,新开一条线程去处理任务
  • 事件驱动:其实就是Reactor模式在Netty中的体现。Netty是基于Reactor模式设计的

只不过稍微有些不同的是:Netty对于事件的定义。

2.3.1 Netty的事件

Netty 使用不同的事件来通知我们状态的改变或者是操作的状态。这使得我们能够基于已经发生的事件来触发适当的动作。Netty 事件是按照它们与入境或出境数据流的相关性进行分类的。

  1. 可能由入境数据或者相关的状态更改而触发的事件包括:
    • 连接已被激活或者连接失活;
    • 数据读取;
    • 用户事件;
    • 错误事件
  2. 出境事件是未来将会触发的某个动作的操作结果,这些动作包括:
    • 打开或者关闭到远程节点的连接;
    • 将数据写到或者冲刷到套接字

每个事件都可以被分发给 ChannelHandler 类中的某个用户实现的方法,既然事件分为入境和出境,用来处理事件的 ChannelHandler 也被分为可以处理入境事件的 Handler 和出境事件的 Handler,当然有些 Handler 既可以处理入境也可以处理出境。
Netty 提供了大量预定义的可以开箱即用的 ChannelHandler 实现,包括用于各种协议(如 HTTP 和 SSL/TLS)的 ChannelHandler。

基于 Netty 的网络应用程序中根据业务需求会使用 Netty 已经提供的 ChannelHandler 或者自行开发 ChannelHandler,这些 ChannelHandler 都放在 ChannelPipeline 中统一管理,事件就会在 ChannelPipeline 中流动,并被其中一个或者多个 ChannelHandler 处理。

它的原理图如下:
在这里插入图片描述

2.4 Netty线程模型

在这里插入图片描述

模型解读:
1) Netty 抽象出两组线程池BossGroup和WorkerGroup,BossGroup专门负责接收客户端的连接, WorkerGroup专门负责网络的读写

不就是Reactor模型的主从架构吗

2)BossGroup和WorkerGroup类型都是NioEventLoopGroup

NIO是一种IO方式,epoll,BIO都是。所以,其实还有EpollEventLoopGroup,以此类推

3)NioEventLoopGroup 相当于一个事件循环线程组, 这个组中含有多个事件循环线程 ,每一个事件循环线程是NioEventLoop
4)每个NioEventLoop都有一个selector , 用于监听注册在其上的socketChannel的网络通讯
5)每个Boss NioEventLoop线程内部循环执行的步骤有 3 步

  1. 处理accept事件 , 与client 建立连接 , 生成 NioSocketChannel
  2. 将NioSocketChannel注册到某个worker NIOEventLoop上的selector
  3. 继续处理任务队列的任务 , 即runAllTasks

6)每个worker NIOEventLoop线程循环执行的步骤

  1. 轮询注册到自己selector上的所有NioSocketChannel 的read, write事件
  2. 处理 I/O 事件, 即read , write 事件, 在对应NioSocketChannel 处理业务
  3. runAllTasks处理任务队列TaskQueue的任务 ,一些耗时的业务处理一般可以放入线程池中慢慢处理,这样不影响数据在 pipeline 中的流动处理

7)每个worker NIOEventLoop处理NioSocketChannel业务时,会使用 pipeline (管道),管道中维护了很多 handler 处理器用来处理 channel 中的数据

三、Netty核心组件详解

我们在【2.2 代码解读】中除了简单分析了Netty服务端客户端使用流程外,还给大家挖掘出了一些Netty的核心组件。分别是:EventLoopGroupBootstrap(ServerBootstrap)NioServerSocketChannel(NioSocketChannel)ChannelHandlerChannelPipeline(这个需要点进去才能看到)、ByteBuf。接下来我们就开始着手研究研究。
这些组件出现的位置大概就是代码流程【自上而下】、【自内而外】吧。

3.1 EventLoopGroup和EventLoop

EventLoop直译:事件循环;EventLoopGroup直译:事件循环组。

虽然我不知道这是啥,但是基本上可以猜测:后者是对前者做管理的类。所以我们还是要了解一下EventLoop先。
回想一下我们在 NIO 中是如何处理我们关心的事件的?很简单,就是在一个 while 循环中 select 出事件,然后依次处理每种事件,这不就是【事件循环】嘛。

3.1.1 EventLoop:事件循环

EventLoop是Netty 的核心接口,用于处理网络连接的生命周期中所发生的事件。它的类结构如下:
在这里插入图片描述
再来看看接口定义:
在这里插入图片描述
再看看对应的实现类:
在这里插入图片描述
看,我们用到的NioEventLoop就在里面了。如果大家翻开里面的源码,会发现,NioEventLoop里面有几个重要的属性,我这边用伪代码写一下:(有一些属性是在父类中的)

class NioEventLoop {
	Selector selector;
	Thread thread;
	Queue<Runnable> taskQueue;
	SelectedSelectionKeySet selectedKeys;
}

由上面的伪代码可以看到,NioEventLoop 中:

  1. 维护了一条线程任务队列是否似曾相识?线程池呀!),支持异步提交执行任务,线程启动时会调用 NioEventLoop 的 run 方法,执行 I/O 任务和非 I/O 任务:
    • I/O 任务,即 selectionKey 中 ready 的事件,如 accept、connect、read、write 等,由 processSelectedKeys 方法触发
    • 非 IO 任务,添加到 taskQueue 中的任务,如 register0、bind0 等任务,由 runAllTasks 方法触发
  2. 维护了一个Selector选择器(多路复用器)。这个在NIO里面,就是循环遍历注册在Selector上的所有Socket,然后响应对应事件嘛。所以从这个东西一定能看出一个东西,那就是:【一个EventLoop里面肯定管理了多个Channel
  3. 维护了一个SelectionKeySet。这个不知道大家有没有印象,在NIO模型中我说:向Selector注册了Channel和感兴趣事件后就会被包装成一个SelectionKey
3.1.2 NioEventLoopGroup:事件循环组

NioEventLoopGroup:事件循环组。它是做什么的呢?其实,它主要干下面几件事情:

  1. 管理 EventLoop 的生命周期,可以理解为一个线程池,内部维护了一组线程
  2. 负责为每个新建的Channel分配一个 EventLoop

EventLoop的分配
异步传输实现只使用了少量的 EventLoop(以及和它们相关联的 Thread),而且在当前的线程模型中,它们可能会被多个 Channel 所共享。这使得可以通过尽可能少量的 Thread 来支撑大量的 Channel,而不是每个 Channel 分配一个Thread(EventLoop)。
EventLoopGroup 负责为每个新创建的 Channel 分配一个 EventLoop。怎么实现的呢?顺序分配。

在当前实现中,使用顺序循环(round-robin)的方式进行分配以获取一个均衡的分布,并且相同的 EventLoop 可能会被分配给多个 Channel。一旦一个 Channel 被分配给一个 EventLoop,它将在它的整个生命周期中都使用这个EventLoop(以及相关联的 Thread)。
在这里插入图片描述

3.1.3 所谓异步的体现

我们在一开始说Netty的时候提到过:Netty是一个【异步事件驱动】的网络应用程序框架。
事件驱动跟Reactor模型有关嘛,看了上面的EventLoop估计有一点感觉了。那【异步】呢?其实这里已经有体现了。怎么说?

首先,我们说了,NioEventLoop它是有自己线程的,所以,我们服务端绑定ServerSocketChannel的时候肯定会给他分配一个EventLoop
那么,我们点开来看,服务端绑定的时候是怎样的,可以稍微窥见一点Netty的异步所在,以及EventLoop是如何工作的。
在这里插入图片描述
但是追踪链比较深,我只能告诉大家,在服务端调用这个bind的时候,会新建一条Channel,并且把真正的bind操作投递给Channel里面的QueueTask任务队列里面。

3.2 Channel:通道

我们在之前的NIO里面,包括上文中也提到过,NIO以后使用了Channel来封装Socket。所以可以近似的认为Channel就是 Socket。很显然啊,这是一个很重要的东西。

Channel即Java NIO编程里面的Socket。Netty 的 Channel 接口所提供的 API,被用于所有的 I/O 操作(bindconnectreadwrite),它大大地降低了直接使用 Socket 类的复杂性。由于 Channel 是独一无二的,所以为了保证顺序需要将 Channel 声明为 java.lang.Comparable的一个子接口。因此,如果两个不同的 Channel 实例都返回了相同的散列码,那么AbstractChannel 中的 compareTo()方法的实现将会抛出一个 Error。

说到Channel,如果大伙有手敲过上面的示例代码的话,你会有点印象:诶,我们自己手写ChannelInboudHandler的时候,貌似看到过不少如下的方法哦:
在这里插入图片描述
我想有过自己思考的朋友,估计通过英文多少能猜到了,这些肯定涉及【生命周期】、【事件】之类的说法。下面就是想给大家介绍一下,【Channel的生命周期】

Channel 的生命周期状态

  • ChannelUnregistered :Channel 已经被创建,但还未注册到 EventLoop
  • ChannelRegistered :Channel 已经被注册到了 EventLoop
  • ChannelActive :Channel 处于活动状态(已经连接到它的远程节点)。它现在可以接收和发送数据了
  • ChannelInactive :Channel 没有连接到远程节点

当这些状态发生改变时,将会生成对应的事件。这些事件将会被转发给 ChannelPipeline中的 ChannelHandler,其可以随后对它们做出响应。在我们的编程中,关注 ChannelActive 和ChannelInactive 会更多一些。

重要 Channel 的方法
在这里插入图片描述

  • eventLoop:返回分配给 Channel 的 EventLoop
  • pipeline:返回 Channel 的 ChannelPipeline,也就是说每个 Channel 都有自己的ChannelPipeline
  • isActive:如果 Channel 是活动的,则返回 true。活动的定义依赖于底层的传输协议。例如,一个 Socket 传输一旦连接到了远程节点便是活动的,而一个 Datagram 传输一旦被打开便是活动的
  • localAddress:返回本地的 SokcetAddress
  • remoteAddress:返回远程的 SocketAddress
  • write:将数据写到远程节点,注意,这个写只是写往 Netty 内部的缓存,还没有真正写往 socket
  • flush:将之前已写的数据冲刷到底层 socket 进行传输
  • writeAndFlush:一个简便的方法,等同于调用 write()并接着调用 flush()

3.3 ChannelPipeline、ChannelHandler 和 ChannelHandlerContext

我们在前面的原型图上有小小的体现过:每一个Channel都有一个自己的Pipeline,那这个玩意有什么用吗?
在这里插入图片描述
其实就算没看源码,仅凭名字,跟一些简单的代码示例基本能猜得到。Pipeline,管道的意思。然后我们在代码示例中是这么使用的:

ch.pipeline().addLast(new NettyServerHandler());

往【管道】里面添加Handler。不知道大家看到这里会想象到什么画面,我自己的话,每次看到这种东西想到的是【自来水过滤器】。【自来水过滤器】不就是【管道】内组织了多层【过滤颗粒】嘛。
在这里插入图片描述

3.3.1 ChannelPipeline:管道

ChannelPipeline管道,这个需要我们好好理解一下。

Channel被创建时,它将会被自动地分配一个新的ChannelPipeline,每个Channel都有自己的 ChannelPipelineChannel的所有操作,都将会以事件的形式流经ChannelPipeline

ChannelPipeline 提供了 ChannelHandler 链的容器,提供了在链上管理ChannelHandler的方法,并定义了用于在该链上传播入境(也就是从网络到业务处理)和 出境(也就是从业务处理到网络),各种事件流的 API,我们代码中的 ChannelHandler 都是放在 ChannelPipeline 中的。

注意:ChannelHandler并不是直接以原型插入到ChannelPipeline中,而是包装成ChannelHandlerContext。这个很重要,是一种仿照LinkedList的设计。怎么说?3.3.4给大家说道说道

在这里插入图片描述
当 Channel 中发生指定事件时,该事件就会在 ChannelPipeline 中沿着双向链表进行传播,调用各个 ChannelHandler 中的指定方法,完成相应的业务处理。
Netty 正是通过 ChannelPipeline 这一结构为用户提供了自定义业务逻辑的扩展点,用户只需要向 ChannelPipeline 中添加处理对应业务逻辑的 ChannelHandler,之后当指定事件发生时,该 ChannelHandler 中的对应方法就会进行回调,实现业务的处理
在这里插入图片描述

ChannelPipeline中ChannelHandler 的生命周期
在 ChannelHandler 被添加到 ChannelPipeline 中或者被从 ChannelPipeline 中移除时会调用下面这些方法。这些方法中的每一个都会在入参处接受一个ChannelHandlerContext参数。

  • handlerAdded:当把 ChannelHandler 添加到 ChannelPipeline 中时被调用
  • handlerRemoved:当从 ChannelPipeline 中移除 ChannelHandler 时被调用
  • exceptionCaught:当处理过程中在 ChannelPipeline 中有错误产生时被调用
3.3.2 ChannelHandler:通道处理器

ChannelHandler,通道处理器。它应该是我们业务开发最最应该关注的核心API了。我们写的大部分网络编程业务都应该要实现该接口。
正如上面介绍ChannelPipeline时说的:当Channel中发生指定事件时,该事件就会在ChannelPipeline 中沿着双向链表进行传播,调用各个ChannelHandler中的指定方法,完成相应的业务处理。(PS:这里,就是所谓【责任链模式】的体现
在这里插入图片描述

// Netty服务端,自定义handler
public class NettyServerHandler extends ChannelInboundHandlerAdapter {}

// Netty客户端代码,自定义handler
public class NettyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {}

从上面的类图结构以及我们一开始的示例代码可以看出,ChannelHandler大体可以分为ChannelInboundHandler ChannelOutboundHandler ,分别处理入境(也就是从网络到业务处理)和 出境(也就是从业务处理到网络)事件。

这里的【网络到业务处理】、【业务处理到网络】大家千万别狭隘地认为是【客户端到服务端】、【服务端到客户端】啊。对于客户端发送消息到服务端来说,大致流程为:客户端 –> 网络 –> 服务端;反之则是:服务端 –> 网络 –> 客户端。所以上面说的【业务】是指【客户端】和【服务端】。
所以,【网络到业务处理】、【业务处理到网络】映射的网络关系是:

  1. 网络(网卡)–>客户端。【网络到业务处理】
  2. 网络(网卡)–>服务端。【网络到业务处理】
  3. 客户端–>网络(网卡)。【业务处理到网络】
  4. 服务端–>网络(网卡)。【业务处理到网络】

总的来说,这个意思是:无论客户端还是服务端,都有出境入境事件。
值得注意的是,出境这个定义跟大家想象的还有一点点不一样:它不单止包括从业务写数据到网络,也包括发出操作信号这一行为

这些Handler的API也很简单,就是提供一些列回调接口让用户实现自定义的业务,但是最让我们疑惑的恰好就是:什么时候,哪个回调接口会被调用呢?很简单,如下:

  • ChannelInboundHandler :处理入境数据以及各种状态变化。通过API方法名字我们就可以看出来,它跟Channel的生命周期有关。大伙可以回头看看【3.2 Channel:通道】
    在这里插入图片描述
    官方定义如下:
/**
 * 一种ChannelHandler,为状态更改添加回调。使得用户很容易检测到状态变化
 */
public interface ChannelInboundHandler extends ChannelHandler {}
  • ChannelOutboundHandler :处理出境数据并且允许拦截所有的操作。这里的操作其实就是跟NIO说的acceptconnectreadwrite有关了。当然,在Netty里面又丰富了一些事件定义。这是因为Netty把一切网络相关内容皆【事件】了
    在这里插入图片描述
    官方定义如下:
/**
 * 一种ChannelHandler,会收到io出境操作的通知
 */
public interface ChannelOutboundHandler extends ChannelHandler {}

但其实在很多时候,我们并不是直接实现这两个接口,而是通过继承它们的子接口,XxxChannelHandlerAdapter及其子类,ChannelHandler适配器(显然,这里用了适配器模式)。

3.3.3 ChannelHandler的适配器

有一些适配器类可以将编写自定义的 ChannelHandler 所需要的工作降到最低限度,因为它们提供了定义在对应接口中的所有方法的默认实现。很正常嘛,我们并不是关心所有定义在ChannelHandler的方法的具体实现,所以 Netty 提供了抽象基类 ChannelInboundHandlerAdapter(处理入境) 和ChannelOutboundHandlerAdapter(处理出境)。
我们可以使用 ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter 类作为自己的 ChannelHandler 的起始点。这两个适配器分别提供了 ChannelInboundHandler 和ChannelOutboundHandler 的基本实现。通过扩展抽象类 ChannelHandlerAdapter,它们获得了它们共同的超接口 ChannelHandler 的方法。
在这里插入图片描述

举例一个继承自ChannelHandlerAdapter的类:SimpleChannelInboundHandler
回想一下我们在 NIO 中是如何接收和发送网络数据的?都是首先创建了一个 Buffer,应用程序中的业务部分和 Channel 之间通过 Buffer 进行数据的交换:在这里插入图片描述
Netty 在处理网络数据时,同样也需要 Buffer,在 Read 网络数据时由 Netty 创建 Buffer,Write 网络数据时 Buffer 往往是由业务方创建的。不管是读和写,Buffer 用完后都必须进行释放,否则可能会造成内存泄露。

在 Write 网络数据时,可以确保数据被写往网络了,Netty 会自动进行 Buffer 的释放,但是如果 Write 网络数据时,我们有 outBoundHandler 处理了 write()操作并丢弃了数据,没有继续往下写,要由我们负责释放这个 Buffer,就必须调用 ReferenceCountUtil.release 方法,否则就可能会造成内存泄露。

但是由于消费入站数据是一项常规任务,所以 Netty 提供了一个特殊的被称为 SimpleChannelInboundHandler 的 ChannelInboundHandler 实现。这个实现会在数据被 channelRead0()方法消费之后自动释放数据。
在这里插入图片描述
同时系统为我们提供的各种预定义 Handler 实现,都实现了数据的正确处理,所以我们自行在编写业务 Handler 时,也需要注意这一点: 要么继续传递,要么自行释放

如果我们的 Handler 既要处理入站又要处理出站怎么办呢?这个时候就可以使用类ChannelDuplexHandler,当然也可以同时实现 ChannelOutboundHandler、ChannelInboundHandler 这两个接口,自然就要麻烦很多了。

3.3.4 ChannelPipeline和ChannelHandler举例

举个例子,我们在网络上传递的数据,要求加密,但是加密后密文比较大,需要压缩后再传输,而且按照业务要求,需要检查报文中携带的用户信息是否合法,于是我们实现了 5 个 Handler:解压(入)Handler、压缩(出)handler、解密(入)Handler、加密(出) Handler、授权(入) Handler。如下图所示:
在这里插入图片描述
如果一个消息或者任何其他的入境事件被读取,那么它会从 ChannelPipeline 的头部开始流动,但是只被处理入境事件的Handler处理,也就是解压(入)Handler、解密(入)Handler、授权(入) Handler,最终,数据将会到达 ChannelPipeline 的尾端,届时,所有处理就都结束了。
在这里插入图片描述
数据的出境运动(即正在被写的数据)在概念上也是一样的。在这种情况下,数据将从链的尾端开始流动,但是只被处理出境事件的 Handler 处理,也就是加密(出) Handler、压缩(出)handler,直到它到达链的头部为止。在这之后,出境数据将会到达网络传输层,也就是我们的 Socket。
在这里插入图片描述

也许有朋友会有疑问,出境入境的Handler都在同一条管道上链化,难道不会有问题吗?它是怎么分辨出境入境Handler的?
哎呀,这个很简单嘛,就算我看源码,我自己都能想到可以使用instanceof来判断。当然,在Netty源码人家可能为了高校,使用了自定义事件掩码判断
首先自定义事件掩码:
在这里插入图片描述
然后用这样的逻辑去判断:
在这里插入图片描述

所以在我们编写 Netty 应用程序时要注意,分属出境和入境不同的 Handler ,特殊要求的情况下是无所谓顺序的,正如我们下面的图所示,比如【压缩(出)handler】可以放在【解压(入)handler】和【解密(入) Handler】中间,也可以放在【解密(入) Handler】和【授权(入) Handler】之间。
而同属一个方向的 Handler 则是有顺序的,因为上一个 Handler 处理的结果往往是下一个 Handler 的要求的输入。比如入境处理,对于收到的数据,只有先解压才能得到密文,才能解密,只有解密后才能拿到明文中的用户信息进行授权检查,所以解压->解密->授权这个三个入境 Handler 的顺序就不能乱。
在这里插入图片描述

3.3.5 ChannelHandlerContext:通道处理器上下文

ChannelHandler并不是直接以原型插入到ChannelPipeline中,而是包装成ChannelHandlerContext
在这里插入图片描述
为什么要这样做呢?

首先,ChannelHandler是被链起来的,而且还是个双向链表。那你肯定能想象到,他肯定有个prevnext指针分别指向前一个和后一个节点嘛。那咱看看LinkedList怎么设计的:

在这里插入图片描述
内部新增了一个Node类,然后在Node类上包裹数据item,并且在上面定义prevnext指针。那你对比一下这两种代码写法,哪一种好点:

// 第一种写法:
public class ChannelHandler {
	ChannelHandler prev;
	ChannelHandler next;

	void read();
	void write();
	... ...
}

// 第二种写法:(模仿LinkedList)
public class ChannelContext {
	ChannelHandler item;
	ChannelContext<ChannelHandler> prev;
	ChannelContext<ChannelHandler> prev;
}

我只能说肯定是后者好点,它遵守【单一职责】的设计原则。所以,ChannelHandlerContext 的主要作用就和 LinkedList 内部的类 Node 类似。
不过 ChannelHandlerContext 不仅仅只是个包装类,它还提供了很多的方法,比如:让事件从【当前】ChannelHandler 传递给链中的下一个 ChannelHandler,还可以被用于获取底层的Channel,还可以用于写出境数据。

上面这个高亮处说的很重要啊。如果你有去真正写过Netty代码,你会发现写数据有多个API,如下:
在这里插入图片描述
它们有什么区别你?最大的区别是:在channelHandlerContext上的write是从【当前节点】开始write,其余两个的write,都会流经整个pipeline。

3.3.6 Channel、ChannelPipeline和ChannelHandlerContext上的事件传播

ChannelHandlerContext有很多的方法,其中一些方法也存在于ChannelChannelPipeline中,但是有一点重要的不同:

  • 如果调用Channel或者ChannelPipeline上的这些方法,它们将沿着整个 ChannelPipeline进行传播
  • 而调用位于ChannelHandlerContext上的相同方法,则将从当前所关联的 ChannelHandler开始,并且只会传播给位于该ChannelPipeline中的下一个(入境下一个,出境上一个)能够处理该事件的 ChannelHandler
    在这里插入图片描述
    我们用一个实际例子来说明,比如服务器收到对端发过来的报文,解压后需要进行解密,结果解密失败,要给对端一个应答;如果发现解密失败原因是服务器和对端的加密算法不一致,应答报文只能以明文的压缩格式发送,就可以在解密 handler 中直接使用 ctx.write 给对端应答,这样应答报文就只经过压缩 Handler 就发往了对端。
    正常情况下,应答报文要以加密和压缩格式发送,就可以在解密Handler中使用channel.write()或者 channelpipeline.write()给对端应答,这样应答报文就会流经整个出境处理过程。
    在这里插入图片描述

3.4 Bootstrap:引导程序

有过SpringCloud开发经验的同学对这个应该不陌生。这个接口其实也很好理解,直译过来就行了:引导程序。引导程序的作用通常是:帮助简化一些基础配置,就好像我们第一次使用某个程序时,通常都有对应的使用引导步骤。
然后呢,通过之前的BIO、NIO我估计大家也看到了,服务端跟客户端实际上的网络行为是有差异的,比如说,服务端是bind,客户端是connect,所以,Netty就设计了BootstrapServerBootstrap。无论你的应用程序使用哪种协议或者处理哪种类型的数据,唯一决定它使用哪种引导类的是它是作为一个客户端还是作为一个服务器。

3.5 ChannelInitializer:通道初始化器

Netty提供了一个特殊的ChannelInboundHandlerAdapter子类:
在这里插入图片描述
initChannel这个方法提供了一种将多个 ChannelHandler 添加到一个 ChannelPipeline 中的简便方法。你只需要简单地向 Bootstrap 或 ServerBootstrap 的实例提供你的 ChannelInitializer 实现即可,并且一旦 Channel 被注册到了它的 EventLoop 之后,就会调用你的 initChannel()版本。在该方法返回之后,ChannelInitializer 的实例将会从 ChannelPipeline 中移除它自己

ChannelInitializer在initChannel返回之后,会将自己从Pipeline中移除,这是一个很重要的特性。如若我们在自己的应用程序中需要一个只是用一次的Handler,可以仿造或者继承ChannelInitializer。比如【授权】这个过程:某客户端第一次连接登录以后,进行授权检查,检查通过后就可以把这个授权 Handler移除了。如果客户端关闭连接下线,下次再连接的时候,就是一个新的连接,授权 handler 依然会被安装到 ChannelPipeline ,依然会进行授权检查。

3.5 ByteBuf

从结构上来说,ByteBuf 由一串字节数组构成。数组中每个字节用来存放信息。
ByteBuf 提供了两个索引,一个用于读取数据,一个用于写入数据。这两个索引通过在字节数组中移动,来定位需要读或者写信息的位置(这种设计思想在很多高性能数据结构里面存在)。
当从 ByteBuf 读取时,它的 readerIndex(读索引)将会根据读取的字节数递增;同样,当写 ByteBuf 时,它的 writerIndex 也会根据写入的字节数进行递增。
在这里插入图片描述
需要注意的是极限的情况是 readerIndex 刚好读到了 writerIndex 写入的地方。如果 readerIndex 超过了 writerIndex 的时候,Netty 会抛出 IndexOutOf-BoundsException 异常。

public class NettyByteBuf {
    public static void main(String[] args) {
        // 创建byteBuf对象,该对象内部包含一个字节数组byte[10]
        // 通过readerindex和writerIndex和capacity,将buffer分成三个区域
        // 已经读取的区域:[0,readerindex)
        // 可读取的区域:[readerindex,writerIndex)
        // 可写的区域: [writerIndex,capacity)
        ByteBuf byteBuf = Unpooled.buffer(10);
        System.out.println("byteBuf=" + byteBuf);
        // 输出:byteBuf=UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 0, cap: 10)

        for (int i = 0; i < 8; i++) {
            byteBuf.writeByte(i);
        }
        System.out.println("byteBuf=" + byteBuf);
        // 输出:byteBuf=UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 8, cap: 10)

        for (int i = 0; i < 5; i++) {
            System.out.println(byteBuf.getByte(i));
            // 输出:0/1/2/3/4
        }
        System.out.println("byteBuf=" + byteBuf);
        // 输出:byteBuf=UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 8, cap: 10)

        for (int i = 0; i < 5; i++) {
            System.out.println(byteBuf.readByte());
            // 输出:0/1/2/3/4
        }
        System.out.println("byteBuf=" + byteBuf);
        // 输出:byteBuf=UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 5, widx: 8, cap: 10)


        //用Unpooled工具类创建ByteBuf
        ByteBuf byteBuf2 = Unpooled.copiedBuffer("hello,zhuge!", CharsetUtil.UTF_8);
        //使用相关的方法
        if (byteBuf2.hasArray()) {
            byte[] content = byteBuf2.array();
            //将 content 转成字符串
            System.out.println(new String(content, CharsetUtil.UTF_8));
			// 输出:hello,zhuge!
            System.out.println("byteBuf2=" + byteBuf2);
            // 输出:byteBuf2=UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 12, cap: 36)

			// 获取数组0这个位置的字符h的ascii码,h=104
            System.out.println(byteBuf2.getByte(0)); 
            // 输出:104

            int len = byteBuf2.readableBytes(); //可读的字节数  12
            System.out.println("len=" + len);
            // 输出:len=12

            //使用for取出各个字节
            for (int i = 0; i < len; i++) {
                System.out.println((char) byteBuf2.getByte(i));
            }

            //范围读取
            System.out.println(byteBuf2.getCharSequence(0, 6, CharsetUtil.UTF_8));
            System.out.println(byteBuf2.getCharSequence(6, 6, CharsetUtil.UTF_8));
        }
    }
}

*四、解决粘包/半包

PS:重点面试题
PS:重点面试题
PS:重点面试题

4.1 什么是TCP粘包、半包

在这里插入图片描述
假设客户端分别发送了两个数据包 D1 和 D2 给服务端,由于服务端一次读取到的字节数是不确定的,故可能存在以下 4 种情况。

  1. 服务端分两次读取到了两个独立的数据包,分别是 D1 和 D2,没有粘包和拆包;
  2. 服务端一次接收到了两个数据包,D1 和 D2 粘合在一起,被称为 TCP 粘包;
  3. 服务端分两次读取到了两个数据包,第一次读取到了完整的 D1 包和 D2 包的部分内容,第二次读取到了 D2 包的剩余内容,这被称为 TCP 拆包;
  4. 服务端分两次读取到了两个数据包,第一次读取到了 D1 包的部分内容 D1_1,第二次读取到了 D1 包的剩余内容 D1_2 和 D2 包的整包

如果此时服务端 TCP 接收滑窗非常小,而数据包 D1 和 D2 比较大,很有可能会发生第五种可能,即服务端分多次才能将 D1 和 D2 包接收完全,期间发生多次拆包。

我们已经知道了什么是粘包/半包了,那知其然得知其所以然啊!TCP 粘包/半包发生的原因是什么呢?

4.2 粘包/半包产生的原因

粘包产生的原因:
粘包的原因要分为两种情况:

  • TCP发送时粘包。由于 TCP 协议本身的机制,客户端与服务器会维持一个连接(Channel)。数据在连接不断开的情况下,可以持续不断地将多个数据包发往服务器,但是如果发送的网络数据包太小,那么他本身会启用 Nagle算法(可配置是否启用)对较小的数据包进行合并(基于此,TCP 的网络延迟要 UDP 的高些)然后再发送(超时或者包大小足够)。那么这样的话,服务器在接收到消息(数据流)的时候就无法区分哪些数据包是客户端自己分开发送的,这样产生了粘包;
  • 服务器缓冲区粘包。服务器在接收到数据库后,放到缓冲区中,如果消息没有被及时从缓存区取走,下次在取数据的时候可能就会出现一次取出多个数据包的情况,造成粘包现象

UDP没有粘包。本身作为无连接的不可靠的传输协议(适合频繁发送较小的数据包),他不会对数据包进行合并发送(也就没有 Nagle 算法之说了),他直接是一端发送什么数据,直接就发出去了,既然他不会对数据合并,每一个数据包都是完整的(数据+UDP 头+IP 头等等发一次数据封装一次)也就没有粘包一说了。

分包产生的原因:
分包产生的原因比较简单,就单纯是因为一个数据包被分成了多次接收。更具体的原因至少包括:

  • 应用程序写入数据的字节大小大于套接字发送缓冲区的大小(sendBuff
  • 进行 MSS 大小的 TCP 分段。MSS 是最大报文段长度的缩写。MSS 是 TCP 报文段中的数据字段的最大长度。数据字段加上 TCP 首部才等于整个的 TCP 报文段。所以 MSS 并不是TCP 报文段的最大长度,而是:MSS=TCP 报文段长度-TCP 首部长度。

4.3 解决粘包半包

由于底层的 TCP 无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决,根据业界的主流协议的解决方案,可以归纳如下。

  1. 在包尾增加分割符,比如回车换行符进行分割,例如 FTP 协议;

作为web开发者的我们应该很熟悉了,在上传文件的时候(各种流中),就经常出现需要【分隔符】的场景

  1. 消息定长,例如每个报文的大小为固定长度 200 字节,如果不够,空位补空格;
  2. 将消息分为消息头和消息体,消息头中包含表示消息总长度(或者消息体长度)的字段,通常设计思路为消息头的第一个字段使用 int32 来表示消息的总长度(在Netty中其实已经为我们提供了一个叫做LengthFieldBasedFrameDecoderHandler用来解决【粘包/半包】问题了)

五、编解码器框架(在下一篇手写长连接通信框架再说)

六、序列化问题(在下一篇手写长连接通信框架再说)

学习总结

  1. 学习了Netty的基本使用
  2. 学习了Netty的一些核心组件

感谢

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/180483.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!