Netty 是一个异步的、高性能的、基于事件驱动的网络 IO 框架,它经过精心设计,不仅功能强大,还保持了良好的可扩展性,使用非常的灵活。
Netty 的主要功能就是处理网络 IO 事件,它使用了「拦截过滤器」设计模式,ChannelHandler 被设计为用来处理事件,而 ChannelPipeline 则用来传播事件。通过类图可以发现,ChannelPipeline 实现了 ChannelInboundInvoker 和 ChannelOutboundInvoker,这代表它可以传播入站/出站事件。实现了 Iterable 接口,是因为它管理着一组 ChannelHandler,可以通过迭代器进行遍历。
ChannelPipeline 被设计成一个双向链表,它由一系列 ChannelHandlerContext 组成,当 ChannelHandler 被添加到 ChannelPipeline 中时,会被包装成一个 ChannelHandlerContext 加入到链表中。这样,当有入站事件时,事件会通过 ChannelPipeline 传播,从链头的 ChannelInBoundHandler 一直被传递到链尾,每个 ChannelHandler 只处理自己感兴趣的事件,不感兴趣的事件则通过fileXXX()
继续传播。
这样做的好处是,事件的处理会非常的灵活。每个 ChannelHandler 的职责可以非常的简单和清晰,例如只处理:握手认证、加解密、编解码等等,通过 ChannelPipeline 将这些 ChannelHandler 自定义组装,就可以构建出一个功能强大的 Netty 程序。ChannelPipeline 除了可以在硬编码时进行编排 ChannelHandler,还支持运行时动态修改,这大大增加了它的灵活性。例如:针对 IP 白名单的请求,可以跳过握手认证。
1. DefaultChannelPipeline 分析
ChannelPipeline 有两大实现类:DefaultChannelPipeline 和 EmbeddedChannelPipeline,后者是测试 ChannelHandler 时才会用到,这里不会分析。
DefaultChannelPipeline 是 ChannelPipeline 的默认实现类,很重要,这里会着重分析。
先看它的属性:
// ChannelHandler会绑定一个name,不指定的话会自动生成,这里会默认生成头尾节点的name
private static final String HEAD_NAME = generateName0(HeadContext.class);
private static final String TAIL_NAME = generateName0(TailContext.class);
// 缓存ChannelHandler对应的name,避免重复生成
private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
new FastThreadLocal<Map<Class<?>, String>>() {
@Override
protected Map<Class<?>, String> initialValue() {
return new WeakHashMap<Class<?>, String>();
}
};
// 便于CAS修改estimatorHandle
private static final AtomicReferenceFieldUpdater<DefaultChannelPipeline, MessageSizeEstimator.Handle> ESTIMATOR =
AtomicReferenceFieldUpdater.newUpdater(
DefaultChannelPipeline.class, MessageSizeEstimator.Handle.class, "estimatorHandle");
// 默认的头、尾节点,分别是HeadContext,TailContext
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
// Pipeline绑定的Channel
private final Channel channel;
private final ChannelFuture succeededFuture;
private final VoidChannelPromise voidPromise;
// 是否开启内存泄漏追踪?如果开启了,调用touch()会记录堆栈信息
private final boolean touch = ResourceLeakDetector.isEnabled();
// 缓存Group对应的Group.next()事件执行器
private Map<EventExecutorGroup, EventExecutor> childExecutors;
// 计算ByteBuf占用的内存
private volatile MessageSizeEstimator.Handle estimatorHandle;
// 是否首次注册,如果是,会触发HandlerAdded回调
private boolean firstRegistration = true;
// ChannelHandler从Pipeline添加/移除后会触发回调,这里会将回调列表通过单向链表连接起来
private PendingHandlerCallback pendingHandlerCallbackHead;
// Pipeline是否注册到Channel
private boolean registered;
比较重要的有 head 和 tail,分别记录这 ChannelPipeline 的头尾节点,DefaultChannelPipeline 头尾节点是固定的,分别是 HeadContext 和 TailContext,它俩有特殊的用途,不可修改。
ChannelHandler | 用途 |
---|---|
HeadContext | 1.传播入站事件 |
2.需要和 JDK 底层 API 交互的,转交给 Unsafe 执行 | |
TailContext | 1.释放入站数据 |
2.报告异常 |
再看它的构造函数,会给头尾节点赋值,并形成双向链表。
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
/*
处理入站事件:
1.如果前面的所有Handler都没释放入站数据,则TailContext会负责释放,防止内存泄漏。
2.前面的Handler都没处理异常,则TailContext会打印警告日志,释放资源。
*/
tail = new TailContext(this);
/*
处理出/入站事件:
1.入站事件,无脑向后传播。
2.出站事件,转交给Channel.Unsafe执行
*/
head = new HeadContext(this);
// 形成双向链表,中间可能还会插入ChannelHandlerContext
head.next = tail;
tail.prev = head;
}
1.1 编排 ChannelHandler
ChannelPipeline 的主要作用就是 ChannelHandler 的编排容器,通过将不同功能的 ChannelHandler 组合起来,就可以快速构建功能各异的 Netty 程序。
添加 ChannelHandler 的方法如下:
方法 | 说明 |
---|---|
addFirst() | 将 ChannelHandler 添加到 HeadContext 的下一个 |
addLast() | 将 ChannelHandler 添加到 TailContext 的上一个 |
addBefore() | 添加到指定 Handler 之前 |
addAfter() | 添加到指定 Handler 之后 |
移除 ChannelHandler 的方法如下:
方法 | 说明 |
---|---|
remove() | 将指定 ChannelHandler 从链表中移除 |
removeFirst() | 移除 HeadContext 的下一个节点 |
removeLast() | 移除 TailContext 的上一个节点 |
由于使用的是链表的数据结构,因此 ChannelHandler 的增删效率极高,只是简单的改变指针的指向即可。
通过调用以上方法,就可以编排 ChannelHandler 的顺序,以处理 IO 事件。
代码太多,不全部贴了,这里只分析addFirst()
的源码,其他大家自行研究。
@Override
public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
// 支持运行时并发修改,因此必须做同步控制
synchronized (this) {
// 检查ChannelHandler是否允许被添加,非共享的ChannelHandler只能被添加一次
checkMultiplicity(handler);
// 校验ChannelHandler的name是否重复,没有给定name会自动生成,按Handler类型+序号
name = filterName(name, handler);
// 将Handler封装为ChannelHandlerContext
newCtx = newContext(group, name, handler);
// 添加到HeadContext的next
addFirst0(newCtx);
// 如果非首次注册,则添加Handler回调任务
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
// 首次注册,如果是EventLoop线程,则直接触发Handler的handlerAdded()回调
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
// 触发Handler的handlerAdded()回调
callHandlerAdded0(newCtx);
return this;
}
addFirst0()
会将 ChannelHandlerContext 添加到 HeadContext 的 next 上:
// 将ChannelHandlerContext添加到HeadContext的next
private void addFirst0(AbstractChannelHandlerContext newCtx) {
// 修改三者的指针指向
AbstractChannelHandlerContext nextCtx = head.next;
newCtx.prev = head;
newCtx.next = nextCtx;
head.next = newCtx;
nextCtx.prev = newCtx;
}
addLast()
也是同理,并非添加到链尾,而是添加到 TailContext 的 prev 上。
1.2 fileXXX()传播事件
ChannelPipeline 的fileXXX()
会将事件传播给后续的 ChannelHandler 执行,事件传播方法如下:
方法 | 说明 |
---|---|
fireChannelRegistered() | 传播 Channel 注册事件 |
fireChannelUnregistered() | 传播 Channel 取消注册事件 |
fireChannelActive() | 传播 Channel 活跃事件 |
fireChannelInactive() | 传播 Channel 失活事件 |
fireExceptionCaught() | 传播异常事件 |
fireUserEventTriggered() | 传播用户自定义事件 |
fireChannelRead() | 传播 Channel 可读事件 |
fireChannelReadComplete() | 传播 Channel 读取完毕事件 |
fireChannelWritabilityChanged() | 传播 Channel 可写状态变更事件 |
由于大量的代码是重复的,笔者就不贴所有代码了,以fireChannelRead()
为例进行分析。
当 NioEventLoop 检测到 Selector 上的 Channel 有数据可读时,会执行Channel.Unsafe.read()
方法读取数据并封装成 ByteBuf,通过 Pipeline 将事件传播出去,这样后续的 ChannelHandler 就可以拿到 ByteBuf 做自己的业务了。
ChannelPipeline.fireChannelRead()
会将读取到的数据从 HeadContext 开始传播。
// 传播可读事件,将读取到的msg 进行传递
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
// 从链头开始传播,即从HeadContext开始
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
invokeChannelRead()
会执行给定 ChannelHandler 的 ChannelRead 回调。
/**
* 执行指定ChannelHandler的ChannelRead回调
* @param next 给定的ChannelHandlerContext节点
* @param msg 读取到的数据
*/
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
// 调用touch()的目的只是为了追踪对象的访问堆栈记录,用于内存泄漏的排查
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
// 获取Context绑定的EventLoop
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
// 如果是EventLoop线程,则立即执行
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
invokeChannelRead()
会检查 ChannelHandler 的 handlerAdded 回调是否触发,如果没触发,会跳过该 Handler,将事件继续向后传播,否则就直接执行 channelRead 回调。
private void invokeChannelRead(Object msg) {
// 检查handlerAdded回调是否触发,如果没触发则暂时跳过该Handler。
if (invokeHandler()) {
try {
// handlerAdded回调已触发,执行channelRead回调
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
invokeExceptionCaught(t);
}
} else {
// handlerAdded回调还没触发,继续向后传播
fireChannelRead(msg);
}
}
1.3 HeadContext
DefaultChannelPipeline 除了传播事件,它内置的头尾节点也是非常重要,有它们本身的职责所在。HeadContext 本身是一个 ChannelHandlerContext,它会被添加到 ChannelPipeline 的链头,同时它实现了 ChannelInboundHandler 和 ChannelOutboundHandler 接口,这代表它需要处理入站和出站事件。
对于入站事件,HeadContext 必须向后传播,否则后续的 ChannelHandler 就没法处理了,下面是channelRead
的一个例子:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 入站事件直接向后传播
ctx.fireChannelRead(msg);
}
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
// 向后找到能处理CHANNEL_READ事件的InBoundHandler,再执行它的ChannelRead事件
invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
return this;
}
对于出站事件,如 bind、write 等需要调用 JDK 底层 API 的操作,HeadContext 会转交给 Unsafe 去执行。这里拿 write 为例:
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
// 需要和JDK底层API交互,转交给Unsafe执行。
unsafe.write(msg, promise);
}
Unsafe 会将 ByteBuf 暂存到 ChannelOutboundBuffer,flush 会将 flushde 节点转换成 Java 的 ByteBuffer,再调用底层 APISocketChannel.write(ByteBuffer)
将数据发送出去。
HeadContext 这里做了一个代理的角色,实际出站事件的执行会转交给 Unsafe。
1.4 TailContext
HeadContext 本身是一个 ChannelHandlerContext,它会被添加到 ChannelPipeline 的链尾,同时它实现了 ChannelInboundHandler 接口,这代表它需要处理入站事件。
由于是被编排在 ChannelPipeline 的链尾,这意味着入站事件它总是最后处理的,这使得它可以做一些保护性的动作,避免程序出现问题。
例如:如果前面所有的 ChannelHandler 都没有释放入站数据,TailContext 会负责释放,避免内存泄漏。源码如下:
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally {
// 释放入站数据
ReferenceCountUtil.release(msg);
}
}
如果前面所有的 ChannelHandler 都没有处理异常事件,TailContext 会输出日志,记录下未被处理的异常。源码如下:
protected void onUnhandledInboundException(Throwable cause) {
try {
logger.warn(
"An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
"It usually means the last handler in the pipeline did not handle the exception.",
cause);
} finally {
ReferenceCountUtil.release(cause);
}
}
TailContext 主要的主责就是释放数据和记录异常了,其他事件它并不感兴趣,大量方法是空的,这里就不贴代码了。
2. 总结
ChannelPipeline 是 ChannelHandler 的编排容器,DefaultChannelPipeline 是 Netty 提供的默认实现类,它本身是一个双向链表,由一系列 ChannelHandlerContext 组成,ChannelHandler 被添加到 ChannelPipeline 时,会被封装成 ChannelHandlerContext 并加入到链表。
ChannelPipeline 的事件传播机制,使得 Netty 在处理 IO 事件时非常的灵活,每个 ChannelHandler 的职责都非常清晰,实现解耦,通过组装各个 ChannelHandler 就可以快速构建一个功能各异的 Netty 程序。
HeadContext 和 TailContext 是 DefaultChannelPipeline 默认的头尾节点,它们的职责也非常重要。HeadContext 需要将入站事件向后传播,否则后续的 ChannelHandler 就没法处理事件了,对于出站事件需要和 JDK 底层 API 交互的,需要转交给 Unsafe 类执行。TailContext 会负责入站数据的释放和记录异常,做一些保护性的工作。
原文始发于微信公众号(程序员小潘):ChannelPipeline:Netty的事件传播管道
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/29434.html