Netty 入门及核心组件详解

导读:本篇文章讲解 Netty 入门及核心组件详解,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

1 为什么要用 Netty

1 虽然 JAVA NIO 框架提供了 多路复用 IO 的支持,但是并没有提供上层“信息格式”的良好封装。例如前两者并没有提供针对 Protocol Buffer、JSON 这些信息格式的封装,但是Netty 框架提供了这些数据格式封装(基于责任链模式的编码和解码功能)。

2 NIO 的类库和 API 相当复杂,使用它来开发,需要非常熟练地掌握 Selector、ByteBuffer、ServerSocketChannel、SocketChannel 等,需要很多额外的编程技能来辅助使用 NIO,例如,因为 NIO 涉及了 Reactor 线程模型,所以必须必须对多线程和网络编程非常熟悉才能写出高质量的 NIO 程序。

3 要编写一个可靠的、易维护的、高性能的 NIO 服务器应用。除了框架本身要兼容实现各类操作系统的实现外。更重要的是它应该还要处理很多上层特有服务,例如:客户端的权限、还有上面提到的信息格式封装、简单的数据读取,断连重连,半包读写,心跳等等,这些 Netty 框架都提供了响应的支持

4 Netty 的性能很高,按照 Facebook 公司开发小组的测试表明,Netty 最高能达到接近百万的吞吐:
在这里插入图片描述

2 Netty核心组件

2.1 Channel

基本的 I/O 操作(bind()、connect()、read()和 write())依赖于底层网络传输所提供的原语。在基于 Java 的网络编程中,其基本的构造是类 Socket。Netty 的 Channel 接口所提供的 API,被用于所有的 I/O 操作。大大地降低了直接使用 Socket 类的复杂性。此外,Channel也是拥有许多预定义的、专门化实现的广泛类层次结构的根。

生命周期:

ChannelUnregistered :Channel 已经被创建,但还未注册到 EventLoop。
ChannelRegistered :Channel 已经被注册到了 EventLoop。
ChannelActive :Channel 处于活动状态(已经连接到它的远程节点),它现在可以接收和发送数据了。
ChannelInactive :Channel 没有连接到远程节点。

在这里插入图片描述

2.2 EventLoop、EventLoopGroup

Netty 基于事件驱动模型,使用不同的事件来通知我们状态的改变或者操作状态的改变。它定义了在整个连接的生命周期里当有事件发生的时候处理的核心抽象。

Channel 为Netty 网络操作抽象类,EventLoop 主要是为Channel 处理 I/O 操作,两者配合参与 I/O 操作。

下图是Channel、EventLoop、Thread、EventLoopGroup之间的关系:

在这里插入图片描述

(1) 一个 EventLoopGroup 包含一个或多个 EventLoop。
(2) 一个 EventLoop 在它的生命周期内只能与一个Thread绑定。
(3) 所有有 EnventLoop 处理的 I/O 事件都将在它专有的 Thread 上被处理。
(4) 一个 Channel 在它的生命周期内只能注册与一个 EventLoop。
(5) 一个 EventLoop 可被分配至一个或多个 Channel 。
 

2.3 ChannelHandler

从应用程序开发人员的角度来看,Netty 的主要组件是 ChannelHandler,它充当了所有处理入站和出站数据的应用程序逻辑的容器。ChannelHandler 的方法是由网络事件触发的。事实上,ChannelHandler 可专门用于几乎任何类型的动作,例如将数据从一种格式转换为另外一种格式,例如各种编解码,或者处理转换过程中所抛出的异常。

举例来说,ChannelInboundHandler 是一个你将会经常实现的子接口。这种类型的ChannelHandler 接收入站事件和数据,这些数据随后将会被你的应用程序的业务逻辑所处理。当你要给连接的客户端发送响应时,也可以从 ChannelInboundHandler 直接冲刷数据然后输出到对端。应用程序的业务逻辑通常实现在一个或者多个 ChannelInboundHandler 中。
这种类型的 ChannelHandler 接收入站事件和数据,这些数据随后将会被应用程序的业务逻辑所处理。Netty 定义了下面两个重要的 ChannelHandler 子接口:

ChannelInboundHandler: 处理入站数据以及各种状态变化;
ChannelOutboundHandler: 处理出站数据并且允许拦截所有的操作。

ChannelHandler 的适配器 :

有一些适配器类可以将编写自定义的 ChannelHandler 所需要的工作降到最低限度,因为它们提供了定义在对应接口中的所有方法的默认实现。因为你有时会忽略那些不感兴趣的事件,所以 Netty 提供了抽象基类 ChannelInboundHandlerAdapter 和ChannelOutboundHandlerAdapter。

