线程池的工作原理究竟是怎么样的

线程池的工作原理究竟是怎么样的

前言

我们都知道java线程池有七大参数,分别是核心线程数、最大线程数,工作队列、过期时间、过期时间单位、线程工厂、拒绝策略。这七大参数分别有什么用,哪些场景下会用到这些参数已经是老生常谈的八股文了,相信大家都知道,这里就从源码角度来简单聊一聊,线程池的工作原理

UML类图

我们看一下线程池类——ThreadPoolExecutor的UML类图:

线程池的工作原理究竟是怎么样的


根据UML图我们可以看到:

Executor是顶层的抽象, 里面定义了一个execute(Runnable command)方法。描述提供一个任务进行执行。

ExecutorService对Executor进行了拓展,主要增加了生命周期(shutdown,shutdownNow等)、submit(Runnable r)任务提交等方法。

AbstractExecutorService对ExecutorService做了基本的实现,主要是针对submit()任务提交的统一抽象封装,以及invokeAny()实现。

ThreadPoolExecutor线程池的具体实现

提交任务

首先我们从入口提交任务execute方法的代码开始:

 if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */

这里一上来就是一大段注释(不得不说道格李的这个习惯真的很好,复杂代码的地方有大段注释,大大降低了阅读源码的难度),这段注释的意思就是如果当前线程池线程数少于核心线程数会创建线程,如果能够成功进入工作队列排队那么应该再次检查是否应该添加线程,如果我们排队失败那么我们尝试添加新的线程如果添加失败那么应该关闭线程池或者拒绝这个任务。短短几句话就简单地概括了线程池大概的执行周期和几个参数的作用。execute方法就是对应这三种情况来分别处理的。

第一种情况:

 int c = ctl.get();
 if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
    }

这里有个非常巧妙的地方,首先ctl是一个AtomicInteger,看上面的常量:

 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

这个ctl好像是表示的是线程池目前的状态,但是在源码源码里的workerCountOf这个获取线程数量的方法也是传入的是ctl,再根据后面的方法:

 // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

我们可以得知,ctl可以同时用来标识线程池的状态和当前线程数数,用其高3位来描述线程池的运行状态,低29位来描述线程池中线程的数量。

所以上文中第一种情况的代码的意思就是如果当前线程数小于核心线程数那么就添加线程,对应第一种分支情况。

接下来第二种情况:

 if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(nullfalse);
        }

如果线程池是运行状态并且能够成功插入工作队列,那么再次检查线程池状态,防止如果成功加入工作队列之后线程池状态改变,如果当前线程池不是运行中的话那么就把这个节点从队列中取出来,并且拒绝这个任务就是执行拒绝策略,如果线程池变成0了那么就新建线程。

第三种情况:

 else if (!addWorker(command, false))
            reject(command);

如果队列入队失败说明队列满了,那么就应该启用最大线程数了,这个时候就应该新建线程了,如果新建线程失败说明最大线程数也满了就执行拒绝策略了,注意队列满了之后后面进入的任务是优先执行的而不是去获取队列中排名第一个的获取的所以说线程池不是公平的

总结一下,就是提交任务的时候根据当前线程池的状态和线程数来走不同的分支,新增线程、进入工作队列、拒绝任务,三个分支。另外一个提交方法submit本质上也是调用的execute这里不再赘述,只是他封装了一下RunnableFuture对象。至于这个对象是用来干嘛的可以参看笔者的给jdk提bug的那一篇。

新建线程

上文中提交任务中最核心的部分应该就是新建线程了,因为进入工作队列只要调用offer方法就好了(内部也就是一个保证了线程安全的队列),拒绝就是调用拒绝策略handler.rejectedExecution(command, this);这个就可以了,也没啥好说的,所以我们的重点就是怎么新建线程。

老规矩我们先看注释:

