CompletableFuture那些不为人知的细节

大家好,我是阿轩。

今天我们来剖析一下CompletableFuture的源码。

前言

每当我们一提到异步编程,脑海中第一印象就是Future,但是随着业务逻辑越来越复杂,Future已经不能满足我们的使用需求了,毕竟Future接口提供的功能及其有限。

CompletableFuture那些不为人知的细节

只有有限的5个方法,取消任务,是否已取消,是否已完成,阻塞获取,超时阻塞获取,而且既然是异步编程,但是获取运行结果的时候又是同步阻塞等待,不免有点违背异步编程的初衷了。

但是仔细想想,Future毕竟是JDk1.5时代的产物

CompletableFuture那些不为人知的细节

在那个时代,并发编程,异步编程都是牛逼的代名词,Future提供的功能在当时看来已经非常牛逼了,但是时代在进步,JDK也得进步嘛不是,所以JDK在1.8的时候引入了CompletableFuture

CompletableFuture那些不为人知的细节

两者都是出自Doug老爷子的手笔,不愧是牛逼的代名词

CompletableFuture那些不为人知的细节

CompletableFuture的功能就非常丰富了,光是定义的子类就有25个,方法更是多达120之多

CompletableFuture那些不为人知的细节

前面我们说过,Future只能阻塞获取执行结果,如果调用get方法的时候,正好此时任务已经执行完成,那么不用等待直接拿到执行结果,皆大欢喜。但是如果任务还没有执行完成,恰巧这个任务又是个耗时任务,天知道他要执行到什么时候,总不能程序一直阻塞在这等待吧?

有没有什么好的方式呢?

我们设想一下,可不可以这样,我把任务交给你执行了,然后我去忙自己的事情,等你执行完了你告诉我一下,我就知道你已经执行完成,我再来获取执行结果进行后面的处理。

CompletableFuture就提供了whenComplete方法,用来进行注册回调通知,当任务执行完成后,主动回调通知调用的线程,它有2个参数,一个是任务执行结果,另一个是执行的异常,后续线程拿到后就可以进行相应的处理了。

除此之外,CompletableFuture还提供了同步运行,异步运行,链式运行,组合运行等等。

这些都属于API的使用,本篇文章主要是剖析底层原理,关于API的使用就不细说了,感兴趣的小伙伴可以去问下度娘哈。

下面我们看下CompletableFuture最基本的任务执行,结果获取部分的源码。

任务执行

CompletableFuture任务的执行主要分为2种,有返回结果和无返回结果的,本篇文章我们看下有返回结果的,无返回结果类似。

我们以supplyAsync方法为例。

他有2个重载方法

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
    return asyncSupplyStage(asyncPool, supplier);
}

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
                                                   Executor executor) {
    return asyncSupplyStage(screenExecutor(executor), supplier);
}

区别在于一个传自定义执行器,一个不传。

这里有一点需要注意下,如果不传的话,执行任务使用的是JDK自带的ForkJoinPool池,有些小伙伴平时写业务代码需要用到异步执行的时候会使用到@Async注解,如果在使用这个注解的时候没有传自定义的线程池,使用的也是ForkJoinPool池。

这样会出现什么问题呢?

不同的业务主体共用同一个线程池,如果其中有一个业务主体的任务执行出现耗时占据着线程池的资源,那么另一个业务主体就会受到影响,从而变成异步转同步,如果恰巧本来异步执行的任务是个比较耗时的操作,比如插库之类的,那么此时主流程就会受到影响,碰巧此时运营又在搞活动,流量比较大,那么恭喜你,很快你就会收到大量的接口超时告警,慢慢的,服务器的资源就被耗尽了,最后应用出现了假死现象,从外部看起来服务器就像崩了一样,什么请求都不处理,然后你的绩效就被打了个D。

CompletableFuture那些不为人知的细节

所以,任何一个看似细微的影响,最终都可能演变成滔天巨浪,正所谓千里之提,溃于蚁穴也就是这个道理。

