ChannelPipeline:Netty的事件传播管道

Netty 是一个异步的、高性能的、基于事件驱动的网络 IO 框架,它经过精心设计,不仅功能强大,还保持了良好的可扩展性,使用非常的灵活。

Netty 的主要功能就是处理网络 IO 事件,它使用了「拦截过滤器」设计模式,ChannelHandler 被设计为用来处理事件,而 ChannelPipeline 则用来传播事件。ChannelPipeline:Netty的事件传播管道通过类图可以发现,ChannelPipeline 实现了 ChannelInboundInvoker 和 ChannelOutboundInvoker,这代表它可以传播入站/出站事件。实现了 Iterable 接口,是因为它管理着一组 ChannelHandler,可以通过迭代器进行遍历。

ChannelPipeline 被设计成一个双向链表,它由一系列 ChannelHandlerContext 组成,当 ChannelHandler 被添加到 ChannelPipeline 中时,会被包装成一个 ChannelHandlerContext 加入到链表中。这样,当有入站事件时,事件会通过 ChannelPipeline 传播,从链头的 ChannelInBoundHandler 一直被传递到链尾,每个 ChannelHandler 只处理自己感兴趣的事件,不感兴趣的事件则通过fileXXX()继续传播。

这样做的好处是,事件的处理会非常的灵活。每个 ChannelHandler 的职责可以非常的简单和清晰,例如只处理:握手认证、加解密、编解码等等,通过 ChannelPipeline 将这些 ChannelHandler 自定义组装,就可以构建出一个功能强大的 Netty 程序。ChannelPipeline:Netty的事件传播管道ChannelPipeline 除了可以在硬编码时进行编排 ChannelHandler,还支持运行时动态修改,这大大增加了它的灵活性。例如:针对 IP 白名单的请求,可以跳过握手认证。

1. DefaultChannelPipeline 分析

ChannelPipeline 有两大实现类:DefaultChannelPipeline 和 EmbeddedChannelPipeline,后者是测试 ChannelHandler 时才会用到,这里不会分析。

DefaultChannelPipeline 是 ChannelPipeline 的默认实现类,很重要,这里会着重分析。

先看它的属性:

// ChannelHandler会绑定一个name,不指定的话会自动生成,这里会默认生成头尾节点的name
private static final String HEAD_NAME = generateName0(HeadContext.class);
private static final String TAIL_NAME = generateName0(TailContext.class);

// 缓存ChannelHandler对应的name,避免重复生成
private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
    new FastThreadLocal<Map<Class<?>, String>>() {
    @Override
    protected Map<Class<?>, String> initialValue() {
        return new WeakHashMap<Class<?>, String>();
    }
};

// 便于CAS修改estimatorHandle
private static final AtomicReferenceFieldUpdater<DefaultChannelPipeline, MessageSizeEstimator.Handle> ESTIMATOR =
    AtomicReferenceFieldUpdater.newUpdater(
    DefaultChannelPipeline.classMessageSizeEstimator.Handle.class, "estimatorHandle");

// 默认的头、尾节点,分别是HeadContext,TailContext
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;

// Pipeline绑定的Channel
private final Channel channel;
private final ChannelFuture succeededFuture;
private final VoidChannelPromise voidPromise;
// 是否开启内存泄漏追踪?如果开启了,调用touch()会记录堆栈信息
private final boolean touch = ResourceLeakDetector.isEnabled();
// 缓存Group对应的Group.next()事件执行器
private Map<EventExecutorGroup, EventExecutor> childExecutors;
// 计算ByteBuf占用的内存
private volatile MessageSizeEstimator.Handle estimatorHandle;
// 是否首次注册,如果是,会触发HandlerAdded回调
private boolean firstRegistration = true;

// ChannelHandler从Pipeline添加/移除后会触发回调,这里会将回调列表通过单向链表连接起来
private PendingHandlerCallback pendingHandlerCallbackHead;

// Pipeline是否注册到Channel
private boolean registered;

比较重要的有 head 和 tail,分别记录这 ChannelPipeline 的头尾节点,DefaultChannelPipeline 头尾节点是固定的,分别是 HeadContext 和 TailContext,它俩有特殊的用途,不可修改。

ChannelHandler 用途
HeadContext 1.传播入站事件
2.需要和 JDK 底层 API 交互的,转交给 Unsafe 执行
TailContext 1.释放入站数据
2.报告异常

