大家好,我是阿轩。
今天我们来剖析一下CompletableFuture
的源码。
前言
每当我们一提到异步编程
,脑海中第一印象就是Future
,但是随着业务逻辑越来越复杂,Future
已经不能满足我们的使用需求了,毕竟Future
接口提供的功能及其有限。

只有有限的5个方法,取消任务,是否已取消,是否已完成,阻塞获取,超时阻塞获取
,而且既然是异步编程,但是获取运行结果的时候又是同步阻塞等待
,不免有点违背异步编程
的初衷了。
但是仔细想想,Future
毕竟是JDk1.5
时代的产物

在那个时代,并发编程,异步编程都是牛逼的代名词,Future
提供的功能在当时看来已经非常牛逼了,但是时代在进步,JDK也得进步嘛不是,所以JDK在1.8
的时候引入了CompletableFuture

两者都是出自Doug老爷子的手笔,不愧是牛逼的代名词

CompletableFuture
的功能就非常丰富了,光是定义的子类就有25
个,方法更是多达120
之多

前面我们说过,Future
只能阻塞获取
执行结果,如果调用get
方法的时候,正好此时任务已经执行完成,那么不用等待直接拿到执行结果,皆大欢喜。但是如果任务还没有执行完成,恰巧这个任务又是个耗时任务
,天知道他要执行到什么时候,总不能程序一直阻塞
在这等待吧?
有没有什么好的方式
呢?
我们设想一下,可不可以这样,我把任务交给你执行了,然后我去忙自己的事情,等你执行完了你告诉
我一下,我就知道你已经执行完成,我再来获取执行结果进行后面的处理。
CompletableFuture
就提供了whenComplete
方法,用来进行注册回调通知
,当任务执行完成后,主动回调通知调用的线程,它有2个参数,一个是任务执行结果
,另一个是执行的异常
,后续线程拿到后就可以进行相应的处理了。
除此之外,CompletableFuture
还提供了同步运行,异步运行,链式运行,组合运行
等等。
这些都属于API的使用,本篇文章主要是剖析底层原理,关于API的使用就不细说了,感兴趣的小伙伴可以去问下度娘哈。
下面我们看下CompletableFuture
最基本的任务执行,结果获取
部分的源码。
任务执行
CompletableFuture
任务的执行主要分为2种,有返回结果和无返回结果
的,本篇文章我们看下有返回结果的,无返回结果类似。
我们以supplyAsync
方法为例。
他有2个重载方法
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}
区别在于一个传自定义执行器
,一个不传。
这里有一点需要注意下,如果不传的话,执行任务使用的是JDK
自带的ForkJoinPool
池,有些小伙伴平时写业务代码需要用到异步执行
的时候会使用到@Async
注解,如果在使用这个注解的时候没有传自定义的线程池,使用的也是ForkJoinPool
池。
这样会出现什么问题呢?
不同的业务主体共用同一个线程池
,如果其中有一个业务主体的任务执行出现耗时
,占据
着线程池的资源
,那么另一个业务主体就会受到影响,从而变成异步转同步
,如果恰巧本来异步执行的任务是个比较耗时
的操作,比如插库
之类的,那么此时主流程
就会受到影响,碰巧此时运营又在搞活动,流量比较大,那么恭喜你,很快你就会收到大量的接口超时告警
,慢慢的,服务器的资源就被耗尽
了,最后应用出现了假死现象
,从外部看起来服务器就像崩了
一样,什么请求都不处理,然后你的绩效就被打了个D。

