FutureTask那些不为人知的细节

大家好,我是阿轩。

今天我们来剖析一下FutureTask的源码。

前言

上篇文章中我们对CompletableFuture的源码进行了一个简单的了解。

同为异步编程,除了使用CompletableFuture外,我们平时使用的最多的其实还是将一个任务提交给线程池,以此来达到异步编程的目的。

如果任务有返回值,我们通常会用一个Future来进行接收,比如这样

Future<String> submit = executor.submit(() -> {
    //任务体。。。
});

这样的代码虽然我们每天都在写,但是你有想过程序运行时这个Future的实现类是哪个类吗?

任务执行过程中如果出现异常会怎么样?

执行任务的线程如果中途发生中断或者其他异常会怎么样?

都知道get方法是阻塞获取,那究竟是如何阻塞获取的呢,是一直在无限循环获取吗?

带着这些疑问开始我们今天的源码之旅,Go!

FutureTask

首先回答第一个问题,Future的实现类是哪个呢?

随便写一个线程池提交任务的demo程序debug一下就会发现,submit方法调用的是AbstractExecutorService类里的submit方法

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

里面将任务包装了一层,看下newTaskFor方法

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

可以很清晰的看到构造了一个FutureTask返回了。

那么第一个谜底就揭晓了,我们看下FutureTask类的整体结构

FutureTask那些不为人知的细节

方法并不是很多,最核心的方法就是runget方法。

在开始方法源码之前,先看下他有哪些常量和变量,便于后续源码的理解。

private volatile int state;

/** The underlying callable; nulled out after running */
private Callable<V> callable;

/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes

/** The thread running the callable; CASed during run() */
private volatile Thread runner;

/** Treiber stack of waiting threads */
private volatile WaitNode waiters;

state表示任务执行的状态。

callable是具体要执行的任务。

outcome是任务执行的结果,这里有一个细节不知道你注意到没有,outcome是没有被volatile修饰的,而且源码的注释也写了,non-volatile,outcome作为线程间通信的共享变量,由于JMM模型的存在,如果不被volatile修饰,那么线程之间是如何及时获取最新值的呢,小伙伴们可以先自己思考一下。

waiters是等待结果获取线程的封装对象。

static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}

里面只有2个变量,一个是等待结果获取的线程thread,另一个是叫next的变量,有经验的小伙伴此时立马就能猜到了,如果有多个线程同时等待结果获取的话,那么他们之间的封装对象会通过next形成一个链表结构,和之前看的CompletableFuture是不是很像。

private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

常量部分就很清晰了,表示state的不同值,也就是任务所有可能的状态。

看完了变量和常量,接下来正式开始方法源码的探索。

run

我们先看下任务执行的方法,run方法

public void run() {
  if (state != NEW ||
      !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                   null, Thread.currentThread()))
      return;
  try {
      Callable<V> c = callable;
      if (c != null && state == NEW) {
          V result;
          boolean ran;
          try {
              result = c.call();
              ran = true;
          } catch (Throwable ex) {
              result = null;
              ran = false;
              setException(ex);
          }
          if (ran)
              set(result);
      }
  } finally {
      // runner must be non-null until state is settled to
      // prevent concurrent calls to run()
      runner = null;
      // state must be re-read after nulling runner to prevent
      // leaked interrupts
      int s = state;
      if (s >= INTERRUPTING)
          handlePossibleCancellationInterrupt(s);
  }
}

首先判断状态不为new,如果任务已经开始了,那么直接返回。

或者在将执行任务的线程设为自己的时候失败了,说明已经有其他线程在执行任务了,也直接返回。

try结构体很清晰,执行任务,将结果赋值给result,如果任务执行成功就调用set方法,异常就调用setException方法。

这里我们思考一个问题。

大家有没有发现,这个ran变量,看起来似乎有点多余呀。

如果我把他去掉写成这样

if (c != null && state == NEW) {
    V result;
    try {
        result = c.call();
        set(result);
    } catch (Throwable ex) {
        result = null;
        setException(ex);
    }
}