再看它的构造函数,会给头尾节点赋值,并形成双向链表。

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);

    /*
    处理入站事件:
        1.如果前面的所有Handler都没释放入站数据,则TailContext会负责释放,防止内存泄漏。
        2.前面的Handler都没处理异常,则TailContext会打印警告日志,释放资源。
     */

    tail = new TailContext(this);

    /*
    处理出/入站事件:
        1.入站事件,无脑向后传播。
        2.出站事件,转交给Channel.Unsafe执行
     */

    head = new HeadContext(this);

    // 形成双向链表,中间可能还会插入ChannelHandlerContext
    head.next = tail;
    tail.prev = head;
}

1.1 编排 ChannelHandler

ChannelPipeline 的主要作用就是 ChannelHandler 的编排容器,通过将不同功能的 ChannelHandler 组合起来,就可以快速构建功能各异的 Netty 程序。

添加 ChannelHandler 的方法如下:

方法 说明
addFirst() 将 ChannelHandler 添加到 HeadContext 的下一个
addLast() 将 ChannelHandler 添加到 TailContext 的上一个
addBefore() 添加到指定 Handler 之前
addAfter() 添加到指定 Handler 之后

移除 ChannelHandler 的方法如下:

方法 说明
remove() 将指定 ChannelHandler 从链表中移除
removeFirst() 移除 HeadContext 的下一个节点
removeLast() 移除 TailContext 的上一个节点

由于使用的是链表的数据结构,因此 ChannelHandler 的增删效率极高,只是简单的改变指针的指向即可。

通过调用以上方法,就可以编排 ChannelHandler 的顺序,以处理 IO 事件。

代码太多,不全部贴了,这里只分析addFirst()的源码,其他大家自行研究。

@Override
public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    // 支持运行时并发修改,因此必须做同步控制
    synchronized (this) {
        // 检查ChannelHandler是否允许被添加,非共享的ChannelHandler只能被添加一次
        checkMultiplicity(handler);
        // 校验ChannelHandler的name是否重复,没有给定name会自动生成,按Handler类型+序号
        name = filterName(name, handler);
        // 将Handler封装为ChannelHandlerContext
        newCtx = newContext(group, name, handler);

        // 添加到HeadContext的next
        addFirst0(newCtx);

        // 如果非首次注册,则添加Handler回调任务
        if (!registered) {
            newCtx.setAddPending();
            callHandlerCallbackLater(newCtx, true);
            return this;
        }

        // 首次注册,如果是EventLoop线程,则直接触发Handler的handlerAdded()回调
        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            callHandlerAddedInEventLoop(newCtx, executor);
            return this;
        }
    }
    // 触发Handler的handlerAdded()回调
    callHandlerAdded0(newCtx);
    return this;
}

addFirst0()会将 ChannelHandlerContext 添加到 HeadContext 的 next 上:

// 将ChannelHandlerContext添加到HeadContext的next
private void addFirst0(AbstractChannelHandlerContext newCtx) {
    // 修改三者的指针指向
    AbstractChannelHandlerContext nextCtx = head.next;
    newCtx.prev = head;
    newCtx.next = nextCtx;
    head.next = newCtx;
    nextCtx.prev = newCtx;
}

addLast()也是同理,并非添加到链尾,而是添加到 TailContext 的 prev 上。

1.2 fileXXX()传播事件

ChannelPipeline 的fileXXX()会将事件传播给后续的 ChannelHandler 执行,事件传播方法如下:

方法 说明
fireChannelRegistered() 传播 Channel 注册事件
fireChannelUnregistered() 传播 Channel 取消注册事件
fireChannelActive() 传播 Channel 活跃事件
fireChannelInactive() 传播 Channel 失活事件
fireExceptionCaught() 传播异常事件
fireUserEventTriggered() 传播用户自定义事件
fireChannelRead() 传播 Channel 可读事件
fireChannelReadComplete() 传播 Channel 读取完毕事件
fireChannelWritabilityChanged() 传播 Channel 可写状态变更事件

由于大量的代码是重复的,笔者就不贴所有代码了,以fireChannelRead()为例进行分析。

当 NioEventLoop 检测到 Selector 上的 Channel 有数据可读时,会执行Channel.Unsafe.read()方法读取数据并封装成 ByteBuf,通过 Pipeline 将事件传播出去,这样后续的 ChannelHandler 就可以拿到 ByteBuf 做自己的业务了。

ChannelPipeline.fireChannelRead()会将读取到的数据从 HeadContext 开始传播。

// 传播可读事件,将读取到的msg 进行传递
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
    // 从链头开始传播,即从HeadContext开始
    AbstractChannelHandlerContext.invokeChannelRead(head, msg);
    return this;
}

invokeChannelRead()会执行给定 ChannelHandler 的 ChannelRead 回调。

/**
 * 执行指定ChannelHandler的ChannelRead回调
 * @param next 给定的ChannelHandlerContext节点
 * @param msg 读取到的数据
 */

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    // 调用touch()的目的只是为了追踪对象的访问堆栈记录,用于内存泄漏的排查
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    // 获取Context绑定的EventLoop
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        // 如果是EventLoop线程,则立即执行
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}

