JDK5中加入Future用来获取异步执行结果。
Future
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
局限:
除了采用get()
阻塞获取或采用轮训isDone()
,别无他法。
ExecutorService可用于异步执行。
为了避免这种问题,不同的第三方Java库或框架提供不同的解决方案,Guava库中的SettableFuture/ListenableFuture、Netty中的Future和ChannelFuture等。这些解决方案都是通过注册监听或回调的方式,形成回调链,从而实现真正意义上的异步执行。
JDK8中新特性CompletableFuture与CompletionStage,以回调的方式实现异步执行,并提供很多异步回调链构造的API。
RunnableFuture
源码:
@since 1.6
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation unless it has been cancelled.
*/
void run();
}
CompletionStage
想要理解 CompletionStage 接口,需要先了解任务的时序关系的。可将任务时序关系分为以下几种:
- 串行执行关系
- 并行执行关系
- AND汇聚关系
- OR汇聚关系
CompletableFuture
CompletionStage,泛型接口,源码太长。CompletableFuture实现CompletionStage和Future接口,支持 future 和 callback 两种调用方式。组合式异步函数编程。异步,即不占用主线程的时间片。该类下面的若干个方法,主要有三类核心参数(即函数接口):
- Function:既支持接收参数,也支持返回值
- Consumer:只支持接受参数,不支持返回值
- Runnable:不支持接受参数,也不支持返回值
API介绍
从一个多线程任务提交到获取结果的流程上来理解API:
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
CompletableFuture.supplyAsync
是开启CompletableFuture异步调用链的方法之一。使用这个方法,会将supplier封装为一个任务提交给executor执行,并返回一个记录这个任务执行状态和执行结果的CompletableFuture对象。之后可以在这个CompletableFuture对象上挂接各种回调动作。
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
thenApplyAsync用于在CompletableFuture对象上挂接一个转化函数。当CompletableFuture对象完成时,以其结果作为输入参数调用转化函数。转化函数内部在执行各种逻辑后,返回另一种类型的数据作为输出。该方法的返回是一个新的CompletableFuture对象,用于记录转化函数的执行状态和执行结果等信息。thenApplyAsync的fn参数将一种类型数据转化为另外一种类型数据。
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
thenAcceptAsync用于在CompletableFuture对象上挂接一个接收函数。当CompletableFuture对象完成时,以其结果作为输入参数调用接收函数。与thenApplyAsync类似,接收函数在其内部可以执行各种逻辑,但不同的是,接收函数不会返回任何类型数据,或者说返回类型是void。因此,thenAcceptAsync通常就是接收并消化任务链的最终输出结果。
<U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)
thenComposeAsync在API形式上与thenApplyAsync类似,但是它的转化函数返回的不是一般类型的对象,而是一个CompletionStage对象,或者说得更具体点,实际使用中通常就是一个CompletableFuture对象。这意味着,可以在原来的CompletableFuture调用链上,插入另外一个调用链,从而形成一个新的调用链。即compose(组成、构成)的含义所在。
<U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
使用applyToEither可以实现两个CompletableFuture谁先完成就由谁执行回调函数的功能。一个使用场景:可以用该方法实现定时超期的功能。具体而言就是,用一个CompletableFuture表示目标任务,用另外一个CompletableFuture表示定时任务,这样如果目标任务在定时任务完成前尚未完成,就由定时任务做善后处理。
static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
CompletableFuture.allOf
的功能是将多个CompletableFuture合并成一个CompletableFuture。可以用它实现类似于Map/Reduce或Fork/Join的功能,尤其是在多核和并行计算大行其道的今天。
CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
如果在回调函数里做异常处理的话,会非常受限和不方便。稍有不注意,就会写出不合理甚至错误的代码。比如你认为捕获了的异常,实际上根本就不是在那个地方或那个线程上抛出。出现这种情况的原因在于,在异步的世界里,即使是同一份代码,实际上在运行起来后,其调用链生成、回调的执行时刻、回调所在线程和回调的上下文环境都是灵活多变的。相比以前同步或半异步半同步的编程方式,使用CompletableFuture开发的程序在运行时的状况会更加复杂和多变。而CompletableFuture的exceptionally方法提供相对较好的异常处理方案。使用exceptionally方法,可以对指定CompletableFuture抛出的异常进行处理。比如捕获异常并返回一个特定的值,或者继续将异常抛出。
实战
考虑下面的实验程序片段:
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(Tests::source, executor1);
CompletableFuture<String> cf2 = cf1.thenApplyAsync(Tests::echo, executor2);
CompletableFuture<String> cf3_1 = cf2.thenApplyAsync(Tests::echo1, executor3);
CompletableFuture<String> cf3_2 = cf2.thenApplyAsync(Tests::echo2, executor3);
CompletableFuture<String> cf3_3 = cf2.thenApplyAsync(Tests::echo3, executor3);
CompletableFuture<Void> cf3 = CompletableFuture.allOf(cf3_1, cf3_2, cf3_3);
CompletableFuture<Void> cf4 = cf3.thenAcceptAsync(x -> print("world"), executor4);
其工作原理如图所示:
描述实验程序整体的执行过程:
- 通过
CompletableFuture.supplyAsync
创建一个任务Tests::source
,并交由executor1异步执行。用cf1来记录该任务在执行过程中的状态和结果等信息。 - 通过
cf1.thenApplyAsync
,指定当cf1(Tests::source)
完成时,需要回调的任务Tests::echo
。cf1使用stack来管理这个后续要回调的任务。与cf1类似,用cf2来记录任务Tests::echo
的执行状态和执行结果等信息。 - 通过连续三次调用
cf2.thenApplyAsync
,指定当cf2(Tests::echo)
完成时,需要回调后续三个任务:Tests::echo1
、Tests::echo2
和Tests::echo3
。与cf1一样,cf2也是用stack来管理其后续要执行的这三个任务。 - 通过
CompletableFuture.allOf
,创建一个合并cf3_1
、cf3_2
、cf3_3
的cf3,cf3只有在其合并的所有cf完成时才能完成。在cf3内部,是用一个二叉树(tree)来记录其和cf3_1
、cf3_2
、cf3_3
的依赖关系。 - 通过
cf3.thenAcceptAsync
,指定当cf3完成时,需要回调的任务(print)。用cf4来记录print任务的状态和结果等信息。
总结:
- CompletableFuture用stack来管理其在完成时后续需要回调的任务(Completion)。
- 在AsyncRun、Completion中,通过依赖(dep)指针,指向后续需要处理的CompletableFuture,这样在任务完成后,就可以通过dep指针找到后续处理的CompletableFuture,从而继续执行。
通过1和2形成一个调用链,所有任务按照调用链执行。
描述CompletableFuture链是如何组织和执行的。总的来说,每个CompletableFuture可以存在三种类型的指针:src、snd和dep。其中dep指向这个CompletableFuture在完成(completed)时,后续继续调用的CompletableFuture。src和snd则指向其链接的另外的两个CompletableFuture,用于决定是否在CompletableFuture完成时触发dep执行。CompletableFuture内部就是用这三个指针巧妙地管理CompletableFuture之间各种复杂的依赖和调用关系的。对于每个CompletableFuture节点,当其被触发执行时,如果其src和snd(如果存在snd)都是completed状态(src或snd指向自己时也算completed状态),就触发其dep执行,否则就不触发其dep执行。但不管这个CompletableFuture是否触发其dep执行,在tryFire(ASYNC)
过后,这个CompletableFuture本身就是已经completed的。如果它没有触发dep,就会由该CompletableFuture的src或snd在被触发时按照同样的方式做处理。
分类
从另一个角度来了解API。有很多分类方法,从不同的维度,分类不一样,不必较真,重点是理解:
创建实例
创建CompletableFuture实例(下文简称CF),提交任务执行并获取执行结果
// 创建一个具有默认结果的CF
static CompletableFuture<U> completedFuture(U value)
// 返回一个新的CF,它在运行给定操作后由运行在 ForkJoinPool.commonPool()中的任务 异步完成
static CompletableFuture<Void> runAsync(Runnable runnable)
// 返回一个新的CF,它在运行给定操作之后由在给定执行程序中运行的任务异步完成
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
// 返回一个新的CF,它通过在 ForkJoinPool.commonPool()中运行的任务与通过调用给定的供应商获得的值,异步完成
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
// 返回一个新的CF,由给定执行器中运行的任务异步完成,并通过调用给定的供应商获得的值
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
Void,表示无返回值的异步执行。
支持传参executor实例,可执行特定线程池或自定义线程池执行,否则默认使用ForkJoinPool.commonPool()
线程池,其线程数是 CPU 的核数。
通过设置 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism
来设置ForkJoinPool线程池的线程数
使用共享线程池将会有个弊端,一旦有任务被阻塞,将会造成其他任务没机会执行。强烈建议自定义线程池,根据任务类型不同,主动创建线程池,进行资源隔离,避免互相干扰。
同步
任务串行执行,下一个任务必须等待上一个任务完成才可以继续执行。都是重写CompletionStage接口里面的方法
<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
CompletableFuture<Void> thenAccept(Consumer<? super T> action)
CompletableFuture<Void> thenRun(Runnable action)
// 类似于Stream#flatmap
<U> CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
解释:
- thenApply 方法需要传入核心参数为
Function<T,R>
类型。这个类核心方法为:R apply(T t)
,这个接口将会把上一个任务返回结果当做入参,执行结束将会返回结果 - thenAccept 方法需要传入参数对象为
Consumer<T>
类型,这个类核心方法为:void accept(T t)
不支持返回结果,但是需要将上一个任务执行结果当做参数传入 - thenRun 方法需要传入参数对象为 Runnable 类型,核心方法既不支持传入参数,也不会返回执行结果
- thenCompose 方法作用与 thenApply 一样,只不过 thenCompose 需要返回新的CompletionStage
异步
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
CompletableFuture<Void> thenRunAsync(Runnable action)
CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
<U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn)
<U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor)
AND
// 同步
<U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
<U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)
// 异步
<U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
<U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
<U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
<U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor)
CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action)
CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor)
// 多组
static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
- thenCombine方法核心参数BiFunction ,作用与Function一样,BiFunction可接受两个参数,Function只能接受一个参数
- thenAcceptBoth方法核心参数BiConsumer 作用也与Consumer一样,不过其需要接受两个参数
- runAfterBoth 方法核心参数最简单,无返回值
这三组方法只能完成两个任务 AND 汇聚关系。
如果需要完成多个任务汇聚关系,需要使用CompletableFuture#allOf
,这个方法是不支持返回任务结果。
OR
// 同步
<U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action)
// 异步
<U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn)
<U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor)
CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)
CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action)
CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor)
// 多组
static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
设置任务结果
boolean complete(T value)
// 如果不是已经完成,将返回的值 get()种相关方法为给定值
boolean completeExceptionally(Throwable ex)
// 如果尚未完成,则调用get()和相关方法来抛出给定的异常
第一个方法,主动设置CF任务执行结果,若返回 true,表示设置成功。返回 false,设置失败,这是因为任务已经执行结束有执行结果。
一旦 complete 设置成功,CompletableFuture 返回结果就不会被更改,即使后续 CompletableFuture 任务执行结束
第二个方法,给 CompletableFuture 设置异常对象。若设置成功,如果调用 get 等方法获取结果,将会抛错。
获取执行结果
T get() throws InterruptedException, ExecutionException
T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException
T getNow(T valueIfAbsent)
T join()
简化的源码:
public T join() {
Object r;
return reportJoin((r = result) == null ? waitingGet(false) : r);
}
public T get() throws InterruptedException, ExecutionException {
Object r;
return reportGet((r = result) == null ? waitingGet(true) : r);
}
// Returns raw result after waiting, or null if interruptible and interrupted
private Object waitingGet(boolean interruptible)
waitingGet方法有一个入参,表示是否可以被中断;
join没有异常抛出,而get方法抛出2个异常,二者都是阻塞的等待获取执行结果,但get的阻塞可以被interrupt(),而join不可以;二者对于异常的处理也不相同。
get需要处理检查异常,必须try..catch
处理或抛出到外层;join没有强制处理异常。
异常处理
CompletableFuture 方法执行过程若产生异常,当调用 get,join获取任务结果才会抛出异常。
// 同步
CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
<U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn)
// 异步
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
<U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
<U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)
exceptionally 使用方式类似于try..catch
中catch代码块中异常处理。
whenComplete 与 handle 方法就类似于try..catch..finanlly
中finally
代码块。无论是否发生异常,都将会执行的。这两个方法区别在于handle
支持返回结果。
实例
public void test1() {
// 提交一个一部执行的任务,有结果返回值
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "future 1");
// 若future执行完毕,则future.get();返回执行结果'future 1',若未执行完毕,则返回给定值'111'
future.complete("111");
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "future 1", ForkJoinPool.commonPool());
// 返回一个指定结果的CompletableFuture对象
CompletableFuture<String> future2 = CompletableFuture.completedFuture("future 2");
}
前者为执行完所有提交任务后进行后面的操作,无返回值;
后者用于有返回值的,当任一执行完返回结果时即可进行后续的任务执行
public void test2() {
CompletableFuture<Void> void1 = CompletableFuture.runAsync(() -> logger.info("I have done Nothing"));
CompletableFuture<Void> void2 = CompletableFuture.runAsync(() -> logger.info("Me, too"), ForkJoinPool.commonPool());
CompletableFuture.allOf(void1, void2).thenRun(() -> logger.info("success")).join();
// 提交一个一部执行的任务,有结果返回值
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "future 1");
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "future 3", ForkJoinPool.commonPool());
// 后面需要join(),否则void1和void2将异步执行,这里不会阻塞,也就拿不到执行结果
CompletableFuture.allOf(void1, void2).thenRun(() -> logger.info("success")).join();
// anyOf任一执行完成
CompletableFuture.anyOf(future, future1).thenAccept(System.out::println).join();
}
两者均需要join()或者get()方法配合使用才能达到同步入参的返回值执行后面的操作,否则将异步执行;
// allOf没有join()配合使用,allOf后面的结果集若没有执行完毕则直接执行log.info();结果集继续异步执行,完成后当前异步线程继续执行thenAccept()方法;
CompletableFuture.allOf(future, future1).thenAccept((value) -> {
System.out.println(value);
});
// allOf配合join()使用,线程将会阻塞等待结果集执行完毕获取结果后执行thenAccept();
CompletableFuture.allOf(future, future1).thenAccept((value) -> {
System.out.println(value);
}).join();
CompletableFuture.allOf(future, future1).thenAccept((value) -> {
System.out.println(value);
}).get();
CompletableFuture.allOf(future, future1).thenAccept((value) -> {
System.out.println(value);
}).get(2000, TimeUnit.MILLISECONDS);
log.info("do nothing");
- 异步结果的处理
@Test
public void test3(){
CompletableFuture.supplyAsync(() -> "future 1").thenAccept(s -> log.info(s));
CompletableFuture.supplyAsync(() -> "future 1").thenAcceptAsync(s -> log.info(s),EnvirmentThreadPool.getThreadPool());
String c = CompletableFuture.supplyAsync(() -> "future 1").thenApply(a -> a).join();
try {
String d = CompletableFuture.supplyAsync(() -> "future 1").thenApply(a -> a).get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
2.5 任务串行执行,不处理上一步执行结果
@Test
public void test4(){
CompletableFuture<Integer> integerCompletableFuture =
CompletableFuture.supplyAsync(() -> 3)
.thenCombine(CompletableFuture.supplyAsync(() -> 8), (one, two) -> one + two);
System.out.println(integerCompletableFuture.join());
}
说明: 上面是合并2个串行结果并返回值,下面对应的是无返回值的方法
@Test
public void test4(){
CompletableFuture.supplyAsync(() -> 3)
.thenAcceptBoth(CompletableFuture.supplyAsync(() -> 8), (one, two) -> System.out.println(one + two));
}
前3个方法用于2个任务完成之后串行执行第三个任务,后3个方法用于只要前2个任务有一个完成就开始执行第三个任务,不关心前面的执行结果:
CompletableFuture.supplyAsync(() -> 3)
.runAfterBoth(CompletableFuture.supplyAsync(()->9),
()->System.out.println()
);
CompletableFuture.supplyAsync(() -> 3)
.runAfterEither(CompletableFuture.supplyAsync(()->9),
()->System.out.println()
);
whenComplete
可用于解决回调的问题
说明:当前者任务完成时,将执行结果和异常作为参数传入方法,便于后续处理;
CompletableFuture.supplyAsync(() -> 3).whenComplete((r,throwable)->System.out.println(r));
将前一执行结果作为参数传入后面的任务中
CompletableFuture.supplyAsync(() -> 3).thenCompose(i-> CompletableFuture.supplyAsync(()->i));
与2.9不同的是,此处将前者执行结果包括异常作为参数入参执行后续的方法;可与thenApply方法进行对比,多了对异常的处理
CompletableFuture.supplyAsync(() -> 3).thenApply(r->r+1);
CompletableFuture.supplyAsync(() -> 3).handle((r,throwable)-> r+2);
CompletableFuture<String> handle = CompletableFuture.supplyAsync(() -> 3).handle((r, throwable) -> throwable.getMessage());
CompletableFuture<String> result = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
throw new RuntimeException("hahahahh");
});
String join = result.exceptionally(throwable -> "123").join();
System.out.println(join);
System.out.println(result.isCompletedExceptionally());
System.out.println(result.completeExceptionally(new RuntimeException()));
result.get();
总结
- CompletableFuture的任务是串联的,如果它的其中某一步骤发生异常,会影响后续代码的运行
- run的参数是Runnable,而supply的参数是Supplier。前者没有返回值,而后者有
- apply 有入参和返回值,入参为前置任务的输出
- accept 有入参无返回值,会返回CompletableFuture
- run没有入参也没有返回值,同样会返回CompletableFuture
- combine 形成一个复合的结构,连接两个CompletableFuture,并将它们的2个输出结果,作为combine的输入
- compose 将嵌套的CompletableFuture平铺开,用来串联两个CompletableFuture
参考
CompletableFuture
20个CompletableFuture例子
CompletableFuture与CompletionStage使用
CompletableFuture异步编程
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/142190.html