人生苦短,不如养狗
一、前言
在上一篇Java多线程探索(一):为什么要使用ThreadPoolExecutor?中我们简单介绍了为什么推荐使用ThreadPoolExecutor的原因。今天我们就来具体分析一下ThreadPoolExecutor的工作原理。
二、ThreadPoolExecutor总览
在探索具体工作流程之前,我们先来看一看ThreadPoolExecutor
比较重要的成员变量、构造函数和几个重要的内部类。
(一)成员变量
// 主要的线程池控制状态变量,用32位二进制表示
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 阻塞队列,用于保存等待执行资源的任务,可以自行选择相应的阻塞队列
private final BlockingQueue<Runnable> workQueue;
private final ReentrantLock mainLock = new ReentrantLock();
// 用于保存线程池中所有的worker线程,只有持有mainLock才可以获取
private final HashSet<Worker> workers = new HashSet<Worker>();
// 用于支持awaitTermination()方法的等待条件
private final Condition termination = mainLock.newCondition();
// 用于追踪最大可获得的线程池的大小
private int largestPoolSize;
// 用于记录已经完成的任务数量
private long completedTaskCount;
// 线程池工厂,所有的线程都是通过addWorker()来创建
private volatile ThreadFactory threadFactory;
// 当线程池和阻塞队列满或者线程池被关闭时会启用handler
private volatile RejectedExecutionHandler handler;
// 空闲线程保持时间
private volatile long keepAliveTime;
// 核心线程池大小,即保持存活的最小worker的数量(不会超时,除非设置了allowCoreThreadTimeOut,此时最小值为0)
private volatile int corePoolSize;
// 最大线程池大小
private volatile int maximumPoolSize;
以下只介绍其中需要理解的成员变量ctl
,其余的大家可以参照注释看一下。ctl
:该变量是一个原子类型的整型变量,实际存放了32位的二进制数,主要用于表示两个概念性的属性——workerCount和runState,其中高3位用于表示runState,低29位用于表示workerCount。
线程池生命周期(状态流转)
和线程一样,线程池也是有生命周期的。如上文所说,ctl
中存放了两个属性,其中runState用于表示线程池的状态,一共有五种状态:RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED。以下是线程池五种状态的流转关系图,也就是线程池的生命周期:
RUNNING
:此时线程池能够接受新的任务,并且能够处理阻塞队列中的任务。SHUTDOWN
:此时线程池不接受新的任务,只处理阻塞队列中的任务。STOP
:此时线程池既不接受新的任务,也不接受阻塞队列中的任务。TIDYING
:此时所有的任务都被终止,workerCount等于0,正在转变成TIDYING状态的线程将会执行terminated()
钩子方法。TERMINATED
:此时terminated()
执行完毕。
(二)构造函数
ThreadPoolExecutor
中有许多构造函数,这里我们只看参数列表最全的那个:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
首先看下参数列表,前五个参数中规中矩,最后两个参数ThreadFactory
和RejectedExecutionHandler
都是接口类型的参数,这就意味着,我们可以根据项目中的实际情况来实现对应的接口,使用自己的实现类来进行初始化。
具体看下方法体。首先,进行了参数校验,corePoolSize
、maximumPoolSize
和keepAliveTime
不能小于0,maximumPoolSize
必须大于corePoolSize
,否则抛出IllegalArgumentException
异常。workQueue
、threadFactory
和handler
不能为null,否则抛出NullPointerException
异常。完成参数校验之后就是初始化参数。
(三)几个重要的内部类
在ThreadPoolExecutor
中定义了五个内部类,分别是AbortPolicy
、CallerRunsPolicy
、DiscardOldestPolicy
、DsicardPolicy
和Worker
。
前四个内部类是ThreadPoolExecutor
提供的拒绝策略,也就是当线程池和阻塞队列都已经满了之后,新到的线程应该如何处理的策略。当然,大家也可以根据自己的实际业务需求实现RejectedExecutionHandler
定制化自己的拒绝策略。
Worker
内部类可以说是ThreadPoolExecutor
相当核心的一个内部类。它的主要作用是用于维护正在运行的任务的中断控制状态,并维护一些次要的信息。这句话是翻译自源码的注释,感觉其实还是没有解释清楚Worker
到底是干什么的。其实Worker
维护中断控制状态的目的是为了保证运行中的任务不被中断。是不是还有点迷糊,不要慌,让我们来看看Worker
到底长什么样子:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
// 这里将state设置为-1是为了阻止中断,直到执行了runWorker方法
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
// 可以看到这里和ReentrantLock的区别,只能获取锁一次,即不可重入
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
可以看到Worker
既继承了AQS
,又实现了Runnable
接口。通过继承AQS
,Worker
定制化的实现了一个简化版的ReentrantLock
,其中最大区别就是Worker
是不可重入的独占锁。而通过实现Runnable
接口,Worker
又能像线程一样进行工作,作为线程池中任务运行的基本单位。这也就是为什么在它的构造方法中,能够通过指定的线程工厂和this对象创建对应工作线程。
现在我们在回头来看下上面所说的维护了运行任务的线程的中断控制状态,Worker
是如何去进行中断控制的呢?这里Worker
继承AQS
实现了一个不可重入的独占锁,通过这个锁,Worker
可以判断当前线程是否可以进行中断,如果当前线程获取到了该锁,说明当前线程正在运行,不能进行中断。而如果当前线程是空闲状态,也就是无锁状态,那么就可以对其进行中断。
再来看下Worker
的不可重入,其实还是为了保证运行中的线程不会被中断。举个例子,当任务在调用像setCoreSize()
这样的线程池控制方法时,会执行这样一个方法 interruptIdleWorkers()
:
public void setCorePoolSize(int corePoolSize) {
、、、
if (workerCountOf(ctl.get()) > corePoolSize)
interruptIdleWorkers();
、、、
}
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 如果可以重入,则会导致进入到这个代码块
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
可以看到,如果可以重入的话,那么if (!t.isInterrupted() && w.tryLock())
判断语句就会为真,此时就会进入到代码块中执行中断方法,最终会导致自己将自己中断。
到这里,应该对Worker
有了一个大致的了解,下面我们结合一个小例子来具体看一下Worker
作为ThreadPoolExecutor
的基本执行单位到底是如何工作的。
三、还是那个简单的小例子
再来回顾一下上一篇文章中我们使用的小例子:
通过这个例子我们来分析一下ThreadPoolExecutor
是如何工作的。
(一)ThreadPoolExecutor的创建过程
在上面这个例子中,创建线程池实际上调用的就是上文中提到的构造方法,这里闲鱼自己实现了一个简答的线程工厂TestThreadFactory
:
public class TestThreadFactory implements ThreadFactory {
/**
* 姓名前缀
*/
private final String namePrefix;
private final AtomicInteger nextId = new AtomicInteger(0);
public TestThreadFactory(String whatFeatureOfGroup){
this.namePrefix = "From TestThreadFactory's " + whatFeatureOfGroup + "-Worker-";
}
@Override
public Thread newThread(Runnable task) {
String name = namePrefix
+ nextId.getAndIncrement();
Thread thread = new Thread(null, task, name, 0);
System.out.println(thread.getName());
return thread;
}
}
在实际工作中,建议大家也可以类似去实现一个线程工厂,对于创建的每一个线程都能有一个明确的名字,这样在进行问题排查时会较为方便。其他参数都是比较基础的设置,这里就不再赘述。
(二)ThreadPoolExecutor的执行过程
下面开始最重要的部分,线程池到底是如何执行的?
可以看到,代码中调用了这样一个方法execute(Runnable task)
,这个方法就是线程池任务执行方法,下面我们来具体看一下:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
execute()
方法的执行流程,一共分为三步:
-
首先判断当前正在运行的线程数是否少于 corePoolSize
,如果小于,则根据给出的command
创建一个新的线程,并将其作为它的第一个任务。(此时可以认为创建的是核心线程) -
如果当前运行的线程数不少于 corePoolSize
,那么就尝试将任务添加到阻塞队列中。如果添加成功,还会再进行一次检查是否需要创建一个新的线程。如果检查通过,同时根据workerCountOf(recheck)
获得值为0,则开启一个新的线程。 -
当入队失败后,此时会再次尝试添加一个新的线程,如果添加失败,则根据任务给出的拒绝策略来执行 reject(command)
方法。
上面主要是用于进行任务创建的控制,具体任务的创建则是通过addWorker()
方法,具体方法如下:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 在这里进行线程池状态的检查和队列是否为空的检查
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
addWorker
方法可以分成两部分,一部分是进行worker数量检查和增加操作,一部分是实际进行worker创建。在这两个过程中都进行了多次的线程池状态检查和线程池已运行数量检查,具体代码这里就不进行过多的解释,这里我们主要来看一下方法参数列表中的core
变量。
通过代码wc >= (core ? corePoolSize : maximumPoolSize)
可以发现,core
变量是用于判断当前准备创建的线程是属于核心线程还是非核心线程,也就是超过corePoolSize
部分的线程。
看完了线程创建部分,下面就到了线程运行。在上面的代码中,调用了t.start()
方法。再往上看,我们可以发现实际上这个线程是Worker
中的线程,所以最终线程运行会调用Worker
中的run()
方法。
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 启动任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
// 完成任务数加一
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
通过上面的代码可以看到,最终任务的执行是在runWorker()
方法中完成的。整体来看,runWorker()
方法首先释放了锁来保证允许中断,然后在循环中进行实际的任务启动(自旋以保证任务执行)。在循环中进行任务执行时还提供了beforeExecute(wt, task)
和afterExecute(task, thrown)
两个可自行扩展的方法,也就意味着,我们可以自行设计线程运行前和运行后需要执行的操作。在方法的最后会进行worker的退出操作。
这里需要注意的是一开始进行的解锁和后续的加锁操作。当使用Worker
的构造方法进行对象创建时,此时Worker
对象中同步状态state
为-1。而当使用了unlock()
方法之后,则会将同步状态state
更改为0,此时后续的加锁操作才能继续进行。除此以外,这里进行解锁的另一个目的是为了保证其他线程在调用ThreadPoolExecutor
的interruptIdleWorkers()
和interruptWorkers()
方法能够进行中断Worker操作。而加锁操作则是为了保障任务执行的完整性。
最后会调用processWorkerExit(w, completedAbruptly)
方法进行Worker退出操作。
总结
通过上面的分析,相信大家对于线程池ThreadPoolExecutor
的使用和线程池中线程的运行应该有了基本的了解。其中内部类Worker
大家一定要自行对照源码和编写案例进行深入理解。
以上只是讲解了线程池运行大致的原理和闲鱼自己的理解,要想更好的使用还需要大家在项目中具体去实践。
原文始发于微信公众号(Brucebat的伪技术鱼塘):Java多线程探索(二):优秀的ThreadPoolExecutor到底是如何工作的?
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/90212.html