0 概述
-
代码以同步的方式执行:可以看到下图,task1、task2 执行加起来一共需要 25s,有没有一种更快的方式呢?
-
代码以异步的方式执行:可以看到,我只需要以耗时最长的 task 为最长时间即可,速度提升了 10s
-
在真实的开发场景中,许多业务很繁杂,需要查询很多张表或者处理逻辑很麻烦,因此使用同步的方式很有可能会造成阻塞等待,性能严重下降。因此,可以使用 Java8 引入的 CompletableFuture 进行异步任务编排,提高程序的性能。下面我会结合公司里常用的方法进行案例说明。
-
官方文档地址(JDK17版本):https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/CompletableFuture.html#%3Cinit%3E()
-
继承/实现关系图:
1 使用场景及好处
-
并行流操作:与并行流结合,允许在多个线程中异步执行流操作,提高处理大数据集的效率
-
异步任务处理:可以轻松地处理异步任务,避免阻塞主线程
-
组合多个任务:它可以方便地组合多个任务的执行结果,实现任务的串行或并行组合
-
错误处理:提供了丰富的异常处理机制,使得异步代码更加健壮
-
超时处理:允许设置超时时间,当任务无法在规定时间内完成,可以执行一个默认的操作,避免长时间等待
-
链式调用:支持链式调用,简化了多个任务之间的依赖关系
2 Java5 及以前创建任务的方式
-
使用 Thread 类来创建线程任务:
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
// 这里写入需要执行的任务逻辑
}
});
thread.start();
-
通过实现 Runnable 接口来创建线程任务:
Runnable task = new Runnable() {
@Override
public void run() {
// 这里写入需要执行的任务逻辑
}
};
Thread thread = new Thread(task);
thread.start();
-
使用 Lambda 表达式(仅限于 JDK8+):
Thread thread = new Thread(() -> {
// 这里写入需要执行的任务逻辑
});
thread.start();
-
使用 ExecutorService 和 Callable / Future 对象来管理线程池并获取返回结果:
public class FutureTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newSingleThreadExecutor();
// 线程池 submit 提交,执行任务,能获取返回值
Future<Double> future = executorService.submit(() -> {
System.out.println(Thread.currentThread().getName() + " start, time = " + TimeUnit.NANOSECONDS.toSeconds(System.currentTimeMillis()));
try {
Thread.sleep(1000);
} catch (Exception e) {
log.error("异常:{}", e.getMessage(), e);
}
System.out.println(Thread.currentThread().getName() + " exit, time = " + TimeUnit.NANOSECONDS.toSeconds(System.currentTimeMillis()));
return 1.10;
});
Double get = future.get();
System.out.println(get);
}
/**
* 输出:
* pool-2-thread-1 start, time = 1706
* pool-2-thread-1 exit, time = 1706
* 1.1
*/
}
-
以上方式的弊端:
-
代码冗余和复杂性: 每次需要创建新的线程任务时,都需要编写大量的代码来实现 Runnable 接口或继承 Thread 类,这增加了代码的冗余和复杂性。
-
线程管理困难: 需要手动管理线程的生命周期,包括线程的创建、启动、停止等操作,这增加了代码的复杂性和出错的可能性。
-
异常处理困难: 在实现 Runnable 接口或继承 Thread 类时,需要处理线程运行过程中可能出现的异常,这增加了代码的复杂性和出错的可能性。
-
不支持并发编程: Java 5 及之前的线程模型不支持并发编程,这限制了程序在处理大量并发任务时的性能。
3 创建任务的核心方法
-
无返回值的 runAsync,或者说返回值为:
CompletableFuture<Void>
-
runAsync(Runnable runnable, Executor executor)
-
runAsync(Runnable runnable)
-
有返回值的 supplyAsync,返回值为:
CompletableFuture<U>
-
supplyAsync(Supplier<U> supplier)
-
supplyAsync(Supplier<U> supplier, Executor executor)
-
说明:如果没有指定线程池,默认使用
ForkJoinPool.commonPool()
,源码如下:
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) {
return asyncRunStage(screenExecutor(executor), runnable);
}
static Executor screenExecutor(Executor e) {
if (!USE_COMMON_POOL && e == ForkJoinPool.commonPool())
return ASYNC_POOL;
if (e == null) throw new NullPointerException();
return e;
}
-
说明:关于线程池的相关知识,下一篇文章再详细讲解
4 任务异步回调的核心方法
-
前置说明:
以下的方法都有重载的方法,参数会多一个 Executor,当调用 thenXxx() 方法时,第一个任务和第二个任务使用的是同一个线程池;当调用 thenXxxAsync() 方法时,第一个任务使用的是自己指定的线程池(如果传了的话),第二个任务使用的是 ForkJoinPool.commonPool() 的线程池
-
无传参 + 无返回值: 任务前后依次执行,且都没有传参和返回值
-
thenRun(Runnable action)
-
thenRunAsync(Runnable action)
import java.util.concurrent.*;
@SuppressWarnings("ALL")
public class MyCompletableFuture {
private final static ExecutorService myExecutorService = new ThreadPoolExecutor(
3, // 核心线程大小
5, // 最大线程大小
5, // 存活时间
TimeUnit.SECONDS, // 存活时间单位
new ArrayBlockingQueue<>(1000), // 阻塞队列
Executors.defaultThreadFactory(), // 线程工厂
new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
);
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture.supplyAsync(() -> {
System.out.println("第一个任务先执行");
return "需要一个返回值";
}, myExecutorService).thenRun(() -> {
System.out.println("然后第二个任务执行");
});
}
/**
* 输出:
* 第一个任务先执行
* 然后第二个任务执行
*/
}
-
有传参 + 有返回值: 将上一个任务的结果作为参数传递给下一个任务,且任务执行完有返回值
-
thenApply(Function<? super T,? extends U> fn)
-
thenApplyAsync(Function<? super T,? extends U> fn)
import java.util.concurrent.*;
@SuppressWarnings("ALL")
public class MyCompletableFuture {
private final static ExecutorService myExecutorService = new ThreadPoolExecutor(
3, // 核心线程大小
5, // 最大线程大小
5, // 存活时间
TimeUnit.SECONDS, // 存活时间单位
new ArrayBlockingQueue<>(1000), // 阻塞队列
Executors.defaultThreadFactory(), // 线程工厂
new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
);
// supplyAsync + thenApply 异步回调,supplyAsync 的返回值作为 thenApply 的入参
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " start, time = " + TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()));
try {
Thread.sleep(1000);
} catch (Exception e) {
log.error("异常:{}", e.getMessage(), e);
}
System.out.println(Thread.currentThread().getName() + " exit, time = " + TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()));
return "supplyAsync";
}, myExecutorService).thenApply((result) -> {
return result + " + thenApply";
});
System.out.println(future.get());
}
/**
* 输出:
* pool-1-thread-1 start, time = 1706537482
* pool-1-thread-1 exit, time = 1706537483
* supplyAsync + thenApply
*/
}
-
有传参 + 无返回值: 将上一个任务的结果作为参数传给下一个任务,但下一个任务执行后无返回值
-
thenAccept(Consumer<? super T> action)
-
thenAcceptAsync(Consumer<? super T> action)
import java.util.concurrent.*;
@SuppressWarnings("ALL")
public class MyCompletableFuture {
private final static ExecutorService myExecutorService = new ThreadPoolExecutor(
3, // 核心线程大小
5, // 最大线程大小
5, // 存活时间
TimeUnit.SECONDS, // 存活时间单位
new ArrayBlockingQueue<>(1000), // 阻塞队列
Executors.defaultThreadFactory(), // 线程工厂
new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
);
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " start, time = " + TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()));
try {
Thread.sleep(1000);
} catch (Exception e) {
log.error("异常:{}", e.getMessage(), e);
}
System.out.println(Thread.currentThread().getName() + " exit, time = " + TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()));
return "supplyAsync";
}, myExecutorService).thenApply((result) -> {
return result + " + thenApply";
}).thenAccept((result) -> {
// thenApply的返回值作为thenAccept的入参
System.out.println(result + " + " + "thenAccept");
});
}
/**
* 输出:
* pool-1-thread-1 start, time = 1706537548
* pool-1-thread-1 exit, time = 1706537549
* supplyAsync + thenApply + thenAccept
*/
}
-
exceptionlly
:任务执行异常后,抛出异常作为参数传递给回调方法
import java.util.concurrent.*;
@SuppressWarnings("ALL")
public class MyCompletableFuture {
private final static ExecutorService myExecutorService = new ThreadPoolExecutor(
3, // 核心线程大小
5, // 最大线程大小
5, // 存活时间
TimeUnit.SECONDS, // 存活时间单位
new ArrayBlockingQueue<>(1000), // 阻塞队列
Executors.defaultThreadFactory(), // 线程工厂
new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
);
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " start, time = " + TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()));
try {
Thread.sleep(1000);
} catch (Exception e) {
log.error("异常:{}", e.getMessage(), e);
}
int i = 1 / 0;
System.out.println(Thread.currentThread().getName() + " exit, time = " + TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()));
return "supplyAsync";
}, myExecutorService).exceptionally((e) -> {
// 类似于 try...catch
return "异常:" + e;
});
System.out.println(future.get());
System.out.println("end");
myExecutorService.shutdown();
}
/**
* 输出:
* pool-1-thread-1 start, time = 1706538066
* 异常:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
* end
*/
}
5 多任务组合
-
图示:
-
allOf()
:等所有 Future 任务完成,再执行剩余逻辑 -
anyOf()
:只要有一个 Future 任务完成,就继续执行剩余逻辑 -
说明:
1、需要使用:
get()
或join()
方法等所有任务完成
2、他们的异同点在于:
a. get() 会阻塞当前线程直到异步计算完成,而 join() 不阻塞当前线程
b. 调用 get() 时,会抛出异常,并会传播到调用 get() 的线程
c. 调用 join() 会返回一个包含异常的 CompletionException -
注意: 在使用 get() 方法时,由于其是阻塞的,强烈建议添加超时时间,如果在设置的超时时间内,异步计算还没有完成,那么就会抛出一个
TimeoutException
异常
import java.util.concurrent.*;
@SuppressWarnings("ALL")
public class MyCompletableFuture {
private final static ExecutorService myExecutorService = new ThreadPoolExecutor(
3, // 核心线程大小
5, // 最大线程大小
5, // 存活时间
TimeUnit.SECONDS, // 存活时间单位
new ArrayBlockingQueue<>(1000), // 阻塞队列
Executors.defaultThreadFactory(), // 线程工厂
new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
);
public static void main(String[] args) {
System.out.println("开始时间:" + LocalDateTimeUtil.now());
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
return "1";
}, myExecutorService);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return "2";
}, myExecutorService);
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
return "3";
}, myExecutorService);
CompletableFuture<Void> allFuture = CompletableFuture.allOf(future1, future2, future3);
try {
List<String> collect = Stream.of(future1, future2, future3).map(CompletableFuture::join).collect(Collectors.toList());
System.out.println(collect);
} catch (Exception e) {
log.error("异常:{}", e.getMessage(), e);
} finally {
myExecutorService.shutdown();
}
System.out.println("结束时间:" + LocalDateTimeUtil.now());
}
}
/**
* 输出:
* 开始时间:2024-01-29T22:30:07.535476700
* [1, 2, 3]
* 结束时间:2024-01-29T22:30:07.540478200
*/
-
在项目的业务中有一个接口需要查询 20+ 张表的数据,并汇总后返回给前端,使用同步的方式性能太差了,因此我使用了上述 CompletableFuture 中的多任务组合的方式进行异步调用,当所有 Future 任务执行完成后,再进行剩余的数据处理逻辑,性能提高了不止一点,接口速度响应比之前快了好几倍。
-
以上就是 CompletableFuture 的常用方法了,小伙伴们在真实场景中还使用过别的方法吗?可以给我留言讨论哦~
-
创作不易,感谢阅读,若遇到问题,可以关注此微信公众号留言反馈,希望能够帮助到您。
原文始发于微信公众号(EzCoding):这才是 CompletableFuture 的正确打开方式!!!
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/203626.html