前言
CompletableFuture是Java 8引入的一个强大的工具类,实现了CompletionStage和Future接口,可以用于执行异步任务、处理任务结果、任务编排等。通过提供丰富的方法来处理异步编程,这些方法允许我们以链式的方式处理异步任务的结果,使得异步编程更加简洁和可控。
一个简单任务编排的例子
我们以一个商品购买全流程为例看看传统任务和CompletableFuture的各自实现。
按上图描述所述,模拟了一个商品购买链路的流程(仅做演示使用,真实流程比这严谨复杂),步骤如下:
-
• 执行获取商品详情任务,获取商品id。
-
• 将商品id传递给询价下单任务,执行创建订单操作,返回订单号。
-
• 将订单号传递给支付任务,执行付款交易操作,返回交易流水号。
-
• 支付任务完成后,可并行执行订单系统更改订单状态任务、会员系统增加积分任务、仓库配货任务(这三个任务之间没有关联性,可异步并行执行)。
-
• 仓库配货任务完成后,执行物流配送任务,配送完成,整个购买流程结束。
传统任务编排实现
public static void testFuture() throws Exception {
//创建线程池
ExecutorService executorService = new ThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));
String pattern = "yyyy-MM-dd HH:mm:ss";
// 获取商品详情任务
FutureTask<String> goodsDetailTask = new FutureTask<>(() -> {
TimeUnit.SECONDS.sleep(1);
String goodsId = "111111";
System.out.println(DateUtil.format(LocalDateTime.now(), pattern) + ":获取商品详情任务执行中,获取到了商品详情,goodsId:" + goodsId);
return goodsId;
});
// 询价下单任务
FutureTask<String> priceOrderTask = new FutureTask<>(() -> {
// 等获取商品详情任务结束返回orderId
String goodsId = goodsDetailTask.get();
TimeUnit.SECONDS.sleep(1);
String orderId = "222222";
System.out.println(DateUtil.format(LocalDateTime.now(), pattern) + ":询价下单任务执行中,根据goodsId:" + goodsId + ",询价完成,下单成功,orderId:" + orderId);
return orderId;
});
// 支付任务
FutureTask<String> paymentTask = new FutureTask<>(() -> {
//等询价下单任务结束,返回orderId
String orderId = priceOrderTask.get();
TimeUnit.SECONDS.sleep(1);
String transId = "333333";
System.out.println(DateUtil.format(LocalDateTime.now(), pattern) + ":支付任务执行中,orderId:" + orderId + ",支付成功,transId:" + transId);
return transId;
});
//订单系统更改订单状态任务
FutureTask<String> changeOrderStatusTask = new FutureTask<>(() -> {
try {
//等支付任务结束,返回orderId
String orderId = paymentTask.get();
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
throw new RuntimeException(e);
}
System.out.println(DateUtil.format(LocalDateTime.now(), pattern) + ":订单状态修改成功");
}, null);
//会员系统增加积分任务
FutureTask<String> vipPointsTask = new FutureTask<>(() -> {
try {
//等支付任务结束,返回orderId
String transId = paymentTask.get();
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
throw new RuntimeException(e);
}
System.out.println(DateUtil.format(LocalDateTime.now(), pattern) + ":会员积分增加成功");
}, null);
//仓库配货任务
FutureTask<String> wmsTask = new FutureTask<>(() -> {
try {
// 等支付任务结束,返回orderId
String transId = paymentTask.get();
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
throw new RuntimeException(e);
}
System.out.println(DateUtil.format(LocalDateTime.now(), pattern) + ":仓库配货成功");
}, null);
//物流配送任务
FutureTask<String> deliveryTask = new FutureTask<>(() -> {
try {
//等仓库配货任务结束
wmsTask.get();
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
throw new RuntimeException(e);
}
System.out.println(DateUtil.format(LocalDateTime.now(), pattern) + ":物流配送成功,订单结束");
}, null);
//将所有的任务提交到线程池
executorService.submit(goodsDetailTask);
executorService.submit(priceOrderTask);
executorService.submit(paymentTask);
executorService.submit(changeOrderStatusTask);
executorService.submit(vipPointsTask);
executorService.submit(wmsTask);
executorService.submit(deliveryTask);
//任务中调用了get方法获取执行结果,等全部任务执行完毕后关闭线程池
executorService.shutdown();
}
输出结果:
2024-01-15 18:40:17:获取商品详情任务执行中,获取到了商品详情,goodsId:111111
2024-01-15 18:40:18:询价下单任务执行中,根据goodsId:111111,询价完成,下单成功,orderId:222222
2024-01-15 18:40:19:支付任务执行中,orderId:222222,支付成功,transId:333333
2024-01-15 18:40:20:订单状态修改成功
2024-01-15 18:40:20:仓库配货成功
2024-01-15 18:40:20:会员积分增加成功
2024-01-15 18:40:21:物流配送成功,订单结束
采用线程池 + FutureTask实现了任务的执行,将所有的FutureTask提交到线程池执行,任务之间如有串联执行的情况,采用的是get方法获取前一个任务结果这种方式。虽然实现了商品购买的流程,可以看到任务之间的参数传递是通过主动调用get方法阻塞等待,任务之间耦合性较强,而且代码量也比较大,接下来我们看一下CompletableFuture是如何实现的。
CompletableFuture任务编排实现
public static void testCompletableFuture() throws Exception {
String pattern = "yyyy-MM-dd HH:mm:ss";
CompletableFuture<String> payFuture = CompletableFuture.supplyAsync(() -> {
ThreadUtil.sleep(1000);
// 获取商品详情任务
String goodsId = "111111";
System.out.println(DateUtil.format(LocalDateTime.now(), pattern) + ":获取商品详情任务执行中,获取到了商品详情,goodsId:" + goodsId);
return goodsId;
}).thenApply(goodsId -> {
// 询价下单任务
ThreadUtil.sleep(1000);
String orderId = "222222";
System.out.println(DateUtil.format(LocalDateTime.now(), pattern) + ":询价下单任务执行中,根据goodsId:" + goodsId + ",询价完成,下单成功,orderId:" + orderId);
return orderId;
}).thenApply(orderId -> {
//支付任务
ThreadUtil.sleep(1000);
String transId = "333333";
System.out.println(DateUtil.format(LocalDateTime.now(), pattern) + ":支付任务执行中,orderId:" + orderId + ",支付成功,transId:" + transId);
return transId;
});
payFuture.thenAcceptAsync(transId -> {
ThreadUtil.sleep(1000);
System.out.println(DateUtil.format(LocalDateTime.now(), pattern) + ":订单状态修改成功");
});
payFuture.thenAcceptAsync(transId -> {
ThreadUtil.sleep(1000);
System.out.println(DateUtil.format(LocalDateTime.now(), pattern) + ":会员积分增加成功");
});
CompletableFuture<String> wmsFuture = payFuture.thenApplyAsync(transId -> {
ThreadUtil.sleep(1000);
System.out.println(DateUtil.format(LocalDateTime.now(), pattern) + ":仓库配货成功");
return transId;
});
CompletableFuture<String> deliveryFuture = wmsFuture.thenApplyAsync(transId -> {
ThreadUtil.sleep(1000);
System.out.println(DateUtil.format(LocalDateTime.now(), pattern) + ":物流配送成功,订单结束");
return transId;
});
System.out.println("最终传递下来的transId:" + deliveryFuture.get());
}
ThreadUtil.sleep方法是hutool包里的方法,已经帮我们捕获了InterruptedException异常,返回值是Boolean类型,true表示正常休眠结束,false表示被中断。
输出结果:
2024-01-15 19:38:33:获取商品详情任务执行中,获取到了商品详情,goodsId:111111
2024-01-15 19:38:34:询价下单任务执行中,根据goodsId:111111,询价完成,下单成功,orderId:222222
2024-01-15 19:38:35:支付任务执行中,orderId:222222,支付成功,transId:333333
2024-01-15 19:38:36:仓库配货成功
2024-01-15 19:38:36:订单状态修改成功
2024-01-15 19:38:36:会员积分增加成功
2024-01-15 19:38:37:物流配送成功,订单结束
最终传递下来的transId:333333
采用CompletableFuture方式首先我们看到的是代码比较清晰,代码量也减少了很多,任务执行先后顺序及参数传递可通过调用方法控制,降低了任务之间的耦合性,是不是简洁而优雅?有没有被香到?那接下来我们就来看看CompletableFuture的妙用。
CompletableFuture的使用
下面一张图理清CompletableFuture的常用方法。
注意:图中所说的有参是指当前的方法执行完成返回新的CompletableFuture,在这个新的CompletableFuture调用其他方法时可将上一个任务结果作为参数向下传递。
CompletableFuture任务的创建
其实一个CompletableFuture对象就是一个任务。
通过构造方法实例化创建
CompletableFuture只有一个无参的构造方法,可以通过这个构造方法创建一个CompletableFuture对象。
CompletableFuture<String> completableFuture = new CompletableFuture<>();
基本上没有场景会直接通过构造方法创建对象,因为虽然实例化了一个对象,也就是一个任务,但这个任务没有执行,如果后续继续调其他的方法而不获取结果的话线程就会结束,新的CompletableFuture调用get获取结果的话,就会阻塞等待前一个任务执行完成。 比如像下面这样:
CompletableFuture<String> completableFuture = new CompletableFuture<>();
CompletableFuture<Void> voidCompletableFuture = completableFuture.thenRun(() -> System.out.println("===================="));
通过构造方法实例化一个CompletableFuture对象completableFuture,然后completableFuture调用thenRun方法,那么不会执行thenRun方法里的任务线程会直接结束,无任何输出。 如果像下面这样加一个获取结果的呢
CompletableFuture<String> completableFuture = new CompletableFuture<>();
CompletableFuture<Void> voidCompletableFuture = completableFuture.thenRun(() -> System.out.println("===================="));
System.out.println(voidCompletableFuture.get());
调用voidCompletableFuture对象的get()方法,则线程会阻塞等待,直到completableFuture这个任务完成为止(调用completableFuture对象的get方法也是如此,阻塞等待completableFuture任务执行完成)。 那么怎样让任务都执行呢,那就是让completableFuture任务执行完成,可以调用对象的complete方法让任务执行结束。
CompletableFuture<String> completableFuture = new CompletableFuture<>();
CompletableFuture<Void> voidCompletableFuture = completableFuture.thenRun(() -> System.out.println("===================="));
completableFuture.complete("我执行结束啦");
System.out.println(voidCompletableFuture.get());
这样程序就会执行结束,输出如下:
====================
null
由于thenRun里的任务是不需要参数也没有返回值的,所以调用get方法时会返回null,后面会讲到。
通过completedFuture方法创建任务
completedFuture方法,参数就是任务的返回值,任务立即执行并返回结果,用于创建一个已经完成的CompletableFuture对象,方法定义如下。
public static <U> CompletableFuture<U> completedFuture(U value) {
return new CompletableFuture<U>((value == null) ? NIL : value);
}
例子:
CompletableFuture<String> future = CompletableFuture.completedFuture("任务已执行。。。");
System.out.println(future.get());
输出:
任务已执行。。。
通过supplyAsync方法创建任务
supplyAsync方法,用于异步执行一个有返回值的任务,有两个重载方法,都有一个Supplier参数,另一个方法多一个Executor参数,可以传自定义个线程池执行任务,这个后面再介绍,方法定义如下:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}
例子:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "我是Supplier。。。");
System.out.println(future.get());
输出:
我是Supplier。。。
通过runAsync方法创建任务
runAsync方法,用于异步执行一个没有返回值的任务,有两个重载方法,都有一个Runnable参数,另一个方法多一个Executor参数,可以传自定义个线程池执行任务,这个后面再介绍,方法定义如下:
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}
public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor) {
return asyncRunStage(screenExecutor(executor), runnable);
}
例子:
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> System.out.println("我是Runnable。。。"));
System.out.println(future.get());
输出:
我是Runnable。。。
null
因为runAsync接收的是Runnable对象,无任务返回值,所以调用get()方法时得到的是null。
异步回调
CompletableFuture任务执行完成后可以通到异步回调的方法继续执行新的任务。常用的方法有以下几种。
thenRun/thenRunAsync(无参无返回值)
这两个方法不接收前一个任务的执行结果,同时新的CompletableFuture也没有返回值。 先看方法定义:
public CompletableFuture<Void> thenRun(Runnable action) {
return uniRunStage(null, action);
}
public CompletableFuture<Void> thenRunAsync(Runnable action) {
return uniRunStage(asyncPool, action);
}
public CompletableFuture<Void> thenRunAsync(Runnable action,
Executor executor) {
return uniRunStage(screenExecutor(executor), action);
}
例子:
ExecutorService executorService = new ThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));
CompletableFuture.supplyAsync(() -> "我是Supplier。。。").thenRun(() -> {
System.out.println(Thread.currentThread().getName() + ":我是thenRun执行的任务。。。");
}).thenRunAsync(() -> {
System.out.println(Thread.currentThread().getName() + ":我是thenRunAsync执行的任务。。。");
}).thenRunAsync(() -> {
System.out.println(Thread.currentThread().getName() + ":我是thenRunAsync执行的任务。。。");
}, executorService);
executorService.shutdown();
输出结果:
main:我是thenRun执行的任务。。。
ForkJoinPool.commonPool-worker-1:我是thenRunAsync执行的任务。。。
pool-1-thread-1:我是thenRunAsync执行的任务。。。
这里可以看到
-
• thenRun执行的方法是跟main方法同一个线程。
-
• 调用的第一个thenRunAsync方法没有传Executor参数,所以默认使用的是ForkJoinPool框架的线程池,
-
• 第二个thenRunAsync方法传了一个自定义的ExecutorService对象,任务是在自定义的线程池中执行的,通过Thread.currentThread().getName()可以看出是哪个线程。
-
• thenRun、thenRunAsync他们都是不接收参数,也没有返回结果的。
注意:在CompletableFuture中我们看到方法以Async结尾的说明任务都是在线程池里执行的
thenAccept/thenAcceptAsync(有参无返回值)
这两个方法接收前一个任务返回的执行结果,同时新的CompletableFuture没有返回值。 先看方法定义:
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
return uniAcceptStage(null, action);
}
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
return uniAcceptStage(asyncPool, action);
}
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
Executor executor) {
return uniAcceptStage(screenExecutor(executor), action);
}
例子:
ExecutorService executorService = new ThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));
CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(() -> "我是Supplier。。。");
supplyFuture.thenAccept(p -> {
System.out.println(Thread.currentThread().getName() + ":我是thenAccept执行的任务。。。我接收的参数是:" + p);
});
supplyFuture.thenAcceptAsync(p -> {
System.out.println(Thread.currentThread().getName() + ":我是thenAcceptAsync执行的任务。。。我接收的参数是:" + p);
});
supplyFuture.thenAcceptAsync(p -> {
System.out.println(Thread.currentThread().getName() + ":我是thenRunAsync执行的任务。。。");
}, executorService);
executorService.shutdown();
输出结果:
main:我是thenAccept执行的任务。。。我接收的参数是:我是Supplier。。。
ForkJoinPool.commonPool-worker-1:我是thenAcceptAsync执行的任务。。。我接收的参数是:我是Supplier。。。
pool-1-thread-1:我是thenRunAsync执行的任务。。。
这里可以看到
-
• thenAccept执行的方法是跟main方法同一个线程,并且参数接收到了supplyFuture这个任务返回的结果。
-
• 调用的第一个thenAcceptAsync方法没有传Executor参数,所以默认使用的是ForkJoinPool框架的线程池,并且参数接收到了supplyFuture这个任务返回的结果。
-
• 第二个thenRunAsync方法传了一个自定义的ExecutorService对象,并且参数接收到了supplyFuture这个任务返回的结果,任务是在自定义的线程池中执行的,通过Thread.currentThread().getName()可以看出是哪个线程。
-
• thenAccept、thenAcceptAsync他们都接收参数,但没有返回值。
thenApply/thenApplyAsync(有参有返回值)
这两个方法接收前一个任务返回的执行结果,同时新的CompletableFuture也有返回值。 先看方法定义:
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn) {
return uniApplyStage(asyncPool, fn);
}
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn, Executor executor) {
return uniApplyStage(screenExecutor(executor), fn);
}
例子:
ExecutorService executorService = new ThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));
CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(() -> "我是Supplier。。。");
CompletableFuture<String> result = supplyFuture.thenApply(p -> {
System.out.println(Thread.currentThread().getName() + ":我是thenApply执行的任务。。。我接收的参数是:" + p);
return "我是thenApply。。。";
}).thenApplyAsync(p -> {
System.out.println(Thread.currentThread().getName() + ":我是thenApplyAsync执行的任务。。。我接收的参数是:" + p);
return "我是thenApplyAsync。。。";
}).thenApplyAsync(p -> {
System.out.println(Thread.currentThread().getName() + ":我是thenApplyAsync执行的任务。。。我接收的参数是:" + p);
return "我是thenApplyAsync使用自定义线程池返回的结果。。。";
}, executorService);
System.out.println(result.get());
executorService.shutdown();
输出结果:
main:我是thenApply执行的任务。。。我接收的参数是:我是Supplier。。。
ForkJoinPool.commonPool-worker-1:我是thenApplyAsync执行的任务。。。我接收的参数是:我是thenApply。。。
pool-1-thread-1:我是thenApplyAsync执行的任务。。。我接收的参数是:我是thenApplyAsync。。。
我是thenApplyAsync使用自定义线程池返回的结果。。。
这里可以看到
-
• thenApply接收了supplyFuture任务的返回结果作为参数,同时自己执行的任务也返回了新的结果。
-
• 调用的第一个thenApplyAsync方法没有传Executor参数,所以默认使用的是ForkJoinPool框架的线程池,并且参数接收到了thenApply方法执行任务的结果,同时自己执行的任务也返回了新的结果。
-
• 第二个thenApplyAsync方法传了一个自定义的ExecutorService对象,并且参数接收到了前一个thenApplyAsync这个任务返回的结果,任务是在自定义的线程池中执行的,通过Thread.currentThread().getName()可以看出是哪个线程,同时自己执行的任务也返回了新的结果。。
-
• 最终调用result对象的get方法,获取到了带自定义线程池的thenApplyAsync方法执行的结果。
-
• thenApply、thenApplyAsync他们都接收参数,都有返回值
任务的组合
and组合关系任务
thenCombine/thenCombineAsync(有参有返回值)
这两个方法接收前一个任务返回的执行结果,并与新的CompletableFuture返回结果进行一个合并处理再返回一个新的结果。 先看方法定义:
public <U,V> CompletableFuture<V> thenCombine(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(null, other, fn);
}
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(asyncPool, other, fn);
}
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
return biApplyStage(screenExecutor(executor), other, fn);
}
例子:
ExecutorService executorService = new ThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));
CompletableFuture<String> result = CompletableFuture.supplyAsync(() -> "我是Supplier。。。")
.thenCombine(CompletableFuture.completedFuture("我是completedFuture的结果"), (t, r) -> t + r)
.thenCombineAsync(CompletableFuture.completedFuture("我是thenCombineAsync。。。"), (t, r) -> t + r)
.thenCombineAsync(CompletableFuture.completedFuture("我是thenCombineAsync with executor。。。"), (t, r) -> t + r, executorService);
System.out.println(result.get());
executorService.shutdown();
输出结果:
我是Supplier。。。我是completedFuture的结果我是thenCombineAsync。。。我是thenCombineAsync with executor。。。
解释:
-
• thenCombine、thenCombineAsync都是将当前任务的执行结果和新任务的执行结果合并处理后再返回一个新的结果。只不过thenCombineAsync是在异步线程里执行的。
-
• 调用thenCombine方法时,参数是一个新的CompletableFuture,任务结果合并处理的方案是将两个字符串相连。
-
• 调用thenCombineAsync方法时,将前一个thenCombine执行结果和参数新的CompletableFuture结果再次相连。
-
• 调用第二个thenCombineAsync方法时,处理跟第一个thenCombineAsync方法相同,也是将结果相连,只不过是在自定义的线程中执行。
-
• 所以最终的结果就是将这几个CompletableFuture的结果合并字符串相连。
-
• thenCombine、thenCombineAsync都是将参数新的CompletableFuture执行结果和当前任务CompletableFuture的结果合并处理并返回一个新的CompletableFuture带返回值。
thenAcceptBoth/thenAcceptBothAsync(有参无返回值)
这两个方法接收前一个任务返回的执行结果,并与新的CompletableFuture返回结果进行一个合并处理后不再返回结果。 先看方法定义:
public <U> CompletableFuture<Void> thenAcceptBoth(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action) {
return biAcceptStage(null, other, action);
}
public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action) {
return biAcceptStage(asyncPool, other, action);
}
public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action, Executor executor) {
return biAcceptStage(screenExecutor(executor), other, action);
}
例子:
ExecutorService executorService = new ThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));
CompletableFuture<Void> result = CompletableFuture.supplyAsync(() -> "我是Supplier。。。")
.thenAcceptBoth(CompletableFuture.completedFuture("我是thenAcceptBoth的结果"), (t, r) -> System.out.println(t + r))
.thenAcceptBothAsync(CompletableFuture.completedFuture("我是thenAcceptBothAsync。。。"), (t, r) -> System.out.println(r))
.thenAcceptBothAsync(CompletableFuture.completedFuture("我是thenAcceptBothAsync with executor。。。"), (t, r) -> {
System.out.println(r);
}, executorService);
System.out.println(result.get());
executorService.shutdown();
执行结果:
我是Supplier。。。我是thenAcceptBoth的结果
我是thenAcceptBothAsync。。。
我是thenAcceptBothAsync with executor。。。
null
解释:
-
• 第一个调用supplyAsync方法返回的CompletableFuture结果是字符串“我是Supplier。。。”,然后调用thenAcceptBoth给传递了一个新的CompletableFuture,返回结果是“我是thenAcceptBoth的结果”BiConsumer处理两个任务的方式时两个字符串相加,输出“我是Supplier。。。我是thenAcceptBoth的结果”,然后thenAcceptBoth不返回结果(也可以说是返回Void)。
-
• 在继续调用第一个thenAcceptBothAsync时,新的CompletableFuture返回字符串“我是thenAcceptBothAsync。。。”,由于thenAcceptBoth返回的是Void,所以这个BiConsumer处理逻辑就是直接输出新的CompletableFuture结果“我是thenAcceptBothAsync。。。”,完成后也没有返回值(也可以说是返回Void)。
-
• 第二个thenAcceptBothAsync调用时,传递了自定义的线程池,所以是在新的线程里执行的任务,由于第一个thenAcceptBothAsync返回值是Void,参数里新的CompletableFuture返回值是“我是thenAcceptBothAsync with executor。。。”,所以这个BiConsumer处理逻辑就是直接输出新的CompletableFuture结果“我是thenAcceptBothAsync with executor。。。”,完成后也没有返回值(也可以说是返回Void)。所以最后调用get方法获取值打印结果就是null。
-
• thenAcceptBoth/thenAcceptBothAsync他们可以接受两个任务的结果并处理返回新的CompletableFuture,只是结果是Void,但这也是一种类型。
runAfterBoth/runAfterBothAsync(无参无返回值)
这两个方法都不接收前一个CompletableFuture返回的结果,第一个参数是新的CompletableFuture,当这两个CompletableFuture执行完成后,再执行第二个参数Runnable。
先看方法定义:
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
Runnable action) {
return biRunStage(null, other, action);
}
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action) {
return biRunStage(asyncPool, other, action);
}
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action,
Executor executor) {
return biRunStage(screenExecutor(executor), other, action);
}
例子:
ExecutorService executorService = new ThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));
CompletableFuture<Void> result = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + ":supplyAsync方法里的CompletableFuture执行完成");
return "我是Supplier。。。";
})
.runAfterBoth(CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName() + ":runAfterBoth方法里的CompletableFuture执行完成");
}), () -> System.out.println(Thread.currentThread().getName() + ":runAfterBoth执行完成"))
.runAfterBothAsync(CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName() + ":第一个runAfterBothAsync方法里的CompletableFuture执行完成");
}), () -> {
System.out.println(Thread.currentThread().getName() + ":第一个runAfterBothAsync执行完成");
})
.runAfterBothAsync(CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName() + ":第二个runAfterBothAsync方法里的CompletableFuture执行完成");
}), () -> {
System.out.println(Thread.currentThread().getName() + ":第二个runAfterBothAsync执行完成");
}, executorService);
System.out.println(result.get());
executorService.shutdown();
输出结果:
ForkJoinPool.commonPool-worker-1:supplyAsync方法里的CompletableFuture执行完成
ForkJoinPool.commonPool-worker-1:runAfterBoth方法里的CompletableFuture执行完成
main:runAfterBoth执行完成
ForkJoinPool.commonPool-worker-1:第一个runAfterBothAsync方法里的CompletableFuture执行完成
ForkJoinPool.commonPool-worker-1:第一个runAfterBothAsync执行完成
ForkJoinPool.commonPool-worker-1:第二个runAfterBothAsync方法里的CompletableFuture执行完成
pool-1-thread-1:第二个runAfterBothAsync执行完成
null
解释:runAfterBoth、runAfterBothAsync这两个方法都是当前CompletableFuture和第一个参数里的CompletableFuture这两个任务都执行完成后,再执行第二个参数的Runnable。
or组合关系任务
applyToEither/applyToEitherAsync(有参有返回值)
表示当前的CompletableFuture和调用方法里的参数CompletableFuture,这两个CompletableFuture谁先执行完成就用谁的返回结果,调用applyToEither/applyToEitherAsync方法返回新的CompletableFuture是有返回值的。
例子:
CompletableFuture<String> r1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + ":------- r1 before-------");
ThreadUtil.sleep(3000);
System.out.println(Thread.currentThread().getName() + ":-------r1 after-------");
return "r1";
});
CompletableFuture<String> r2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + ":-------r2 before-------");
ThreadUtil.sleep(1000);
System.out.println(Thread.currentThread().getName() + ":-------r2 after-------");
return "r2";
});
CompletableFuture<String> future = r1.applyToEitherAsync(r2, t -> {
System.out.println(Thread.currentThread().getName() + ":result========" + t);
return t + "--------";
});
System.out.println(Thread.currentThread().getName() + ":" + r1.get());
System.out.println(Thread.currentThread().getName() + ":" + r2.get());
System.out.println(Thread.currentThread().getName() + ":" + future.get());
输出结果:
ForkJoinPool.commonPool-worker-1:------- r1 before-------
ForkJoinPool.commonPool-worker-2:-------r2 before-------
ForkJoinPool.commonPool-worker-2:-------r2 after-------
ForkJoinPool.commonPool-worker-2:result========r2
ForkJoinPool.commonPool-worker-1:-------r1 after-------
main:r1
main:r2
main:r2--------
解释:
-
• r1这个任务休眠3s,r2这个任务休眠1s,所以休眠先前打印了before日志。
-
• r2执行结束后立即打印了r2 after日志,由于r2先于r1任务执行结束,所以applyToEitherAsync方法的第二个参数Function就会使用r2返回值并处理返回新值,打印result========r2,返回新值“r2——–”。
-
• r1,r2任务的get正常返回各自的值并打印。
-
• future这个任务返回的是applyToEitherAsync方法第二个参数Function处理r2结果的值:”r2——–“。
acceptEither/acceptEitherAsync(有参无返回值)
表示当前的CompletableFuture和调用方法里的参数CompletableFuture,这两个CompletableFuture谁先执行完成就用谁的返回结果,但调用acceptEither/acceptEitherAsync方法返回新的CompletableFuture是没有返回值的。
例子:
CompletableFuture<String> r1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + ":------- r1 before-------");
ThreadUtil.sleep(3000);
System.out.println(Thread.currentThread().getName() + ":-------r1 after-------");
return "r1";
});
CompletableFuture<String> r2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + ":-------r2 before-------");
ThreadUtil.sleep(1000);
System.out.println(Thread.currentThread().getName() + ":-------r2 after-------");
return "r2";
});
CompletableFuture<Void> future = r1.acceptEitherAsync(r2, t -> {
System.out.println(Thread.currentThread().getName() + ":result========" + t);
});
System.out.println(Thread.currentThread().getName() + ":" + r1.get());
System.out.println(Thread.currentThread().getName() + ":" + r2.get());
System.out.println(Thread.currentThread().getName() + ":" + future.get());
输出结果:
ForkJoinPool.commonPool-worker-1:------- r1 before-------
ForkJoinPool.commonPool-worker-2:-------r2 before-------
ForkJoinPool.commonPool-worker-2:-------r2 after-------
ForkJoinPool.commonPool-worker-2:result========r2
ForkJoinPool.commonPool-worker-1:-------r1 after-------
main:r1
main:r2
main:null
这里我们看到最终调用r1对象的acceptEitherAsync后生成新的future对象是没有返回值的,这里的泛型给了Void。
runAfterEither/runAfterEitherAsync(无参无返回值)
表示当前的CompletableFuture和调用方法里的参数CompletableFuture,这两个CompletableFuture只要有一个执行完成即可,不管他们没有返回值(返回了也不用),然后调用runAfterEither/runAfterEitherAsync方法返回新的CompletableFuture也是没有返回值的。
例子:
CompletableFuture<String> r1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + ":------- r1 before-------");
ThreadUtil.sleep(3000);
System.out.println(Thread.currentThread().getName() + ":-------r1 after-------");
return "r1";
});
CompletableFuture<String> r2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + ":-------r2 before-------");
ThreadUtil.sleep(1000);
System.out.println(Thread.currentThread().getName() + ":-------r2 after-------");
return "r2";
});
CompletableFuture<Void> future = r1.runAfterEitherAsync(r2, () -> {
System.out.println(Thread.currentThread().getName() + ":runAfterEitherAsync方法调用啦,没有返回结果");
});
System.out.println(Thread.currentThread().getName() + ":" + r1.get());
System.out.println(Thread.currentThread().getName() + ":" + r2.get());
System.out.println(Thread.currentThread().getName() + ":" + future.get());
输出结果:
ForkJoinPool.commonPool-worker-1:------- r1 before-------
ForkJoinPool.commonPool-worker-2:-------r2 before-------
ForkJoinPool.commonPool-worker-2:-------r2 after-------
ForkJoinPool.commonPool-worker-2:runAfterEitherAsync方法调用啦,没有返回结果
ForkJoinPool.commonPool-worker-1:-------r1 after-------
main:r1
main:r2
main:null
虽然r1和r2返回了结果,但是当调用r1.runAfterEitherAsync(r2,Runnable对象)方法时,不管这个结果有没有,第二个参数是Runnable对象都不接收和处理,并且Runnable对象执行后也不返回任何值。
任务执行结果和异常处理
get方法
get方法用来获取CompletableFuture执行结果,get方法抛出了检查时异常,需要应用程序主动捕获或继续抛出。
public T get() throws InterruptedException, ExecutionException {
Object r;
return reportGet((r = result) == null ? waitingGet(true) : r);
}
public T get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
Object r;
long nanos = unit.toNanos(timeout);
return reportGet((r = result) == null ? timedGet(nanos) : r);
}
例子:
CompletableFuture<String> r1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + ":------- 执行完成啦 -------");
return "r1";
});
try {
System.out.println(r1.get());
} catch (InterruptedException e) {
// 处理中断异常逻辑
e.printStackTrace();
} catch (ExecutionException e) {
// 处理执行异常逻辑
e.printStackTrace();
}
输出结果:
ForkJoinPool.commonPool-worker-1:------- 执行完成啦 -------
r1
调用get方式时会阻塞等待CompletableFuture的执行结果,如果在等待过程中CompletableFuture的执行逻辑被中断或执行出问题则抛出对应的异常,程序可根据异常做相应的处理。 另外一个get方法提供了等待超时的参数,也增加了一个需要处理的异常,如果在指定的等待超时时间还没有获取到结果,则抛出TimeoutException异常。
CompletableFuture<String> r1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + ":------- 执行完成啦 -------");
ThreadUtil.sleep(5000);
return "r1";
});
try {
System.out.println(r1.get(1, TimeUnit.SECONDS));
} catch (InterruptedException e) {
// 处理中断异常逻辑
e.printStackTrace();
} catch (ExecutionException e) {
// 处理执行异常逻辑
e.printStackTrace();
} catch (TimeoutException e) {
// 超时异常逻辑
e.printStackTrace();
}
输出:
ForkJoinPool.commonPool-worker-1:------- 执行完成啦 -------
Exception in thread "main" java.lang.RuntimeException: java.util.concurrent.TimeoutException
at com.star95.study.CompletableFutureTest.main(CompletableFutureTest.java:235)
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
at com.star95.study.CompletableFutureTest.main(CompletableFutureTest.java:227)
还有一个getNow(T valueIfAbsent)方法用来立即获取任务执行结果,如果任务没有执行完成则直接返回默认值。
join方法
join方法也是用来获取CompletableFuture执行结果,但join方法不会抛出检查时异常,他会抛出运行时异常。
CompletableFuture<String> r1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + ":------- 执行完成啦 -------");
ThreadUtil.sleep(5000);
return "r1";
});
System.out.println(r1.join());
输出:
ForkJoinPool.commonPool-worker-1:------- 执行完成啦 -------
r1
代码改成如下:
CompletableFuture<String> r1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + ":------- 执行完成啦 -------");
ThreadUtil.sleep(5000);
return "r1";
});
//取消任务
r1.cancel(true);
System.out.println(r1.join());
可以调用cancel方法取消任务的执行,则会抛出CancellationException。
Exception in thread "main" java.util.concurrent.CancellationException
at java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2276)
at com.star95.study.CompletableFutureTest.main(CompletableFutureTest.java:226)
exceptionally方法
exceptionally方法用于处理CompletableFuture任务在执行过程中出错抛出的异常,同时他也返回跟任务相同的结果类型。 例子:
CompletableFuture<String> r1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + ":------- 执行完成啦 -------");
System.out.println(1 / 0);
return "r1";
}).exceptionally(e-> "程序执行出错,异常原因:" + e);
System.out.println(r1.join());
输出:
ForkJoinPool.commonPool-worker-1:------- 执行完成啦 -------
程序执行出错,异常原因:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
handle/handleAsync方法
handle、handleAsync都是用来处理结果的,只是handleAsync是异步执行而已,他接收任务返回值和异常两个参数,程序可根据异常这个参数来判断任务执行是否出现异常,如异常则走对应的异常处理逻辑,这时任务的返回值就为null,无异常则说明任务正常返回了结果值,可根据实际情况处理返回结果值或者新值。 例子:
CompletableFuture<String> r1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + ":------- 执行完成啦 -------");
System.out.println(1 / 0);
return "r1";
}).handleAsync((v,e) -> {
if (e != null) {
System.out.println("程序执行出错,异常原因:" + e);
}
return v;
});
System.out.println(r1.join());
输出结果:
ForkJoinPool.commonPool-worker-1:------- 执行完成啦 -------
程序执行出错,异常原因:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
null
-
• 这里输出了异常,handleAsync处理时打印了异常,并把任务的执行结果返回了(返回值为null)。
-
• 如果把“System.out.println(1 / 0);”这个异常注释掉,则handleAsync会正常返回任务的执行结果。
-
• handleAsync方法可自行处理任务返回结果,可返回原任务的执行结果,也可以自定义返回结果,返回类型相同即可。
whenComplete/whenCompleteAsync方法
whenComplete、whenCompleteAsync这两个方法是任务执行完成后执行的回调方法,只是whenCompleteAsync是异步执行而已,虽然他们执行后也返回一个CompletableFuture对象,但是这个CompletableFuture返回的是上一个任务的执行结果。换一个角度来说,whenComplete、whenCompleteAsync他们是在上一个任务执行后被回调执行,返回的是上一个任务的结果,他们不对任务结果产生影响。
例子:
CompletableFuture<String> r1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + ":------- 执行完成啦 -------");
return "r1";
});
CompletableFuture<String> r2 = r1.whenCompleteAsync((v,e) -> {
if (e != null) {
System.out.println("程序执行出错,异常原因:" + e);
}
});
System.out.println("---" + r1.join());
System.out.println("+++" + r2.join());
结果:
ForkJoinPool.commonPool-worker-1:------- 执行完成啦 -------
---r1
+++r1
r2是r1调用whenCompleteAsync方法后生成的新的CompletableFuture对象,但是r2的返回结果其实就是r1的返回结果。
总结
CompletableFuture提供了跟灵活的任务处理方法。总结起来有以下几点:
-
• 任务可灵活选择是在当前线程执行,还是在新的线程里执行,默认是ForkJoinPool线程池,也可以传递自定义线程池。
-
• CompletableFuture基本上大部分方法执行结果都返回了一个新的CompletableFuture对象,基于这一点就可以实现链式调用。
-
• 可灵活的实现任务编排功能,比如任务的串行、并行都可以自由编排,相比于传统线程池的异步任务更灵活可控。
-
• 任务执行结果处理更灵活,可同时处理任务返回的结果和异常执行相应的逻辑。
对ForkJoinPool框架感兴趣的,可点击以下文章跳转阅读:
欢迎关注公众号,欢迎分享、点赞、在看
原文始发于微信公众号(小新成长之路):谁说异步任务难了?送你一份CompletableFuture玩转指南
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/238274.html