我们在平时码code的时候一定要注意,对于一些原理不太了解的骚操作,最好不要轻易使用,因为里面可能就潜藏着你不知道的巨坑,等到你发现的时候已经为时已晚了。

好了,话说回来,我们接着看。

supplyAsync方法会调用asyncSupplyStage方法

static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
                                                 Supplier<U> f) {
    if (f == null) throw new NullPointerException();
    CompletableFuture<U> d = new CompletableFuture<U>();
    e.execute(new AsyncSupply<U>(d, f));
    return d;
}

他会构造一个CompletableFuture实例对象传给AsyncSupply构造方法,f是一个函数式接口,表示具体的业务逻辑代码。

我们看下AsyncSupply

static final class AsyncSupply<T> extends ForkJoinTask<Void>
        implements Runnable, AsynchronousCompletionTask {
    CompletableFuture<T> dep; Supplier<T> fn;
    AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
        this.dep = dep; this.fn = fn;
    }

    public final Void getRawResult() { return null; }
    public final void setRawResult(Void v) {}
    public final boolean exec() { run(); return true; }

    public void run() {
        CompletableFuture<T> d; Supplier<T> f;
        if ((d = dep) != null && (f = fn) != null) {
            dep = null; fn = null;
            if (d.result == null) {
                try {
                    d.completeValue(f.get());
                } catch (Throwable ex) {
                    d.completeThrowable(ex);
                }
            }
            d.postComplete();
        }
    }
}

继承了Runnable接口,是一个任务包装类,主要看run方法。

逻辑比较简单,f.get()执行我们的业务逻辑代码,然后调用completeValue方法对结果进行包装

final boolean completeValue(T t) {
    return UNSAFE.compareAndSwapObject(this, RESULT, null,
                                       (t == null) ? NIL : t);
}

通过cas对结果进行赋值,如果任务的执行结果就是null的话,会返回NIL,NIL是什么?

static final AltResult NIL = new AltResult(null);
static final class AltResult { // See above
    final Throwable ex;        // null only for NIL
    AltResult(Throwable x) { this.ex = x; }
}

是一个自定义的子类,将null赋值给了ex变量。为什么要这样做呢,后面讲get方法的时候会讲到。

结果封装完之后会调用postComplete方法

final void postComplete() {
    CompletableFuture<?> f = this; Completion h;
    while ((h = f.stack) != null ||
           (f != this && (h = (f = this).stack) != null)) {
        CompletableFuture<?> d; Completion t;
        if (f.casStack(h, t = h.next)) {
            if (t != null) {
                if (f != this) {
                    pushStack(h);
                    continue;
                }
                h.next = null;    // detach
            }
            f = (d = h.tryFire(NESTED)) == null ? this : d;
        }
    }
}

这里稍微提一下,CompletableFuture里主要有2个变量

volatile Object result;       // Either the result or boxed AltResult
volatile Completion stack;    // Top of Treiber stack of dependent actions

rsult保存的是任务执行的结果,而stack保存的则是此时等待线程包装的对象

abstract static class Completion extends ForkJoinTask<Void>
    implements Runnable, AsynchronousCompletionTask {
    volatile Completion next;      // Treiber stack link

    abstract CompletableFuture<?> tryFire(int mode);

    abstract boolean isLive();

    public final void run()                { tryFire(ASYNC); }
    public final boolean exec()            { tryFire(ASYNC); return true; }
    public final Void getRawResult()       { return null; }
    public final void setRawResult(Void v) {}
}

Completion是一个抽象类,里面有一个next变量,可以想象的出,当有多个线程等待获取结果的时候,他们就会形成一个链表的结构,而为什么变量取名叫stack呢,stack翻译过来就是的意思,栈是一种数据结构,从这里我们就能看出,实际使用的时候他就是以的形式存在的。

我们接着看postComplete方法,进去就是一个while循环,如果此时有多个线程等待获取执行结果的话,h就不为空,进入循环,然后调用casStack方法,将stack变量替换为hnext指针指向的下一个等待线程包装对象,因为此处是存在并发竞争的,所以使用了cas