/**
     * Checks if a new worker can be added with respect to current
     * pool state and the given bound (either core or maximum). If so,
     * the worker count is adjusted accordingly, and, if possible, a
     * new worker is created and started, running firstTask as its
     * first task. This method returns false if the pool is stopped or
     * eligible to shut down. It also returns false if the thread
     * factory fails to create a thread when asked.  If the thread
     * creation fails, either due to the thread factory returning
     * null, or due to an exception (typically OutOfMemoryError in
     * Thread.start()), we roll back cleanly.
     *
     * @param firstTask the task the new thread should run first (or
     * null if none). Workers are created with an initial first task
     * (in method execute()) to bypass queuing when there are fewer
     * than corePoolSize threads (in which case we always start one),
     * or when the queue is full (in which case we must bypass queue).
     * Initially idle threads are usually created via
     * prestartCoreThread or to replace other dying workers.
     *
     * @param core if true use corePoolSize as bound, else
     * maximumPoolSize. (A boolean indicator is used here rather than a
     * value to ensure reads of fresh values after checking other pool
     * state).
     * @return true if successful
     */

简单来说就是创建线程,并且运行参数中的task作为他的第一个任务,然后在创建的时候检查线程池状态和线程数等等。addWorker有两个参数——firstTaskcorefirsrTask就是提交的任务,core就是是否使用核心线程数绑定如果不是则使用最大线程数。

这里需要提前讲一下线程池的内部类——Worker,可以简单理解为他就是线程池内的线程。

线程池的工作原理究竟是怎么样的


Worker实现Runnable接口、继承AbstractQueuedSynchronizer,持有一个Thread的成员变量。所以可以把Worker对象看成一个线程,同时拥有AbstractQueuedSynchronizer的属性和方法,因此它能够进行加锁和释放锁的操作。

然后我们开始看源码:

  for (;;) {
            int c = ctl.get();
              //获取当前线程池的状态
            int rs = runStateOf(c);

            // 参数校验和线程池状态校验
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   //如果工作队列不为空那么说明核心线程数肯定满了,那么不应该新建线程了
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                //获取当前线程数
                int wc = workerCountOf(c);
                //如果当前线程数大于ctl所能表示的最大线程数或者大于最大线程数则不让添加
                if (wc >= CAPACITY ||
                    //根据入参绑定核心线程数和最大线程数来对对应的参数做对比,如果超了也不创建线程
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //通过CAS对线程数加一,如果失败就重试,成功就退出循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get(); 
                if (runStateOf(c) != rs)
                    continue retry;
             
            }
        }

第一部分主要是做必要的校验看当前线程池是否能创建新的线程,然后这里比较重要的信息就是创建新的线程是先修改线程数再去创建真正的线程的。

