1. 什么是异步编程
在很多时候,我们在进程中使用单一线程从头到尾地执行程序,这种简单模式会导致性能和用户体验另人难以接受。
比如程序向另外一台服务器发出请求,由于网络等外部原因,此种通信任务往往会耗费大量时间,进程如果在此期间仅仅只能等待网络或网络上其他机器的响应,将严重地降低了性能。程序不应该浪费等待的时间,而应该更加高效地利用,在等待的时间执行其他任务,回复到达后在继续执行第一个任务。
如果程序调用某个方法,等待其执行全部处理后才能继续执行,我们称其为同步的。相反,在处理完成之前就返回调用方法则是异步的。我们在编程语言的流程中添加了异步控制的部分,这部分的编程可以称之为异步编程。 异步编程传统的解决方案:回调函数和事件监听。
对于从来没有接触过编程的人来说,异步其实比线性更贴近人类的直觉。现实世界的组织结构本身就是由大量的并行线/进程靠异步所结合起来的。
2. CompletableFuture 大杀器
在Java8中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法。
异步处理的本质其实就是回调(系统层借助于指针来实现,准确来说是函数指针),用户提供一个回调方法,回调函数不是由该函数的实现方直接调用,而是在特定的事件或条件发生时由另外的一方调用的,用于对该事件或条件进行响应。从“宏观”来看,CompletableFuture的实现其实很简单,就是回调,即在任务执行完成之后进行回调,回调中可能涉及到其他操作,比如下一个回调或者执行下一个任务。
3. CompletableFuture 实现机制
先抛开 CompletableFuture 不谈,如果程序中使用了线程池,如何才能在某个任务执行完成之后执行某些动作呢?其实Java线程池本身已经提供了任务执行前后的hook方法(beforeExecute和afterExecute),如下:
public class ThreadPoolExecutor extends AbstractExecutorService {
// ...
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
// ...
}
我们只需要自定义线程池继承ThreadPoolExecutor ,然后重写beforeExecute和afterExecute方法即可,在afterExecute里可以执行一些动作。
public class ListenableThreadPoolExecutor extends ThreadPoolExecutor {
public ListenableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public ListenableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public ListenableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public ListenableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
@Setter
private IListenable listenable;
private ConcurrentHashMap<Object, Object> argHashMap = new ConcurrentHashMap<>();
@SuppressWarnings("all")
public void execute(Runnable command, Object arg) {
Runnable task = () -> command.run();
argHashMap.put(task, arg);
execute(task);
}
@SuppressWarnings("all")
public <T> Future<T> submit(Callable<T> task, Object arg) {
if (task == null) {
throw new NullPointerException();
}
RunnableFuture<T> ftask = newTaskFor(task);
argHashMap.put(ftask, arg);
execute(ftask);
return ftask;
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
if (this.listenable == null) {
return;
}
this.listenable.beforeExecute(t, r, this.argHashMap.get(r));
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
Object arg = this.argHashMap.remove(r);
if (this.listenable == null) {
return;
}
if (r instanceof Future) {
this.listenable.callableCallback((Future) r, r, t);
} else {
this.listenable.runnableCallback(r, t);
}
this.listenable.afterCallback(this, arg, r, t);
}
}
那么CompletableFuture 的实现机制是怎样的呢?其实,和上面的所说的“afterExecute机制”是类似的(本质是一样的,回调机制),也是在任务执行完成后执行某些动作,如下代码:
CompletableFuture.supplyAsync(() -> {
// callable任务
System.out.println("hello world");
return "result";
}).thenApply(r -> {
// 任务完成之后的动作(回调方法),类似于ThreadPoolExecutor.afterExecute方法
System.out.println(r);
return r;
});
上面的示例代码其实主要完成了3个步骤,这3个步骤其实也是CompletableFuture的实现流程:
-
执行任务 -
添加任务完成之后的动作(回调方法) -
执行回调
3.1. 执行任务
执行任务的主要逻辑就是 AsyncSupply.run 方法:
public void run() {
CompletableFuture<T> d; Supplier<T> f;
// dep是当前CompletableFuture,fn是任务执行逻辑
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
if (d.result == null) {
try {
// 1 任务执行 & result cas设置
d.completeValue(f.get());
} catch (Throwable ex) {
// 1.1 result cas异常设置
d.completeThrowable(ex);
}
}
// 2 任务完成,可能涉及到回调的执行
d.postComplete();
}
}
3.2. 回调方法
添加回调方法的流程是从 thenApply 开始的:
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
private <V> CompletableFuture<V> uniApplyStage(
Executor e, Function<? super T,? extends V> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<V> d = new CompletableFuture<V>();
if (e != null || !d.uniApply(this, f, null)) {
// 当上一个CompletableFuture未完成时,将该CompletableFuture添加
// 到上一个CompletableFuture的statck中
UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
push(c);
c.tryFire(SYNC);
}
return d;
}
3.3. 执行回调
执行回调是从CompletableFuture.postComplete 开始的:
final void postComplete() {
/*
* On each step, variable f holds current dependents to pop
* and run. It is extended along only one path at a time,
* pushing others to avoid unbounded recursion.
*/
CompletableFuture<?> f = this; Completion h;
while ((h = f.stack) != null ||
(f != this && (h = (f = this).stack) != null)) {
CompletableFuture<?> d; Completion t;
// cas设置h.next到当前CompletableFuture.statck
if (f.casStack(h, t = h.next)) {
if (t != null) {
if (f != this) {
pushStack(h);
continue;
}
h.next = null; // detach
}
f = (d = h.tryFire(NESTED)) == null ? this : d;
}
}
}
// UniAccept
final CompletableFuture<Void> tryFire(int mode) {
CompletableFuture<Void> d; CompletableFuture<T> a;
if ((d = dep) == null ||
!d.uniAccept(a = src, fn, mode > 0 ? null : this)) // 执行回调
return null;
dep = null; src = null; fn = null;
// 返回当前CompletableFuture 或者 递归调用postComplete
return d.postFire(a, mode);
}
看完上面3个步骤,是不是还不太清楚多个CompletableFuture之间的执行流程呢,下面我们换个例子并给出图示来看:

下面代码是CompletableFuture及其Completion关系对应的代码:
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
System.out.println("hello world f1");
sleep(1); // TimeUnit.SECONDS.sleep(1)
return "result f1";
});
CompletableFuture<String> f2 = f1.thenApply(r -> {
System.out.println(r);
sleep(1);
return "f2";
});
CompletableFuture<String> f3 = f2.thenApply(r -> {
System.out.println(r);
sleep(1);
return "f2";
});
CompletableFuture<String> f4 = f1.thenApply(r -> {
System.out.println(r);
sleep(1);
return "f2";
});
CompletableFuture<String> f5 = f4.thenApply(r -> {
System.out.println(r);
sleep(1);
return "f2";
});
CompletableFuture<String> f6 = f5.thenApply(r -> {
System.out.println(r);
sleep(1);
return "f2";
});
结合上图和postComplete流程,可以看出执行回调的顺序是:f1 -> f4 -> f5 -> f6 -> f2 -> f3。
4. CompletableFuture 基本使用
Java8新增的CompletableFuture 提供对异步计算的支持,可以通过回调的方式处理计算结果。
4.1. CompletableFuture 的创建
runAsync 和 supplyAsync
一般情况下我们都是让其他线程或者线程池来执行future这些异步任务。除了直接创建CompletableFuture 对象外(不推荐这样使用),可以使用如下4个方法创建CompletableFuture 对象:
// runAsync是Runnable任务,不带返回值的,如果入参有executor,则使用executor来执行异步任务
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
// supplyAsync是待返回结果的异步任务
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
使用示例:
// 使用示例
CompletableFuture.runAsync(() -> {
System.out.println("hello world");
}, executor);
CompletableFuture.supplyAsync(() -> {
System.out.println("hello world");
return "result";
});
如果入参不带executor,则默认使用ForkJoinPool.commonPool()作为执行异步任务的线程池;否则使用executor执行任务。
4.2. CompletableFuture 的完成动作
whenComplete

public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
使用示例:
// 使用示例
CompletableFuture.supplyAsync(() -> {
System.out.println("hello world");
return "result";
}).whenCompleteAsync((result, e) -> {
System.out.println(result + " " + e);
}).exceptionally((e) -> {
System.out.println("exception " + e);
return "exception";
});
action是Action类型,从上面可以看出它既可以处理正常返回值也可以处理异常,whenComplete会在任务执行完成后直接在当前线程内执行action动作,后缀带Async的方法是交给其他线程执行action(如果是线程池,执行action的可能和之前执行异步任务的是同一个线程),入参带executor的交给executor线程池来执行action动作,当发生异常时,会在当前线程内执行exceptionally方法。