然后判断next指针指向的对象是否为空,不为空就把h.next置空。

接着走到f = (d = h.tryFire(NESTED)) == null ? this : d,这是干什么的呢,我们看下tryFire方法

abstract CompletableFuture<?> tryFire(int mode);

tryFire方法是Completion里面的一个抽象方法,最终实际调用的是Signaller类里面的tryFire方法,为什么是这个实现类,后面我们看get方法的时候就会看到了。

final CompletableFuture<?> tryFire(int ignore) {
    Thread w; // no need to atomically claim
    if ((w = thread) != null) {
        thread = null;
        LockSupport.unpark(w);
    }
    return null;
}

这个方法很简单,就是把Signaller类里的thread变量置空,然后唤醒这个线程。

唤醒?唤醒谁?

聪明的小伙伴应该已经猜到了,肯定是那些阻塞等待的线程嘛,怎么阻塞等待呢,总不能一直自旋等待吧,那得多浪费CPU呀,最后肯定是挂起了嘛,所以这里才需要唤醒

tryFire方法恒返回null,所以f = (d = h.tryFire(NESTED)) == null ? this : d表达式的结果也恒等于this,然后接着唤醒下一个线程。

这里有一个小插曲,阿轩当初在debug调试这段代码的时候一度以为这里有个bug,怎么回事呢?

阿轩写了个程序

public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "程序员阿轩";
    });

    new Thread(() -> {
        try {
            completableFuture.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }).start();

    new Thread(() -> {
        try {
            completableFuture.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }).start();
}

逻辑很简单,执行一个任务,任务中线程休眠10秒,然后起2个线程阻塞获取结果,当我debug调试supplyAsync方法到f = (d = h.tryFire(NESTED)) == null ? this : d这里的时候,栈是这样的

CompletableFuture那些不为人知的细节

可以看到,此时h变量是Thread-1包装的对象,this对象的stack变量此时被替换为了Thread-0包装的对象,当这个表达式执行完之后,也就是Thread-1线程被唤醒之后,下一个循环应该就是唤醒Thread-0了。

但是当我debug下一步的时候,栈是这样的

CompletableFuture那些不为人知的细节

说好的唤醒Thread-0的呢,怎么stack变量变成null了??

Thread-0去哪了,难不成感染新冠被隔离了?

CompletableFuture那些不为人知的细节

带着这个疑问,阿轩当初在这段代码反复调试多次,终不得果,后来看了get方法的源码后才明白过来,那我们接下来就看下get源码。

get

get方法也有2个重载方法

public T get() throws InterruptedException, ExecutionException {
    Object r;
    return reportGet((r = result) == null ? waitingGet(true) : r);
}

public T get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    Object r;
    long nanos = unit.toNanos(timeout);
    return reportGet((r = result) == null ? timedGet(nanos) : r);
}

一个带超时时间,一个不带,我们看不带超时时间的,带超时时间类似。

刚进来的时候,result肯定是为null的,调用waitingGet方法,这个方法是重点,我们仔细看下

CompletableFuture那些不为人知的细节

方法一进来首先定义了4个变量Signaller就是之前所说的封装等待线程的对象;queued变量表示等待线程是否已经在排队了,前面说过,completableFuture主要有2个变量,其中一个stack就表示等待线程所形成的的结构,这里的排队就是进入这个栈结构;spins,翻译过来就是自旋的意思,表示自旋的次数;r就是任务执行的结果了。

此时如果任务还未执行完成,进入while循环。

进入循环后首先会自旋256次,①和②就表示自旋的逻辑。

值得一提的是,如果大量的调用get方法的话,这里实际上是存在性能问题的,Runtime.getRuntime().availableProcessors()表示获取操作系统的可用核数,阿轩此时的JDK版本是1.8.0_212,在更高版本里已经被修复了,感兴趣的小伙伴可以去https://bugs.openjdk.java.net/browse/JDK-8227019这里看下。

执行到③的时候,会把等待线程封装成一个Signaller对象

CompletableFuture那些不为人知的细节