一下子就少了4行代码,这样看起来会不会更优雅一点。

对吧?

其实,我想说

FutureTask那些不为人知的细节

这样写会出现什么问题呢?

想象一下,如果任务执行成功了,也就是call方法正常执行了,但是在调用set方法的时候出异常了,这个时候,就会走到setException方法里,setException方法源码稍后我们就会看到,在方法里会把异常赋值给outcome变量,而这个变量表示的是任务执行的结果。

看到这里,你应该看出来问题出在哪了吧?

我赵日天,额不,outcome,本来是用来保存任务执行结果的,你现在把set方法的异常扔给我是几个意思,我这又不是垃圾堆,啥都往我这扔,就算是异常也应该是任务执行的异常赋值给我才对嘛。

所以,这里作者特意把他分开来也是有用意的,你可别忘记了,这个类的作者可是

FutureTask那些不为人知的细节

并发编程界的泰斗,这点小优化怎么可能想不到呢。

好吧,那就放在try catch外面吧。

机灵的小伙伴可能又会想,你刚刚说set方法可能会抛出异常,那放在外面,又没有catch,如果真抛出异常了怎么办?

能怎么办,调用线程池方法的主线程直接抛异常呗,同时线程池会把这个抛异常的线程杀死重新创建一个。

不过set方法正常情况下是不会抛出异常的,毕竟作为源码,如果真的有异常,我们是没办法修改的,那问题就大了。

说到这,那机灵的小伙伴就又要问了,既然不会抛出异常,那写在里面写在外面不是一样吗?

话是如此,但是在程序的逻辑上要能说得通,不能给人留有口舌,免得平白无故又多了一个pr,是不是,要的就是无懈可击

FutureTask那些不为人知的细节

接下来我们看下setsetException方法

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}
protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}

结构很相似,只是状态赋值不同,里面干了啥呢?

先执行CASstate状态从NEW变成COMPLETING,成功就继续往下走,失败返回。

机灵的小伙伴又要问了,前面run方法刚进来的时候不是判断了当前执行任务的线程是不是自己吗,也就是同一时间只会有一个线程来执行任务。

那这里搞个CAS是不是有点多余啊?

显然不是的。

确实,同一时间,只会有一个线程执行任务,但是,别忘了,state作为共享变量,其他方法也是会调用他的,比如任务取消方法cancel

public boolean cancel(boolean mayInterruptIfRunning) {
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion();
    }
    return true;
}

如果任务状态不为NEW,表示任务已经开始执行了无法取消,否则也会CAS去变更任务的状态。

所以这下你应该能理解set方法中if判断条件为什么使用CAS了吧。

这里我们再思考另一个问题。

还记得阿轩在前文提过outcome没有被volatile修饰吗,作为共享变量,没有被volatile修饰,但是线程间却能及时获取到最新值,why?

原因就在这里。

在说明具体原因之前,先介绍一种高级技术,借助同步

是不是听名字就感觉很高级,FutureTask那些不为人知的细节

在《Java并发编程实战》一书的16.1.4小节对这项技术做了一个简要的介绍

FutureTask那些不为人知的细节

书中正是以FutureTask为例进行讲解的。

什么意思呢,阿轩给你画个图你就理解了。

FutureTask那些不为人知的细节

根据Happens-Before的程序顺序规则,对volatile变量的写入操作必须在对该变量的读操作之前执行,也就是②必然在③之前执行,而被voaltile修饰的变量,在操作系统底层会设置一层内存屏障,阻止CPU指令重排序,所以①必然在②之前执行,③必然在④之前执行,而根据传递性原则,最终①必然是在④之前执行。

我们在回到源码,你会发现COMPLETING这个状态几乎是转瞬即逝的,他的引入目的就是为了去掉outcome的volatile修饰。

这个类如果给像阿轩这样的菜鸡来写,直接一个volatile就完事了,还整啥同步不同步的,听都没听过。

不得不说Doug老爷子,YYDS!

这也是为什么人家是JDK的作者而你不是。

