简介
pipeline 和 ChannelHandlerContext, ChannelHandler三者之间的关系
pipeline
通过维持一个链表结构,链表节点是 ChannelHandlerContext
,该节点持有 ChannelHandler
。部分对 ChannelHandler
的操作直接暴露给 ChannelHandlerContext
,因此我们可以直接操作 ChannelHandlerContext
来间接操作 ChannelHandler
。
以 channelRead
事件为例,描述 inBound
和 outBound
的传播过程。总体如下图:
inBound事件
当发生某个IO事件的时候,例如
链路建立
,链路关闭
,读取操作完成
(register,read,active) 等,都会产生一个事件,事件在pipeline中传播和处理。
pipeline中以fireXXX
命名的方法都是从IO线程向用户业务Handler的inBound事件,它们的实现因功能而异,但是处理步骤类似。如下:
- 调用HeadHandler对应的fireXXX方法;
- 执行事件相关的逻辑操作。
当调用ChannelHandlerContext.fireXXX
方法 (例如: fireChannelRead()) 的时候,事件会从当前节点向下进行传播,就是找到下一个ChannelInBoundHandler
,然后调用ChannelInBoundHandler的channelXXX
方法。
如果不覆盖channelXXX
方法的话,默认会将这个消息在pipeline上从头到尾进行传播,最后会调用到Tail
节点的channelXXX方法,如果说有消息一直调用到Tail节点的channelXXX方法,说明前面的channelHandler没有处理消息,使得消息一直到了尾节点,尾节点需要进行一定的释放,防止内存泄漏。
SimpleChannelInBoundHandler#channelRead0
方法会帮助自动释放ByteBuf,它的channelRead方法调用channelRead0()并且释放ByteBuf。
outBound事件
由用户线程或者代码发起的IO操作被称做outBound事件。
ChannelOutBoundHandler
的添加顺序和事件传播的顺序是相反的。而outBound事件是寻找下一个ChannelOutBoundHandler,调用该handler的方法,最终会从Tail节点传播到Head节点。
总结:每次使用pipeline传播调用方法时,是从头结点或者尾节点开始传播,而使用ChannelHandlerContext则是从当前节点开始传播。
异常传播
异常的传播并不区分是inBoundHandler
还是outBoundHandler
,异常都会在上面进行传播。
异常的传播顺序: 是和handler添加的顺序相关(一样)。默认情况下异常的传播是直接拿到当前节点的下一个节点,最终会传播到Tail节点
的exceptionCaught()
,Tail节点最终会打印这个exception,通知这个异常并没有被处理。如果说我们需要对异常做一个处理的话,最好在pipeline最后添加一个异常处理器,最终可以对异常进行一个统一的处理。
inBound事件的传播
handler之间的传播信息通过 fireXXX方法:其区别是从哪个节点开始传播。
ctx.pipeline().fireChannelRead(msg); 从头节点HeadContext开始传播
ctx.fireChannelRead(msg); 从当前节点往下传播事件
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {
//调用通道的fireChannelRead方法是从头节点HeadContext开始传播
ctx.pipeline(). fireChannelRead(msg);
//调用数据节点的传播方法是从当前节点往下传播事件
ctx.fireChannelRead (msg);
}
分析 ctx.fireChannelRead(msg);
pipeline 默认是DefaultChannelpipeline
这种方式是直接从当前节点开始传播的
其实就是找到相关 handler 执行其 channelRead 方法, 由于我们在这里的handler就是head节点, 所以我们跟到
HeadContext#channelRead
方法
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
//向下传递channelRead事件
ctx.fireChannelRead(msg);
}
fireChannelRead()
调用了findContextInbound()
通过inbound属性轮询出下一个ChannelInboundHandler。
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
// 先找到下一个节点,再执行channelRead方法
// findContextInbound : 找到下一个节点
invokeChannelRead(findContextInbound(), msg);
return this;
}
private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
EventExecutor currentExecutor = executor();
//通过inbound属性轮询出下一个inboundHandlerContext
do {
ctx = ctx.next;
} while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
return ctx;
}
从头节点开始,逐个往下传递并触发用户回调函数,在这过程当中,最后传到尾节点TailContext
以channelRead为例,当走到这个方法则表明,通道内未对传播的内容进行处理,并且占用的内存未释放,在尾节点打印了日志并最终释放了内存。
最终 inbound事件的传播过程,是从头节点开始,逐个往下传递并触发用户回调函数,在这过程当中,
- 可以手动调用 pipeline 的传播事件的方法,从任何一个节点开始从头开始触发传播事件,
- 也可以直接通过ChannelHandlerContext的传播事件方法,一次从本节点开始往下传播事件。最后传到尾节点TailContext。
outBound 事件的传播
和inbound事件有相似之处。ChannelOutboundHandler
的执行顺序正好和ChannelInboundHandler相反,是倒序的。 以write事件为例。
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//写法1
ctx.channel().write("test data");
//写法2
ctx.write("test data");
}
注意:
直接调用write
方法是不能往对方channel中写入数据的, 因为这种方式只能写入到缓冲区, 还要调用flush
方法才能将缓冲区数据刷到channel中, 或者直接调用writeAndFlush
方法
ctx.write(“hello world”)
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ctx.write(msg, promise);
}
最终调用到AbstractChannelHandlerContext#write()
方法,主要是做了两件事:
findContextOutbound
方法找到下一个ChannelOutboundHandlerContext
- 判断是否需要flush,选择执行write回调方法之后是否执行flush回调方法
private void write(Object msg, boolean flush, ChannelPromise promise) {
ObjectUtil.checkNotNull(msg, "msg");
try {
if (isNotValidPromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return;
}
} catch (RuntimeException e) {
ReferenceCountUtil.release(msg);
throw e;
}
//查找下一个 ChannelOutboundHandlerContext
final AbstractChannelHandlerContext next = findContextOutbound(flush ?
(MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
//判断是否刷新
if (flush) {
//执行写并刷新方法
next.invokeWriteAndFlush(m, promise);
} else {
//执行写方法
next.invokeWrite(m, promise);
}
} else {
final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
if (!safeExecute(executor, task, promise, m, !flush)) {
// We failed to submit the WriteTask. We need to cancel it so we decrement the pending bytes
// and put it back in the Recycler for re-use later.
//
// See https://github.com/netty/netty/issues/8343.
task.cancel();
}
}
}
findContextOutbound
方法找到下一个ChannelOutboundHandlerContext
private AbstractChannelHandlerContext findContextOutbound(int mask) {
//循环往前查找,通过outbound属性判断
AbstractChannelHandlerContext ctx = this;
EventExecutor currentExecutor = executor();
do {
ctx = ctx.prev;
} while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
return ctx;
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/69736.html