所以,任何一个看似细微
的影响,最终都可能演变成滔天巨浪,正所谓千里之提,溃于蚁穴
也就是这个道理。
我们在平时码code的时候一定要注意,对于一些原理
不太了解的骚操作,最好不要轻易使用,因为里面可能就潜藏着你不知道的巨坑
,等到你发现的时候已经为时已晚了。
好了,话说回来,我们接着看。
supplyAsync
方法会调用asyncSupplyStage
方法
static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
Supplier<U> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<U> d = new CompletableFuture<U>();
e.execute(new AsyncSupply<U>(d, f));
return d;
}
他会构造一个CompletableFuture
实例对象传给AsyncSupply
构造方法,f是一个函数式接口,表示具体的业务逻辑代码。
我们看下AsyncSupply
类
static final class AsyncSupply<T> extends ForkJoinTask<Void>
implements Runnable, AsynchronousCompletionTask {
CompletableFuture<T> dep; Supplier<T> fn;
AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
this.dep = dep; this.fn = fn;
}
public final Void getRawResult() { return null; }
public final void setRawResult(Void v) {}
public final boolean exec() { run(); return true; }
public void run() {
CompletableFuture<T> d; Supplier<T> f;
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
if (d.result == null) {
try {
d.completeValue(f.get());
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
d.postComplete();
}
}
}
继承了Runnable
接口,是一个任务包装类,主要看run
方法。
逻辑比较简单,f.get()
执行我们的业务逻辑代码,然后调用completeValue
方法对结果进行包装
final boolean completeValue(T t) {
return UNSAFE.compareAndSwapObject(this, RESULT, null,
(t == null) ? NIL : t);
}
通过cas
对结果进行赋值,如果任务的执行结果就是null
的话,会返回NIL
,NIL是什么?
static final AltResult NIL = new AltResult(null);
static final class AltResult { // See above
final Throwable ex; // null only for NIL
AltResult(Throwable x) { this.ex = x; }
}
是一个自定义的子类
,将null
赋值给了ex变量
。为什么要这样做呢,后面讲get
方法的时候会讲到。
结果封装完之后会调用postComplete
方法
final void postComplete() {
CompletableFuture<?> f = this; Completion h;
while ((h = f.stack) != null ||
(f != this && (h = (f = this).stack) != null)) {
CompletableFuture<?> d; Completion t;
if (f.casStack(h, t = h.next)) {
if (t != null) {
if (f != this) {
pushStack(h);
continue;
}
h.next = null; // detach
}
f = (d = h.tryFire(NESTED)) == null ? this : d;
}
}
}
这里稍微提一下,CompletableFuture
里主要有2个变量
volatile Object result; // Either the result or boxed AltResult
volatile Completion stack; // Top of Treiber stack of dependent actions
rsult
保存的是任务执行的结果,而stack
保存的则是此时等待线程包装的对象
abstract static class Completion extends ForkJoinTask<Void>
implements Runnable, AsynchronousCompletionTask {
volatile Completion next; // Treiber stack link
abstract CompletableFuture<?> tryFire(int mode);
abstract boolean isLive();
public final void run() { tryFire(ASYNC); }
public final boolean exec() { tryFire(ASYNC); return true; }
public final Void getRawResult() { return null; }
public final void setRawResult(Void v) {}
}
Completion
是一个抽象类,里面有一个next
变量,可以想象的出,当有多个线程等待获取结果的时候,他们就会形成一个链表
的结构,而为什么变量取名叫stack
呢,stack翻译过来就是栈
的意思,栈是一种数据结构,从这里我们就能看出,实际使用的时候他就是以栈
的形式存在的。
我们接着看postComplete
方法,进去就是一个while
循环,如果此时有多个线程等待获取执行结果的话,h
就不为空,进入循环,然后调用casStack
方法,将stack
变量替换为h
的next指针
指向的下一个等待线程包装对象,因为此处是存在并发竞争
的,所以使用了cas
。
然后判断next指针
指向的对象是否为空,不为空就把h.next
置空。
接着走到f = (d = h.tryFire(NESTED)) == null ? this : d
,这是干什么的呢,我们看下tryFire
方法
abstract CompletableFuture<?> tryFire(int mode);
tryFire
方法是Completion
里面的一个抽象方法
,最终实际调用的是Signaller
类里面的tryFire
方法,为什么是这个实现类,后面我们看get
方法的时候就会看到了。
final CompletableFuture<?> tryFire(int ignore) {
Thread w; // no need to atomically claim
if ((w = thread) != null) {
thread = null;
LockSupport.unpark(w);
}
return null;
}
这个方法很简单,就是把Signaller
类里的thread
变量置空,然后唤醒
这个线程。
唤醒?唤醒谁?
聪明的小伙伴应该已经猜到了,肯定是那些阻塞等待
的线程嘛,怎么阻塞等待呢,总不能一直自旋等待
吧,那得多浪费CPU
呀,最后肯定是挂起
了嘛,所以这里才需要唤醒
。
tryFire
方法恒返回null
,所以f = (d = h.tryFire(NESTED)) == null ? this : d
表达式的结果也恒等于this
,然后接着唤醒
下一个线程。
这里有一个小插曲,阿轩当初在debug调试这段代码的时候一度以为这里有个bug
,怎么回事呢?
阿轩写了个程序
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "程序员阿轩";
});
new Thread(() -> {
try {
completableFuture.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
completableFuture.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}).start();
}
逻辑很简单,执行一个任务,任务中线程休眠10秒,然后起2个线程阻塞获取结果,当我debug调试supplyAsync
方法到f = (d = h.tryFire(NESTED)) == null ? this : d
这里的时候,栈是这样的

可以看到,此时h
变量是Thread-1
包装的对象,this
对象的stack
变量此时被替换为了Thread-0
包装的对象,当这个表达式执行完之后,也就是Thread-1
线程被唤醒之后,下一个循环应该就是唤醒Thread-0
了。
但是当我debug下一步的时候,栈是这样的

说好的唤醒Thread-0
的呢,怎么stack
变量变成null
了??
Thread-0
去哪了,难不成感染新冠被隔离了?

