责任链事件流程
责任链创建流程
-
入口程序
// 责任链的创建是在Channel的初始化的时候进行的
// AbstractChannel.java
protected AbstractChannel(Channel parent) {
// 如果当前为服务端的channel,则parent=null
this.parent = parent;
// 创建channelId
id = newId();
// 使用NioServerSocketChannel父类的AbstractNioMessageChannel下的NioMessageUnsafe
// 使用NioSocketChannel父类的AbstractNioByteChannel下的AbstractNioUnsafe
unsafe = newUnsafe();
// 创建channel的责任链,DefaultChannelPipeline
pipeline = newChannelPipeline();
}
// 创建默认的责任链实例对象
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
-
Channel的类关系图
DefaultChannelPipeline
,接下来我们来看下责任链创建的流程-
创建源码分析
// DefaultChannelPipeline.java
protected DefaultChannelPipeline(Channel channel) {
// 当前的责任链保存对应的channel信息
this.channel = ObjectUtil.checkNotNull(channel, "channel");
// channel在整个责任链处理正常返回的成功结果对象Future
succeededFuture = new SucceededChannelFuture(channel, null);
// 对channel在整个责任链处理添加监听,负责异常的捕获
voidPromise = new VoidChannelPromise(channel, true);
// 创建上下文对象,每个上下文对象都包含当前pipeline实例对象
tail = new TailContext(this);
head = new HeadContext(this);
// 在逻辑结构上通过双端链表的方式存储上文对象
head.next = tail;
tail.prev = head;
}
// 对于HeadContext与TailContext特殊上下文的创建
// 上下文创建
// AbstractChannelHandlerContext.java
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, HeadContext.class);
// 根据channel的类型来区分,服务端为NioMessageUnsafe
// 客户端为NioSocketChannelUnsafe
unsafe = pipeline.channel().unsafe();
// 保证handler调用方法的顺序,可以理解为handler执行的生命周期,通过状态机来控制生命周期
setAddComplete();
}
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, TailContext.class);
setAddComplete();
}
// 对于Netty责任链使用的EventLoop是属于有序的执行器,为了保证handlerAdd与handlerRemove的执行存在先后关系,通过以下的状态机来控制,即handler方法执行的生命周期保证,如果EventLoop不保证有序的话,只需要通过ADD_COMPLETE或者REMOVE_COMPLETE来告知方法是否被调用即可
// 初始化状态,创建责任链的时候上下文的handlerState默认为初始化,表示handlerAdd/handlerRemove均没有被调用
private static final int INIT = 0;
// handlerAdded即将被调用(实际还没有调用,准备就绪,可以被调用),一般是在不需要保证有序的情况下
private static final int ADD_PENDING = 1;
// handlerAdd已经被调用
private static final int ADD_COMPLETE = 2;
// handlerRemoved已经被调用
private static final int REMOVE_COMPLETE = 3;
setAddComplete()
,主要目的是由于双端链表的head与tail都是在初始化channel的时候构建而不是通过addLast或者是addFirst的方式构建,为了保证handler方法执行的有序性,于是在构建上下文的时候多添加一个步骤,接下来我们可以看到普通的handler添加方式,会在addLast中也调用上述setAddComplete()
相应的方法执行.-
创建流程
添加handler流程
-
程序入口代码
// 获取当前的责任链pipeline
Pipeline pipeline = channel.pipeline();
// 添加handler,这里以特殊的initHandler添加为准来说明,摘录启动类的init方法
pipeline.addLast(new ChannelInitializer<Channel>() {
// ChannelInitializer是一个特殊的入站事件,添加到channel中的pipeline中
// 一旦channel已经注册到EventLoop中就会触发执行
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
// 获取服务端channel的handler处理类
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
-
addLast()
源代码
// DefaultChannelPipeline.java
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
// 创建一个上下文对象,默认为DefaultChannelHandlerContext
newCtx = newContext(group, filterName(name, handler), handler);
// 将上下文对象添加到责任链尾部
addLast0(newCtx);
if (!registered) {
// channel还没有注册,设置当前handler处于等待状态
newCtx.setAddPending();
// 将其添加到等待链表的尾部中
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
// 当前线程持有的eventloop非独占,需要将其添加到任务队列中
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
// 核心方法
callHandlerAdded0(newCtx);
return this;
}
// callHandlerAdded0下会执行方法
void callHandlerAdded0(){
ctx.callHandlerAdded();
}
// AbstractChannelContext.java
final void callHandlerAdded() throws Exception {
// 可以看到在执行handlerAdd方法之前会调用setAddComplete方法
if (setAddComplete()) {
handler().handlerAdded(this);
}
}
// 最终会当channel完成注册的时候会调用handlerAdd方法,而ChannelInitial的handlerAdd方法如下:
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
// 调用initChannel方法
if (initChannel(ctx)) {
// 完成channel的初始化链会将当前实例移除
removeState(ctx);
}
}
}
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.add(ctx)) { // Guard against re-entrance.
try {
// 模板(钩子hook)方法,也就是我们上述的入口程序添加initHandler重载的initChannel方法
initChannel((C) ctx.channel());
} catch (Throwable cause) {
exceptionCaught(ctx, cause);
} finally {
// 最后会在pipeline链中删除当前实例
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
}
return true;
}
return false;
}
-
添加handler的流程
通过上述流程图可知,初始化initHandler在channel注册之后责任链pipeline会将initHandler从中移除并将用户添加到启动类的handler添加到当前注册channel的责任链下
责任链销毁流程
close()
方法的时候,这个时候connection要么等待被JVM回收要么就是存放到回收资源池中,对此关于责任链的销毁分析如下:-
入口程序
// 服务端channel销毁,也就是服务端channel调用close()关闭服务
// 对于客户端channel,自然是断开与服务端的连接
// channel的关闭是属于事件触发,于是我们直接定位到事件轮询器下的方法processSelectedKey,该方法负责处理就绪事件
// 对于NIO的api,每个socket的就绪事件都存储在SelectionKey中,如果channel销毁,当前的SelectionKey也将会在销毁之前取消事件监听
// NioEventLoop.javas
void processSelectedKey(){
if (!k.isValid()) {
// ...
unsafe.close(unsafe.voidPromise());
}
try{
}catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
-
源码分析
// 通过代码定位最终会执行以下代码块
// AbstractChannel.java
try {
// 调用java饿的socket进行关闭
doClose0(promise);
} finally {
// Call invokeLater so closeAndDeregister is executed in the EventLoop again!
invokeLater(new Runnable() {
@Override
public void run() {
if (outboundBuffer != null) {
// Fail all the queued messages
outboundBuffer.failFlushed(cause, notify);
outboundBuffer.close(closeCause);
}
// 将channel事件传播到责任链中
fireChannelInactiveAndDeregister(wasActive);
}
});
}
-
销毁流程
IO事件流程
监听连接事件
-
Netty框架下的核心事件轮询run方法源代码
// 仅贴出部分核心代码
// NioServerSocketChannel.java
run(){
// 服务端不断轮询监听事件
for (;;) {
// 执行select操作
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
// 处理就绪事件
processSelectedKeys();
// 处理任务队列中的任务
ranTasks = runAllTasks();
}
}
// 既然关注ACCEPT事件,这个时候我们需要知道服务端Channel在创建注册并绑定的时候初始化handler并将Acceptor添加到handler中,对此我们追踪下bind方法,最后查阅代码到init方法,其代码如下:
void init(Channel channel){
// 在先前分析可知,这里已经完成了channel的创建,且此时channel为NioServerSocketChannel事件
ChannelPipeline p = channel.pipeline();
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
// 获取服务端channel的handler处理类
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
-
ServerBootstrapAcceptor监听连接的源码
在上一篇的事件分析中,我们对服务端的一个绑定事件进行了分析(包括服务端channel的创建/初始化与注册,客户端的channel基本与服务端一致,这里也不再详细说明),最终的监听连接的事件将会调用unsafe.read()
方法并且会将事件通过责任链pipeline传播到channelRead方法下,对此,我们关注Acceptor处理连接可以通过查看handler实现的channelRead()
方法即可.
// ServerBootstrapAcceptor.java
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
// 将处理客户端channel的handler添加到责任链pipeline中
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
try {
// 客户端channel注册到EventLoop,注册流程与之前服务端注册流程基本一致,这里不再详述
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
-
监听连接的事件流程
head -> handlers -> ServerBootstrapAcceptor -> tail
,因而我们根据现有的线索以及上述源码,对监听连接事件流程绘制如下:请求读取事件
-
入口程序
// NioEventLoop.run()方法
// 在这里关注的读写事件是NioSocketChannel
void processSelectedKey(){
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
// 对于处理请求,我们需要关注NioSocketChannel处理读取事件流程
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
}
-
NioSocketChannel的类图组件
-
读取事件实现
// 根据上述代码以及类图可知,NioSocketChannel使用初始化safe实现类为NioSocketChannelUnsafe
// 于是查看其的源码实现,但是方法NioSocketChannelUnsafe并没有read方法,而是在NioByteUnsafe类中,因而找到对应的read方法,摘录部分核心代码如下:
void read() {
try{
do{
// 从socket中读取数据
doReadBytes(byteBuff);
// 传播读取事件到责任链中
pipeline.fireChannelRead(byteBuf);
}while(continueReading())
// 传播读取完成事件到责任链中
pipeline.fireChannelReadComplete();
if(close){
closeOnRead(pipeline);
}
}catch (Throwable t) {
//传播事件异常以及userEventTriggered
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
if (!readPending && !config.isAutoRead()) {
// 取消读取操作
removeReadOp();
}
}
}
-
请求读取流程示意图
数据写出事件
-
入口程序代码
// 写出操作的触发点是在某个handler下的channelRead方法下手动执行write或者writeAndFlush方法
void handlerRead(AbstractHandlerContext ctx, Object msg){
// 执行写操作的流程说明责任链执行当前入站事件handler已经是最后一个,从当前handler的上下文对象开始执行出站事件
ctx.writeAndFlush(msg);
}
-
上下文对象类设计图
AbstractChannelHandlerContext
来实现通用的方法,同时上下文对象具备出入站事件,因此我们可以在handler中对接收到的上下文对象ctx手动处理出站或入站事件的传播,对此当我们调用ctx.writeAndFlush()
方法的时候也将会触发对应的一个handler触发事件(通过源码分析是属于出站事件)-
源码分析
// AbstractChannelHandlerContext.java
void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
// 负责将写出的数据存储到OutboundBuffer缓冲区
invokeWrite0(msg, promise);
// 执行刷新操作
invokeFlush0();
} else {
writeAndFlush(msg, promise);
}
}
// 通过责任链传播写出事件,
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
// 通过责任链传播刷新事件
private void invokeFlush0() {
try {
((ChannelOutboundHandler) handler()).flush(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
// 根据责任链执行流程可知,最终会执行headContext下write以及flush的方法
// HeadContext.java
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
unsafe.write(msg, promise);
}
@Override
public void flush(ChannelHandlerContext ctx) {
unsafe.flush();
}
// 最后会调用AbstractUnsafe的write以及flush方法(这里就不贴出代码.直接查看流程图)
-
写出事件执行流程图
Channel与Handler的事件执行流程小结
Channel的生命周期
-
Channel生命周期
Handler的生命周期
-
handler类设计图
-
Handler生命周期
Handler方法回调与生命周期联系

老铁们关注走一走,不迷路

往期精彩回顾
原文始发于微信公众号(疾风先生):深入Netty事件流程分析(下)
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/25713.html