再来看看第二部分,这里才是真正的在创建线程也就是Worker对象:

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //创建一个Wokrer对象,传入提交的任务
            //在Worker构造方法内部会新建一个线程,代码是 this.thread =       getThreadFactory().newThread(this);线程工厂就是这里用的
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                //用ReentrantLock来保证线程安全
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    //持有锁成功的时候再次检查线程池状态,目的是为了防止线程池已经关闭或者创建线程失败
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        //提前检查新建的线程是不是可以启动,如果不能启动也就是线程不是存货的那么就抛异常
                        if (t.isAlive())
                            throw new IllegalThreadStateException();
                        //如果检查没问题就装入到workers中,workers是一个HashSet
                        workers.add(w);
                        //刷新当前线程最大数
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    //开锁,这里上锁主要是为了largestPoolSize
                    mainLock.unlock();
                }
                //如果是新增线程成功
                if (workerAdded) {
                    //这里启动刚刚新建的线程,他会调用我们提交任务的run方法
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;

可以看到新增线程的话就是调用线程工厂来新增一个线程,然后调用这个线程的start方法启动以此Worker的run方法(现在知道Worker为什么要实现runable了吧)

运行线程

我们进入到Worker的源码来看看线程是怎么运行的,上面说到新建线程之后会调用Thread#start来启动线程,然后实际上就是调用我们Worker的run方法,然后run方法内部是调用了runWorker方法,真正的线程池工作方法。这里还是先看注释:

/**
     * Main worker run loop.  Repeatedly gets tasks from queue and
     * executes them, while coping with a number of issues:
     *
     * 1. We may start out with an initial task, in which case we
     * don't need to get the first one. Otherwise, as long as pool is
     * running, we get tasks from getTask. If it returns null then the
     * worker exits due to changed pool state or configuration
     * parameters.  Other exits result from exception throws in
     * external code, in which case completedAbruptly holds, which
     * usually leads processWorkerExit to replace this thread.
     *
     * 2. Before running any task, the lock is acquired to prevent
     * other pool interrupts while the task is executing, and then we
     * ensure that unless pool is stopping, this thread does not have
     * its interrupt set.
     *
     * 3. Each task run is preceded by a call to beforeExecute, which
     * might throw an exception, in which case we cause thread to die
     * (breaking loop with completedAbruptly true) without processing
     * the task.
     *
     * 4. Assuming beforeExecute completes normally, we run the task,
     * gathering any of its thrown exceptions to send to afterExecute.
     * We separately handle RuntimeException, Error (both of which the
     * specs guarantee that we trap) and arbitrary Throwables.
     * Because we cannot rethrow Throwables within Runnable.run, we
     * wrap them within Errors on the way out (to the thread's
     * UncaughtExceptionHandler).  Any thrown exception also
     * conservatively causes thread to die.
     *
     * 5. After task.run completes, we call afterExecute, which may
     * also throw an exception, which will also cause thread to
     * die. According to JLS Sec 14.20, this exception is the one that
     * will be in effect even if task.run throws.
     *
     * The net effect of the exception mechanics is that afterExecute
     * and the thread's UncaughtExceptionHandler have as accurate
     * information as we can provide about any problems encountered by
     * user code.
     *
     * @param w the worker
     */

核心的线程池的线程执行方法,大概意思就是如果是第一个任务的话直接执行,如果没有任务 就反复尝试从队列中获取任务执行。注意上文提到的都是新建线程,如果线程一旦被新建如果是核心线程他就不一直不会消失除非线程池停止了,如果是非核心线程那么就会空闲了一定时间之后就会被回收。

 final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        //worker继承了AQS所以本身就是一把锁,这里不上锁是在这里允许中断
        w.unlock(); 
        boolean completedAbruptly = true;
        try {
            //有任务,如果firstTask就是传进来的没有那么就尝试从工作队列里获取
            while (task != null || (task = getTask()) != null) {
                //上锁,避免两个线程提交任务 的时候用到同一个worker
                w.lock();
                // 如果线程池已经停止,那么需要确保线程被中断
                //如果线程池没有停止那么确保线程不会中断
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //执行前置方法,这里什么都没有做,但是在注释里写明了可以通过继承ThreadPoolExecutor来扩展这个方法, 例如记录threadLocal等等
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //调用任务的run方法
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //后置方法,和前置方法一样什么都不做
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    //当前Worker完成的任务数量,这里类似于i++的问题是线程不安全的所以需要上锁
                    w.completedTasks++;
                    //开锁
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            //处理退出逻辑
            processWorkerExit(w, completedAbruptly);
        }
    }

整个方法就是在执行第一个任务之后不停的自旋尝试从工作队列中获取任务,我们看看getTask()代码,看一下他是如何从工作队列中获取任务的:

private Runnable getTask() {
        boolean timedOut = false// Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            //判断线程池状态和任务队列的情况,不满足条件直接返回 null,结束。
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                //递减线程数
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // 超时时间的标识,[是否设置了核心线程数的超时时间 或者 当前线程数量是否大于核心线程数 ],
    //因为我们知道线程池运行的线程数量如果大于核心线程数,多出来的那部分非核心线程是会被淘汰
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            //如果大于最大线程数或者超时并且线程可以淘汰
            if ((wc > maximumPoolSize || (timed && timedOut))
                //或者工作队列为空并且线程数不为空
                && (wc > 1 || workQueue.isEmpty())) {
                //CAS减工作线程数返回空
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

其实上面最核心的代码就是:

Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();

这里单独说,

1、workQueue.take:如果阻塞队列为空,当前线程会被挂起等待;当队列中有任务加入时,线程被唤醒,take方法返回任务,并执行;2、workQueue.poll:如果在keepAliveTime时间内,阻塞队列还是没有任务,则返回null;,线程池中实现的线程可以一直执行由用户提交的任务。

workQueue.poll这个是非常巧妙的,这个也是实现我们的非核心线程数被回收的核心实现,意思就是等到指定时间,如果过了指定时间还是没有任务就返回null,注意take方法是类似于无限等待因为线程挂起了。然后返回null之后上层的worker的run方法的while循环也就解开了,因为:

  while (task != null || (task = getTask()) != null)

为false,那么这个worker的run方法就执行结束,自然而然这个worker以及他的线程就会被CPU回收了。这里实在是太巧妙了,就用一个poll方法和一个while循环解决了回收线程的问题。

最后再解开循环之后会调用一次processWorkerExit处理一下线程退出逻辑:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    //如果突然完成那么就减去总数
        if (completedAbruptly) 
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
    //上锁
        mainLock.lock();
        try {
            //整体完成任务数量加上当前worker的数量
            completedTaskCount += w.completedTasks;
            //从workers集合里移除
            workers.remove(w);
        } finally {
            //开锁
            mainLock.unlock();
        }
//尝试将线程池状态设置为 终止,这里是为了如果当前线程是最后一个话,如果还有其他线程则不会终止线程池
        tryTerminate();

        int c = ctl.get();
    //如果线程池不是停止状态
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                //不需要新建线程了
                if (workerCountOf(c) >= min)
                    return// replacement not needed
            }
            //小于核心线程数,继续新建线程
            addWorker(nullfalse);
        }
    }