FutureTask那些不为人知的细节

再扩展思考一下,如果把COMPLETING去掉,写成下面这样可以吗?

if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, NORMAL)) {
    outcome = v;
    finishCompletion();
}

这样写的话,那么上面所说的顺序性就无法保证了。

那行,给outcome加上volatile,可以吗?

答案是,还是不可以,为什么呢?

设想一种极端情况,当CAS刚执行完,状态改为NORMAL了,但是此时线程因为某种原因突然终止了,另一个线程过来获取任务执行结果的时候就会发现,任务的状态是NORMAL了,表示任务正常执行完成了,但是执行的结果却拿不到。

这在语义上显然是说不通的,虽然这种情况几乎不会发生,或许,这就是工匠精神吧。

接着往下看,状态设置完之后,接下来该做什么呢?

如果看过阿轩前面写过的AQSCompletableFuture等文章一定知道,该唤醒线程了。

finishCompletion方法做的事情就是唤醒线程

private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    done();

    callable = null;        // to reduce footprint
}

首先通过CAS将waiters设置为null,然后循环遍历链表,将未中断的线程依次唤醒

接下来有个done方法,在FutureTask中是没有实现的。

protected void done() { }

如果你需要在任务执行完成时做一些自己的事情,这里可以自定义实现这个方法。

在回到run方法中,最后还有段finally结构体

finally {
    // runner must be non-null until state is settled to
    // prevent concurrent calls to run()
    runner = null;
    // state must be re-read after nulling runner to prevent
    // leaked interrupts
    int s = state;
    if (s >= INTERRUPTING)
        handlePossibleCancellationInterrupt(s);
}

注释中也说明了,防止遗漏中断,看下handlePossibleCancellationInterrupt方法

private void handlePossibleCancellationInterrupt(int s) {
    // It is possible for our interrupter to stall before getting a
    // chance to interrupt us.  Let's spin-wait patiently.
    if (s == INTERRUPTING)
        while (state == INTERRUPTING)
            Thread.yield(); // wait out pending interrupt
}

其实什么也没做,就是如果状态处于INTERRUPTING的话,放弃CPU时间片,干嘛呢?

等状态从INTERRUPTING变为INTERRUPTED,毕竟INTERRUPTING只是一个中间状态,我任务都执行完了,你状态还处在一个中间状态上,这有点说不过去吧,作者肯定也是不允许这种情况发生的。

到这里,run方法我们就看完了,接下来我们看下get方法。

get

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

先判断状态,如果状态小于等于COMPLETING说明任务还在执行中,需要等待他完成,进入awaitDone方法

FutureTask那些不为人知的细节

因为我们看的是不带时间参数的get方法,所以deadline等于0,然后就进入了一个死循环。

①判断线程是否发生了中断,如果中断了就把自己封装的等待节点移除掉,稍后看下removeWaiter方法。

②判断任务是否已经执行完毕,如果执行完了,好说,把线程置空,返回任务状态。

③如果状态恰巧卡在了中间状态,那就等一会,等他执行完,毕竟我们不可能返回一个中间状态是吧。

④到了这里,说明任务还在执行中,老规矩,把自己封装成一个WaitNode对象。

⑤不用说,该进入队列排队等待了。

⑥⑦就是看get方法是带时间参数还是不带,以此来决定线程挂起时间

这套组合拳如果看过阿轩前面的并发源码分析文章的话,就会发现都是这个套路。

这里要提一下的是,①这个地方在JDK8是有bug的,在JDK9中得到了修复,感兴趣的小伙伴可以去看下,bug地址https://bugs.openjdk.java.net/browse/JDK-8073704,什么bug呢?

FutureTask里提供了一个isDone方法

public boolean isDone() {
    return state != NEW;
}

某些情况下,调用isDone方法返回了true告诉我们任务执行完成了,但是通过get获取的时候,却抛出了一个中断异常,这是不合逻辑的。

于是JDK9做了修复,前后代码如下

FutureTask那些不为人知的细节

