线程池
1. 线程池的创建
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize: 最小核心线程数;
maximumPoolSize: 最大核心线程数;
keepAliveTime: 空闲线程最大存活时间;
unit: 存活时间的单位:秒/分/时…;
workQueue: 任务队列,用来存放提交的任务;
threadFactory: 线程工厂用来创建线程池中工作的线程;
handler: 拒绝策略,当线程数达到最大线程数,在提交任务线程池该如何操作。那么就由拒绝策略来决定,一共有4种,默认采用的是抛出异常。其他3种下面会进行介绍。
2. 线程池的工作原理
这里大致介绍线程池的工作原理,具体的细节需要查看源码,在以下的源码介绍中会进行详细介绍。假设我们提交的任务为while(1)无限循环,会一直执行下去。
线程池的工作原理:
a. 在初始化线程池的时候,线程池的核心线程数和最大核心线程数都为0,任务队列为空;
b. 当当前线程第一次通过execute()方法提交任务的时候,线程池会创建一个核心线程,来执行任务。
c. 当提交的任务超过核心线程数的时候,线程池会把多余的任务加入到任务队列中;
d. 当任务队列被填充满了的时候,在提交其他任务,线程池会创建最大核心线程来处理任务。
e. 当创建的线程超过最大核心线程数的时候,继续提交任务会走相应的拒绝策略。默认拒绝策略就是报错。
3. 线程池的状态转换
线程池共有五种状态:RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED。
private static final int RUNNING = -1 << COUNT_BITS; // 111-00000 00000000 00000000 00000000
private static final int SHUTDOWN = 0 << COUNT_BITS; // 000-00000 00000000 00000000 00000000
private static final int STOP = 1 << COUNT_BITS; // 001-00000 00000000 00000000 00000000
private static final int TIDYING = 2 << COUNT_BITS; // 010-00000 00000000 00000000 00000000
private static final int TERMINATED = 3 << COUNT_BITS; // 011-00000 00000000 00000000 00000000
其中在线程池创建的时候初始状态为RUNNING。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 111-00000 00000000 00000000 00000000
2、SHUTDOWN状态:不接收新任务,但能够处理已添加的任务。调用线程池的shutdown()方法,线程池由RUNNING -> SHUTDOWN;
3、STOP状态:不接受新任务,也不在处理已添加的任务,并且会中断正在处理的任务。调用线程池的shutdownNow()方法,线程池由(RUNNING or SHUTDOWN) -> STOP;
4、IDYING状态:当所有的任务已终止,ctl记录的 “任务数量” 为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。
terminated()方法在线程池中的实现是空的,若用户想在线程池变为TIDYING时,进行相应的处理,可以通过重载terminated()函数进行实现。
当在SHUTDOWN状态时,阻塞队列为空并且线程池中工作的线程数也为 0 时,就会由SHUTDOWN -> TIDYING。
当在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING;
5、TERMINATED状态:线程池彻底终止,就会变为TERMINATED状态。线程池处于TIDYING状态时,执行完terminated()后, 就会由TIDYING -> TERMINATED。
4. 线程池源码解析
可能会奇怪,一个整数类型,怎么表达两个意思呢?这就是它设计的巧妙之处,采用的是位操作,前 3 位用来表示线程池的状态。后 29 位表示创建的线程数。
// ctlOf 进行与操作,wc 都为 0, 所以结果由 rc 来决定
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 111-00000 00000000 00000000 00000000
private static int ctlOf(int rs, int wc) { return rs | wc; }
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 获取线程池的状态,这个状态包括 线程池的状态(高3位)| 工作的线程数 (低29位)
int c = ctl.get();
// 计算工作线程数是否小于最小核心线程数
if (workerCountOf(c) < corePoolSize) {
// true表示创建最小核心线程数的逻辑,一个工作线程 可以看成对应一个 Worker对象
if (addWorker(command, true))
// 表示创建成功
return;
c = ctl.get();
}
// 判断线程池的状态是否为Running状态,如果为true才加入到队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get(); // 再次进行判断,确保线程安全
if (! isRunning(recheck) && remove(command)) // 如果当前线程池不是运行状态,则移除队列中的任务
reject(command); // 使用拒绝策略
else if (workerCountOf(recheck) == 0) // 如果工作线程数为0, 则需要在创建新的线程进行处理任务
addWorker(null, false);
}
else if (!addWorker(command, false)) // 绑定一个非核心线程数 (创建最大核心线程数逻辑)
// 如果创建最大核心线程数失败,使用拒绝策略
reject(command);
}
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
/**
* 如果 rs >= SHUTDOWN 表示当前线程池不可能在接收任务了
*
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
!workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// core 为 true,表示是最小核心线程数的步骤, 如果 wc >= corePoolSize, 表示无法在创建最小核心线程数了
// maximumPoolSize 也是一样,表示无法创建最大核心线程数了
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 先把工作线程数CAS + 1 操作
if (compareAndIncrementWorkerCount(c))
break retry; // 跳出 retry 标记层的循环
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
// 一个Worker 包含一个线程和一个任务
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 获取不到锁,在此阻塞
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 把工作的线程添加到Set集合中
workers.add(w);
// 记录线程池中工作线程的值
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 任务开始执行,实现在 Worker中的run方法
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Worker 工作线程类的具体实现。类中有两个非常重要的属性一个是创建的线程和提交的任务。也就是线程和任务绑定到一块。其中构造方法getThreadFactory()获取 线程工程默认DefaultThreadFactory类的实现,然后调用newThread方法创建一个线程,参数是Worker 类自身。
也就是说,当线程启动的时候 (在addWorker 方法的 t.start() 完成) ,会调用 Worker的run方法实现。
而 Worker 的run方法调用 runWorker 方法。其中 runWoker 方法是线程池工作的主要方法。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
...
Worker(Runnable firstTask) {
...
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(),0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
线程池工作的主要实现 就是在 runWorker 方法中。去掉一些不重要的代码,主要看它的 while 的循环条件和 task.run(); 方法。线程池能够不停的进行工作原理,就是依赖阻塞队列完成的。当前线程如果完成了提交任务,会从阻塞队列中进行拉取任务,如果阻塞队列任务为空,那么当前线程就会阻塞。直到往阻塞队列里面添加任务,即可唤醒线程继续工作。
注意,当创建的核心线程数已满的时候,执行的任务已完成并且任务队列为空,那么创建的核心线程数拉取任务的时候就会阻塞,不会被释放掉。当再次提交任务的时候,不会在创建核心线程数了,而是唤醒阻塞的核心线程数继续从阻塞队列拉取任务,进行执行,如果执行完后,任务队列又为空,那么核心线程数再次阻塞。依次循环往复,到达线程复用的效果。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// task != null 表示是创建新的 Worker线程
// 如果 task == null, 那么会走 task = getTask() 表示该线程之前的任务已经执行完成了,从任务队列里获取任务
// 通过循环不断的从当前提交的任务和从任务队列获取任务进行工作
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); // 扩展程序,未实现
Throwable thrown = null;
try {
task.run(); // 直接调用run, 而不是通过线程自身回调的
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); // 扩展程序,未实现
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
线程复用的实现就是 getTask 方法。最大核心线程数过期的具体实现也在 getTask 方法中。如果 timed 为 true,说明当前线程数超过最小核心线程数(启用了最大核心线程数),会从阻塞队列中拉取并设置过期时间。
如果在指定时间内,拉取不到就会解开阻塞,表示任务队列中没有任务。那么最大核心线程数就可以关闭了。把timedOut 值为 true,在下一次的 for(;;) 的循环中调用compareAndDecrementWorkerCount 把 ctl 的线程数减1,并返回null。当返回为 null 的时候 runWorker的 while 条件就不满足了,会直接跳出去,这个函数也就执行完成了,那么对应的线程也就释放了。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
// 是否为非核心线程数, (allowCoreThreadTimeOut 如果为ture 是允许关闭核心线程数的)
// 保证线程数 符合核心线程数即可
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// wc > maximumPoolSize 大于最大核心线程数
// timed && timedOut (timed 为 true 表示当前线程数大于最小核心线程数) (timedOut 表示任务队列为空,并超过当前线程的最大存活时间)
// wc > 1 表示有可以剔除的线程数量
// 表示队列为空,目前没有任务
if (compareAndDecrementWorkerCount(c)) // 剔除一个线程数
return null;
continue;
}
try {
// 当阻塞队列为空的时候,如果不设置过期时间,在此会进行阻塞
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
本篇文章来源于微信公众号: 程序员秋田君
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/10848.html