你可以使用 ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter 类作为自己的 ChannelHandler 的起始点。这两个适配器分别提供了 ChannelInboundHandler 和ChannelOutboundHandler 的基本实现。通过扩展抽象类 ChannelHandlerAdapter,它们获得了它们共同的超接口 ChannelHandler 的方法。

在这里插入图片描述
资源管理和 SimpleChannelInboundHandler :

Netty 在处理网络数据时,同样也需要 Buffer,在 Read 网络数据时由 Netty 创建 Buffer,Write 网络数据时 Buffer 往往是由业务方创建的。不管是读和写,Buffer 用完后都必须进行释放,否则可能会造成内存泄露。

在 Write 网络数据时,可以确保数据被写往网络了,Netty 会自动进行 Buffer 的释放,但是如果 Write 网络数据时,我们有 outBoundHandler 处理了 write()操作并丢弃了数据,没有继续往下写,要由我们负责释放这个 Buffer,就必须调用 ReferenceCountUtil.release 方法,否则就可能会造成内存泄露。

在 Read 网络数据时,如果我们可以确保每个 InboundHandler 都把数据往后传递了,也就是调用了相关的 fireChannelRead 方法,Netty 也会帮我们释放,同样的,如果我们有InboundHandler 处理了数据,又不继续往后传递,又不调用负责释放的ReferenceCountUtil.release 方法,就可能会造成内存泄露。

但是由于消费入站数据是一项常规任务,所以 Netty 提供了一个特殊的被称为 SimpleChannelInboundHandler 的 ChannelInboundHandler 实现。这个实现会在数据被 channelRead0()方法消费之后自动释放数据。

在这里插入图片描述
同时系统为我们提供的各种预定义 Handler 实现,都实现了数据的正确处理,所以我们自行在编写业务 Handler 时,也需要注意这一点: 要么继续传递,要么自行释放。

2.4 ChannelPipeline

当 Channel 被创建时,它将会被自动地分配一个新的 ChannelPipeline。这项关联是永久性的;Channel 既不能附加另外一个 ChannelPipeline,也不能分离其当前的。在 Netty 组件的生命周期中,这是一项固定的操作,不需要开发人员的任何干预。

使得事件流经 ChannelPipeline 是 ChannelHandler 的工作,它们是在应用程序的初始化或者引导阶段被安装的。这些对象接收事件、执行它们所实现的处理逻辑,并将数据传递给链中的下一个 ChannelHandler。它们的执行顺序是由它们被添加的顺序所决定的。

ChannelPipeline 中 ChannelHandler :
入站和出站 ChannelHandler 可以被安装到同一个 ChannelPipeline 中。如果一个消息或者任何其他的入站事件被读取,那么它会从 ChannelPipeline 的头部开始流动,最终,数据将会到达 ChannelPipeline 的尾端,届时,所有处理就都结束了。

数据的出站运动(即正在被写的数据)在概念上也是一样的。在这种情况下,数据将从ChannelOutboundHandler 链的尾端开始流动,直到它到达链的头部为止。在这之后,出站数据将会到达网络传输层,这里显示为 Socket。通常情况下,这将触发一个写操作。

如果将两个类别的ChannelHandler都混合添加到同一个ChannelPipeline 中会发生什么。虽然 ChannelInboundHandle 和 ChannelOutboundHandle 都扩展自 ChannelHandler,但是Netty 能区分 ChannelInboundHandler 实现和 ChannelOutboundHandler 实现,并确保数据只会在具有相同定向类型的两个 ChannelHandler 之间传递。

在这里插入图片描述

2.5 ChannelHandlerContext

通过使用作为参数传递到每个方法的 ChannelHandlerContext,事件可以被传递给当前ChannelHandler 链中的下一个 ChannelHandler。虽然这个对象可以被用于获取底层的Channel,但是它主要还是被用于写出站数据。

ChannelHandlerContext 代表了 ChannelHandler 和 ChannelPipeline 之间的关联,每当有ChannelHandler 添加到 ChannelPipeline 中时,都会创建 ChannelHandlerContext。ChannelHandlerContext 的主要功能是管理它所关联的 ChannelHandler 和在同一个ChannelPipeline 中的其他 ChannelHandler 之间的交互。
在这里插入图片描述
ChannelHandlerContext 有很多的方法,其中一些方法也存在于 Channel 和Channel-Pipeline 本身上, 但是有一点重要的不同。 。如果调用Channel 或者ChannelPipeline 上的这些方法,它们将沿着整个 ChannelPipeline 进行传播。而调用位于 ChannelHandlerContext上的相同方法,则将从当前所关联的 ChannelHandler 开始,并且只会传播给位于该ChannelPipeline 中的下一个(入站下一个,出站上一个)能够处理该事件的 ChannelHandler。

