在上一篇文章《Java 线程池详解(上)》中,详解介绍了线程池的创建以及核心参数,在介绍核心参数的过程中,其实已经把线程池工作的原理简单介绍了一遍,这篇文章从线程池源码的角度来看分析线程池是如何执行任务的
一、execute()方法
首先拿到线程池状态控制变量ctl的值,它是一个AtomicInteger类型,所以先获取它的value值
workerCountOf()方法来计算线程池中线程的数量,计算方式也很简单,因为CAPACITY
低29位都是1,用ctl
的值与CAPACITY
的做与运算,因为CAPACITY
高3位都是0,所以结果的高三位也是0,ctl
在初始化的时候是拿RUNNING
的值与0做或运算,得到的低29位都是0,所以这个时候,ctl
与CAPACITY
与运算就可以保留ctl
低29位的数据,然后高3位是0,就可以表示当前线程池中线程的数量了
如果当前线程数是否小于核心线程数,则调用addWorker()方法来创建一个线程,同时把任务也传了进去
如果当前线程池处于运行状态并且任务添加到阻塞队列后,需要再次判断线程池的状态,因为在添加任务到队列的过程中,线程池状态可能会发生变化,再采取下面的措施:
- 如果添加完任务后线程池状态发生了变化,就从队列中将刚才添加的任务移除,移除成功之后,调用线程池的拒绝策略来处理该任务;
- 如果添加完任务之后,线程池状态没有发生变化,就去判断当前线程池是否有线程,如果没有,就去创建一个空任务的线程,让线程从队列中获取任务来执行。这么做的目的是为了保证当线程池处于RUNNING状态时,至少要有一个线程在执行。
如果不满足上面的条件,就去位任务创建临时线程,如果创建失败,调用拒绝策略来处理任务
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
二、addWorker()方法
在execute()方法中,都是通过addWorker()方法来创建线程的,下面看一下该方法的具体实现,我们把addWorker()方法分为两部分来介绍,第一部分用于判断是否可以创建线程,第二部分创建线程
2.1 判断是否可以创建线程
第一种判断情况对应于上面execute()方法的最后一种情况,如果线程池的状态不是处于RUNNING状态,那么也会采用拒绝策略来处理任务,并不说只有线程池饱和和才会执行拒绝策略
在第二层for
循环里面,如果线程池中线程的数量超过了线程池能表示的最大值,或超过了核心线程数,或超过了设置的最大线程数,都不会再创建新的线程。
接着compareAndIncrementWorkerCount()方法在for
循环自选,通过CAS来对当前线程数进行+1
操作,同时需要判断当前线程池的状态于之前获取的线程池状态是否发生了变化,如果发生了变化则通过类似go-to
的调用再回到开始的地方
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
……
}
2.2 创建线程
调用new Worker()
创建一个线程,创建成功之后,紧接着即使直接加锁,加锁的目的是为了要更新一些线程池的参数信息,比如当前最大的线程池大小。
加锁后,如果当前线程池处于RUNNING状态或者rs == SHUTDOWN && firstTask == null
(表示处理队列里面的任务),就判断当前线程是否存活,因为到现在为止,只是创建了Thread对象,还没有通过start()方法真正去创建线程,所以如果此时线程处于激活状态,说明是有问题的,直接抛出异常
如果没有问题,就把worker对象添加到线程池的workers
属性中,最后再调用线程的start()方法真正去创建一个线程
private boolean addWorker(Runnable firstTask, boolean core) {
……
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
在Worker()构造方法中,通过ThreadFactory的newThread()方法创建一个线程,然后将任务和线程封装成一个Worker对象
Worker(Runnable firstTask) {
// 禁止线程中断
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
三、runWorker()方法
线程启动后,就会去调用Worker的run()方法,而run()方法会去调用runWorker()方法
我开始比较疑惑的是为什么在addWorker()方法启动了线程之后会去调用Worker里面里面的run()方法,后来我在Worker的构造方法中找出来了,newThread()方法中入参是this
。关于线程start()方法调用后为什么会去执行run()方法,可以参考文章《Java 线程详解(上)》中关于线程创建原理的介绍
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
下面重点看一下runWorker()方法的实现
首先调用unlock()方法,将当前Worker的state从-1变成0,意思是运行中断
然后从Worker中取出任务,或者调用getTask()方法从任务队列中获取任务,只要有任务还没执行,while
循环里面的代码都可以执行
获取到任务之后,会去判断当前线程池的状态,如果线程池的状态大于STOP状态并且当前线程不处于中断状态,就去调用线程的中断方法
如果线程池状态正常,就可以去执行任务的run()方法了,但是在执行run()方法前后,会去执行beforeExecute()和afterExecute()方法,但在ThreadPoolExecutor线程池中,这两个方法都没有实现。如果我们自定义线程池的时候,可以通过这两个方法来实现一些其他的逻辑
当所有的任务都执行完了之后,会去调用processWorkerExit()方法处理Worker的退出
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
四、getTask()方法
如果线程池状态大于SHUTDOWN,并且任务队列为空,就把线程池中线程的数量-1,然后退出循环
否则就获取线程数,判断是否需要超时销毁,如果当前线程数大于线程池最大允许的线程数,或需要超时销毁,在进行后面的判断,如果线程数大于1或任务队列为空,就将当前线程池中的线程数量-1
如果上面的条件都不满足,就从任务队列中获取任务,根据是否需要超时销毁调用不同的获取任务的方法,poll()方法可以指定等待多长时间时间,如果超过了等待时间任务任务队列任务没有任务就返回null,而take()方法在任务队列为空时会一直等待下去,直到拿到任务为止
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
五、processWorkerExit()方法
runWorker()方法中,任务都处理完成之后,回来调用该方法,而该方法的的实现也比较简单,首先加锁,将完成任务的Worker从workers中移除**(这个时候是只要完成任务了就会被移除,不会考虑核心线程数这些问题)**
接着会调用tryTerminate()去判断是否需要终止线程池
最后则是去计算线程池需要保留多少线程,如果没有开启核心线程也销毁,就就需要保留核心线程数的线程,前面已经调用workers的remove()方法将之前创建的线程移除了,所以,对于需要保留的线程数,都是再通过addWorker()方法重新创建新的线程
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
六、tryTerminate()方法
如果线程池状态处于运行状态,或者大于TIDYING状态,或者处于SHUTDOWN状态但任务队列不为空,这些情况都不会终止线程池
如果线程池中的线程数大于0,则随意中断一个线程,在前面processWorkerExit()方法中,只是将Worker移除了,并没有处理Worker里面的线程,在tryTerminate()方法的interruptIdleWorkers()方法中会去中断一个线程
如果上面的情况都不满足,就会试图去将线程池的状态改为TIDYING,如果修改成功了,就会去调用terminated()钩子方法(这是一个空方法,没有任何实现),最后将线程池的状态更改为TERMINATED,然后唤醒所有阻塞在任务队列的线程
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
七、ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor是一个比较特殊的线程池,它可以支持延时执行,但它继承自ThreadPoolExecutor类,所以它具有所有普通线程池的功能,除此之外,它还提供了三个scheduleXxx()方法,可以用来指定延时的时间,同时它的任务队列使用DelayedWorkQueue
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(10);
scheduledThreadPool.schedule(new RunnableTest(),2,TimeUnit.SECONDS);
scheduledThreadPool.scheduleWithFixedDelay(new RunnableTest(),0,2,TimeUnit.SECONDS);
scheduledThreadPool.scheduleAtFixedRate(new RunnableTest(),0,1,TimeUnit.SECONDS);
class RunnableTest implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
}
如上面所示,有三种方法来延时执行任务,但这三种方法各不相同,schedule()方法可以在指定延时时间后执行任务,但不能重复执行任务;而scheduleWithFixedDelay()和scheduleAtFixedRate()方法可以重复执行任务,这两个方法都需要可以指定一个初始延迟时间以及周期延迟时间,初始延迟时间用于任务第一次执行时需要等待多久,后面就按照指定的周期延迟时间来周期性地执行任务,但这两个方法也有区别:
scheduleWithFixedDelay(固定):按照初始延迟时间执行一次之后,后面按照固定地时长周期来执行任务
scheduleAtFixedRate(非固定):按照初始延迟时间执行一次之后,后面每次的执行都需要依赖于上一次任务执行完成后,再等待指定延时的时间后才能执行,比如指定了延迟时间为5s,它就需要上一次任务执行完之后再等待5s才能执行
下面我们以scheduleAtFixedRate()方法为例,看一下源码:
在scheduleAtFixedRate()方法中,需要指定period不能小于0,该参数可以用来表示延迟时间
将任务封装成一个ScheduledFutureTask,该类继承了FutureTask,所以最后真正执行的时候也是去调用它的run()方法
而执行执行任务是去调用delayedExecute()方法
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
7.1 delayedExecute()方法
如果当前线程池状态大于SHUTDOWN,就会采取拒绝策略,否则就把任务添加到DelayedWorkQueue中
如果线程池状状态大于SHUTDOWN,并且当前状态不能运行,并且把任务从延迟队列中移除成功,就取消任务执行
否则就调用ensurePrestart()方法去创建线程来执行任务
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
7.2 offer()方法
调用add()方法将任务添加到延迟队列时,会去调用它的offer()方法,在该方法中会判断是否需要扩容(扩容50%),如果是第一个任务,直接添加到queue的0号索引位置,否则将调用siftUp()方法将任务插入到堆尾,而在siftUp()方法中会对任务的位置进一步进行调整
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
grow();
size = i + 1;
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
siftUp(i, e);
}
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
在siftUp()方法中,会根据任务的延迟时间进行一个排序,如果延迟时间相同就按照任务的序列号进行排序,序列号是在创建ScheduledFutureTask对象时递增生成的
private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
// 根据序列号排序
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
排序后的延迟队列是一个最小堆的数据结构
7.3 run()方法
首先判断是否为周期性执行,schedule()方法的period为0,表示为非周期性的
如果不是周期性的就直接调用它的run()方法即可
如果是周期性的,调用runAndReset()方法执行任务,任务执行完成之后,设置下次执行的时间,再调用reExecutePeriodic()方法把任务添加到延迟队列中
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
public boolean isPeriodic() {
return period != 0;
}
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task);
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/153632.html