停止线程池

停止线程池是用的shutdown方法:

 public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        //获取mainLock锁,防止其他线程执行
        mainLock.lock();
        try {
        //检查权限,确保用户线程有关闭线程池的权限
            checkShutdownAccess();
        //通过CAS将线程池状态设置成 SHUTDOWN
            advanceRunState(SHUTDOWN);
          //中断所有空闲的Workers 
            interruptIdleWorkers();
          //后置方法,子类可以扩展之
            onShutdown(); 
        } finally {
        // 释放mainLock锁
            mainLock.unlock();
        }
        //execute方法,我们分析过了,主要就是尝试将线程池的状态设置为终止
        tryTerminate();
    }

下面比较关心的是如何来中断所有空闲的Workers ,并且是如何保证Worker执行完毕的:

private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //刚刚多次看到的worker集合,现在有用了
            for (Worker w : workers) {
                 //判断 如果当前线程未中断且能够获取w锁,则执行中断
                // 如果当前线程未中断但不能获取w锁,那么就会阻塞,直到获取锁为止。
                //这里的w锁,就是前面在分析execute时,有个死循环不断取任务,取到任务就会获取w锁。
                //所以这边如果获取不到w锁,就证明还有任务没有执行完。
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

这里又是非常巧妙的用了一个内置的锁来判断任务是否执行完成的。也就是说如果锁被占用说明就一定是worker不是空闲的,那么等锁被释放了就说明worker的任务以及执行完了可以中断了。

总结

线程池提交任务的时候:如果当前线程数小于核心线程数就新建线程如果不小于就进入等待队列,如果等待队列满了之后就再添加线程到最大线程数(所以新来的线程在队列满了之后会直接执行的不会再入队列的)

添加线程就是封装了一个worker对象,然后new了一个线程,执行的是你提交任务的run方法

worker对象会一直自旋等待从任务队列里获取任务,所以新任务到来之后只要核心线程数满了都会进入到等待队列中才能被现成的线程获取到来执行。如果一直没有获取到任务线程就会被挂起直到有任务了被唤醒,这里用的是take方法,如果是此时线程数大于了核心线程数或者核心线程数设置了空闲时间,那么就是使用的是队列的poll方法,等待指定时间如果过了时间还是没有的话就直接返回空了,worker自旋结束的条件就是获取出来的任务是null,所以此时worker的run方法就执行完完毕,自然而然就被回收了。

线程池关闭就是基于worker实现了aqs而来等待所有执行中的任务执行完毕才会把线程挂起从而结束线程池

原文始发于微信公众号(六道轮回菠萝):线程池的工作原理究竟是怎么样的

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/25328.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!