带着这个疑问,阿轩当初在这段代码反复调试多次,终不得果,后来看了get
方法的源码后才明白过来,那我们接下来就看下get源码。
get
get方法也有2个重载方法
public T get() throws InterruptedException, ExecutionException {
Object r;
return reportGet((r = result) == null ? waitingGet(true) : r);
}
public T get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
Object r;
long nanos = unit.toNanos(timeout);
return reportGet((r = result) == null ? timedGet(nanos) : r);
}
一个带超时时间,一个不带,我们看不带超时时间的,带超时时间类似。
刚进来的时候,result
肯定是为null
的,调用waitingGet
方法,这个方法是重点,我们仔细看下

方法一进来首先定义了4个变量
,Signaller
就是之前所说的封装等待线程的对象;queued
变量表示等待线程是否已经在排队了,前面说过,completableFuture
主要有2个变量,其中一个stack
就表示等待线程所形成的的栈
结构,这里的排队就是进入这个栈结构;spins
,翻译过来就是自旋的意思,表示自旋的次数;r
就是任务执行的结果了。
此时如果任务还未执行完成,进入while
循环。
进入循环后首先会自旋256次
,①和②就表示自旋
的逻辑。
值得一提的是,如果大量的调用get
方法的话,这里实际上是存在性能问题
的,Runtime.getRuntime().availableProcessors()
表示获取操作系统的可用核数
,阿轩此时的JDK
版本是1.8.0_212
,在更高版本里已经被修复了,感兴趣的小伙伴可以去https://bugs.openjdk.java.net/browse/JDK-8227019这里看下。
执行到③的时候,会把等待线程封装成一个Signaller
对象