在这里插入图片描述

3 第一个 Netty 程序

在这里插入图片描述

3.1 pom


    <dependencies>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.28.Final</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
        </dependency>

        <!--工具-->
        <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.12.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-collections4 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-collections4</artifactId>
            <version>4.4</version>
        </dependency>
        <!--日志-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.21</version>
        </dependency>
        <dependency>
            <groupId>commons-logging</groupId>
            <artifactId>commons-logging</artifactId>
            <version>1.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>2.6.2</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <exclusions>
                <exclusion>
                    <artifactId>mail</artifactId>
                    <groupId>javax.mail</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jms</artifactId>
                    <groupId>javax.jms</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jmxtools</artifactId>
                    <groupId>com.sun.jdmk</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jmxri</artifactId>
                    <groupId>com.sun.jmx</groupId>
                </exclusion>
            </exclusions>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.25</version>
        </dependency>

    </dependencies>


    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

3.2 EchoServer

package com.rosh.echo;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * @Description:
 * @Author: Rosh
 * @Date: 2021/4/1 15:15
 */
public class EchoServer {


    private final int port;

    public EchoServer(int port) {
        this.port = port;
    }


    public void start() throws InterruptedException {
        //服务端启动类
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        //线程组
        EventLoopGroup group = new NioEventLoopGroup();
        try {

            //handler
            final EchoServerHandler echoServerHandler = new EchoServerHandler();

            /**
             * (1) 服务端绑定线程组
             * (2) 设置channel通道类型为NIO
             * (3) 指定监听端口
             * (4) 添加EchoServerHandler
             */
            serverBootstrap.group(group)
                    .channel(NioServerSocketChannel.class)
                    .localAddress(port)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(echoServerHandler);
                        }
                    });

            //异步绑定服务器,sync阻塞到绑定完成
            ChannelFuture channelFuture = serverBootstrap.bind().sync();
            System.out.println("服务器已经启动啦......");
            //阻塞当前线程,直到服务器的channel关闭
            channelFuture.channel().closeFuture().sync();
        } finally {
            /**
             *  优雅退出
             */
            group.shutdownGracefully().sync();
        }
    }


    public static void main(String[] args) throws InterruptedException {

         new EchoServer(8888).start();
    }


}

3.3 EchoServerHandler

public class EchoServerHandler extends ChannelInboundHandlerAdapter {


    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf in = (ByteBuf) msg;
        System.out.println("Server accept:" + in.toString(CharsetUtil.UTF_8));
        ctx.writeAndFlush(in);
        ctx.close();
    }


}

3.4 EchoClient

package com.rosh.echo;


import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.net.InetSocketAddress;

/**
 * @Description:
 * @Author: Rosh
 * @Date: 2021/4/1 15:15
 */
public class EchoClient {

    private final String host;

    private final int port;


    public EchoClient(String host,int port) {
        this.port = port;
        this.host = host;
    }

    public void start() throws InterruptedException {
        Bootstrap bootstrap = new Bootstrap();
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

        try {

            final EchoClientHandler echoClientHandler = new EchoClientHandler();

            /**
             *  设置线程组、设置通道类型、绑定远端地址、设置handler
             */
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .remoteAddress(new InetSocketAddress(host, port))
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(echoClientHandler);
                     }
            });

            //异步链接服务器
            ChannelFuture channelFuture = bootstrap.connect().sync();
            //阻塞
            channelFuture.channel().closeFuture().sync();

        } finally {
            eventLoopGroup.shutdownGracefully().sync();
        }
    }


    public static void main(String[] args) throws InterruptedException {
        new EchoClient("127.0.0.1",8888).start();
    }


}

3.5 EchoClientHandler

public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {


    /**
     *  读取到网络数据后进行处理
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        System.out.println("client Accept: " + msg.toString(CharsetUtil.UTF_8));

    }

    /**
     *  channel 连接成功后做业务处理
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("Hello Netty", CharsetUtil.UTF_8));
    }
}

3.6 启动服务端、客户端

在这里插入图片描述
在这里插入图片描述

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/15086.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!