handle
除了用上面的whenComplete来执行完成动作之外,还可以使用handle方法,该方法可以返回一个新的CompletableFuture的返回类型。
public <U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)
使用示例:
// handle方法示例:
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
System.out.println("hello world");
return "result";
});
CompletableFuture<Integer> f2 = f1.handle((r, e) -> {
System.out.println("handle");
return 1;
});
除了使用handle方法来执行CompletableFuture返回类型转换之外,还可以使用thenApply方法,二者不同的是前者会处理正常返回值和异常,因此可以屏蔽异常,避免继续抛出;而后者只能处理正常返回值,一旦有异常就会抛出。
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
使用示例:
// thenApply方法示例:
CompletableFuture.supplyAsync(() -> {
System.out.println("hello world");
return "result";
}).thenApply((r) -> {
System.out.println(r);
return "aaa";
}).thenApply((r) -> {
System.out.println(r);
return 1;
});
thenAccept
注意,上面的handle、thenApply都是返回新的CompletableFuture类型,如果只是为了在CompletableFuture完成之后执行某些消费动作,而不返回新的CompletableFuture类型,则可以使用thenAccept方法。
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
使用示例:
// thenAccept方法示例:
CompletableFuture.supplyAsync(() -> {
System.out.println("hello world");
return "result";
}).thenAccept(r -> {
System.out.println(r);
}).thenAccept(r -> {
// 这里的r为Void(null)了
System.out.println(r);
});
thenAcceptBoth

上面的handle、thenApply和thenAppept都是对上一个CompletableFuture执行完的结果进行某些操作。那么可不可以同时对2个CompletableFuture执行结果执行某些操作呢?其实也是可以的,使用thenAppeptBoth方法即可。注意,thenAppeptBoth和handle/thenApply/thenAppep的流程是一样的,只不过thenAppeptBoth中包含了另一个CompletableFuture对象(注意,这里另一个CompletableFuture对象的执行可并不是上一个CompletableFuture执行结束才开始执行的)。
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor)
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)
使用示例:
// thenAcceptBoth方法示例:
CompletableFuture.supplyAsync(() -> {
System.out.println("hello world");
return "result";
}).thenAcceptBoth(CompletableFuture.completedFuture("result2"), (r1, r2) -> {
System.out.println(r1 + "-" + r2);
});
thenCombine
注意,thenAcceptBoth方法是没有返回值的(CompletableFuture
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
使用示例:
// thenCombine方法示例
CompletableFuture.supplyAsync(() -> {
System.out.println("hello world");
return "result";
}).thenCombine(CompletableFuture.completedFuture("result2"), (r1, r2) -> {
System.out.println(r1 + "-" + r2);
return r1 + "-" + r2;
});
acceptEither 和 applyToEither

thenAcceptBoth和runAfterBoth是当两个CompletableFuture都计算完成,而下面的方法是当任意一个CompletableFuture计算完成的时候就会执行。
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)
allOf 和 anyOf


如果当想在多个CompletableFuture都计算完成或者多个CompletableFuture中的一个计算完成后执行某个动作,可使用方法 allOf 和 anyOf。
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
thenRun
如果当任务完成时并不想用CompletableFuture的结果,可以使用thenRun方法来执行一个Runnable。
public CompletableFuture<Void> thenRun(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
thenCompose
以上方法都是在方法中返回一个值(或者不返回值),其实还可以返回一个CompletableFuture,是不是很像类的组合一样。
public <U> CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor)
使用示例:
// thenCompose方法示例:
CompletableFuture.supplyAsync(() -> {
System.out.println("hello world");
return "result";
}).thenCompose(r -> {
System.out.println(r);
return CompletableFuture.supplyAsync(() -> {
System.out.println(r + " result2");
return r + " result2";
});
});
// 上面的代码和下面的代码效果是一样的
CompletableFuture.supplyAsync(() -> {
System.out.println("hello world");
return "result";
}).thenApply(r -> {
System.out.println(r);
return r;
}).thenApplyAsync(r -> {
System.out.println(r + " result2");
return r + " result2";
});
原文始发于微信公众号(白菜说技术):CompletableFuture 异步编程设计的大杀器
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/172953.html