invokeChannelRead()会检查 ChannelHandler 的 handlerAdded 回调是否触发,如果没触发,会跳过该 Handler,将事件继续向后传播,否则就直接执行 channelRead 回调。

private void invokeChannelRead(Object msg) {
    // 检查handlerAdded回调是否触发,如果没触发则暂时跳过该Handler。
    if (invokeHandler()) {
        try {
            // handlerAdded回调已触发,执行channelRead回调
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            invokeExceptionCaught(t);
        }
    } else {
        // handlerAdded回调还没触发,继续向后传播
        fireChannelRead(msg);
    }
}

1.3 HeadContext

DefaultChannelPipeline 除了传播事件,它内置的头尾节点也是非常重要,有它们本身的职责所在。ChannelPipeline:Netty的事件传播管道HeadContext 本身是一个 ChannelHandlerContext,它会被添加到 ChannelPipeline 的链头,同时它实现了 ChannelInboundHandler 和 ChannelOutboundHandler 接口,这代表它需要处理入站和出站事件。

对于入站事件,HeadContext 必须向后传播,否则后续的 ChannelHandler 就没法处理了,下面是channelRead的一个例子:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    // 入站事件直接向后传播
    ctx.fireChannelRead(msg);
}

@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
    // 向后找到能处理CHANNEL_READ事件的InBoundHandler,再执行它的ChannelRead事件
    invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
    return this;
}


对于出站事件,如 bind、write 等需要调用 JDK 底层 API 的操作,HeadContext 会转交给 Unsafe 去执行。这里拿 write 为例:

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
    // 需要和JDK底层API交互,转交给Unsafe执行。
    unsafe.write(msg, promise);
}

Unsafe 会将 ByteBuf 暂存到 ChannelOutboundBuffer,flush 会将 flushde 节点转换成 Java 的 ByteBuffer,再调用底层 APISocketChannel.write(ByteBuffer)将数据发送出去。

HeadContext 这里做了一个代理的角色,实际出站事件的执行会转交给 Unsafe。

1.4 TailContext

ChannelPipeline:Netty的事件传播管道HeadContext 本身是一个 ChannelHandlerContext,它会被添加到 ChannelPipeline 的链尾,同时它实现了 ChannelInboundHandler 接口,这代表它需要处理入站事件。

由于是被编排在 ChannelPipeline 的链尾,这意味着入站事件它总是最后处理的,这使得它可以做一些保护性的动作,避免程序出现问题。

例如:如果前面所有的 ChannelHandler 都没有释放入站数据,TailContext 会负责释放,避免内存泄漏。源码如下:

protected void onUnhandledInboundMessage(Object msg) {
    try {
        logger.debug(
            "Discarded inbound message {} that reached at the tail of the pipeline. " +
            "Please check your pipeline configuration.", msg);
    } finally {
        // 释放入站数据
        ReferenceCountUtil.release(msg);
    }
}


如果前面所有的 ChannelHandler 都没有处理异常事件,TailContext 会输出日志,记录下未被处理的异常。源码如下:

protected void onUnhandledInboundException(Throwable cause) {
    try {
        logger.warn(
            "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
            "It usually means the last handler in the pipeline did not handle the exception.",
            cause);
    } finally {
        ReferenceCountUtil.release(cause);
    }
}


TailContext 主要的主责就是释放数据和记录异常了,其他事件它并不感兴趣,大量方法是空的,这里就不贴代码了。

2. 总结

ChannelPipeline 是 ChannelHandler 的编排容器,DefaultChannelPipeline 是 Netty 提供的默认实现类,它本身是一个双向链表,由一系列 ChannelHandlerContext 组成,ChannelHandler 被添加到 ChannelPipeline 时,会被封装成 ChannelHandlerContext 并加入到链表。

ChannelPipeline 的事件传播机制,使得 Netty 在处理 IO 事件时非常的灵活,每个 ChannelHandler 的职责都非常清晰,实现解耦,通过组装各个 ChannelHandler 就可以快速构建一个功能各异的 Netty 程序。

HeadContext 和 TailContext 是 DefaultChannelPipeline 默认的头尾节点,它们的职责也非常重要。HeadContext 需要将入站事件向后传播,否则后续的 ChannelHandler 就没法处理事件了,对于出站事件需要和 JDK 底层 API 交互的,需要转交给 Unsafe 类执行。TailContext 会负责入站数据的释放和记录异常,做一些保护性的工作。

原文始发于微信公众号(程序员小潘):ChannelPipeline:Netty的事件传播管道

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/29434.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!