源码基于JDK8
文章1.5w字,非常硬核
一丶从多鱼外卖开始
话说,王多鱼给好友胖子钱让其投资,希望亏得血本无归。胖子开了一个外卖店卖国宴,主打高端,外卖小哥都是自己雇佣,并且开迈巴赫送外卖。最开始胖子觉得这么贵的外卖,就雇佣100个外卖员(核心线程
)够了,并购买了100台迈巴赫。但是随后王多鱼让他搞活动——顾客说说自己的亏钱经历就可以免费吃外卖。随即店铺大火,100个外卖员都送不过来(核心线程打满了
),胖子就把外卖放在桌子上,按照先后顺序摆放好,等外卖员送完一单再送桌子上的外卖(阻塞队列
)。但是慢慢的桌子上都放不下了(阻塞队列满了
),胖子只好临时雇员外卖员(非核心线程
)送外卖。雇佣到20个临时外卖员,还是不够送外卖,最终决定每天限量,系统提示今日活动太火爆,让用户无法下单(拒绝策略
)。最终活动结束了,临时雇佣的外卖员也都解雇了(回收非核心线程
)
1.从故事中理解线程池是什么
我们把外卖员看作线程,多鱼外卖店就是线程池,用户点的外卖就是任务。线程池就是使用池化技术,维护线程,并使用这些线程执行任务的执行器(Executor)
。结合例子我们看下线程池的执行流程图
2.为什么需要线程池 = 多鱼外卖店雇佣外卖员的意义
-
降低资源消耗:
(如果多鱼外卖店,每次都来一个外卖临时雇佣一个,临时发布招聘的开销大,随后解聘的开销也大)
通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。java线程和操作系统是一对一的映射关系,新建或者销毁一个线程都存在资源的消耗
-
提高响应速度:
(当多鱼外卖店没有做活动的时候,来一个任务,100个核心外卖员可以立马送出外卖)
任务到达时,
一定情况下
无需等待线程创建即可立即执行。 -
提高线程的可管理性:
(多鱼外卖店为什么要100个人配100个车,就是要减少外卖员交替使用车辆送外卖带来的上车下车开销,并且店主可以控制外卖员的数量,并且弄出绩效制度doge)
线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。使用过多的线程会导致线程上下文切换更多,从而导致在保存“现场”和恢复“现场”的开销激增。
-
提供更多更强大的功能:
(多鱼外卖店可以要求外卖员在送外卖到顾客家的时候,祝顾客新年快乐)
线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行
3.从多鱼外卖店看核心线程数,最大线程数,阻塞队列,拒绝策略,存活时间
3.1 核心线程数
多鱼外卖店,是一个员工一个车辆,假如外卖店就10台车辆,那么还需要雇佣100人么?
显然是不需要的,但是如果外卖员每天工作12小时,期间休息和吃饭占用4小时,在休息和吃饭的时间内是不会使用到车辆的。那么这时候我们应该雇佣 10(车辆数) + 10*4(空闲时间)/(12-4)(有效工作时间) = 15人
,这样我们让这些快递员轮班,A吃饭的时候,B送货。
但是还需要考虑到,难道外卖车就不送去保养么,外卖车不也得加油。
-
10个车没必要雇佣100人
好比CPU只有10核心,在计算密集型任务中(把外卖员吃饭看作IO操作,计算密集就如同外卖员丝毫不停歇努力配送中),那么雇佣10人左右即可
-
10(车辆数) + 10*4(空闲时间)/(12-4)(有效工作时间) = 15人
这就是
线程数 = CPU 核心数 +CPU 核心数 x(IO耗时/CPU计算耗时)
。看看这个公式,难道IO密集型的任务就设置核心线程数为CPU核心x2
么?不见得,如果IO耗时和CPU耗时不是1比1,IO耗时比例更高,那么应该设置的更高一点 -
难道外卖车就不送去保养么,外卖车不也得加油
这个可以理解为,CPU还得处理系统的其他计算,并非100%专注于当前这个线程池,所以核心线程数的设置需要考虑到
CPU利用率
最终核心线程数的设置,是一个需要压测,需要实际数据去调试的,勉强只能给出
线程数 = (CPU 核心数 +CPU 核心数 x (IO耗时/CPU计算耗时))x cpu利用率
的理论公式
3.2 阻塞队列
阻塞队列就是故事中的”桌子”,它基于AQS Condition
实现等待唤醒模式,在线程池中主要利用阻塞队列队列为空,获取任务的线程将阻塞,成功提交任务到阻塞队列将唤醒被阻塞的线程
的特性,之所以阻塞就是避免线程无意义的自旋浪费CPU。
阻塞队列在juc包下具备很多实现,下面我们介绍几种常用的阻塞队列
ArrayBlockingQueue
基于数组的有界队列LinkedBlockingQeque
基于链表的有界队列(默认容量是int类型最大)PriorityBlockingQueue
优先阻塞队列,基于数组实现的堆,并且具有阻塞队列的特性DelayQueue
基于优先队列实现的无界阻塞队列,元素只有在其延迟到期时才能被取出SynchronousQueue
不存储元素的阻塞队列,每个插入操作都必须等待另一个线程的相应删除操作
如果我们选择无界阻塞队列LinkedBlockingQeque
,意味着最大线程数基本上没用了,因为任务会一直塞到队列直到达到int类型最大,这时候往往意味着OOM
如果选择有界阻塞队列,并且指定的容量太小,那么意味着线程池在任务很多的时候,阻塞队列将立马塞满,开始创建非核心线程,甚至直到触发拒绝策略。
如果指定的容量太大,意味着很多任务堆积,任务得不到及时执行。
另外还有SynchronousQueue
,它可以简单看作容量为0的阻塞队列。
PriorityBlockingQueue
和DelayQueue
都是基于堆实现,可以快速获得堆顶元素,我们使用PriorityBlockingQueue
需要传入比较器,或者任务本身就是Comparable
。
可以看出阻塞队列的选择,需要考虑到任务对及时性的要求,也要考虑到,峰值的时候任务有多。
3.3 最大线程数
最大线程数,是核心线程数打满,阻塞队列塞满,然后会去建立 最大线程数 - 核心线程数
个非核心线程执行任务,但是非核心线程在存活时间内,如果拿不到任务,将被回收(如同多鱼外卖活动结束,非核心外卖员没有外卖送,自然被解雇)
最大线程数如果设置太小,那么可能不能胜任大量任务,最后任务将被拒接策略处理。如果太大,并不意味着效率一定提高,因为线程的调度依赖于cpu调度。此参数的设置需要考虑到系统的性能(cpu不行设置太大也没有意义),任务是否接受被拒绝策略处理,以及任务峰值等。
3.4 拒绝策略
每个系统都有它的性能瓶颈,当任务是在太多,核心线程打满,阻塞队列塞满,最大线程打满,这时候继续提交任务将触发拒绝策略。JUC中提供了以下策略
-
CallerRunsPolicy
由提交任务的线程执行任务,如果线程池关闭了,那么一声不吭的抛弃任务这个拒绝策略很有意思,从某种程度上说,它有点阻塞的意思,当需要提高任务的线程执行任务的时候,意味着提高任务线程的方法将不能立即返回,从而避免提高任务继续提交其他任务。
-
AbortPolicy
直接抛出RejectedExecutionException
,线程池默认的拒绝策略 -
DiscardPolicy
悄无声息的忽略任务 什么都不做忽略任务 -
DiscardOldestPolicy
如果线程池没有被关闭那么丢弃队列头部的任务,然后提交此任务。这个拒绝策略,会丢弃队列头部任务,然后再次调用线程池提交任务的方法,有点递归的意思,需要注意:丢弃队列头部任务,并再次提交任务并不是一个原子操作,这种拒绝策略会递归的调用提交任务的方法直到任务入队
在自己系统中,触发拒绝策略往往需要我们做好记录,甚至提醒开发人员调优线程池。具体使用什么拒绝策略需要看业务需求。
3.5 存活时间
当任务有限或者提交不频繁时,最终线程池中的线程将无任务执行。为了减少系统资源消耗,在存活时间内如果一直接收不到任务的话,线程将被回收。通常存活时间的设置只对非核心线程有效,但是如果调用allowCoreThreadTimeOut(true)
那么核心线程也将被回收
那么核心线程是否应该被回收呢?如果业务上这个线程池被调用的十分不频繁,或许回收核心线程也是不错的选择,但是如果经常间歇性有任务需要执行且要求效率尽可能高,这时候如果核心线程被回收了,线程池又将new新的线程,会降低线程池的执行效率。
那么存活时间如何设置呢?还是得依赖于业务,看业务需要线程池的时间间隔,取一个粗略估计值。
3.6 线程工厂
线程池创建线程最终使用调用ThreadFactory
进行,通常需要我们指定下线程的名称,推荐使用ThreadFactoryBuilder
方便对线程的命名进行定义
4.我们平时在那些地方使用到线程池,以及有哪些坑
4.1 @Async
spring的@Async
注解标注在spring bean的方法上,将被AsyncAnnotationBeanPostProcessor
代理原始对象,活的异步增强的效果,其核心还是向线程池中提交任务。
建议使用此注解的时候,指定自己的线程池(注解中可以指定使用线程池bean的名称)这样可以让不同类型的业务使用不同的线程池,如果IO密集和CPU密集使用一个线程池,且发生等待队列中IO任务排在CPU密集任务前面,就如同墨迹的人在你前面排队,会对效率有所影响
且AsyncAnnotationBeanPostProcessor
是一个BeanPostProcessor
并不是一个SmartInstantiationAwareBeanPostProcessor
,如果发生循环依赖需要注意代理对象的方法可能不具备异步能力(而且调用的时候必须使用代理对象去调用,this.
,或者直接调用无异步能力)
4.2 @EventListener
spring的@EventListener
标注的方法,会被EventListenerMethodProcessor
(BeanFactoryPostProcessor实现类
),在所有单例bean实例化后,将所有bean中标有@EventListener
注解的方法和bean包装成ApplicationListener
,注册到ApplicationContext
中(一般最终注册到SimpleApplicationEventMulticaster(事件多播器)
中),如果我们为SimpleApplicationEventMulticaster
设置了一个线程池,它将异步的回调ApplicationListener
(反射调用bean对应的方法)
注意这里的坑,如果异步意味着事务可能会失效,spring还有一个@TransactionalEventListener
注解,可以指定在事务提交前等等阶段去响应事件,其中@TransactionalEventListener
的fallbackExecution
可以指定,是否事务同步管理器中没有事务(事务同步管理器基于ThreadLocal,异步使用其他线程将导致事务失效,这时候事务管理器就是没有事务的状态)也继续响应事件。
4.3 CompletableFuture
这是并发大师doug lea编写的进行异步or同步任务编排的一个工具类,如果不指定线程池那么将使用公共线程池(线程数默认为CPU 核心数量减1)。如果使用者都不使用自定义的线程池,很容易造成大量任务堆积,反而降低执行效率。通常建议不同业务类型使用不同的线程池,并设置合适的线程池参数
4.4 @Schedule
注解依赖于ScheduledAnnotationBeanPostProcessor
,它是一个BeanPostProcessor,在每一个单例bean 实例化的时候,会扫描是否存在此注解,如果存在那么解析并记录。在所有单例bean 实例化后,会将bean和其方法,以及解析的注解信息包装称一个任务,提交到线程池中。
5 .Executors中的线程池
常用的有以下几种
newFixedThreadPool
固定数目工作线程,无界任务阻塞队列(可以容纳int最大个任务)的线程池——容易oom,如果请求量大容易操作阻塞队列积压过多任务造成oomnewSingleThreadExecutor
单线程,无界任务阻塞队列的线程池newCachedThreadPool
,支持工作线程数达到Integer.MAX_VALUE
,且空闲时间达到60秒那么就会被回收,使用的是SynchronousQueue
不会容纳任何任务,每一个任务提交之后都必须有另外一个线程获取任务——线程多并不意味着效率高,上下文的切换,线程的new 和消耗都是消耗大量资源的,支持Integer.MAX_VALUE
个线程显然也是不符合实际的
基本上编程规范都要求我们自己定义线程池参数,Executors
中的线程池多少都有点问题,建议开发人员使用ThreadPoolExecutor
构造方法结合业务实际设置参数后使用。
二丶带着问题学源码
- 线程池往往提交任务等操作都是被并发调用的,doug lea如何实现线程安全 和 高效率
- 线程池 的 shutdownNow和shutdown的区别
- 线程池停止的时候,如何确保所有工作线程回收后才停止线程池本身
三丶源码分析
1.线程池ThreadPoolExecutor的继承关系
1.1 Executor 接口
public interface Executor {
void execute(Runnable command);
}
Executor
的作用的是把任务和任务将如何运行进行解耦
(直接使用Thread我们需要自己把业务逻辑些在runnable中传入,然后start,任务逻辑和任务的运行耦合紧密),其中只有一个方法execute
,但是其实现类,可能是同步的直接调用Runnable#run
,也可能是异步开启线程执行。
1.2 ExecutorService 接口
ExecutorService
实现了Executor
接口,提供管理自身生命周期的方法,其submit
方法生成 Future
来跟踪一个或多个异步任务的进度的方法,还提供了批量提交任务的方法。
方法 | 描述 |
---|---|
void shutdown() | 关闭执行器,如果还有任务没有执行完,那么任务还会执行,但是不会接受新的任务,如果已经处于关闭状态还去调用此方法,不会有任何效果 |
List<Runnable> shutdownNow() |
尝试停止所有正在执行的任务,返回等待执行但未执行的任务,停止执行任务通常是通过调用对应线程的interrupt 方法,如果线程自己不响应中断,那么无济于事,任务还是会继续执行 |
boolean isShutdown() | 如果已经被关闭那么返回true,通常调用shutdown 和 shutdownNow 后可能存在线程在执行任务,但是还是返回true |
boolean isTerminated() | 如果所有任务在关闭后都已完成,则返回 true。请注意,除非调用了 shutdown 或 shutdownNow,且所以任务都结束了,否则 isTerminated 永远不会为真 |
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException | 调用线程进入阻塞等待直到关闭当前ExcutorServuce,或者发生超时,或者当前线程被中断。 |
<T> Future<T> submit(Callable<T> task) |
提供一个具备返回值的任务,返回一个Future 表示是此任务的异步执行结果。 |
<T> Future<T> submit(Runnable task, T result) |
和submit(Callable) 类似,但是其异步返回结果在执行完后返回结果是入参result |
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException |
批量提交一批任务,阻塞直到所有任务都完成or者任务执行失败或者当前线程被中断 |
批量提交一批任务,阻塞直到所有任务都完成or任务执行失败或者当前线程被中断,or指定的时间超时 | |
提交一批任务,等待其中一个执行完成,或者直到当前线程被中断,返回时会取消没有执行完的任务 | |
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException |
提交一批任务,等待其中一个执行完成,or到当前线程被中断,or等待时间超时,返回时会取消没有执行完的任务 |
1.3 AbstractExecutorService
AbstractExecutorService
提供了Runnable
,Callable
适配成RunnableFuture
(一般适配成实现类FutureTask
),还实现了ExecutorService
的submit
,invokeAny
,以及invokeAll
。是对ExecutorService
的抽象实现,有点模板方法的意思。
2.线程池的状态和属性
2.1 状态和状态的变更
线程池使用一个AtomicInteger
类型的属性,同时记录线程池状态和当前线程池中线程的数量。高3位标识线程池的状态 低29位标识线程池工作线程个数
2.2状态变更
2.3 属性
属性名 | 类型 | 解释 |
---|---|---|
workQueue | BlockingQueue<Runnable> |
保存待处理任务的阻塞队列 |
mainLock | ReentrantLock | 锁,线程池用一个set保存所有线程,一个int保存最大的线程数,修改的时候使用这个锁保证线程安全 |
workers | HashSet<Worker> |
包含池中所有工作线程的集合。仅在持有 mainLock 时访问 |
termination | Condition | 调用awaitTermination的线程在此等待队列上等待。线程终止的时候也会使用此唤醒等待的线程 |
largestPoolSize | int | 程池中存在的最大的工作线程数。仅在持有 mainLock 时访问。 |
completedTaskCount | long | 完成任务的计数器。仅在工作线程终止时更新。 |
threadFactory | ThreadFactory | 所有线程都是使用这个工厂创建的 |
handler | RejectedExecutionHandler | 拒绝策略,队列也无法容纳任务,且达到最大线程数的时候调用此策略方法 |
keepAliveTime | long | 工作线程多久(纳秒)没有执行任务将被回收,(一般针对非核心线程,也可以用于核心线程的回收) |
allowCoreThreadTimeOut | boolean | 如果为 false(默认),核心线程即使在空闲时也保持活动状态。如果为true,核心线程超过keepAliveTime纳秒没有工作将被回收。 |
corePoolSize | int | 核心线程数,如果池中线程数小于核心线程数,那么接受新任务总是new一个线程 |
maximumPoolSize | int | 当核心线程数达到,阻塞队列塞满,将新增maximumPoolSize-corePoolSize 个线程处理任务 |
3.源码分析
线程池中存在一些位运算,本文不会分析这些位运算
3.1执行任务excute
excute方法接受一个Runnale,submit方法也是基于excute实现的,这是线程池源码中的核心。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//ctl `高3位标识线程池的状态 低29位标识线程池工作线程个数`
int c = ctl.get();
//如果当前工作线程总数小于核心线程
if (workerCountOf(c) < corePoolSize) {
//那么会尝试新增一个核心线程执行当前任务
//addWorker第一个参数是任务,第二个参数是是否核心线程,返回是否新增成功
//如果新增任务成功那么直接返回
if (addWorker(command, true))
return;
//新增失败那么重新获取线程总数和线程池状态
c = ctl.get();
}
//如果是运行状态 且加入到了任务队列
if (isRunning(c) && workQueue.offer(command)) {
//如果新增成功 重新获取程总数和线程池状态
int recheck = ctl.get();
//如果发现不是运行状态尝试删除任务
if (!isRunning(recheck) && remove(command))
//如果成功从队列删除了任务,那么调用拒绝策略
reject(command);
//如果线程池中的线程为0那么添加一个非核心线程,保证队列中的任务会被执行
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果队列满了,或者说不是running 那么新增一个非核心线程
//如果新增非核心失败 那么调用拒绝策略
else if (!addWorker(command, false))
reject(command);
}
整个代码看下来并没有很复杂,其中addWorker方法便是新增线程执行任务,成功返回true 失败返回false。excute方法最大的难点就是 线程安全问题
(存在并发调用excute方法的可能)我们来一起品一品doug lea是如何解决的。
-
当线程数小于核心线程数(
if (workerCountOf(c) < corePoolSize)
)会尝试调用
addWorker(command, true)
新增一个核心线程执行任务,乍一看这里存在线程安全问题,因为if (workerCountOf(c) < corePoolSize)
和addWorker(command, true)
不是一个原子操作,可能A线程正在addWorker
,B线程抢先一步addWorker
成功达到了核心线程数,如果A继续成功那么核心线程数将被突破。doug lea的解决办法在addWorker
方法中接着看,如果
addWorker(command, true)
失败,会再次调用c = ctl.get()
,因为此时要么核心线程数被突破,要么线程池状态发生变更,需要刷新下局部变量c -
线程数达到核心线程数
if (isRunning(c) && workQueue.offer(command))
当线程数达到核心线程数,会首先看线程是否是运行状态,然后
workQueue.offer(command))
将任务放入阻塞队列中。这里对应了shutdown stop
等状态下,线程池是不接受新任务的。但是需要注意if (isRunning(c) && workQueue.offer(command))
不是一个原子操作,可能放入到阻塞队列的过程中,线程状态被更改了,doug lea解决办法就是,如果放入到阻塞队列后,可以从队列中删除任务,说明任务没有被拿去执行,那么拒绝任务。如果删除任务失败了,并且线程池中的工作线程为0个,那么会新增一个线程去执行任务,保证这个放入到队列中的任务,一定会被执行到。
-
如果阻塞队列满,或者线程池不是running状态
!addWorker(command, false)
会新增一个非核心线程去执行任务,如果新增非核心任务失败,说明已经达到了最大线程数,那么会调用拒绝策略
为什么不是running还会
addWorker
方法,不怕shutDown状态还接受了一个任务么?——addWorker中会对状态再次进行判断,保证了这种情况不会发生
3.2 新增一个工作线程addWorker
addWorker
接受两个参数——任务,和是否核心线程。这个方法代码很精彩,使用cas乐观锁 + ReentrantLock
提高执行效率。
我们思考一个问题,修改工作线程计数,new 一个工作线程,将线程放入HashSet<Worker>
中,启动工作线程,这四步中有哪些步骤是线程不安全的?
显然是——修改工作线程计数,和将线程放入HashSet<Worker>
中是线程不安全。虽然新建线程调用构造方法涉及到内存空间的分配,但是jvm无论是使用指针指针碰撞,还是空闲链表,还是线程本地分配空间,都会为我们保证这一步是线程安全的
那么我们是否需要锁住整个addWorker
方法昵?显然不需要,至少new 一个工作线程这一步是不需要加锁的。接下来我们看看doug lea是如何巧妙减低锁的粒度,提高执行效率的。
方法很长,我们具体解析的时候分多个部分
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
//=========自旋部分开始=================
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//如果大于等于SHUTDOWN 且 线程池不是SHUTDOWN 说明是STOP TIDYING TERMINATED这几种都是不接受新任务的
//大于等于SHUTDOWN 且队列是空,这时候也不接受新任务,线程池即将关闭
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//自选
for (;;) {
int wc = workerCountOf(c);
//如果大于(2^29)-1 直接不可新增线程,ctl 高三位状态低29位线程数 再多表示不了了
// 如果表示新增核心线程 且大于核心线程 或者非核心但是大于最大线程数 返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//cas增加 工作线程数 这里只是更新ctl 不是真的增加一个线程
//这样增加成功了才能退出 保证了线程数不会超过阈值
if (compareAndIncrementWorkerCount(c))
break retry;
//如果增加失败了重新看下状态,状态改变了,那么重新自旋
//cas失败了,状态没变也会自选
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
//=========自旋部分结束=================
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//新建一个线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//上锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//线程池状态
int rs = runStateOf(ctl.get());
//如果小于 SHUTDOWN 说明是RUNNING
//或者是SHUTDOWN 但是没有任务执行,说明是为了执行队列中的任务或者预热线程池
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//加到set集合
workers.add(w);
int s = workers.size();
//更新最大线程数
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
//解锁
mainLock.unlock();
}
//如果加入了set 启动worker
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//如果没有启动 说明线程池已经不接受新任务了,或者其他奇奇怪怪的异常
if (! workerStarted)
//尝试减少工作线程数 并且尝试关闭线程池
addWorkerFailed(w);
}
//返回worker是否启动了
return workerStarted;
}
3.2.1 自旋修改工作线程数
修改工作线程数,这一步doug lea使用自旋+cas的方式
-
外层for中的第一个if
rs 是线程池的运行状态,看下这个if中哪些情况addWorker会直接返回false
首先
rs>=SHUTDOWN
必须成立,这就意味着线程池处于SHUTDOWN,STOP, TIDYING ,TERMINATED这几个状态之一
接下来需要满足以下情况之一线程池就无法新增工作线程
-
rs != SHUTDOWN
这意味着线程池是
STOP or TIDYING or TERMINATED
状态,这几个状态都不可新增工作线程 -
rs == SHUTDOWN
&&firstTask != null
这对应了 线程池处于SHUTDOWN,不会接受新提交的任务(
firstTask != null
是excute 方法入参提交的任务) -
rs == SHUTDOWN
&&firstTask == null
&&workQueue.isEmpty
这是意味着队列中所有任务都运行成功了,当前调用的时候也不是提交新任务,接下来线程池将转变为STOP,不需要新增新线程去处理队列中的任务
-
-
内部for循环
这里分别看三个if
-
内部for循环第一个if
,防止突破核心线程数,或者最大线程数可以看到如果运行线程大于
CAPACITY(2的29方-1,因为前3为表示状态)
那么直接无法新增线程如果新增的是核心线程,那么不能大于核心线程数
如果新增的是非核心线程数,那么不能突破最大线程数
-
内部for循环 cas 新增工作线程数量
这里
compareAndIncrementWorkerCount
方法使用cas
更新工作线程数。我们要考虑下,第一个if 和这里的
compareAndIncrementWorkerCount
会不会出现第一个if确认不会突破线程数,但是准备运行第二个if的时候,其他线程新增了一个线程,然后第二个if还是成功cas增大线程数
的情况其实是不会的,我们要看下
c
这个局部变量是第一层for循环,进来的时候获取的,并没有在第一个if 和第二个if 中去更新c
,所有如果真发生这种情况,cas会失败。cas失败的话,会刷新c,然后会由内部for循环第一个if
确保不会突破线程数,如果cas成功那么会去真正新建工作线程 -
如果线程池状态变化
这时候会跳到外层循环,由
外层for中的第一个if
判断状态
3.2.2 无锁新增工作线程
这里很牛逼,太牛逼了!
我们说过,new 一个工作线程的过程,是不需要加速锁,jvm保证new的过程分配内存线程安全。所有doug lea,让这部分可以并发进行 (值得借鉴)。
这里新增的Worker对象,Worker是一个内部类,后面我们分析Worker是如何运行的时候,再看其内部结构。
3.2.3 同步维护HashSet<Worker>
等变量
接下来需要维护HashSet<Worker> workers
, int largestPoolSize
,并启动工作线程。largestPoolSize
记录了线程池曾经同时具备多少个线程,并使用一个HashSet存储工作线程
首先会上锁,然后重新检查下线程池的状态 (确保处于运行,运行可以接受新任务,新增工作线程,或者处于SHUTDOWN,但是不能是提交新任务)然后将维护HashSet<Worker> workers
, int largestPoolSize
这些属性,然后解锁。整个流程很简单,但是没什么doug lea要再次检查一次线程池运行状态昵?
因为上面的双层for,到这里的上锁,并非一个原子操作,可能在此期间由另外一个线程调用了关闭线程池的方法。
3.2.4 启动工作线程
可以看到只有worker被加到HashSet<Worker> workers
后才会,运行工作线程
3.2.5 如果Worker启动失败
在此方法的finally
中,如果worker启动失败,会调用addWorkerFailed
这里从工作线程集合中删除工作线程,自旋cas减少工作线程数目,尝试关闭线程池(这个方法内部会判断线程池状态,并不是尝试关就一定会关)这一步就是上面操作的回滚。
3.3.工作线程运行机制
上面我们看了,提交任务到线程池的流程,下面我们看线程池中的工作线程是如何处理任务的
3.3.1 Worker内部类结构
Worker这个类继承了AQS实现了Runnable接口,继承Runnalbe 比较好理解,毕竟Worker的职责就是从阻塞队列中不断获取任务执行。但是为什么Worker为什么要继承AQS昵?(这部分需要有AQS的基础,推荐学习JUC源码学习笔记1——AQS独占模式和ReentrantLock)
3.3.2 Worker属性
属性 | 描述 |
---|---|
Thread thread | 运行Worker#run方法的线程,从ThreadFactory中创建而来 |
Runnable firstTask | 当调用addWorker传入一个任务的时候,firstTask记录初始任务 |
long completedTasks | 当前工作线程完成的任务数量 |
3.3.2 Worker 构造方法
首先设置AQS状态为-1,然后调用线程工厂创建一个线程,且Runnable为自己,那么这个线程启动将执行Worker#run
方法
3.3.3 Worker运行任务
Worker的运行调用了线程池的runWorker方法,我们先忽略Worker中对中断的处理,专注于Worker从队列拿任务执行,然后执行的流程
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 忽略,后续解释这里的作用
boolean completedAbruptly = true;
try {
//不断从队列中运行任务,如果firstTask不为null 那么这里直接先执行firstTask
while (task != null || (task = getTask()) != null) {
w.lock();//忽略,后续解释这里的作用
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt(); //忽略,后续解释这里的作用
try {
//钩子方法 可以进行扩展
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//钩子方法 可以进行扩展
afterExecute(task, thrown);
}
} finally {
task = null;
//记录工作线程完成工作数
w.completedTasks++;
w.unlock(); //忽略,后续解释这里的作用
}
}
completedAbruptly = false;
} finally {
//如果工作线程运行的时候 抛出了异常 那么来到这里,做善后工作
//completedAbruptly = true => 我们提交的任务,其业务逻辑抛出了异常
processWorkerExit(w, completedAbruptly);
}
}
可以看到工作线程的职责,就是在While循环中不断的从阻塞队列那任务,然后调用beforeExecute
,然后运行我们向线程池中提交的任务,执行我们的业务逻辑,然后调用afterExecute
。如果运行过程中出现了异常或者当前线程长时间没有拿到任务——getTask返回null,那么会调用processWorkerExit
进行“善后工作”,此线程将被回收。
那么getTask什么时候会返回null
3.3.4 getTask从阻塞队列中拿任务
此方法负责从阻塞队列中获取任务,使用阻塞队列的poll方法,或者使用take方法,前者可以指定超时时长,如果超过时长没有获取到任务,那么返回null,0后者不会超时,如果没有任务一直等待,二者都是对中断敏感的(中断在唤醒之前,那么9重新获取阻塞队列的锁之后抛出中断异常,中断在唤醒之后,重新获取锁后恢复中断标识)(推荐学习:JUC源码学习笔记3——AQS等待队列和CyclicBarrier,BlockingQueue)。
private Runnable getTask() {
//获取任务是否超时
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果线程池为STOP TIDYING TERMINATED 那么cas减小线程数 return null
//如果SHUTDOWN 但是队列存在任务 不会cas减少,那么不会return
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
//如果允许核心线程超时被回收 那么为true 或者工作线程大于核心线程数会没有任务的时候会减少到核心线程数
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//如果工作线程大于最大核心数 或者 允许过期且获取任务超时
if ((wc > maximumPoolSize || (timed && timedOut))
//如果队列不是空至少保证wc大于1 那么减少后工作线程至少为1
&& (wc > 1 || workQueue.isEmpty())) {
//CAS 减少工作线程数
if (compareAndDecrementWorkerCount(c))
return null;
//如果CAS失败那么继续自旋
continue;
}
try {
//在`allowCoreThreadTimeOut = true(允许核心线程过期)`或者`工作线程数>核心线程数`的时候会使用超时poll获取任务
//反之使用无限期阻塞take方法获取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
//获取到任务 那么直接返回任务
return r;
//反之说明超时没有获取到任务
timedOut = true;
} catch (InterruptedException retry) {
//如果被中断那么把超时置为false 继续自旋
timedOut = false;
}
}
getTask方法,整体是一个自旋,自旋返回的情况,要么是线程池的状态导致不需要继续处理队列中的任务,要么是队列中线程在存活时间内还没有获取到任务。
-
第一if
这里if成立的的情况有两种,成立后会减少工作线程数,并返回null
-
线程池处于 stop ,tidying,terminated
这种情况下一般是用户调用了shutdownNow,这个方法导致线程池进入stop,并且返回没有执行的任务
所以这时候是不需要去处理线程池中的任务的
-
线程池处于shutdown 且队列没有任务
shutdown状态不处理新任务,但是处理队列中的任务,既然队列都没有任务了,那么可以返回null。
-
-
第二个if
- timed属性记录
是否允许线程过期
,在allowCoreThreadTimeOut = true(允许核心线程过期)
或者工作线程数>核心线程数
的时候会为true - timeOut属性记录
是否从阻塞队列中拿任务超时,拿任务的时间超过了keepAliveTime
接下来我们看下这个if成立的条件,和对应的意义
-
工作线程数超过了最大线程数,且工作线程数大于1
可以看作doug lea写兜底机制,反之工作线程数突破最大线程数,导致资源枯竭
-
工作线程数超过了最大线程数,且队列是空
同上
-
允许超时,且发生超时没有拿到任务,且工作线程数大于1
允许超时要么是核心线程允许过期,要么是工作线程数大于核心线程数,这时候工作线程长时间没有拿到任务,将返回null。之所以要求工作线程数大于1,是要确保队列中的任务有一个线程可以执行
-
允许超时,且发生超时没有拿到任务,且队列是空
基本同上
命中条件,那么会cas减少工作线程数量,成功那么返回null,这里
compareAndDecrementWorkerCount
没有自旋,因为这里失败了,会continue,说明存在多个线程将被回收,如果同时回收了,可能线程池直接没有线程执行队列中的任务了 - timed属性记录
-
从队列中获取任务
-
这里可以看出允许核心线程过期,和存活时间的作用。核心线程和非核心线程并没有特殊标记记录,而是如果不允许核心线程过期,那么在工作线程数小于等于核心线程的时候使用无限期take 保证核心线程没有任务至少阻塞于阻塞队列中,而不是返回null 导致核心线程过期
-
如果工作线程数大于核心线程数,或者允许核心线程过期,那么使用超时等待poll方法,这时候超过存活时间就返回null,线程将被“善后”
-
如果超时没有拿到任务,这时候timedOut 会为true,将继续自旋并可能命中
第一个if
或者第二个if
导致线程被回收 -
如果成功获取到任务,那么返回任务进行执行
-
如果在阻塞队列中获取的时候被中断,那么
timedOut = false
并且不响应中断
-
3.3.5 processWorkerExit 工作线程“善后”操作
在用户线程
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // 如果是由于用户业务逻辑错误,那么是没有减少线程数的
decrementWorkerCount();//自旋+cas减少
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();//获取锁
try {
//更新完成的任务数
completedTaskCount += w.completedTasks;
//从HashSet中移除
workers.remove(w);
} finally {
mainLock.unlock();
}
//尝试终止线程池
tryTerminate();
int c = ctl.get();
//如果线程是running 或者 shutdown
if (runStateLessThan(c, STOP)) {
//不是由用户异常导致的
if (!completedAbruptly) {
//线程最少数
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//确保最少有一个
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//不足一个
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//只要线程是running 或者shutdown都确保存在一个线程可以执行队列中的任务
//或者使用用户业务逻辑错误,导致的异常,那么补上一个线程
addWorker(null, false);
}
}
代码总共分为四步:
-
如果是用户业务逻辑错误,那么自旋+cas减少工作线程数
因为正常由于getTask返回null的情况,在getTask中就已经完成了减少工作线程数的操作
-
更新completedTaskCount和
HashSet<Worker>
更新completedTaskCount就是把当前工作线程完成的任务数加和
然后更新
HashSet<Worker>
-
尝试终止线程池
-
如果是用户业务逻辑错误导致的异常,那么补上一个线程。如果是由于长时间没有任务,但是回收这个线程后,队列又有任务了,那么确保线程池中有一个线程可以处理任务。
这都是建立在 线程池为running 或者 shutdown的情况下,因为其他状态队列中的任务都不需要去执行。
如果不是用户业务错误,工作线程由于等待超时进入,且线程池是running 或者shutdown的时候,会增加一个线程,这就是
线程池的保活
(这哪里是保活啊,这是替身)
3.4 shutdown关闭线程池,中断工作空闲线程
关闭线程池,如果还有任务没有执行完,那么任务还会执行,但是线程池将不接受新任务。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//检查权限相关
checkShutdownAccess();
//确保状态至少为SHUTDOWN
advanceRunState(SHUTDOWN);
//中断所有的空闲工作线程
interruptIdleWorkers();
//钩子函数 可以自行扩展
onShutdown();
} finally {
mainLock.unlock();
}
//尝试终止线程池
tryTerminate();
}
-
advanceRunState 使用自旋+cas确保状态至少为shutdown,因为存在其他线程调用shutdownNow,设置状态为stop的可能
-
interruptIdleWorkers
调用了interruptIdleWorkers(false),表示中断所有空闲的工作线程(tryLock成功表示工作线程空闲,这部分在
Worker 与中断
章节中详细解释) -
onShutdown
钩子方法,可以自行实现进行扩展
3.5 shutdownNow 关闭线程池,中断所有启动的工作线程
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//自旋+cas 确保状态为stop
advanceRunState(STOP);
//中断所有已经启动的工作线程,那怕这个工作线程在处理任务
interruptWorkers();
//将剩余的任务从队列中倒出来,吐给用户
tasks = drainQueue();
} finally {
mainLock.unlock();
}
//尝试终止线程池
tryTerminate();
return tasks;
}
-
advanceRunState 使用自旋+cas确保状态至少为stop,因为存在其他线程调用过shutdownNow并且线程池将终结(触发了tryTerminate)设置为Tidying 或者Terminate的可能
-
interruptWorkers
对每一个worker调用
interruptIfStarted
,只要工作线程启动了(满足getState>=0)那么进进行中断 -
drainQueue
调用阻塞队列的drainTo
方法将任务吐出来,如果调用完还有任务,那么使用遍历 + 删除的方式进行清理
3.6 tryTerminate 尝试关闭线程池
/**
* 尝试判断是否满足线程池中止条件,如果满足条件,将其推进到最后的TERMINATED状态
* 注意:必须在任何可能触发线程池中止的场景下调用(例如工作线程退出,或者SHUTDOWN状态下队列工作队列为空等)
* */
final void tryTerminate() {
for (;;) {
int currentCtl = this.ctl.get();
if (isRunning(currentCtl)
|| runStateAtLeast(currentCtl, TIDYING)
|| (runStateOf(currentCtl) == SHUTDOWN && !workQueue.isEmpty())) {
return;
}
// 有两种场景会走到这里
// 1 执行了shutdown方法(runState状态为SHUTDOWN),工作线程都空闲导致,
// 2 执行了shutdownNow方法(runState状态为STOP)
if (workerCountOf(currentCtl) != 0) {
interruptIdleWorkers(ONLY_ONE);
return;
}
// 线程池状态runState为SHUTDOWN或者STOP,且存活的工作线程个数已经为0了
// 虽然前面的interruptIdleWorkers是一个一个中断idle线程的,但实际上有的工作线程是因为别的原因退出的(恰好workerCountOf为0了)
// 所以这里是可能存在并发的,因此通过mainLock加锁防止并发,避免重复的terminated方法调用和termination.signalAll方法调用
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// cas的设置ctl的值为TIDYING+工作线程个数0(防止与别的地方ctl并发更新)
if (ctl.compareAndSet(currentCtl, ctlOf(TIDYING, 0))) {
try {
// cas成功,调用terminated钩子函数
terminated();
} finally {
// 无论terminated钩子函数是否出现异常
// cas的设置ctl的值为TERMINATED最终态+工作线程个数0(防止与别的地方ctl并发更新)
ctl.set(ctlOf(TERMINATED, 0));
// 通知使用awaitTermination方法等待线程池关闭的其它线程(通过termination.await等待)
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// 如果上述对ctl变量的cas操作失败了,则进行重试,再来一次循环
// else retry on failed CAS
}
}
-
第一个if
-
isRunning(currentCtl)
为true,说明线程池还在运行中,不可以关闭线程池 -
runStateAtLeast(currentCtl, TIDYING)
当前线程池状态已经大于等于TIDYING了,说明之前别的线程可能已经执行过tryTerminate,且通过了这个if校验,不用重复执行了 -
(runStateOf(currentCtl) == SHUTDOWN && !workQueue.isEmpty()))
当前线程池是SHUTDOWN状态,但工作队列中还有任务没处理完,也不满足中止条件,这时候不能关闭,还需要处理队列中的任务
工作队列没有任务的时候,这些线程getTask为null,就会调用
processWorkerExist
也会调用到tryTerminate
,这时候满足条件将自动关闭线程池
-
-
第二个if
来到这个if需要满足
执行了shutdown方法(runState状态为SHUTDOWN),且当前工作线程已经空了
or执行了shutdownNow方法(runState状态为STOP)
如果工作线程数不等于0,这里会调用
interruptIdleWorkers
中断一个空闲的线程。这个被中断的线程会
getTask方法返回null->processWorkerExit->tryTerminate
,这时候这个线程也会中断一个空闲的线程,从而达到一个接一个的终止,优雅的关闭资源 -
修改状态,唤醒由于 调用
awaitTermination
而被阻塞的线程-
这里上锁的原因是,也许线程是一个个停止的,然后突然有一个工作线程执行业务逻辑出现异常,调用
processWorkerExit
,也调用到tryTerminate
,恰好线程数为0,出现并发 -
修改状态,调用钩子方法,唤醒阻塞的线程
首先cas状态到Tidying,工作线程数为0,然后调用
terminated
钩子方法,然后设置为terminated,并且唤醒阻塞在termination
上的线程
-
3.7 awaitTermination 阻塞当前线程,直到超时,或者线程池关闭,或者被中断
代码不复杂,但是需要有AQS Condition的知识,才知道为什么这里会阻塞调用线程
JUC源码学习笔记3——AQS等待队列和CyclicBarrier,BlockingQueue
3.8. prestartAllCoreThreads 预热线程池
此方法会提前让线程池工作线程数到达核心线程数,这样的好处相当于10个外卖员等待接单,一旦单子(任务)提交,立马得到执行,减少了新建线程的耗时
3.9 submit 提交一个任务
submit 底层还是依赖excute ,但是它会先将任务包装FutureTask,方便调用者来控制任务的执行,取消,获取异步执行结果。FutureTask本身就是一个任务,也是异步执行的结果 ,FutureTask就如同一个纽带,连接了任务 和 任务的结果
(FutureTask 学习: JUC源码学习笔记7——FutureTask源码解析,人生亦如是,run起来才有结果)
4.Worker与中断
这里我们主要分析,worker为什么需要继承AQS,以及Worker中state代表什么,worker在不同工作状态被中断会如何
4.1 Worker状态
-
构造的时候为-1
-
runWorker对状态的变更
unlock会调用Worker的tryRelease,设置为0
lock会调用Worker的 tryAcquire,cas修改state从0到1,如果失败会阻塞在AQS同步队列中
我们可以看到
state =1 意味着worker在执行业务逻辑
,state=0意味着worker处于空闲
,
4.2 shutdown 与工作线程的交互
shutdown 会中断空闲的线程,并对空闲线程进行回收。怎么识别一个线程是空闲线程昵,怎么让空闲线程被回收昵?
上面讲shutdown
方法时候,我们了解到 shutdown 首先自旋+cas 确保线程池状态到达 SHUTDOWN,然后调用interruptIdleWorkers
中断空闲线程,这个方法会调用到interruptIdleWorkers(false)
其中的false表示中断所有空闲线程,而不是一个
这里需要品一品,为什么tryLock成功,就意味着当前工作线程是空闲的。上面我们说到工作线程执行任务的时候会先执行lock,改变状态为1,然后开始执行业务逻辑,这里的tryLock会cas状态从0到1,如果成功了,意味着cas的这一瞬间工作线程是空闲的。
这是工作线程也许阻塞与getTask方法,也可能刚刚拿到任务,准备lock但是比shutdown慢。
-
工作线程阻塞与getTask
中断工作线程,会导致原本阻塞与阻塞队列的线程抛出中断异常
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); //3 发现 线程池是shutdown ,如果这时候阻塞队列还没任务,那么会自旋减少工作线程数,返回null if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { //1.从这里 抛出中断异常 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { //2.在这里被捕获 //将继续自旋,来到3 timedOut = false; } } }
顺着代码中的1,2,3看,最终getTask返回null,线程会从
runWorker
中的while循环退出,执行processWorkerExist
,从而实现空闲线程的回收 -
准备lock 但是 比shutdown慢
这时候,线程从getTask刚刚拿到任务,但是准备lock,被shutdown方法强占先机,导致lock获取锁失败,而阻塞与锁,只有等shutdown释放自己worker这把锁才能返回,但是这时候工作线程被中断了。
注意这时候返回以及被中断,doug lea不能让这个中断带到用户的业务逻辑中,因为这样会影响到业务逻辑(用户代码中根据中断也许有不同的逻辑)所以有下面这段代码
这段代码的作用是,如果线程池停止了(stop tidying terminated)那么一定确保工作线程被中断,但是如果不是那么一定确保线程不被中断
这段代码,我做了一点点排版调整,逻辑不变,如下
//1. (runStateAtLeast(ctl.get(), STOP) || //2. (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && //3. !wt.isInterrupted()
其中1,2是或的关系,3和(1或2)是且的关系。如果整个为true 那么会中断当前下次你,我们详细分析下
-
如果线程池停止,那么1 为true,如果这时候工作线程没有中断,那么工作线程会被中断
-
如果线程没有停止,这是1为false,来到2,首先
Thread.interrupted()
清除中断标志,返回之前有没有被中断。如果线程池没有停止,但是之前被中断了,这里会清除中断标识,这样实现了 ——线程池没有停止,那么确保线程不被中断。如果之前被中断,那么说明是shutdown ,或者 shutdownNow,或者用户业务逻辑进行的中断,这时候且
runStateAtLeast(ctl.get(), STOP)
成立,那么说明线程需要中断,那么这是再次进行中断(整体为true if中的逻辑就是中断)有趣的是,为什么doug lea写两次
runStateAtLeast(ctl.get(), STOP)
?这是
(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))
不是一个原子操作,可能我刚清除中断了,这时候shutdownNow,成功改变状态为STOP,这时候,其实需要对线程进行中断(在源码注释中doug lea称之为清除中断和shutdownNow的race(竞赛)
十分生动形象了)
-
至此我们理解了 中断对于工作线程的意义,其中关键的一点是,中断能让阻塞于阻塞队列中的线程,重新自旋从而来检查线程池状态,达到如果shutdown,shutdownNow执行了,工作线程会从阻塞,到自旋检查线程池状态从而让getTask返回null,达到工作线程回收的目的(doug lea 牛逼!)
4.3 shutdownNow 和 工作线程的交互
shutdownNow,不关工作线程是空闲还是运行都会进行中断,而且这个中断会传播到我们提交的业务逻辑中
shutdownNow 会首先改变状态为stop然后调用interruptWorkers
,这个方法会调用每一个Worker的interruptIfStarted
可以看到只要state >=0 都可能被中断,只有Worker刚new出的来的时候是-1,一旦执行runWorker,首先第一个事情就是修改状态为0,这时候就可能被shutdownNow中断。
这中断会让空闲的线程从getTask返回null,然后线程进入回收。让刚拿到任务准备执行的线程将中断带到业务逻辑中,让正在运行业务逻辑线程被中断(为什么能带到业务逻辑中?见4.2 shutdown 与工作线程的交互 中的 准备lock 但是 比shutdown慢
)。
这个中断可以看作是线程池和我们业务逻辑的通信 —— 爷关闭了,你好自为之
4.4 线程池的优雅停
如果当前线程池有一百个线程,我上来一个shutdownNow, 让线程池关闭,我能立马回收一百个线程么。显然是不行的,也不能说我线程池先修改为停止状态,线程爱咋咋地,这种不负责任的行为也是不行的,线程池需要等待池中所有工作线程为0,才能停止自己。
那我们来看看doug lea如何实现优雅停
tryTerminate
方法 在新增worker 失败
,或者shutdown执行
,或者shutdownNow执行
等情况的时候,会被调用
所以上来就是一个判断,如果线程池为运行,那么不能停止;如果已经是TIDYING说明有线程已经将线程池停止了,不需要再次执行;如果是shutdown但是队列有任务,那么需要执行队列中的任务,也不能停止线程池。
精彩的在于 workerCountOf(c) != 0
这是会中断一个空闲的线程,为什么只中断一个啊,为什么不都中断?
如果全部中断,这些线程都会从getTask中拿到null 然后调用processWorkerExist,然后并发执行terminate,从某种程度上cpu遭了殃,不够优雅。
中断一个可以让其中一个空闲执行processWorkerExist 然后调用tryTerminate
,继续执行一个空闲的线程,然后循环往复,直到所有工作线程调用processWorkerExist 进行回收后,才能到下面修改状态为TIDYING的逻辑。
doug lea 在源码注释中 说 中断一个空闲线程,确保信号的传播
就是这个意思,doug lea 牛逼
四丶问题解答
-
线程池往往提交任务,等操作都是并发调用的,doug lea如何实现线程安全 和 高效率
首先doug lea 使用一个自旋 +cas的操作,确保成功增加了工作线程数后,才能继续创建线程的操作,并且这个自旋判断了线程池状态是否能接受新任务,是否能新建工作线程,相当于一把自旋锁,避免阻塞挂起的性能消耗。如果成功实现了工作线程数的增加,就如同占据的名额,接下来使用线程工厂创建线程的步骤是不加锁的,提高了并发。将线程放入worker集合 使用了ReentrantLock ,启动线程的操作又是不加锁的,通过这种缩小锁的粒度的思想,提高并发执行效率。
-
线程池 的 shutdownNow和shutdown的区别
前者会修改线程池状态为stop并中断所有启动的线程(工作线程刚新建的使用state = -1,调用runWorker首先设置状态为0,视为已经启动,如果开始执行任务那么修改,如果执行任务首先cas修改状态为1)所有state >=0 的线程都会被中断,且中断可以在用户定义的任务中感知到,并且会把任务队列中的任务通过阻塞队列
drainTo
方法倒出来给用户。后者会修改线程池状态为shutdown,然后中断所有空闲的线程,使用tryLock cas修改状态从0到1,如果成功视为工作线程为空闲。
-
线程池停止的时候,如何确保所有工作线程回收后才停止线程池本身
tryTerminate方法负责停止线程池,会检查工作线程数,如果不为0,那么中断一个空闲的线程。中断工作线程的作用会让阻塞于getTask方法的工作线程,重新自旋,从而判断线程池状态,如果停止那么返回null,如果shutdown且阻塞队列为空,也返回null,从而让工作线程从runWorker方法while结束,执行processWorkerExist进行线程回收,processWorkerExist方法又会调用到tryTerminate,继续中断一个空闲线程,直到工作线程数为0,这时候才会修改状态为TIDYING,然后执行
terminated
方法,然后设置状态为terminated状态。
JUC源码学 习笔记8——ThreadPoolExecutor源码解析
源码基于JDK8
文章1.5w字,非常硬核
一丶从多鱼外卖开始
话说,王多鱼给好友胖子钱让其投资,希望亏得血本无归。胖子开了一个外卖店卖国宴,主打高端,外卖小哥都是自己雇佣,并且开迈巴赫送外卖。最开始胖子觉得这么贵的外卖,就雇佣100个外卖员(核心线程
)够了,并购买了100台迈巴赫。但是随后王多鱼让他搞活动——顾客说说自己的亏钱经历就可以免费吃外卖。随即店铺大火,100个外卖员都送不过来(核心线程打满了
),胖子就把外卖放在桌子上,按照先后顺序摆放好,等外卖员送完一单再送桌子上的外卖(阻塞队列
)。但是慢慢的桌子上都放不下了(阻塞队列满了
),胖子只好临时雇员外卖员(非核心线程
)送外卖。雇佣到20个临时外卖员,还是不够送外卖,最终决定每天限量,系统提示今日活动太火爆,让用户无法下单(拒绝策略
)。最终活动结束了,临时雇佣的外卖员也都解雇了(回收非核心线程
)
1.从故事中理解线程池是什么
我们把外卖员看作线程,多鱼外卖店就是线程池,用户点的外卖就是任务。线程池就是使用池化技术,维护线程,并使用这些线程执行任务的执行器(Executor)
。结合例子我们看下线程池的执行流程图
2.为什么需要线程池 = 多鱼外卖店雇佣外卖员的意义
-
降低资源消耗:
(如果多鱼外卖店,每次都来一个外卖临时雇佣一个,临时发布招聘的开销大,随后解聘的开销也大)
通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。java线程和操作系统是一对一的映射关系,新建或者销毁一个线程都存在资源的消耗
-
提高响应速度:
(当多鱼外卖店没有做活动的时候,来一个任务,100个核心外卖员可以立马送出外卖)
任务到达时,
一定情况下
无需等待线程创建即可立即执行。 -
提高线程的可管理性:
(多鱼外卖店为什么要100个人配100个车,就是要减少外卖员交替使用车辆送外卖带来的上车下车开销,并且店主可以控制外卖员的数量,并且弄出绩效制度doge)
线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。使用过多的线程会导致线程上下文切换更多,从而导致在保存“现场”和恢复“现场”的开销激增。
-
提供更多更强大的功能:
(多鱼外卖店可以要求外卖员在送外卖到顾客家的时候,祝顾客新年快乐)
线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行
3.从多鱼外卖店看核心线程数,最大线程数,阻塞队列,拒绝策略,存活时间
3.1 核心线程数
多鱼外卖店,是一个员工一个车辆,假如外卖店就10台车辆,那么还需要雇佣100人么?
显然是不需要的,但是如果外卖员每天工作12小时,期间休息和吃饭占用4小时,在休息和吃饭的时间内是不会使用到车辆的。那么这时候我们应该雇佣 10(车辆数) + 10*4(空闲时间)/(12-4)(有效工作时间) = 15人
,这样我们让这些快递员轮班,A吃饭的时候,B送货。
但是还需要考虑到,难道外卖车就不送去保养么,外卖车不也得加油。
-
10个车没必要雇佣100人
好比CPU只有10核心,在计算密集型任务中(把外卖员吃饭看作IO操作,计算密集就如同外卖员丝毫不停歇努力配送中),那么雇佣10人左右即可
-
10(车辆数) + 10*4(空闲时间)/(12-4)(有效工作时间) = 15人
这就是
线程数 = CPU 核心数 +CPU 核心数 x(IO耗时/CPU计算耗时)
。看看这个公式,难道IO密集型的任务就设置核心线程数为CPU核心x2
么?不见得,如果IO耗时和CPU耗时不是1比1,IO耗时比例更高,那么应该设置的更高一点 -
难道外卖车就不送去保养么,外卖车不也得加油
这个可以理解为,CPU还得处理系统的其他计算,并非100%专注于当前这个线程池,所以核心线程数的设置需要考虑到
CPU利用率
最终核心线程数的设置,是一个需要压测,需要实际数据去调试的,勉强只能给出
线程数 = (CPU 核心数 +CPU 核心数 x (IO耗时/CPU计算耗时))x cpu利用率
的理论公式
3.2 阻塞队列
阻塞队列就是故事中的”桌子”,它基于AQS Condition
实现等待唤醒模式,在线程池中主要利用阻塞队列队列为空,获取任务的线程将阻塞,成功提交任务到阻塞队列将唤醒被阻塞的线程
的特性,之所以阻塞就是避免线程无意义的自旋浪费CPU。
阻塞队列在juc包下具备很多实现,下面我们介绍几种常用的阻塞队列
ArrayBlockingQueue
基于数组的有界队列LinkedBlockingQeque
基于链表的有界队列(默认容量是int类型最大)PriorityBlockingQueue
优先阻塞队列,基于数组实现的堆,并且具有阻塞队列的特性DelayQueue
基于优先队列实现的无界阻塞队列,元素只有在其延迟到期时才能被取出SynchronousQueue
不存储元素的阻塞队列,每个插入操作都必须等待另一个线程的相应删除操作
如果我们选择无界阻塞队列LinkedBlockingQeque
,意味着最大线程数基本上没用了,因为任务会一直塞到队列直到达到int类型最大,这时候往往意味着OOM
如果选择有界阻塞队列,并且指定的容量太小,那么意味着线程池在任务很多的时候,阻塞队列将立马塞满,开始创建非核心线程,甚至直到触发拒绝策略。
如果指定的容量太大,意味着很多任务堆积,任务得不到及时执行。
另外还有SynchronousQueue
,它可以简单看作容量为0的阻塞队列。
PriorityBlockingQueue
和DelayQueue
都是基于堆实现,可以快速获得堆顶元素,我们使用PriorityBlockingQueue
需要传入比较器,或者任务本身就是Comparable
。
可以看出阻塞队列的选择,需要考虑到任务对及时性的要求,也要考虑到,峰值的时候任务有多。
3.3 最大线程数
最大线程数,是核心线程数打满,阻塞队列塞满,然后会去建立 最大线程数 - 核心线程数
个非核心线程执行任务,但是非核心线程在存活时间内,如果拿不到任务,将被回收(如同多鱼外卖活动结束,非核心外卖员没有外卖送,自然被解雇)
最大线程数如果设置太小,那么可能不能胜任大量任务,最后任务将被拒接策略处理。如果太大,并不意味着效率一定提高,因为线程的调度依赖于cpu调度。此参数的设置需要考虑到系统的性能(cpu不行设置太大也没有意义),任务是否接受被拒绝策略处理,以及任务峰值等。
3.4 拒绝策略
每个系统都有它的性能瓶颈,当任务是在太多,核心线程打满,阻塞队列塞满,最大线程打满,这时候继续提交任务将触发拒绝策略。JUC中提供了以下策略
-
CallerRunsPolicy
由提交任务的线程执行任务,如果线程池关闭了,那么一声不吭的抛弃任务这个拒绝策略很有意思,从某种程度上说,它有点阻塞的意思,当需要提高任务的线程执行任务的时候,意味着提高任务线程的方法将不能立即返回,从而避免提高任务继续提交其他任务。
-
AbortPolicy
直接抛出RejectedExecutionException
,线程池默认的拒绝策略 -
DiscardPolicy
悄无声息的忽略任务 什么都不做忽略任务 -
DiscardOldestPolicy
如果线程池没有被关闭那么丢弃队列头部的任务,然后提交此任务。这个拒绝策略,会丢弃队列头部任务,然后再次调用线程池提交任务的方法,有点递归的意思,需要注意:丢弃队列头部任务,并再次提交任务并不是一个原子操作,这种拒绝策略会递归的调用提交任务的方法直到任务入队
在自己系统中,触发拒绝策略往往需要我们做好记录,甚至提醒开发人员调优线程池。具体使用什么拒绝策略需要看业务需求。
3.5 存活时间
当任务有限或者提交不频繁时,最终线程池中的线程将无任务执行。为了减少系统资源消耗,在存活时间内如果一直接收不到任务的话,线程将被回收。通常存活时间的设置只对非核心线程有效,但是如果调用allowCoreThreadTimeOut(true)
那么核心线程也将被回收
那么核心线程是否应该被回收呢?如果业务上这个线程池被调用的十分不频繁,或许回收核心线程也是不错的选择,但是如果经常间歇性有任务需要执行且要求效率尽可能高,这时候如果核心线程被回收了,线程池又将new新的线程,会降低线程池的执行效率。
那么存活时间如何设置呢?还是得依赖于业务,看业务需要线程池的时间间隔,取一个粗略估计值。
3.6 线程工厂
线程池创建线程最终使用调用ThreadFactory
进行,通常需要我们指定下线程的名称,推荐使用ThreadFactoryBuilder
方便对线程的命名进行定义
4.我们平时在那些地方使用到线程池,以及有哪些坑
4.1 @Async
spring的@Async
注解标注在spring bean的方法上,将被AsyncAnnotationBeanPostProcessor
代理原始对象,活的异步增强的效果,其核心还是向线程池中提交任务。
建议使用此注解的时候,指定自己的线程池(注解中可以指定使用线程池bean的名称)这样可以让不同类型的业务使用不同的线程池,如果IO密集和CPU密集使用一个线程池,且发生等待队列中IO任务排在CPU密集任务前面,就如同墨迹的人在你前面排队,会对效率有所影响
且AsyncAnnotationBeanPostProcessor
是一个BeanPostProcessor
并不是一个SmartInstantiationAwareBeanPostProcessor
,如果发生循环依赖需要注意代理对象的方法可能不具备异步能力(而且调用的时候必须使用代理对象去调用,this.
,或者直接调用无异步能力)
4.2 @EventListener
spring的@EventListener
标注的方法,会被EventListenerMethodProcessor
(BeanFactoryPostProcessor实现类
),在所有单例bean实例化后,将所有bean中标有@EventListener
注解的方法和bean包装成ApplicationListener
,注册到ApplicationContext
中(一般最终注册到SimpleApplicationEventMulticaster(事件多播器)
中),如果我们为SimpleApplicationEventMulticaster
设置了一个线程池,它将异步的回调ApplicationListener
(反射调用bean对应的方法)
注意这里的坑,如果异步意味着事务可能会失效,spring还有一个@TransactionalEventListener
注解,可以指定在事务提交前等等阶段去响应事件,其中@TransactionalEventListener
的fallbackExecution
可以指定,是否事务同步管理器中没有事务(事务同步管理器基于ThreadLocal,异步使用其他线程将导致事务失效,这时候事务管理器就是没有事务的状态)也继续响应事件。
4.3 CompletableFuture
这是并发大师doug lea编写的进行异步or同步任务编排的一个工具类,如果不指定线程池那么将使用公共线程池(线程数默认为CPU 核心数量减1)。如果使用者都不使用自定义的线程池,很容易造成大量任务堆积,反而降低执行效率。通常建议不同业务类型使用不同的线程池,并设置合适的线程池参数
4.4 @schedule
注解依赖于ScheduledAnnotationBeanPostProcessor
,它是一个BeanPostProcessor,在每一个单例bean 实例化的时候,会扫描是否存在此注解,如果存在那么解析并记录。在所有单例bean 实例化后,会将bean和其方法,以及解析的注解信息包装称一个任务,提交到线程池中。
5 .Executors中的线程池
常用的有以下几种
newFixedThreadPool
固定数目工作线程,无界任务阻塞队列(可以容纳int最大个任务)的线程池——容易oom,如果请求量大容易操作阻塞队列积压过多任务造成oomnewSingleThreadExecutor
单线程,无界任务阻塞队列的线程池newCachedThreadPool
,支持工作线程数达到Integer.MAX_VALUE
,且空闲时间达到60秒那么就会被回收,使用的是SynchronousQueue
不会容纳任何任务,每一个任务提交之后都必须有另外一个线程获取任务——线程多并不意味着效率高,上下文的切换,线程的new 和消耗都是消耗大量资源的,支持Integer.MAX_VALUE
个线程显然也是不符合实际的
基本上编程规范都要求我们自己定义线程池参数,Executors
中的线程池多少都有点问题,建议开发人员使用ThreadPoolExecutor
构造方法结合业务实际设置参数后使用。
二丶带着问题学源码
- 线程池往往提交任务等操作都是被并发调用的,doug lea如何实现线程安全 和 高效率
- 线程池 的 shutdownNow和shutdown的区别
- 线程池停止的时候,如何确保所有工作线程回收后才停止线程池本身
三丶源码分析
1.线程池ThreadPoolExecutor的继承关系
1.1 Executor 接口
public interface Executor {
void execute(Runnable command);
}
Executor
的作用的是把任务和任务将如何运行进行解耦
(直接使用Thread我们需要自己把业务逻辑些在runnable中传入,然后start,任务逻辑和任务的运行耦合紧密),其中只有一个方法execute
,但是其实现类,可能是同步的直接调用Runnable#run
,也可能是异步开启线程执行。
1.2 ExecutorService 接口
ExecutorService
实现了Executor
接口,提供管理自身生命周期的方法,其submit
方法生成 Future
来跟踪一个或多个异步任务的进度的方法,还提供了批量提交任务的方法。
方法 | 描述 |
---|---|
void shutdown() | 关闭执行器,如果还有任务没有执行完,那么任务还会执行,但是不会接受新的任务,如果已经处于关闭状态还去调用此方法,不会有任何效果 |
List<Runnable> shutdownNow() |
尝试停止所有正在执行的任务,返回等待执行但未执行的任务,停止执行任务通常是通过调用对应线程的interrupt 方法,如果线程自己不响应中断,那么无济于事,任务还是会继续执行 |
boolean isShutdown() | 如果已经被关闭那么返回true,通常调用shutdown 和 shutdownNow 后可能存在线程在执行任务,但是还是返回true |
boolean isTerminated() | 如果所有任务在关闭后都已完成,则返回 true。请注意,除非调用了 shutdown 或 shutdownNow,且所以任务都结束了,否则 isTerminated 永远不会为真 |
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException | 调用线程进入阻塞等待直到关闭当前ExcutorServuce,或者发生超时,或者当前线程被中断。 |
<T> Future<T> submit(Callable<T> task) |
提供一个具备返回值的任务,返回一个Future 表示是此任务的异步执行结果。 |
<T> Future<T> submit(Runnable task, T result) |
和submit(Callable) 类似,但是其异步返回结果在执行完后返回结果是入参result |
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException |
批量提交一批任务,阻塞直到所有任务都完成or者任务执行失败或者当前线程被中断 |
批量提交一批任务,阻塞直到所有任务都完成or任务执行失败或者当前线程被中断,or指定的时间超时 | |
提交一批任务,等待其中一个执行完成,或者直到当前线程被中断,返回时会取消没有执行完的任务 | |
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException |
提交一批任务,等待其中一个执行完成,or到当前线程被中断,or等待时间超时,返回时会取消没有执行完的任务 |
1.3 AbstractExecutorService
AbstractExecutorService
提供了Runnable
,Callable
适配成RunnableFuture
(一般适配成实现类FutureTask
),还实现了ExecutorService
的submit
,invokeAny
,以及invokeAll
。是对ExecutorService
的抽象实现,有点模板方法的意思。
2.线程池的状态和属性
2.1 状态和状态的变更
线程池使用一个AtomicInteger
类型的属性,同时记录线程池状态和当前线程池中线程的数量。高3位标识线程池的状态 低29位标识线程池工作线程个数
2.2状态变更
2.3 属性
属性名 | 类型 | 解释 |
---|---|---|
workQueue | BlockingQueue<Runnable> |
保存待处理任务的阻塞队列 |
mainLock | ReentrantLock | 锁,线程池用一个set保存所有线程,一个int保存最大的线程数,修改的时候使用这个锁保证线程安全 |
workers | HashSet<Worker> |
包含池中所有工作线程的集合。仅在持有 mainLock 时访问 |
termination | Condition | 调用awaitTermination的线程在此等待队列上等待。线程终止的时候也会使用此唤醒等待的线程 |
largestPoolSize | int | 程池中存在的最大的工作线程数。仅在持有 mainLock 时访问。 |
completedTaskCount | long | 完成任务的计数器。仅在工作线程终止时更新。 |
threadFactory | ThreadFactory | 所有线程都是使用这个工厂创建的 |
handler | RejectedExecutionHandler | 拒绝策略,队列也无法容纳任务,且达到最大线程数的时候调用此策略方法 |
keepAliveTime | long | 工作线程多久(纳秒)没有执行任务将被回收,(一般针对非核心线程,也可以用于核心线程的回收) |
allowCoreThreadTimeOut | boolean | 如果为 false(默认),核心线程即使在空闲时也保持活动状态。如果为true,核心线程超过keepAliveTime纳秒没有工作将被回收。 |
corePoolSize | int | 核心线程数,如果池中线程数小于核心线程数,那么接受新任务总是new一个线程 |
maximumPoolSize | int | 当核心线程数达到,阻塞队列塞满,将新增maximumPoolSize-corePoolSize 个线程处理任务 |
3.源码分析
线程池中存在一些位运算,本文不会分析这些位运算
3.1执行任务excute
excute方法接受一个Runnale,submit方法也是基于excute实现的,这是线程池源码中的核心。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//ctl `高3位标识线程池的状态 低29位标识线程池工作线程个数`
int c = ctl.get();
//如果当前工作线程总数小于核心线程
if (workerCountOf(c) < corePoolSize) {
//那么会尝试新增一个核心线程执行当前任务
//addWorker第一个参数是任务,第二个参数是是否核心线程,返回是否新增成功
//如果新增任务成功那么直接返回
if (addWorker(command, true))
return;
//新增失败那么重新获取线程总数和线程池状态
c = ctl.get();
}
//如果是运行状态 且加入到了任务队列
if (isRunning(c) && workQueue.offer(command)) {
//如果新增成功 重新获取程总数和线程池状态
int recheck = ctl.get();
//如果发现不是运行状态尝试删除任务
if (!isRunning(recheck) && remove(command))
//如果成功从队列删除了任务,那么调用拒绝策略
reject(command);
//如果线程池中的线程为0那么添加一个非核心线程,保证队列中的任务会被执行
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果队列满了,或者说不是running 那么新增一个非核心线程
//如果新增非核心失败 那么调用拒绝策略
else if (!addWorker(command, false))
reject(command);
}
整个代码看下来并没有很复杂,其中addWorker方法便是新增线程执行任务,成功返回true 失败返回false。excute方法最大的难点就是 线程安全问题
(存在并发调用excute方法的可能)我们来一起品一品doug lea是如何解决的。
-
当线程数小于核心线程数(
if (workerCountOf(c) < corePoolSize)
)会尝试调用
addWorker(command, true)
新增一个核心线程执行任务,乍一看这里存在线程安全问题,因为if (workerCountOf(c) < corePoolSize)
和addWorker(command, true)
不是一个原子操作,可能A线程正在addWorker
,B线程抢先一步addWorker
成功达到了核心线程数,如果A继续成功那么核心线程数将被突破。doug lea的解决办法在addWorker
方法中接着看,如果
addWorker(command, true)
失败,会再次调用c = ctl.get()
,因为此时要么核心线程数被突破,要么线程池状态发生变更,需要刷新下局部变量c -
线程数达到核心线程数
if (isRunning(c) && workQueue.offer(command))
当线程数达到核心线程数,会首先看线程是否是运行状态,然后
workQueue.offer(command))
将任务放入阻塞队列中。这里对应了shutdown stop
等状态下,线程池是不接受新任务的。但是需要注意if (isRunning(c) && workQueue.offer(command))
不是一个原子操作,可能放入到阻塞队列的过程中,线程状态被更改了,doug lea解决办法就是,如果放入到阻塞队列后,可以从队列中删除任务,说明任务没有被拿去执行,那么拒绝任务。如果删除任务失败了,并且线程池中的工作线程为0个,那么会新增一个线程去执行任务,保证这个放入到队列中的任务,一定会被执行到。
-
如果阻塞队列满,或者线程池不是running状态
!addWorker(command, false)
会新增一个非核心线程去执行任务,如果新增非核心任务失败,说明已经达到了最大线程数,那么会调用拒绝策略
为什么不是running还会
addWorker
方法,不怕shutDown状态还接受了一个任务么?——addWorker中会对状态再次进行判断,保证了这种情况不会发生
3.2 新增一个工作线程addWorker
addWorker
接受两个参数——任务,和是否核心线程。这个方法代码很精彩,使用cas乐观锁 + ReentrantLock
提高执行效率。
我们思考一个问题,修改工作线程计数,new 一个工作线程,将线程放入HashSet<Worker>
中,启动工作线程,这四步中有哪些步骤是线程不安全的?
显然是——修改工作线程计数,和将线程放入HashSet<Worker>
中是线程不安全。虽然新建线程调用构造方法涉及到内存空间的分配,但是jvm无论是使用指针指针碰撞,还是空闲链表,还是线程本地分配空间,都会为我们保证这一步是线程安全的
那么我们是否需要锁住整个addWorker
方法昵?显然不需要,至少new 一个工作线程这一步是不需要加锁的。接下来我们看看doug lea是如何巧妙减低锁的粒度,提高执行效率的。
方法很长,我们具体解析的时候分多个部分
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
//=========自旋部分开始=================
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//如果大于等于SHUTDOWN 且 线程池不是SHUTDOWN 说明是STOP TIDYING TERMINATED这几种都是不接受新任务的
//大于等于SHUTDOWN 且队列是空,这时候也不接受新任务,线程池即将关闭
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//自选
for (;;) {
int wc = workerCountOf(c);
//如果大于(2^29)-1 直接不可新增线程,ctl 高三位状态低29位线程数 再多表示不了了
// 如果表示新增核心线程 且大于核心线程 或者非核心但是大于最大线程数 返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//cas增加 工作线程数 这里只是更新ctl 不是真的增加一个线程
//这样增加成功了才能退出 保证了线程数不会超过阈值
if (compareAndIncrementWorkerCount(c))
break retry;
//如果增加失败了重新看下状态,状态改变了,那么重新自旋
//cas失败了,状态没变也会自选
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
//=========自旋部分结束=================
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//新建一个线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//上锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//线程池状态
int rs = runStateOf(ctl.get());
//如果小于 SHUTDOWN 说明是RUNNING
//或者是SHUTDOWN 但是没有任务执行,说明是为了执行队列中的任务或者预热线程池
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//加到set集合
workers.add(w);
int s = workers.size();
//更新最大线程数
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
//解锁
mainLock.unlock();
}
//如果加入了set 启动worker
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//如果没有启动 说明线程池已经不接受新任务了,或者其他奇奇怪怪的异常
if (! workerStarted)
//尝试减少工作线程数 并且尝试关闭线程池
addWorkerFailed(w);
}
//返回worker是否启动了
return workerStarted;
}
3.2.1 自旋修改工作线程数
修改工作线程数,这一步doug lea使用自旋+cas的方式
-
外层for中的第一个if
rs 是线程池的运行状态,看下这个if中哪些情况addWorker会直接返回false
首先
rs>=SHUTDOWN
必须成立,这就意味着线程池处于SHUTDOWN,STOP, TIDYING ,TERMINATED这几个状态之一
接下来需要满足以下情况之一线程池就无法新增工作线程
-
rs != SHUTDOWN
这意味着线程池是
STOP or TIDYING or TERMINATED
状态,这几个状态都不可新增工作线程 -
rs == SHUTDOWN
&&firstTask != null
这对应了 线程池处于SHUTDOWN,不会接受新提交的任务(
firstTask != null
是excute 方法入参提交的任务) -
rs == SHUTDOWN
&&firstTask == null
&&workQueue.isEmpty
这是意味着队列中所有任务都运行成功了,当前调用的时候也不是提交新任务,接下来线程池将转变为STOP,不需要新增新线程去处理队列中的任务
-
-
内部for循环
这里分别看三个if
-
内部for循环第一个if
,防止突破核心线程数,或者最大线程数可以看到如果运行线程大于
CAPACITY(2的29方-1,因为前3为表示状态)
那么直接无法新增线程如果新增的是核心线程,那么不能大于核心线程数
如果新增的是非核心线程数,那么不能突破最大线程数
-
内部for循环 cas 新增工作线程数量
这里
compareAndIncrementWorkerCount
方法使用cas
更新工作线程数。我们要考虑下,第一个if 和这里的
compareAndIncrementWorkerCount
会不会出现第一个if确认不会突破线程数,但是准备运行第二个if的时候,其他线程新增了一个线程,然后第二个if还是成功cas增大线程数
的情况其实是不会的,我们要看下
c
这个局部变量是第一层for循环,进来的时候获取的,并没有在第一个if 和第二个if 中去更新c
,所有如果真发生这种情况,cas会失败。cas失败的话,会刷新c,然后会由内部for循环第一个if
确保不会突破线程数,如果cas成功那么会去真正新建工作线程 -
如果线程池状态变化
这时候会跳到外层循环,由
外层for中的第一个if
判断状态
3.2.2 无锁新增工作线程
这里很牛逼,太牛逼了!
我们说过,new 一个工作线程的过程,是不需要加速锁,jvm保证new的过程分配内存线程安全。所有doug lea,让这部分可以并发进行 (值得借鉴)。
这里新增的Worker对象,Worker是一个内部类,后面我们分析Worker是如何运行的时候,再看其内部结构。
3.2.3 同步维护HashSet<Worker>
等变量
接下来需要维护HashSet<Worker> workers
, int largestPoolSize
,并启动工作线程。largestPoolSize
记录了线程池曾经同时具备多少个线程,并使用一个HashSet存储工作线程
首先会上锁,然后重新检查下线程池的状态 (确保处于运行,运行可以接受新任务,新增工作线程,或者处于SHUTDOWN,但是不能是提交新任务)然后将维护HashSet<Worker> workers
, int largestPoolSize
这些属性,然后解锁。整个流程很简单,但是没什么doug lea要再次检查一次线程池运行状态昵?
因为上面的双层for,到这里的上锁,并非一个原子操作,可能在此期间由另外一个线程调用了关闭线程池的方法。
3.2.4 启动工作线程
可以看到只有worker被加到HashSet<Worker> workers
后才会,运行工作线程
3.2.5 如果Worker启动失败
在此方法的finally
中,如果worker启动失败,会调用addWorkerFailed
这里从工作线程集合中删除工作线程,自旋cas减少工作线程数目,尝试关闭线程池(这个方法内部会判断线程池状态,并不是尝试关就一定会关)这一步就是上面操作的回滚。
3.3.工作线程运行机制
上面我们看了,提交任务到线程池的流程,下面我们看线程池中的工作线程是如何处理任务的
3.3.1 Worker内部类结构
Worker这个类继承了AQS实现了Runnable接口,继承Runnalbe 比较好理解,毕竟Worker的职责就是从阻塞队列中不断获取任务执行。但是为什么Worker为什么要继承AQS昵?(这部分需要有AQS的基础,推荐学习JUC源码学习笔记1——AQS独占模式和ReentrantLock)
3.3.2 Worker属性
属性 | 描述 |
---|---|
Thread thread | 运行Worker#run方法的线程,从ThreadFactory中创建而来 |
Runnable firstTask | 当调用addWorker传入一个任务的时候,firstTask记录初始任务 |
long completedTasks | 当前工作线程完成的任务数量 |
3.3.2 Worker 构造方法
首先设置AQS状态为-1,然后调用线程工厂创建一个线程,且Runnable为自己,那么这个线程启动将执行Worker#run
方法
3.3.3 Worker运行任务
Worker的运行调用了线程池的runWorker方法,我们先忽略Worker中对中断的处理,专注于Worker从队列拿任务执行,然后执行的流程
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 忽略,后续解释这里的作用
boolean completedAbruptly = true;
try {
//不断从队列中运行任务,如果firstTask不为null 那么这里直接先执行firstTask
while (task != null || (task = getTask()) != null) {
w.lock();//忽略,后续解释这里的作用
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt(); //忽略,后续解释这里的作用
try {
//钩子方法 可以进行扩展
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//钩子方法 可以进行扩展
afterExecute(task, thrown);
}
} finally {
task = null;
//记录工作线程完成工作数
w.completedTasks++;
w.unlock(); //忽略,后续解释这里的作用
}
}
completedAbruptly = false;
} finally {
//如果工作线程运行的时候 抛出了异常 那么来到这里,做善后工作
//completedAbruptly = true => 我们提交的任务,其业务逻辑抛出了异常
processWorkerExit(w, completedAbruptly);
}
}
可以看到工作线程的职责,就是在While循环中不断的从阻塞队列那任务,然后调用beforeExecute
,然后运行我们向线程池中提交的任务,执行我们的业务逻辑,然后调用afterExecute
。如果运行过程中出现了异常或者当前线程长时间没有拿到任务——getTask返回null,那么会调用processWorkerExit
进行“善后工作”,此线程将被回收。
那么getTask什么时候会返回null
3.3.4 getTask从阻塞队列中拿任务
此方法负责从阻塞队列中获取任务,使用阻塞队列的poll方法,或者使用take方法,前者可以指定超时时长,如果超过时长没有获取到任务,那么返回null,0后者不会超时,如果没有任务一直等待,二者都是对中断敏感的(中断在唤醒之前,那么9重新获取阻塞队列的锁之后抛出中断异常,中断在唤醒之后,重新获取锁后恢复中断标识)(推荐学习:JUC源码学习笔记3——AQS等待队列和CyclicBarrier,BlockingQueue)。
private Runnable getTask() {
//获取任务是否超时
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果线程池为STOP TIDYING TERMINATED 那么cas减小线程数 return null
//如果SHUTDOWN 但是队列存在任务 不会cas减少,那么不会return
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
//如果允许核心线程超时被回收 那么为true 或者工作线程大于核心线程数会没有任务的时候会减少到核心线程数
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//如果工作线程大于最大核心数 或者 允许过期且获取任务超时
if ((wc > maximumPoolSize || (timed && timedOut))
//如果队列不是空至少保证wc大于1 那么减少后工作线程至少为1
&& (wc > 1 || workQueue.isEmpty())) {
//CAS 减少工作线程数
if (compareAndDecrementWorkerCount(c))
return null;
//如果CAS失败那么继续自旋
continue;
}
try {
//在`allowCoreThreadTimeOut = true(允许核心线程过期)`或者`工作线程数>核心线程数`的时候会使用超时poll获取任务
//反之使用无限期阻塞take方法获取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
//获取到任务 那么直接返回任务
return r;
//反之说明超时没有获取到任务
timedOut = true;
} catch (InterruptedException retry) {
//如果被中断那么把超时置为false 继续自旋
timedOut = false;
}
}
getTask方法,整体是一个自旋,自旋返回的情况,要么是线程池的状态导致不需要继续处理队列中的任务,要么是队列中线程在存活时间内还没有获取到任务。
-
第一if
这里if成立的的情况有两种,成立后会减少工作线程数,并返回null
-
线程池处于 stop ,tidying,terminated
这种情况下一般是用户调用了shutdownNow,这个方法导致线程池进入stop,并且返回没有执行的任务
所以这时候是不需要去处理线程池中的任务的
-
线程池处于shutdown 且队列没有任务
shutdown状态不处理新任务,但是处理队列中的任务,既然队列都没有任务了,那么可以返回null。
-
-
第二个if
- timed属性记录
是否允许线程过期
,在allowCoreThreadTimeOut = true(允许核心线程过期)
或者工作线程数>核心线程数
的时候会为true - timeOut属性记录
是否从阻塞队列中拿任务超时,拿任务的时间超过了keepAliveTime
接下来我们看下这个if成立的条件,和对应的意义
-
工作线程数超过了最大线程数,且工作线程数大于1
可以看作doug lea写兜底机制,反之工作线程数突破最大线程数,导致资源枯竭
-
工作线程数超过了最大线程数,且队列是空
同上
-
允许超时,且发生超时没有拿到任务,且工作线程数大于1
允许超时要么是核心线程允许过期,要么是工作线程数大于核心线程数,这时候工作线程长时间没有拿到任务,将返回null。之所以要求工作线程数大于1,是要确保队列中的任务有一个线程可以执行
-
允许超时,且发生超时没有拿到任务,且队列是空
基本同上
命中条件,那么会cas减少工作线程数量,成功那么返回null,这里
compareAndDecrementWorkerCount
没有自旋,因为这里失败了,会continue,说明存在多个线程将被回收,如果同时回收了,可能线程池直接没有线程执行队列中的任务了 - timed属性记录
-
从队列中获取任务
-
这里可以看出允许核心线程过期,和存活时间的作用。核心线程和非核心线程并没有特殊标记记录,而是如果不允许核心线程过期,那么在工作线程数小于等于核心线程的时候使用无限期take 保证核心线程没有任务至少阻塞于阻塞队列中,而不是返回null 导致核心线程过期
-
如果工作线程数大于核心线程数,或者允许核心线程过期,那么使用超时等待poll方法,这时候超过存活时间就返回null,线程将被“善后”
-
如果超时没有拿到任务,这时候timedOut 会为true,将继续自旋并可能命中
第一个if
或者第二个if
导致线程被回收 -
如果成功获取到任务,那么返回任务进行执行
-
如果在阻塞队列中获取的时候被中断,那么
timedOut = false
并且不响应中断
-
3.3.5 processWorkerExit 工作线程“善后”操作
在用户线程
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // 如果是由于用户业务逻辑错误,那么是没有减少线程数的
decrementWorkerCount();//自旋+cas减少
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();//获取锁
try {
//更新完成的任务数
completedTaskCount += w.completedTasks;
//从HashSet中移除
workers.remove(w);
} finally {
mainLock.unlock();
}
//尝试终止线程池
tryTerminate();
int c = ctl.get();
//如果线程是running 或者 shutdown
if (runStateLessThan(c, STOP)) {
//不是由用户异常导致的
if (!completedAbruptly) {
//线程最少数
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//确保最少有一个
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//不足一个
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//只要线程是running 或者shutdown都确保存在一个线程可以执行队列中的任务
//或者使用用户业务逻辑错误,导致的异常,那么补上一个线程
addWorker(null, false);
}
}
代码总共分为四步:
-
如果是用户业务逻辑错误,那么自旋+cas减少工作线程数
因为正常由于getTask返回null的情况,在getTask中就已经完成了减少工作线程数的操作
-
更新completedTaskCount和
HashSet<Worker>
更新completedTaskCount就是把当前工作线程完成的任务数加和
然后更新
HashSet<Worker>
-
尝试终止线程池
-
如果是用户业务逻辑错误导致的异常,那么补上一个线程。如果是由于长时间没有任务,但是回收这个线程后,队列又有任务了,那么确保线程池中有一个线程可以处理任务。
这都是建立在 线程池为running 或者 shutdown的情况下,因为其他状态队列中的任务都不需要去执行。
如果不是用户业务错误,工作线程由于等待超时进入,且线程池是running 或者shutdown的时候,会增加一个线程,这就是
线程池的保活
(这哪里是保活啊,这是替身)
3.4 shutdown关闭线程池,中断工作空闲线程
关闭线程池,如果还有任务没有执行完,那么任务还会执行,但是线程池将不接受新任务。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//检查权限相关
checkShutdownAccess();
//确保状态至少为SHUTDOWN
advanceRunState(SHUTDOWN);
//中断所有的空闲工作线程
interruptIdleWorkers();
//钩子函数 可以自行扩展
onShutdown();
} finally {
mainLock.unlock();
}
//尝试终止线程池
tryTerminate();
}
-
advanceRunState 使用自旋+cas确保状态至少为shutdown,因为存在其他线程调用shutdownNow,设置状态为stop的可能
-
interruptIdleWorkers
调用了interruptIdleWorkers(false),表示中断所有空闲的工作线程(tryLock成功表示工作线程空闲,这部分在
Worker 与中断
章节中详细解释) -
onShutdown
钩子方法,可以自行实现进行扩展
3.5 shutdownNow 关闭线程池,中断所有启动的工作线程
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//自旋+cas 确保状态为stop
advanceRunState(STOP);
//中断所有已经启动的工作线程,那怕这个工作线程在处理任务
interruptWorkers();
//将剩余的任务从队列中倒出来,吐给用户
tasks = drainQueue();
} finally {
mainLock.unlock();
}
//尝试终止线程池
tryTerminate();
return tasks;
}
-
advanceRunState 使用自旋+cas确保状态至少为stop,因为存在其他线程调用过shutdownNow并且线程池将终结(触发了tryTerminate)设置为Tidying 或者Terminate的可能
-
interruptWorkers
对每一个worker调用
interruptIfStarted
,只要工作线程启动了(满足getState>=0)那么进进行中断 -
drainQueue
调用阻塞队列的drainTo
方法将任务吐出来,如果调用完还有任务,那么使用遍历 + 删除的方式进行清理
3.6 tryTerminate 尝试关闭线程池
/**
* 尝试判断是否满足线程池中止条件,如果满足条件,将其推进到最后的TERMINATED状态
* 注意:必须在任何可能触发线程池中止的场景下调用(例如工作线程退出,或者SHUTDOWN状态下队列工作队列为空等)
* */
final void tryTerminate() {
for (;;) {
int currentCtl = this.ctl.get();
if (isRunning(currentCtl)
|| runStateAtLeast(currentCtl, TIDYING)
|| (runStateOf(currentCtl) == SHUTDOWN && !workQueue.isEmpty())) {
return;
}
// 有两种场景会走到这里
// 1 执行了shutdown方法(runState状态为SHUTDOWN),工作线程都空闲导致,
// 2 执行了shutdownNow方法(runState状态为STOP)
if (workerCountOf(currentCtl) != 0) {
interruptIdleWorkers(ONLY_ONE);
return;
}
// 线程池状态runState为SHUTDOWN或者STOP,且存活的工作线程个数已经为0了
// 虽然前面的interruptIdleWorkers是一个一个中断idle线程的,但实际上有的工作线程是因为别的原因退出的(恰好workerCountOf为0了)
// 所以这里是可能存在并发的,因此通过mainLock加锁防止并发,避免重复的terminated方法调用和termination.signalAll方法调用
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// cas的设置ctl的值为TIDYING+工作线程个数0(防止与别的地方ctl并发更新)
if (ctl.compareAndSet(currentCtl, ctlOf(TIDYING, 0))) {
try {
// cas成功,调用terminated钩子函数
terminated();
} finally {
// 无论terminated钩子函数是否出现异常
// cas的设置ctl的值为TERMINATED最终态+工作线程个数0(防止与别的地方ctl并发更新)
ctl.set(ctlOf(TERMINATED, 0));
// 通知使用awaitTermination方法等待线程池关闭的其它线程(通过termination.await等待)
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// 如果上述对ctl变量的cas操作失败了,则进行重试,再来一次循环
// else retry on failed CAS
}
}
-
第一个if
-
isRunning(currentCtl)
为true,说明线程池还在运行中,不可以关闭线程池 -
runStateAtLeast(currentCtl, TIDYING)
当前线程池状态已经大于等于TIDYING了,说明之前别的线程可能已经执行过tryTerminate,且通过了这个if校验,不用重复执行了 -
(runStateOf(currentCtl) == SHUTDOWN && !workQueue.isEmpty()))
当前线程池是SHUTDOWN状态,但工作队列中还有任务没处理完,也不满足中止条件,这时候不能关闭,还需要处理队列中的任务
工作队列没有任务的时候,这些线程getTask为null,就会调用
processWorkerExist
也会调用到tryTerminate
,这时候满足条件将自动关闭线程池
-
-
第二个if
来到这个if需要满足
执行了shutdown方法(runState状态为SHUTDOWN),且当前工作线程已经空了
or执行了shutdownNow方法(runState状态为STOP)
如果工作线程数不等于0,这里会调用
interruptIdleWorkers
中断一个空闲的线程。这个被中断的线程会
getTask方法返回null->processWorkerExit->tryTerminate
,这时候这个线程也会中断一个空闲的线程,从而达到一个接一个的终止,优雅的关闭资源 -
修改状态,唤醒由于 调用
awaitTermination
而被阻塞的线程-
这里上锁的原因是,也许线程是一个个停止的,然后突然有一个工作线程执行业务逻辑出现异常,调用
processWorkerExit
,也调用到tryTerminate
,恰好线程数为0,出现并发 -
修改状态,调用钩子方法,唤醒阻塞的线程
首先cas状态到Tidying,工作线程数为0,然后调用
terminated
钩子方法,然后设置为terminated,并且唤醒阻塞在termination
上的线程
-
3.7 awaitTermination 阻塞当前线程,直到超时,或者线程池关闭,或者被中断
代码不复杂,但是需要有AQS Condition的知识,才知道为什么这里会阻塞调用线程
JUC源码学习笔记3——AQS等待队列和CyclicBarrier,BlockingQueue
3.8. prestartAllCoreThreads 预热线程池
此方法会提前让线程池工作线程数到达核心线程数,这样的好处相当于10个外卖员等待接单,一旦单子(任务)提交,立马得到执行,减少了新建线程的耗时
3.9 submit 提交一个任务
submit 底层还是依赖excute ,但是它会先将任务包装FutureTask,方便调用者来控制任务的执行,取消,获取异步执行结果。FutureTask本身就是一个任务,也是异步执行的结果 ,FutureTask就如同一个纽带,连接了任务 和 任务的结果
(FutureTask 学习: JUC源码学习笔记7——FutureTask源码解析,人生亦如是,run起来才有结果)
4.Worker与中断
这里我们主要分析,worker为什么需要继承AQS,以及Worker中state代表什么,worker在不同工作状态被中断会如何
4.1 Worker状态
-
构造的时候为-1
-
runWorker对状态的变更
unlock会调用Worker的tryRelease,设置为0
lock会调用Worker的 tryAcquire,cas修改state从0到1,如果失败会阻塞在AQS同步队列中
我们可以看到
state =1 意味着worker在执行业务逻辑
,state=0意味着worker处于空闲
,
4.2 shutdown 与工作线程的交互
shutdown 会中断空闲的线程,并对空闲线程进行回收。怎么识别一个线程是空闲线程昵,怎么让空闲线程被回收昵?
上面讲shutdown
方法时候,我们了解到 shutdown 首先自旋+cas 确保线程池状态到达 SHUTDOWN,然后调用interruptIdleWorkers
中断空闲线程,这个方法会调用到interruptIdleWorkers(false)
其中的false表示中断所有空闲线程,而不是一个
这里需要品一品,为什么tryLock成功,就意味着当前工作线程是空闲的。上面我们说到工作线程执行任务的时候会先执行lock,改变状态为1,然后开始执行业务逻辑,这里的tryLock会cas状态从0到1,如果成功了,意味着cas的这一瞬间工作线程是空闲的。
这是工作线程也许阻塞与getTask方法,也可能刚刚拿到任务,准备lock但是比shutdown慢。
-
工作线程阻塞与getTask
中断工作线程,会导致原本阻塞与阻塞队列的线程抛出中断异常
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); //3 发现 线程池是shutdown ,如果这时候阻塞队列还没任务,那么会自旋减少工作线程数,返回null if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { //1.从这里 抛出中断异常 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { //2.在这里被捕获 //将继续自旋,来到3 timedOut = false; } } }
顺着代码中的1,2,3看,最终getTask返回null,线程会从
runWorker
中的while循环退出,执行processWorkerExist
,从而实现空闲线程的回收 -
准备lock 但是 比shutdown慢
这时候,线程从getTask刚刚拿到任务,但是准备lock,被shutdown方法强占先机,导致lock获取锁失败,而阻塞与锁,只有等shutdown释放自己worker这把锁才能返回,但是这时候工作线程被中断了。
注意这时候返回以及被中断,doug lea不能让这个中断带到用户的业务逻辑中,因为这样会影响到业务逻辑(用户代码中根据中断也许有不同的逻辑)所以有下面这段代码
这段代码的作用是,如果线程池停止了(stop tidying terminated)那么一定确保工作线程被中断,但是如果不是那么一定确保线程不被中断
这段代码,我做了一点点排版调整,逻辑不变,如下
//1. (runStateAtLeast(ctl.get(), STOP) || //2. (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && //3. !wt.isInterrupted()
其中1,2是或的关系,3和(1或2)是且的关系。如果整个为true 那么会中断当前下次你,我们详细分析下
-
如果线程池停止,那么1 为true,如果这时候工作线程没有中断,那么工作线程会被中断
-
如果线程没有停止,这是1为false,来到2,首先
Thread.interrupted()
清除中断标志,返回之前有没有被中断。如果线程池没有停止,但是之前被中断了,这里会清除中断标识,这样实现了 ——线程池没有停止,那么确保线程不被中断。如果之前被中断,那么说明是shutdown ,或者 shutdownNow,或者用户业务逻辑进行的中断,这时候且
runStateAtLeast(ctl.get(), STOP)
成立,那么说明线程需要中断,那么这是再次进行中断(整体为true if中的逻辑就是中断)有趣的是,为什么doug lea写两次
runStateAtLeast(ctl.get(), STOP)
?这是
(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))
不是一个原子操作,可能我刚清除中断了,这时候shutdownNow,成功改变状态为STOP,这时候,其实需要对线程进行中断(在源码注释中doug lea称之为清除中断和shutdownNow的race(竞赛)
十分生动形象了)
-
至此我们理解了 中断对于工作线程的意义,其中关键的一点是,中断能让阻塞于阻塞队列中的线程,重新自旋从而来检查线程池状态,达到如果shutdown,shutdownNow执行了,工作线程会从阻塞,到自旋检查线程池状态从而让getTask返回null,达到工作线程回收的目的(doug lea 牛逼!)
4.3 shutdownNow 和 工作线程的交互
shutdownNow,不关工作线程是空闲还是运行都会进行中断,而且这个中断会传播到我们提交的业务逻辑中
shutdownNow 会首先改变状态为stop然后调用interruptWorkers
,这个方法会调用每一个Worker的interruptIfStarted
可以看到只要state >=0 都可能被中断,只有Worker刚new出的来的时候是-1,一旦执行runWorker,首先第一个事情就是修改状态为0,这时候就可能被shutdownNow中断。
这中断会让空闲的线程从getTask返回null,然后线程进入回收。让刚拿到任务准备执行的线程将中断带到业务逻辑中,让正在运行业务逻辑线程被中断(为什么能带到业务逻辑中?见4.2 shutdown 与工作线程的交互 中的 准备lock 但是 比shutdown慢
)。
这个中断可以看作是线程池和我们业务逻辑的通信 —— 爷关闭了,你好自为之
4.4 线程池的优雅停
如果当前线程池有一百个线程,我上来一个shutdownNow, 让线程池关闭,我能立马回收一百个线程么。显然是不行的,也不能说我线程池先修改为停止状态,线程爱咋咋地,这种不负责任的行为也是不行的,线程池需要等待池中所有工作线程为0,才能停止自己。
那我们来看看doug lea如何实现优雅停
tryTerminate
方法 在新增worker 失败
,或者shutdown执行
,或者shutdownNow执行
等情况的时候,会被调用
所以上来就是一个判断,如果线程池为运行,那么不能停止;如果已经是TIDYING说明有线程已经将线程池停止了,不需要再次执行;如果是shutdown但是队列有任务,那么需要执行队列中的任务,也不能停止线程池。
精彩的在于 workerCountOf(c) != 0
这是会中断一个空闲的线程,为什么只中断一个啊,为什么不都中断?
如果全部中断,这些线程都会从getTask中拿到null 然后调用processWorkerExist,然后并发执行terminate,从某种程度上cpu遭了殃,不够优雅。
中断一个可以让其中一个空闲执行processWorkerExist 然后调用tryTerminate
,继续执行一个空闲的线程,然后循环往复,直到所有工作线程调用processWorkerExist 进行回收后,才能到下面修改状态为TIDYING的逻辑。
doug lea 在源码注释中 说 中断一个空闲线程,确保信号的传播
就是这个意思,doug lea 牛逼
四丶问题解答
-
线程池往往提交任务,等操作都是并发调用的,doug lea如何实现线程安全 和 高效率
首先doug lea 使用一个自旋 +cas的操作,确保成功增加了工作线程数后,才能继续创建线程的操作,并且这个自旋判断了线程池状态是否能接受新任务,是否能新建工作线程,相当于一把自旋锁,避免阻塞挂起的性能消耗。如果成功实现了工作线程数的增加,就如同占据的名额,接下来使用线程工厂创建线程的步骤是不加锁的,提高了并发。将线程放入worker集合 使用了ReentrantLock ,启动线程的操作又是不加锁的,通过这种缩小锁的粒度的思想,提高并发执行效率。
-
线程池 的 shutdownNow和shutdown的区别
前者会修改线程池状态为stop并中断所有启动的线程(工作线程刚新建的使用state = -1,调用runWorker首先设置状态为0,视为已经启动,如果开始执行任务那么修改,如果执行任务首先cas修改状态为1)所有state >=0 的线程都会被中断,且中断可以在用户定义的任务中感知到,并且会把任务队列中的任务通过阻塞队列
drainTo
方法倒出来给用户。后者会修改线程池状态为shutdown,然后中断所有空闲的线程,使用tryLock cas修改状态从0到1,如果成功视为工作线程为空闲。
-
线程池停止的时候,如何确保所有工作线程回收后才停止线程池本身
tryTerminate方法负责停止线程池,会检查工作线程数,如果不为0,那么中断一个空闲的线程。中断工作线程的作用会让阻塞于getTask方法的工作线程,重新自旋,从而判断线程池状态,如果停止那么返回null,如果shutdown且阻塞队列为空,也返回null,从而让工作线程从runWorker方法while结束,执行processWorkerExist进行线程回收,processWorkerExist方法又会调用到tryTerminate,继续中断一个空闲线程,直到工作线程数为0,这时候才会修改状态为TIDYING,然后执行
terminated
方法,然后设置状态为terminated状态。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/202350.html