大家好,我是阿轩。
今天我们来剖析一下FutureTask
的源码。
前言
上篇文章中我们对CompletableFuture
的源码进行了一个简单的了解。
同为异步编程
,除了使用CompletableFuture
外,我们平时使用的最多的其实还是将一个任务提交给线程池
,以此来达到异步编程
的目的。
如果任务有返回值,我们通常会用一个Future
来进行接收,比如这样
Future<String> submit = executor.submit(() -> {
//任务体。。。
});
这样的代码虽然我们每天都在写,但是你有想过程序运行时这个Future
的实现类是哪个类吗?
任务执行过程中如果出现异常
会怎么样?
执行任务的线程如果中途发生中断
或者其他异常会怎么样?
都知道get方法是阻塞获取
,那究竟是如何阻塞获取的呢,是一直在无限循环获取吗?
带着这些疑问开始我们今天的源码之旅,Go!
FutureTask
首先回答第一个问题,Future
的实现类是哪个呢?
随便写一个线程池提交任务的demo程序debug
一下就会发现,submit
方法调用的是AbstractExecutorService
类里的submit
方法
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
里面将任务包装了一层,看下newTaskFor
方法
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
可以很清晰的看到构造了一个FutureTask
返回了。
那么第一个谜底就揭晓了,我们看下FutureTask
类的整体结构

方法并不是很多,最核心的方法就是run
和get
方法。
在开始方法源码之前,先看下他有哪些常量和变量
,便于后续源码的理解。
private volatile int state;
/** The underlying callable; nulled out after running */
private Callable<V> callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
state
表示任务执行的状态。
callable
是具体要执行的任务。
outcome
是任务执行的结果,这里有一个细节不知道你注意到没有,outcome是没有被volatile
修饰的,而且源码的注释也写了,non-volatile
,outcome作为线程间通信的共享变量
,由于JMM模型
的存在,如果不被volatile
修饰,那么线程之间是如何及时获取最新值
的呢,小伙伴们可以先自己思考一下。
waiters
是等待结果获取线程的封装对象。
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
里面只有2个变量,一个是等待结果获取的线程thread
,另一个是叫next
的变量,有经验的小伙伴此时立马就能猜到了,如果有多个线程同时等待结果获取的话,那么他们之间的封装对象会通过next
形成一个链表
结构,和之前看的CompletableFuture
是不是很像。
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
常量部分就很清晰了,表示state
的不同值,也就是任务所有可能的状态。
看完了变量和常量,接下来正式开始方法源码的探索。
run
我们先看下任务执行的方法,run
方法
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
首先判断状态不为new
,如果任务已经开始了,那么直接返回。
或者在将执行任务的线程设为自己的时候失败了,说明已经有其他线程在执行任务了,也直接返回。
try
结构体很清晰,执行任务,将结果赋值给result
,如果任务执行成功就调用set
方法,异常就调用setException
方法。
这里我们思考一个问题。
大家有没有发现,这个ran
变量,看起来似乎有点多余呀。
如果我把他去掉
写成这样
if (c != null && state == NEW) {
V result;
try {
result = c.call();
set(result);
} catch (Throwable ex) {
result = null;
setException(ex);
}
}
一下子就少了4行代码,这样看起来会不会更优雅一点。
对吧?
其实,我想说

这样写会出现什么问题呢?
想象一下,如果任务执行成功了,也就是call
方法正常执行了,但是在调用set
方法的时候出异常了,这个时候,就会走到setException
方法里,setException
方法源码稍后我们就会看到,在方法里会把异常赋值给outcome
变量,而这个变量表示的是任务执行的结果。
看到这里,你应该看出来问题出在哪了吧?
我赵日天,额不,outcome
,本来是用来保存任务执行结果
的,你现在把set
方法的异常扔给我是几个意思,我这又不是垃圾堆,啥都往我这扔,就算是异常也应该是任务执行的异常
赋值给我才对嘛。
所以,这里作者特意把他分开来也是有用意的,你可别忘记了,这个类的作者可是

好吧,那就放在try catch
外面吧。
机灵的小伙伴可能又会想,你刚刚说set
方法可能会抛出异常,那放在外面,又没有catch
,如果真抛出异常了怎么办?
能怎么办,调用线程池方法的主线程直接抛异常
呗,同时线程池会把这个抛异常的线程杀死
重新创建一个。
不过set
方法正常情况下是不会抛出异常的,毕竟作为源码,如果真的有异常,我们是没办法修改的,那问题就大了。
说到这,那机灵的小伙伴就又要问了,既然不会抛出异常
,那写在里面写在外面不是一样吗?
话是如此,但是在程序的逻辑
上要能说得通,不能给人留有口舌,免得平白无故又多了一个pr,是不是,要的就是无懈可击
。