可以看到,把①调换了下位置,这样的话,即使线程中断了,也不会抛出中断异常。

此外,在线程挂起那块也做了修改,计时开始的时间从JDK8中的刚进入方法时计时,变为了进入这个if判断才开始计时。

另一个有趣的地方是,在第一次进入⑥判断的时候,线程会被挂起指定的时间,当第二次再进入⑥的时候,会走到else分支,通常来说,变量elapsed的值一定是大于等于nacos值的,因为刚刚被挂起了nacos这么长时间嘛,但是这里却用if再次判断了下两者的值。

机灵的小伙伴一定会说,出于严谨考虑嘛,是的,一方面是严谨,但更重要的是出于另一方面的考虑,那就是虚假唤醒。阿轩之前在写AQS的时候也提到过,AQS那些不为人知的细节,感兴趣的小伙伴可以再去看一下,而且注释中实际上也提到了,注意这么一个词,spurious wakeup,翻译过来就是虚假唤醒

还有一个小细节,不知道大家注意到没有,在给startTime赋值System.nanoTime()后,紧接着又判断了startTime == 0L,这是啥意思?

阿轩一开始也是非常的困惑,后来询问了某位技术大佬加上阿轩查阅了一些资料之后发现,System.nanoTime()这个方法有可能返回0,甚至返回负数,大抵意思就是会发生时间回绕,感兴趣的小伙伴可以去研究研究。

awaitDone方法主流程我们就看完了,再看下刚刚提到的删除节点方法removeWaiter

private void removeWaiter(WaitNode node) {
    if (node != null) {
        node.thread = null;
        retry:
        for (;;) {          // restart on removeWaiter race
            for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                s = q.next;
                if (q.thread != null)
                    pred = q;
                else if (pred != null) {
                    pred.next = s;
                    if (pred.thread == null) // check for race
                        continue retry;
                }
                else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                      q, s))
                    continue retry;
            }
            break;
        }
    }
}

流程还是很清晰的,把取消的,中断的节点删除掉,然后重新构建一个新的链表。

阿轩当初在看这块源码的时候,突然有一个想法,for循环里第三个if条件里的continue retry可以删掉吗?

后来阿轩仔细研究了下,单就removeWaiter方法来说的话,阿轩觉得是可以删掉的。

waiters这个变量只在里层for循环的初始化赋值的时候用到过,后面循环的时候就再也没用到,所以即使CAS失败了,接着往下走也是没问题的。

但是放在FutureTask层面来看就不行了,为啥呢?

还记得之前看过的finishCompletion方法吗

private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    done();

    callable = null;        // to reduce footprint
}

首先他会CAS把waiters置空,然后把线程置空,再把当前节点的下一个节点置空,试想一下,如果他刚走完第一次循环,此时removeWaiter方法开始执行第一次循环,显然,3个if条件会全部失败,并且因为q.next是为null的,所以当执行第二次循环的时候,会报空指针异常

所以这里的continue retry还是不能去掉的,果然还是阿轩多虑了,JDK并发源码里没有一行多余代码呀,阿轩被啪啪打脸。

FutureTask那些不为人知的细节

这样awaitDone方法就结束了,最后进入report方法

private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

很简单,根据任务执行状态来做相应返回。

至此,FutureTask的核心流程我们就看完了,怎么样,没看的时候感觉很神秘,看完了是不是感觉也就这样。FutureTask那些不为人知的细节FutureTask那些不为人知的细节

总结

稍微提一嘴,我们在看并发源码的时候,会发现里面有很多关于链表相关的操作,如果对链表相关操作不熟悉的话,看起来会比较吃力,这里给大家说个小技巧,可以去刷一些链表相关的算法题,自己亲自动手写一写,刚开始的时候可能会比较吃力,等练习多了慢慢就会熟练了,再回头来看源码的时候就不会感到那么吃力了。


我是阿轩,欢迎关注我的公众号,一起交流,一起进步


原文始发于微信公众号(程序员阿轩):FutureTask那些不为人知的细节

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/36943.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!