pipeline 事件传播的过程

导读:本篇文章讲解 pipeline 事件传播的过程,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

简介

pipeline 和 ChannelHandlerContext, ChannelHandler三者之间的关系

pipeline 通过维持一个链表结构,链表节点是 ChannelHandlerContext,该节点持有 ChannelHandler。部分对 ChannelHandler 的操作直接暴露给 ChannelHandlerContext,因此我们可以直接操作 ChannelHandlerContext来间接操作 ChannelHandler

channelRead 事件为例,描述 inBoundoutBound 的传播过程。总体如下图:
yeeB6g.png

inBound事件

当发生某个IO事件的时候,例如链路建立链路关闭读取操作完成(register,read,active) 等,都会产生一个事件,事件在pipeline中传播和处理

pipeline中以fireXXX命名的方法都是从IO线程向用户业务Handler的inBound事件,它们的实现因功能而异,但是处理步骤类似。如下:

  • 调用HeadHandler对应的fireXXX方法;
  • 执行事件相关的逻辑操作。

当调用ChannelHandlerContext.fireXXX方法 (例如: fireChannelRead()) 的时候,事件会从当前节点向下进行传播,就是找到下一个ChannelInBoundHandler,然后调用ChannelInBoundHandler的channelXXX方法。

如果不覆盖channelXXX方法的话,默认会将这个消息在pipeline上从头到尾进行传播,最后会调用到Tail节点的channelXXX方法,如果说有消息一直调用到Tail节点的channelXXX方法,说明前面的channelHandler没有处理消息,使得消息一直到了尾节点,尾节点需要进行一定的释放,防止内存泄漏。

SimpleChannelInBoundHandler#channelRead0方法会帮助自动释放ByteBuf,它的channelRead方法调用channelRead0()并且释放ByteBuf。

outBound事件

由用户线程或者代码发起的IO操作被称做outBound事件。

ChannelOutBoundHandler的添加顺序和事件传播的顺序是相反的。而outBound事件是寻找下一个ChannelOutBoundHandler,调用该handler的方法,最终会从Tail节点传播到Head节点。

总结:每次使用pipeline传播调用方法时,是从头结点或者尾节点开始传播,而使用ChannelHandlerContext则是从当前节点开始传播。

异常传播

异常的传播并不区分是inBoundHandler还是outBoundHandler,异常都会在上面进行传播。

异常的传播顺序: 是和handler添加的顺序相关(一样)。默认情况下异常的传播是直接拿到当前节点的下一个节点,最终会传播到Tail节点exceptionCaught(),Tail节点最终会打印这个exception,通知这个异常并没有被处理。如果说我们需要对异常做一个处理的话,最好在pipeline最后添加一个异常处理器,最终可以对异常进行一个统一的处理。

inBound事件的传播

handler之间的传播信息通过 fireXXX方法:其区别是从哪个节点开始传播。

ctx.pipeline().fireChannelRead(msg); 从头节点HeadContext开始传播
ctx.fireChannelRead(msg); 从当前节点往下传播事件

@Override
public void channelRead(ChannelHandlerContext ctx,  Object msg)throws Exception {

    //调用通道的fireChannelRead方法是从头节点HeadContext开始传播
    ctx.pipeline(). fireChannelRead(msg);
    
    //调用数据节点的传播方法是从当前节点往下传播事件
    ctx.fireChannelRead (msg);
}

分析 ctx.fireChannelRead(msg);

pipeline 默认是DefaultChannelpipeline

这种方式是直接从当前节点开始传播的

其实就是找到相关 handler 执行其 channelRead 方法, 由于我们在这里的handler就是head节点, 所以我们跟到HeadContext#channelRead方法

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{    
    //向下传递channelRead事件    
    ctx.fireChannelRead(msg);
}

fireChannelRead()调用了findContextInbound()通过inbound属性轮询出下一个ChannelInboundHandler。

@Override    
public ChannelHandlerContext fireChannelRead(final Object msg) {
    // 先找到下一个节点,再执行channelRead方法
    // findContextInbound : 找到下一个节点
    invokeChannelRead(findContextInbound(), msg);
    return this;
}
private AbstractChannelHandlerContext findContextInbound(int mask) {
    AbstractChannelHandlerContext ctx = this;
    EventExecutor currentExecutor = executor();
    //通过inbound属性轮询出下一个inboundHandlerContext
    do {
        ctx = ctx.next;
    } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
    return ctx;
}

从头节点开始,逐个往下传递并触发用户回调函数,在这过程当中,最后传到尾节点TailContext
以channelRead为例,当走到这个方法则表明,通道内未对传播的内容进行处理,并且占用的内存未释放,在尾节点打印了日志并最终释放了内存。

最终 inbound事件的传播过程,是从头节点开始,逐个往下传递并触发用户回调函数,在这过程当中,

  • 可以手动调用 pipeline 的传播事件的方法,从任何一个节点开始从头开始触发传播事件,
  • 也可以直接通过ChannelHandlerContext的传播事件方法,一次从本节点开始往下传播事件。最后传到尾节点TailContext。

outBound 事件的传播

和inbound事件有相似之处。ChannelOutboundHandler的执行顺序正好和ChannelInboundHandler相反,是倒序的。 以write事件为例。

public void channelActive(ChannelHandlerContext ctx) throws Exception {
    //写法1    
    ctx.channel().write("test data");    
    //写法2    
    ctx.write("test data");
}

注意:

直接调用write方法是不能往对方channel中写入数据的, 因为这种方式只能写入到缓冲区, 还要调用flush方法才能将缓冲区数据刷到channel中, 或者直接调用writeAndFlush方法

ctx.write(“hello world”)

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ctx.write(msg, promise);
}

最终调用到AbstractChannelHandlerContext#write()方法,主要是做了两件事:

  1. findContextOutbound 方法找到下一个 ChannelOutboundHandlerContext
  2. 判断是否需要flush,选择执行write回调方法之后是否执行flush回调方法
private void write(Object msg, boolean flush, ChannelPromise promise) {
    ObjectUtil.checkNotNull(msg, "msg");
    try {
        if (isNotValidPromise(promise, true)) {
            ReferenceCountUtil.release(msg);
            // cancelled
            return;
        }
    } catch (RuntimeException e) {
        ReferenceCountUtil.release(msg);
        throw e;
    }
    //查找下一个 ChannelOutboundHandlerContext
    final AbstractChannelHandlerContext next = findContextOutbound(flush ?
            (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        //判断是否刷新
        if (flush) {
            //执行写并刷新方法
            next.invokeWriteAndFlush(m, promise);
        } else {
            //执行写方法
            next.invokeWrite(m, promise);
        }
    } else {
        final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
        if (!safeExecute(executor, task, promise, m, !flush)) {
            // We failed to submit the WriteTask. We need to cancel it so we decrement the pending bytes
            // and put it back in the Recycler for re-use later.
            //
            // See https://github.com/netty/netty/issues/8343.
            task.cancel();
        }
    }
}

findContextOutbound方法找到下一个ChannelOutboundHandlerContext

private AbstractChannelHandlerContext findContextOutbound(int mask) {
    //循环往前查找,通过outbound属性判断
    AbstractChannelHandlerContext ctx = this;
    EventExecutor currentExecutor = executor();
    do {
        ctx = ctx.prev;
    } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
    return ctx;
}

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/69736.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!