接下来我们看下set
和setException
方法
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
结构很相似,只是状态赋值不同,里面干了啥呢?
先执行CAS
将state
状态从NEW
变成COMPLETING
,成功就继续往下走,失败返回。
机灵的小伙伴又要问了,前面run
方法刚进来的时候不是判断了当前执行任务的线程是不是自己吗,也就是同一时间只会有一个线程来执行任务。
那这里搞个CAS
是不是有点多余啊?
显然不是的。
确实,同一时间,只会有一个线程执行任务,但是,别忘了,state
作为共享变量,其他方法也是会调用他的,比如任务取消方法cancel
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
如果任务状态不为NEW
,表示任务已经开始执行了无法取消,否则也会CAS
去变更任务的状态。
所以这下你应该能理解set
方法中if判断条件为什么使用CAS
了吧。
这里我们再思考另一个问题。
还记得阿轩在前文提过outcome
没有被volatile
修饰吗,作为共享变量,没有被volatile
修饰,但是线程间却能及时获取到最新值,why?
原因就在这里。
在说明具体原因之前,先介绍一种高级技术,借助同步
。
在《Java并发编程实战》一书的16.1.4小节对这项技术做了一个简要的介绍

书中正是以FutureTask
为例进行讲解的。
什么意思呢,阿轩给你画个图你就理解了。

根据Happens-Before
的程序顺序规则,对volatile
变量的写入操作必须在对该变量的读操作之前执行,也就是②必然在③之前执行,而被voaltile
修饰的变量,在操作系统底层会设置一层内存屏障
,阻止CPU指令重排序
,所以①必然在②之前执行,③必然在④之前执行,而根据传递性原则
,最终①必然是在④之前执行。
我们在回到源码,你会发现COMPLETING
这个状态几乎是转瞬即逝
的,他的引入目的就是为了去掉outcome的volatile
修饰。
这个类如果给像阿轩这样的菜鸡来写,直接一个volatile
就完事了,还整啥同步不同步的,听都没听过。
不得不说Doug老爷子,YYDS!
这也是为什么人家是JDK的作者而你不是。

再扩展思考一下,如果把COMPLETING
去掉,写成下面这样可以吗?
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, NORMAL)) {
outcome = v;
finishCompletion();
}
这样写的话,那么上面所说的顺序性就无法保证了。
那行,给outcome加上volatile
,可以吗?
答案是,还是不可以,为什么呢?
设想一种极端情况,当CAS
刚执行完,状态改为NORMAL
了,但是此时线程因为某种原因突然终止了,另一个线程过来获取任务执行结果的时候就会发现,任务的状态是NORMAL
了,表示任务正常执行完成了,但是执行的结果
却拿不到。
这在语义上显然是说不通
的,虽然这种情况几乎不会发生,或许,这就是工匠精神
吧。
接着往下看,状态设置完之后,接下来该做什么呢?
如果看过阿轩前面写过的AQS
,CompletableFuture
等文章一定知道,该唤醒线程了。
finishCompletion
方法做的事情就是唤醒线程
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
首先通过CAS将waiters
设置为null
,然后循环遍历链表,将未中断的线程依次唤醒
。
接下来有个done
方法,在FutureTask中是没有实现的。
protected void done() { }
如果你需要在任务执行完成时做一些自己的事情,这里可以自定义实现这个方法。
在回到run方法中,最后还有段finally
结构体
finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
注释中也说明了,防止遗漏中断
,看下handlePossibleCancellationInterrupt
方法
private void handlePossibleCancellationInterrupt(int s) {
// It is possible for our interrupter to stall before getting a
// chance to interrupt us. Let's spin-wait patiently.
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
}
其实什么也没做,就是如果状态处于INTERRUPTING
的话,放弃CPU
时间片,干嘛呢?
等状态从INTERRUPTING
变为INTERRUPTED
,毕竟INTERRUPTING只是一个中间状态
,我任务都执行完了,你状态还处在一个中间状态上,这有点说不过去吧,作者肯定也是不允许这种情况发生的。
到这里,run方法我们就看完了,接下来我们看下get
方法。
get
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
先判断状态,如果状态小于等于COMPLETING
说明任务还在执行中,需要等待他完成,进入awaitDone
方法

