EventLoop和EventLoopGroup

EventLoop 和 EventLoopGroup 描述了 Netty 程序运行所使用的线程模型,通过修改 EventLoopGroup,可以让 Netty 在不同的线程模型间快速切换,甚至可以在 Nio 和 Bio 间一键切换。

1. EventLoopGroup

EventLoopGroup 直译过来叫「事件循环组」,它管理着一组 EventLoop。EventLoop和EventLoopGroupEventLoopGroup 实现了 Iterable 接口,可以通过迭代器遍历它管理的 EventLoop。

EventLoopGroup 还继承了 ScheduledExecutorService 接口,代表它不仅可以执行异步任务,还可以执行定时任务。不过 EventLoopGroup 本身不干活,当你向 EventLoopGroup 提交一个任务时,它会轮询出一个 EventLoop,转交给它执行。如下是 execute 示例:

@Override
public void execute(Runnable command) {
    // 轮询出EventLoop,转交给它执行
    next().execute(command);
}

EventLoopGroup 可以看做是一个多线程的线程池,EventLoop 就是真正干活的线程。EventLoop和EventLoopGroup

EventLoopGroup 接口功能还是比较简单的,它对外提供的能力有:优雅停机、轮询出 EventLoop、提交异步任务、执行定时任务等。

Netty 服务需要依赖 EventLoopGroup 来驱动,它决定了 Netty 服务运行的线程模型。Netty 官方推荐使用 Reactor 主从线程模型,如下示例:

NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
new ServerBootstrap()
    .group(bossGroup, workerGroup);


使用一个单线程的 EventLoopGroup,就可以实现 Reactor 单线程模型,连接的接入、IO 读写均由一个线程执行。如下示例:

NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
new ServerBootstrap()
    .group(bossGroup, bossGroup);

一个 Acceptor 线程,多个 IO 线程,就可实现 Reactor 多线程模型:

NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup(16);
new ServerBootstrap()
    .group(bossGroup, bossGroup);

你甚至可以从 Nio 一键切换到 Bio,如果使用 JDK 原生的网络编程,这个切换过程是十分痛苦的,但是使用 Netty 将很快完成切换。

EventLoopGroup bossGroup = new OioEventLoopGroup(1);
EventLoopGroup workerGroup = new OioEventLoopGroup(16);
new ServerBootstrap()
    .group(bossGroup, bossGroup);

常用的 EventLoopGroup 实现是 NioEventLoopGroup,它被用来处理非阻塞 IO 事件。EventLoop和EventLoopGroupNioEventLoopGroup 的源码之前的文章有分析过,这里再提一下吧,先看它的构造函数:

/**
 * @param nThreads 线程数量,就是NioEventLoop的数量,默认CPU核心数*2
 * @param executor NioEventLoop.run()的执行者,默认为ThreadPerTaskExecutor,NioEventLoop将利用它来启动一个FastThreadLocalThread并执行
 * @param chooserFactory 选择器工厂,默认DefaultEventExecutorChooserFactory,轮询选择NioEventLoop
 * @param selectorProvider 多路复用器提供者,DefaultSelectorProvider.create()
 * @param selectStrategyFactory select策略工厂,指示EventLoop应该要做什么事情
 * @param rejectedExecutionHandler 拒绝策略
 * @param taskQueueFactory 任务队列工厂,默认PlatformDependent.newMpscQueue(),Netty实现的高性能无锁队列
 */

public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                         final SelectorProvider selectorProvider,
                         final SelectStrategyFactory selectStrategyFactory,
                         final RejectedExecutionHandler rejectedExecutionHandler,
                         final EventLoopTaskQueueFactory taskQueueFactory)
 
{
    super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
            rejectedExecutionHandler, taskQueueFactory);
}

它主要的目的其实是为了初始化 NioEventLoop,在父类MultithreadEventExecutorGroup的构造函数中,会调用newChild()方法来完成创建 NioEventLoop。

@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    // EventLoop需要一个TaskQueue来存放待执行的任务,这里判断是否有指定QueueFactory,没有则使用默认的
    EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
    // 创建NioEventLoop
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
                            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}


总结就是:EventLoopGroup 描述了 Netty 服务运行的线程模型,它管理着一组 EventLoop,它可以被看作是一个多线程的线程池,EventLoop 就是那个干活的线程。它主要的作用是为了创建 EventLoop,以及在需要停机时提供优雅停机的能力。

2. EventLoop

EventLoop 直译过来叫「事件循环」,可以看作是一个单线程的线程池,是那个真正干活的角色。

为啥叫「事件循环」呢?因为它做的事情就是一个死循环。EventLoop和EventLoopGroup它的run()方法是一个死循环,有 IO 事件到达时,它会处理 IO 事件,处理完了看看任务队列是否有待执行的任务需要处理,以及定时任务队列中是否有待执行的任务,执行完这些任务它又会去等待 IO 事件,开启下一个循环。

对于 NioEventLoopGroup 来说,它创建的肯定就是 NioEventLoop 了,直接看构造函数:

/**
 * 创建一个NioEventLoop实例,用来执行注册在其上的Channel事件
 * @param parent 所属Group
 * @param executor
 * @param selectorProvider 多路复用器提供者,不同平台会使用不同实现
 * @param strategy Selector.select()的策略
 * @param rejectedExecutionHandler 拒绝策略
 * @param queueFactory 任务队列工厂
 */

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
             SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
             EventLoopTaskQueueFactory queueFactory) {
    super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
            rejectedExecutionHandler);
    this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
    this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
    /*
    每个EventLoop都会有一个Selector,用来监听注册在其上的Channel事件。
    对于BossGroup,处理的是Accept。
    对于WorkerGroup,处理的是read、write...
    SelectorTuple:Selector元组,Netty提供了一个Selector包装,用来优化select()性能
     */

    final SelectorTuple selectorTuple = openSelector();
    this.selector = selectorTuple.selector;
    this.unwrappedSelector = selectorTuple.unwrappedSelector;
}

NioEventLoop 内部依赖于 JDK 的多路复用器 Selector,它会将 Channel 注册到 Selector 上,调用select()方法等待 IO 事件到达,一旦有准备就绪的 Channel 它就能感知到,然后通过 EventLoop 线程去处理 IO 事件。

NioEventLoop 的核心是它的run()方法,它说明了 NioEventLoop 具体干了哪些事情:

@Override
protected void run() {
    /*
    无效 空轮询的次数
    JDK的Selector存在Bug,会导致空轮询,CPU飙升。
    Netty会检测Selector.select()空轮询次数,超过SELECTOR_AUTO_REBUILD_THRESHOLD 512 则重建Selector。
    有效轮询:要么有IO事件到达、要么执行了Task。
     */

    int selectCnt = 0;
    for (;;) {
        try {
            int strategy;
            try {
                /*
                NioEventLoop的执行策略:
                有任务待执行吗?
                    没有:Selector.select()阻塞,等待IO事件到达(定时任务判断)
                    有:非阻塞调用Selector.selectNow(),
                 */

                strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                switch (strategy) {
                case SelectStrategy.CONTINUE:// 重试IO循环
                    continue;

                case SelectStrategy.BUSY_WAIT:// NIO不支持忙等,走SELECT

                case SelectStrategy.SELECT: // 队列中没有任务要执行
                    // 下一个要执行的定时任务截止时间
                    long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                    if (curDeadlineNanos == -1L) {
                        curDeadlineNanos = NONE;//没有定时任务
                    }
                    nextWakeupNanos.set(curDeadlineNanos);
                    try {
                        /*
                        如果没有任务要执行,则在下一个任务要执行前,阻塞等待IO事件。
                        没有定时任务,则等待超时为Long.MAX_VALUE,无限等待
                         */

                        if (!hasTasks()) {
                            strategy = select(curDeadlineNanos);
                        }
                    } finally {
                        // This update is just to help block unnecessary selector wakeups
                        // so use of lazySet is ok (no race condition)
                        nextWakeupNanos.lazySet(AWAKE);
                    }
                    // fall through
                default:
                }
            } catch (IOException e) {
                // If we receive an IOException here its because the Selector is messed up. Let's rebuild
                // the selector and retry. https://github.com/netty/netty/issues/8566
                rebuildSelector0();
                selectCnt = 0;
                handleLoopException(e);
                continue;
            }

            selectCnt++;//无效轮询次数+1,后面会判断是否重置
            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            boolean ranTasks;
            if (ioRatio == 100) {
                // 优先处理所有的IO事件后再去处理Task
                try {
                    if (strategy > 0) {// 代表有准备就绪的Channel待处理
                        processSelectedKeys();
                    }
                } finally {
                    // 处理完IO事件后,执行所有Task
                    ranTasks = runAllTasks();
                }
            } else if (strategy > 0) {
                // 先处理IO事件,并记录所花的时间
                final long ioStartTime = System.nanoTime();
                try {
                    processSelectedKeys();
                } finally {
                    // 根据ioTime和ioRatio,计算处理Task能分配的时间
                    final long ioTime = System.nanoTime() - ioStartTime;
                    ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            } else {
                /*
                有待执行的任务,且Selector.selectNow()返回0,没有IO事件需要处理,那就先执行少量的Task。
                每64个任务检查一次超时,如果有足够的任务,那么最少执行64个。
                所以,不应该提交耗时任务,阻塞IO线程!!!
                 */

                ranTasks = runAllTasks(0); // This will run the minimum number of tasks
            }

            if (ranTasks || strategy > 0) {
                // 如果执行了任务或者有IO事件,说明这次轮询是有效的,重置selectCnt
                if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                            selectCnt - 1, selector);
                }
                selectCnt = 0;
            } else if (unexpectedSelectorWakeup(selectCnt)) { // 意外唤醒时,是否需要重置selectCnt,解决Selector空轮询Bug
                selectCnt = 0;
            }
        } catch (CancelledKeyException e) {
            if (logger.isDebugEnabled()) {
                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                        selector, e);
            }
        } catch (Error e) {
            throw (Error) e;
        } catch (Throwable t) {
            handleLoopException(t);
        } finally {
            // 不管正常/异常停止,都要关闭,释放资源。
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Error e) {
                throw (Error) e;
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }
}

通过源码可以发现,它其实就是在不断的处理 IO 事件和异步任务。processSelectedKey()方法是它处理 IO 事件的核心,这里就不贴代码了,之前的文章已经分析过了。

3. 总结

EventLoopGroup 描述了 Netty 程序运行的线程模型,它的功能并不多,主要用来创建 EventLoop 和优雅停机。EventLoop 才是真正处理 IO 事件和系统任务的角色,它干的活是一个死循环:等待 IO 事件、处理 IO 事件、处理系统/定时任务、再等待 IO 事件。

原文始发于微信公众号(程序员小潘):EventLoop和EventLoopGroup

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/29427.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!