等待时间和截止时间传的都是0,线程为当前线程。

④会将刚刚封装好的对象入栈排队,调用tryPushStack方法

final boolean tryPushStack(Completion c) {
    Completion h = stack;
    lazySetNext(c, h);
    return UNSAFE.compareAndSwapObject(this, STACK, h, c);
}

h是当前栈的栈顶元素,lazySetNext方法会把h挂在新封装对象的尾部

static void lazySetNext(Completion c, Completion next) {
    UNSAFE.putOrderedObject(c, NEXT, next);
}

因为这里是把h挂在当前线程封装对象的尾部,不存在并发竞争,所以这里没有使用CAS

但是挂完之后,需要用新封装对象替换掉当前stack,成为新的栈顶元素,此时就会存在并发竞争,因为stack是线程公用的变量,并不是私有的,所以这里就需要使用CAS了。

如果替换失败进入下一次循环,重新尝试替换,替换成功之后来到⑤,判断线程是否中断了。

最后会来到⑥,进入ForkJoinPool.managedBlock(q)方法

public static void managedBlock(ManagedBlocker blocker)
    throws InterruptedException {
    ForkJoinPool p;
    ForkJoinWorkerThread wt;
    Thread t = Thread.currentThread();
    if ((t instanceof ForkJoinWorkerThread) &&
        (p = (wt = (ForkJoinWorkerThread)t).pool) != null) {
        WorkQueue w = wt.workQueue;
        while (!blocker.isReleasable()) {
            if (p.tryCompensate(w)) {
                try {
                    do {} while (!blocker.isReleasable() &&
                                 !blocker.block());
                } finally {
                    U.getAndAddLong(p, CTL, AC_UNIT);
                }
                break;
            }
        }
    }
    else {
        do {} while (!blocker.isReleasable() &&
                     !blocker.block());
    }
}

这里面会判断线程是ForkJoinPool池的线程还是自定义线程池的线程,但是不管是哪个,核心逻辑都是!blocker.isReleasable() && !blocker.block()这个条件判断,先看isReleasable方法

