EventLoop 和 EventLoopGroup 描述了 Netty 程序运行所使用的线程模型,通过修改 EventLoopGroup,可以让 Netty 在不同的线程模型间快速切换,甚至可以在 Nio 和 Bio 间一键切换。
1. EventLoopGroup
EventLoopGroup 直译过来叫「事件循环组」,它管理着一组 EventLoop。EventLoopGroup 实现了 Iterable 接口,可以通过迭代器遍历它管理的 EventLoop。
EventLoopGroup 还继承了 ScheduledExecutorService 接口,代表它不仅可以执行异步任务,还可以执行定时任务。不过 EventLoopGroup 本身不干活,当你向 EventLoopGroup 提交一个任务时,它会轮询出一个 EventLoop,转交给它执行。如下是 execute 示例:
@Override
public void execute(Runnable command) {
// 轮询出EventLoop,转交给它执行
next().execute(command);
}
EventLoopGroup 可以看做是一个多线程的线程池,EventLoop 就是真正干活的线程。
EventLoopGroup 接口功能还是比较简单的,它对外提供的能力有:优雅停机、轮询出 EventLoop、提交异步任务、执行定时任务等。
Netty 服务需要依赖 EventLoopGroup 来驱动,它决定了 Netty 服务运行的线程模型。Netty 官方推荐使用 Reactor 主从线程模型,如下示例:
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
new ServerBootstrap()
.group(bossGroup, workerGroup);
使用一个单线程的 EventLoopGroup,就可以实现 Reactor 单线程模型,连接的接入、IO 读写均由一个线程执行。如下示例:
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
new ServerBootstrap()
.group(bossGroup, bossGroup);
一个 Acceptor 线程,多个 IO 线程,就可实现 Reactor 多线程模型:
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup(16);
new ServerBootstrap()
.group(bossGroup, bossGroup);
你甚至可以从 Nio 一键切换到 Bio,如果使用 JDK 原生的网络编程,这个切换过程是十分痛苦的,但是使用 Netty 将很快完成切换。
EventLoopGroup bossGroup = new OioEventLoopGroup(1);
EventLoopGroup workerGroup = new OioEventLoopGroup(16);
new ServerBootstrap()
.group(bossGroup, bossGroup);
常用的 EventLoopGroup 实现是 NioEventLoopGroup,它被用来处理非阻塞 IO 事件。NioEventLoopGroup 的源码之前的文章有分析过,这里再提一下吧,先看它的构造函数:
/**
* @param nThreads 线程数量,就是NioEventLoop的数量,默认CPU核心数*2
* @param executor NioEventLoop.run()的执行者,默认为ThreadPerTaskExecutor,NioEventLoop将利用它来启动一个FastThreadLocalThread并执行
* @param chooserFactory 选择器工厂,默认DefaultEventExecutorChooserFactory,轮询选择NioEventLoop
* @param selectorProvider 多路复用器提供者,DefaultSelectorProvider.create()
* @param selectStrategyFactory select策略工厂,指示EventLoop应该要做什么事情
* @param rejectedExecutionHandler 拒绝策略
* @param taskQueueFactory 任务队列工厂,默认PlatformDependent.newMpscQueue(),Netty实现的高性能无锁队列
*/
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory,
final RejectedExecutionHandler rejectedExecutionHandler,
final EventLoopTaskQueueFactory taskQueueFactory) {
super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
rejectedExecutionHandler, taskQueueFactory);
}
它主要的目的其实是为了初始化 NioEventLoop,在父类MultithreadEventExecutorGroup
的构造函数中,会调用newChild()
方法来完成创建 NioEventLoop。
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
// EventLoop需要一个TaskQueue来存放待执行的任务,这里判断是否有指定QueueFactory,没有则使用默认的
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
// 创建NioEventLoop
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}
总结就是:EventLoopGroup 描述了 Netty 服务运行的线程模型,它管理着一组 EventLoop,它可以被看作是一个多线程的线程池,EventLoop 就是那个干活的线程。它主要的作用是为了创建 EventLoop,以及在需要停机时提供优雅停机的能力。
2. EventLoop
EventLoop 直译过来叫「事件循环」,可以看作是一个单线程的线程池,是那个真正干活的角色。
为啥叫「事件循环」呢?因为它做的事情就是一个死循环。它的
run()
方法是一个死循环,有 IO 事件到达时,它会处理 IO 事件,处理完了看看任务队列是否有待执行的任务需要处理,以及定时任务队列中是否有待执行的任务,执行完这些任务它又会去等待 IO 事件,开启下一个循环。
对于 NioEventLoopGroup 来说,它创建的肯定就是 NioEventLoop 了,直接看构造函数:
/**
* 创建一个NioEventLoop实例,用来执行注册在其上的Channel事件
* @param parent 所属Group
* @param executor
* @param selectorProvider 多路复用器提供者,不同平台会使用不同实现
* @param strategy Selector.select()的策略
* @param rejectedExecutionHandler 拒绝策略
* @param queueFactory 任务队列工厂
*/
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
/*
每个EventLoop都会有一个Selector,用来监听注册在其上的Channel事件。
对于BossGroup,处理的是Accept。
对于WorkerGroup,处理的是read、write...
SelectorTuple:Selector元组,Netty提供了一个Selector包装,用来优化select()性能
*/
final SelectorTuple selectorTuple = openSelector();
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
NioEventLoop 内部依赖于 JDK 的多路复用器 Selector,它会将 Channel 注册到 Selector 上,调用select()
方法等待 IO 事件到达,一旦有准备就绪的 Channel 它就能感知到,然后通过 EventLoop 线程去处理 IO 事件。
NioEventLoop 的核心是它的run()
方法,它说明了 NioEventLoop 具体干了哪些事情:
@Override
protected void run() {
/*
无效 空轮询的次数
JDK的Selector存在Bug,会导致空轮询,CPU飙升。
Netty会检测Selector.select()空轮询次数,超过SELECTOR_AUTO_REBUILD_THRESHOLD 512 则重建Selector。
有效轮询:要么有IO事件到达、要么执行了Task。
*/
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
/*
NioEventLoop的执行策略:
有任务待执行吗?
没有:Selector.select()阻塞,等待IO事件到达(定时任务判断)
有:非阻塞调用Selector.selectNow(),
*/
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:// 重试IO循环
continue;
case SelectStrategy.BUSY_WAIT:// NIO不支持忙等,走SELECT
case SelectStrategy.SELECT: // 队列中没有任务要执行
// 下一个要执行的定时任务截止时间
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE;//没有定时任务
}
nextWakeupNanos.set(curDeadlineNanos);
try {
/*
如果没有任务要执行,则在下一个任务要执行前,阻塞等待IO事件。
没有定时任务,则等待超时为Long.MAX_VALUE,无限等待
*/
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
selectCnt = 0;
handleLoopException(e);
continue;
}
selectCnt++;//无效轮询次数+1,后面会判断是否重置
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
boolean ranTasks;
if (ioRatio == 100) {
// 优先处理所有的IO事件后再去处理Task
try {
if (strategy > 0) {// 代表有准备就绪的Channel待处理
processSelectedKeys();
}
} finally {
// 处理完IO事件后,执行所有Task
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
// 先处理IO事件,并记录所花的时间
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// 根据ioTime和ioRatio,计算处理Task能分配的时间
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
/*
有待执行的任务,且Selector.selectNow()返回0,没有IO事件需要处理,那就先执行少量的Task。
每64个任务检查一次超时,如果有足够的任务,那么最少执行64个。
所以,不应该提交耗时任务,阻塞IO线程!!!
*/
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}
if (ranTasks || strategy > 0) {
// 如果执行了任务或者有IO事件,说明这次轮询是有效的,重置selectCnt
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) { // 意外唤醒时,是否需要重置selectCnt,解决Selector空轮询Bug
selectCnt = 0;
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
} catch (Error e) {
throw (Error) e;
} catch (Throwable t) {
handleLoopException(t);
} finally {
// 不管正常/异常停止,都要关闭,释放资源。
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Error e) {
throw (Error) e;
} catch (Throwable t) {
handleLoopException(t);
}
}
}
}
通过源码可以发现,它其实就是在不断的处理 IO 事件和异步任务。processSelectedKey()
方法是它处理 IO 事件的核心,这里就不贴代码了,之前的文章已经分析过了。
3. 总结
EventLoopGroup 描述了 Netty 程序运行的线程模型,它的功能并不多,主要用来创建 EventLoop 和优雅停机。EventLoop 才是真正处理 IO 事件和系统任务的角色,它干的活是一个死循环:等待 IO 事件、处理 IO 事件、处理系统/定时任务、再等待 IO 事件。
原文始发于微信公众号(程序员小潘):EventLoop和EventLoopGroup
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/29427.html