因为我们看的是不带时间参数的get
方法,所以deadline
等于0,然后就进入了一个死循环。
①判断线程是否发生了中断,如果中断了就把自己封装的等待节点移除掉,稍后看下removeWaiter
方法。
②判断任务是否已经执行完毕,如果执行完了,好说,把线程置空,返回任务状态。
③如果状态恰巧卡在了中间状态,那就等一会,等他执行完,毕竟我们不可能返回一个中间状态是吧。
④到了这里,说明任务还在执行中,老规矩,把自己封装成一个WaitNode
对象。
⑤不用说,该进入队列
排队等待了。
⑥⑦就是看get方法是带时间参数还是不带,以此来决定线程挂起时间
。
这套组合拳如果看过阿轩前面的并发源码分析文章的话,就会发现都是这个套路。
这里要提一下的是,①这个地方在JDK8
是有bug
的,在JDK9
中得到了修复,感兴趣的小伙伴可以去看下,bug地址https://bugs.openjdk.java.net/browse/JDK-8073704,什么bug呢?
说FutureTask
里提供了一个isDone
方法
public boolean isDone() {
return state != NEW;
}
某些情况下,调用isDone
方法返回了true
告诉我们任务执行完成了,但是通过get获取的时候,却抛出了一个中断异常
,这是不合逻辑的。
于是JDK9
做了修复,前后代码如下

可以看到,把①调换了下位置,这样的话,即使线程中断了,也不会抛出中断异常。
此外,在线程挂起那块也做了修改,计时开始的时间
从JDK8中的刚进入方法时计时,变为了进入这个if判断才开始计时。
另一个有趣的地方是,在第一次进入⑥判断的时候,线程会被挂起指定的时间,当第二次再进入⑥的时候,会走到else
分支,通常来说,变量elapsed
的值一定是大于等于nacos
值的,因为刚刚被挂起了nacos
这么长时间嘛,但是这里却用if再次判断了下两者的值。
机灵的小伙伴一定会说,出于严谨
考虑嘛,是的,一方面是严谨,但更重要的是出于另一方面的考虑,那就是虚假唤醒
。阿轩之前在写AQS
的时候也提到过,AQS那些不为人知的细节,感兴趣的小伙伴可以再去看一下,而且注释中实际上也提到了,注意这么一个词,spurious wakeup
,翻译过来就是虚假唤醒
。
还有一个小细节,不知道大家注意到没有,在给startTime
赋值System.nanoTime()
后,紧接着又判断了startTime == 0L
,这是啥意思?
阿轩一开始也是非常的困惑,后来询问了某位技术大佬加上阿轩查阅了一些资料之后发现,System.nanoTime()
这个方法有可能返回0
,甚至返回负数
,大抵意思就是会发生时间回绕
,感兴趣的小伙伴可以去研究研究。
awaitDone
方法主流程我们就看完了,再看下刚刚提到的删除节点方法removeWaiter
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
if (q.thread != null)
pred = q;
else if (pred != null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
continue retry;
}
break;
}
}
}
流程还是很清晰的,把取消的,中断的节点删除掉,然后重新构建一个新的链表。
阿轩当初在看这块源码的时候,突然有一个想法,for循环里第三个if条件里的continue retry
可以删掉吗?
后来阿轩仔细研究了下,单就removeWaiter
方法来说的话,阿轩觉得是可以删掉的。
waiters
这个变量只在里层for循环的初始化赋值的时候用到过,后面循环的时候就再也没用到,所以即使CAS失败了,接着往下走也是没问题的。
但是放在FutureTask
层面来看就不行了,为啥呢?
还记得之前看过的finishCompletion
方法吗
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
首先他会CAS把waiters
置空,然后把线程置空,再把当前节点的下一个节点置空,试想一下,如果他刚走完第一次循环,此时removeWaiter
方法开始执行第一次循环,显然,3个if条件会全部失败,并且因为q.next
是为null
的,所以当执行第二次循环的时候,会报空指针异常
。
所以这里的continue retry
还是不能去掉的,果然还是阿轩多虑了,JDK并发源码里没有一行多余代码呀,阿轩被啪啪打脸。

这样awaitDone方法就结束了,最后进入report
方法
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
很简单,根据任务执行状态来做相应返回。
至此,FutureTask
的核心流程我们就看完了,怎么样,没看的时候感觉很神秘,看完了是不是感觉也就这样。
总结
稍微提一嘴,我们在看并发源码的时候,会发现里面有很多关于链表
相关的操作,如果对链表相关操作不熟悉的话,看起来会比较吃力,这里给大家说个小技巧,可以去刷一些链表相关的算法题
,自己亲自动手写一写,刚开始的时候可能会比较吃力,等练习多了慢慢就会熟练了,再回头来看源码的时候就不会感到那么吃力了。
我是阿轩,欢迎关注我的公众号,一起交流,一起进步
原文始发于微信公众号(程序员阿轩):FutureTask那些不为人知的细节
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/36943.html