public boolean isReleasable() {
    if (thread == null)
        return true;
    if (Thread.interrupted()) {
        int i = interruptControl;
        interruptControl = -1;
        if (i > 0)
            return true;
    }
    if (deadline != 0L &&
        (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
        thread = null;
        return true;
    }
    return false;
}

Signaller类实现了这个方法,从方法名称来看就是线程是否已经被释放了。

此时线程是不为null的,第一个条件不满足,线程也没有中断,第二个条件pass,之前传进来的deadline是为0的,第三个条件也不符合,所以这个方法恒返回false

那就会继续进行第二个方法的判断,block方法

public boolean block() {
    if (isReleasable())
        return true;
    else if (deadline == 0L)
        LockSupport.park(this);
    else if (nanos > 0L)
        LockSupport.parkNanos(this, nanos);
    return isReleasable();
}

Signaller类也实现了这个方法,第一个判断刚刚我们已经看到了,恒返回false,不满足,第二个判断,因为刚刚传入的deadline参数是0,所以线程在这里会被挂起

看到了吧,线程原来是在这里被挂起的。

接着往下看,前面我们说过,任务执行完成后,会唤醒挂起的线程,这里被唤醒后会返回isReleasable()方法的结果,也就是false

do {} while (!blocker.isReleasable() &&
                         !blocker.block());

所以while里的条件执行结果就是true,再一次进入isReleasable方法,这里注意了,前面任务在执行完成后,唤醒线程之前是把thread变量置空的

if ((w = thread) != null) {
    thread = null;
    LockSupport.unpark(w);
}

所以此时第一个判断是成立

if (thread == null)
  return true;

返回true,while里的条件执行结果就是false了,退出循环。

再一次进入waitingGetwhile循环判断,因为此时任务已经执行完成了,result是有值的,所以while里的条件就不成立了

(r = result) == null

跳出while循环,接着往下走

if (q != null) {
    q.thread = null;
    if (q.interruptControl < 0) {
        if (interruptible)
            r = null; // report interruption
        else
            Thread.currentThread().interrupt();
    }
}
postComplete();

这里的if是判断线程是否中断的,没有中断不用管。

下一行代码,看到没有,这里又一次调用了postComplete方法。

还记得之前阿轩提出的那个疑问吗,对,答案就在这里。

这个方法是干什么的,我们已经知道了,就是把栈结构里所有的等待线程全部唤醒,所以之前调试时为什么会出现那样的情形呢?

就是因为被唤醒的线程把下一个等待的线程也唤醒了,所以debug到下一步的时候会发现stack变量已经变成null了。

不知道小伙伴想过这样的一个问题没有,我自己已经被唤醒了,跑我自己的逻辑不就好了吗,干嘛还要去帮忙唤醒其他的线程呢?

这个问题,其实你仔细想一下就能知道答案了。

如果不调用postComplete方法,那么就会执行return r返回结果了,大家都是在一个主线程里跑,你是被唤醒了,其他线程仍然处在阻塞的状态,最后主线程不还是阻塞嘛,对不对,木桶原理,桶里能装多少水不取决于最高的那块板,而是最低的那块板。同理,程序运行的有多快,不取决于运行最快的那段代码,而是运行最慢的那段代码。

如果被唤醒的线程不执行postComplete方法,那么所有的唤醒任务就全交给执行任务的那个线程执行了,表面上看起来好像就是重复调用一下LockSupport.unpark(w)方法就行了,但是别忘了,LockSupport.unpark(w)方法最终执行的还是内核,涉及到用户态内核态,再从内核态用户态切换,其实还是一个挺重的操作

如果在执行任务的线程唤醒阻塞等待线程的过程中,其他被唤醒的线程帮忙唤醒剩下的线程,那么整个栈结构里的线程唤醒时间会大大的缩短,进而提高整体程序的运行性能,经常有小伙伴会问,怎样提高程序的运行性能呢,不就是这样一点一滴累积起来的吗!CompletableFuture那些不为人知的细节

其实这里也提现了并发编程的一种思想,协同思想,在很多JDK源码里都有这种思想的影子,比如ConcurrentHashMap里的协同扩容,忘记的小伙伴可以再去回顾一下ConcurrentHashMap那些不为人知的细节

到这里waitingGet方法就执行完了,进入reportGet方法

private static <T> T reportGet(Object r)
    throws InterruptedException, ExecutionException {
    if (r == null) // by convention below, null means interrupted
        throw new InterruptedException();
    if (r instanceof AltResult) {
        Throwable x, cause;
        if ((x = ((AltResult)r).ex) == null)
            return null;
        if (x instanceof CancellationException)
            throw (CancellationException)x;
        if ((x instanceof CompletionException) &&
            (cause = x.getCause()) != null)
            x = cause;
        throw new ExecutionException(x);
    }
    @SuppressWarnings("unchecked") T t = (T) r;
    return t;
}

前面我们提到过,如果任务执行的结果就是null的话,会用AltResult对象包装起来,所以这里就可以看到(x = ((AltResult)r).ex) == null这个条件判断,其他的没啥好说的了。

到这里整个get方法也就结束了,小小的总结一下

waitingGet -> 自旋256次 -> 构建线程封装对象Signaller -> 入栈排队 -> 线程挂起 -> 线程被唤醒,协助唤醒其他线程 -> 返回结果

总结

本篇文章只谈到了任务执行和结果获取部分的源码,关于具体的API使用,小伙伴自行去度娘哈。

其实看源码也是提高自己的一种方式,刚开始看的时候,有些操作你会发现好牛逼呀,但是当你看多了,看习惯了,你就会发现,都是这样写的,也就习以为常了。

这个时候在别人看来很骚的操作,在你看来,害,也就那样

CompletableFuture那些不为人知的细节

我是阿轩,欢迎关注我的公众号,一起交流,一起进步


原文始发于微信公众号(程序员阿轩):CompletableFuture那些不为人知的细节

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/36959.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!