等待时间和截止时间传的都是0,线程为当前线程。
④会将刚刚封装好的对象入栈排队
,调用tryPushStack
方法
final boolean tryPushStack(Completion c) {
Completion h = stack;
lazySetNext(c, h);
return UNSAFE.compareAndSwapObject(this, STACK, h, c);
}
h
是当前栈的栈顶
元素,lazySetNext
方法会把h
挂在新封装对象的尾部
static void lazySetNext(Completion c, Completion next) {
UNSAFE.putOrderedObject(c, NEXT, next);
}
因为这里是把h
挂在当前线程封装对象的尾部,不存在并发竞争,所以这里没有使用CAS
。
但是挂完之后,需要用新封装对象替换掉当前stack
,成为新的栈顶
元素,此时就会存在并发竞争
,因为stack
是线程公用
的变量,并不是私有
的,所以这里就需要使用CAS
了。
如果替换失败进入下一次循环,重新尝试替换,替换成功之后来到⑤,判断线程是否中断
了。
最后会来到⑥,进入ForkJoinPool.managedBlock(q)
方法
public static void managedBlock(ManagedBlocker blocker)
throws InterruptedException {
ForkJoinPool p;
ForkJoinWorkerThread wt;
Thread t = Thread.currentThread();
if ((t instanceof ForkJoinWorkerThread) &&
(p = (wt = (ForkJoinWorkerThread)t).pool) != null) {
WorkQueue w = wt.workQueue;
while (!blocker.isReleasable()) {
if (p.tryCompensate(w)) {
try {
do {} while (!blocker.isReleasable() &&
!blocker.block());
} finally {
U.getAndAddLong(p, CTL, AC_UNIT);
}
break;
}
}
}
else {
do {} while (!blocker.isReleasable() &&
!blocker.block());
}
}
这里面会判断线程是ForkJoinPool
池的线程还是自定义线程池
的线程,但是不管是哪个,核心逻辑都是!blocker.isReleasable() && !blocker.block()
这个条件判断,先看isReleasable
方法
public boolean isReleasable() {
if (thread == null)
return true;
if (Thread.interrupted()) {
int i = interruptControl;
interruptControl = -1;
if (i > 0)
return true;
}
if (deadline != 0L &&
(nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
thread = null;
return true;
}
return false;
}
Signaller
类实现了这个方法,从方法名称来看就是线程是否已经被释放了。
此时线程是不为null
的,第一个条件不满足,线程也没有中断,第二个条件pass,之前传进来的deadline
是为0的,第三个条件也不符合,所以这个方法恒返回false
。
那就会继续进行第二个方法的判断,block
方法
public boolean block() {
if (isReleasable())
return true;
else if (deadline == 0L)
LockSupport.park(this);
else if (nanos > 0L)
LockSupport.parkNanos(this, nanos);
return isReleasable();
}
Signaller
类也实现了这个方法,第一个判断刚刚我们已经看到了,恒返回false,不满足,第二个判断,因为刚刚传入的deadline
参数是0,所以线程在这里会被挂起
。
看到了吧,线程原来是在这里被挂起
的。
接着往下看,前面我们说过,任务执行完成后,会唤醒
挂起的线程,这里被唤醒后会返回isReleasable()
方法的结果,也就是false
。
do {} while (!blocker.isReleasable() &&
!blocker.block());
所以while
里的条件执行结果就是true
,再一次进入isReleasable
方法,这里注意了,前面任务在执行完成后,唤醒线程之前是把thread
变量置空的
if ((w = thread) != null) {
thread = null;
LockSupport.unpark(w);
}
所以此时第一个判断是成立
的
if (thread == null)
return true;
返回true,while
里的条件执行结果就是false
了,退出循环。
再一次进入waitingGet
的while
循环判断,因为此时任务已经执行完成了,result
是有值的,所以while
里的条件就不成立了
(r = result) == null
跳出while
循环,接着往下走
if (q != null) {
q.thread = null;
if (q.interruptControl < 0) {
if (interruptible)
r = null; // report interruption
else
Thread.currentThread().interrupt();
}
}
postComplete();
这里的if是判断线程是否中断的,没有中断不用管。
下一行代码,看到没有,这里又一次调用了postComplete
方法。
还记得之前阿轩提出的那个疑问吗,对,答案就在这里。
这个方法是干什么的,我们已经知道了,就是把栈结构里所有的等待线程全部唤醒
,所以之前调试时为什么会出现那样的情形呢?
就是因为被唤醒
的线程把下一个等待的线程也唤醒
了,所以debug
到下一步的时候会发现stack
变量已经变成null
了。
不知道小伙伴想过这样的一个问题没有,我自己已经被唤醒了,跑我自己的逻辑不就好了吗,干嘛还要去帮忙唤醒其他的线程呢?
这个问题,其实你仔细想一下就能知道答案了。
如果不调用postComplete
方法,那么就会执行return r
返回结果了,大家都是在一个主线程
里跑,你是被唤醒
了,其他线程仍然处在阻塞
的状态,最后主线程不还是阻塞
嘛,对不对,木桶原理
,桶里能装多少水不取决于最高的那块板,而是最低的那块板。同理,程序运行的有多快,不取决于运行最快
的那段代码,而是运行最慢
的那段代码。
如果被唤醒的线程不执行postComplete
方法,那么所有的唤醒任务就全交给执行任务的那个线程执行了,表面上看起来好像就是重复调用一下LockSupport.unpark(w)
方法就行了,但是别忘了,LockSupport.unpark(w)
方法最终执行的还是内核
,涉及到用户态
到内核态
,再从内核态
到用户态
的切换
,其实还是一个挺重的操作
。
如果在执行任务的线程唤醒
阻塞等待线程的过程中,其他被唤醒的线程帮忙唤醒
剩下的线程,那么整个栈结构里的线程唤醒时间
会大大的缩短
,进而提高整体程序的运行性能,经常有小伙伴会问,怎样提高程序的运行性能呢,不就是这样一点一滴累积
起来的吗!
其实这里也提现了并发编程的一种思想,协同思想
,在很多JDK源码
里都有这种思想的影子,比如ConcurrentHashMap
里的协同扩容
,忘记的小伙伴可以再去回顾一下ConcurrentHashMap那些不为人知的细节
到这里waitingGet
方法就执行完了,进入reportGet
方法
private static <T> T reportGet(Object r)
throws InterruptedException, ExecutionException {
if (r == null) // by convention below, null means interrupted
throw new InterruptedException();
if (r instanceof AltResult) {
Throwable x, cause;
if ((x = ((AltResult)r).ex) == null)
return null;
if (x instanceof CancellationException)
throw (CancellationException)x;
if ((x instanceof CompletionException) &&
(cause = x.getCause()) != null)
x = cause;
throw new ExecutionException(x);
}
@SuppressWarnings("unchecked") T t = (T) r;
return t;
}
前面我们提到过,如果任务执行的结果就是null
的话,会用AltResult
对象包装起来,所以这里就可以看到(x = ((AltResult)r).ex) == null
这个条件判断,其他的没啥好说的了。
到这里整个get
方法也就结束了,小小的总结一下
waitingGet -> 自旋256次 -> 构建线程封装对象Signaller -> 入栈排队 -> 线程挂起 -> 线程被唤醒,协助唤醒其他线程 -> 返回结果
总结
本篇文章只谈到了任务执行和结果获取部分的源码,关于具体的API使用,小伙伴自行去度娘哈。
其实看源码
也是提高自己
的一种方式,刚开始看的时候,有些操作你会发现好牛逼呀,但是当你看多了,看习惯了,你就会发现,都是这样写的,也就习以为常了。
这个时候在别人看来很骚的操作,在你看来,害,也就那样

我是阿轩,欢迎关注我的公众号,一起交流,一起进步
原文始发于微信公众号(程序员阿轩):CompletableFuture那些不为人知的细节
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/36959.html