这才是 CompletableFuture 的正确打开方式!!!

0 概述

  • 代码以同步的方式执行:可以看到下图,task1、task2 执行加起来一共需要 25s,有没有一种更快的方式呢?

  • 这才是 CompletableFuture 的正确打开方式!!!

  • 代码以异步的方式执行:可以看到,我只需要以耗时最长的 task 为最长时间即可,速度提升了 10s

  • 这才是 CompletableFuture 的正确打开方式!!!

  • 在真实的开发场景中,许多业务很繁杂,需要查询很多张表或者处理逻辑很麻烦,因此使用同步的方式很有可能会造成阻塞等待,性能严重下降。因此,可以使用 Java8 引入的 CompletableFuture 进行异步任务编排,提高程序的性能。下面我会结合公司里常用的方法进行案例说明。

  • 官方文档地址(JDK17版本):https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/CompletableFuture.html#%3Cinit%3E()

  • 继承/实现关系图:

  • 这才是 CompletableFuture 的正确打开方式!!!


    这才是 CompletableFuture 的正确打开方式!!!

1 使用场景及好处

  • 并行流操作:与并行流结合,允许在多个线程中异步执行流操作,提高处理大数据集的效率

  • 异步任务处理:可以轻松地处理异步任务,避免阻塞主线程

  • 组合多个任务:它可以方便地组合多个任务的执行结果,实现任务的串行或并行组合

  • 错误处理:提供了丰富的异常处理机制,使得异步代码更加健壮

  • 超时处理:允许设置超时时间,当任务无法在规定时间内完成,可以执行一个默认的操作,避免长时间等待

  • 链式调用:支持链式调用,简化了多个任务之间的依赖关系


2 Java5 及以前创建任务的方式

  • 使用 Thread 类来创建线程任务:

Thread thread = new Thread(new Runnable() {
@Override
public void run() {
// 这里写入需要执行的任务逻辑
}
});
thread.start();


  • 通过实现 Runnable 接口来创建线程任务:

Runnable task = new Runnable() {
@Override
public void run() {
// 这里写入需要执行的任务逻辑
}
};
Thread thread = new Thread(task);
thread.start();


  • 使用 Lambda 表达式(仅限于 JDK8+):

Thread thread = new Thread(() -> {
// 这里写入需要执行的任务逻辑
});
thread.start();


  • 使用 ExecutorService 和 Callable / Future 对象来管理线程池并获取返回结果:

public class FutureTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newSingleThreadExecutor();
// 线程池 submit 提交,执行任务,能获取返回值
Future<Double> future = executorService.submit(() -> {
System.out.println(Thread.currentThread().getName() + " start, time = " + TimeUnit.NANOSECONDS.toSeconds(System.currentTimeMillis()));
try {
Thread.sleep(1000);
} catch (Exception e) {
log.error("异常:{}", e.getMessage(), e);
}
System.out.println(Thread.currentThread().getName() + " exit, time = " + TimeUnit.NANOSECONDS.toSeconds(System.currentTimeMillis()));
return 1.10;
});
Double get = future.get();
System.out.println(get);
}
/**
* 输出:
* pool-2-thread-1 start, time = 1706
* pool-2-thread-1 exit, time = 1706
* 1.1
*/
}


  • 以上方式的弊端:

    • 代码冗余和复杂性: 每次需要创建新的线程任务时,都需要编写大量的代码来实现 Runnable 接口或继承 Thread 类,这增加了代码的冗余和复杂性。

    • 线程管理困难: 需要手动管理线程的生命周期,包括线程的创建、启动、停止等操作,这增加了代码的复杂性和出错的可能性。

    • 异常处理困难: 在实现 Runnable 接口或继承 Thread 类时,需要处理线程运行过程中可能出现的异常,这增加了代码的复杂性和出错的可能性。

    • 不支持并发编程: Java 5 及之前的线程模型不支持并发编程,这限制了程序在处理大量并发任务时的性能。

3 创建任务的核心方法

  • 无返回值的 runAsync,或者说返回值为:CompletableFuture<Void>

    • runAsync(Runnable runnable, Executor executor)

    • runAsync(Runnable runnable)

  • 有返回值的 supplyAsync,返回值为:CompletableFuture<U>

    • supplyAsync(Supplier<U> supplier)

    • supplyAsync(Supplier<U> supplier, Executor executor)

  • 说明:如果没有指定线程池,默认使用 ForkJoinPool.commonPool(),源码如下:

public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) {
return asyncRunStage(screenExecutor(executor), runnable);
}

static Executor screenExecutor(Executor e) {
if (!USE_COMMON_POOL && e == ForkJoinPool.commonPool())
return ASYNC_POOL;
if (e == null) throw new NullPointerException();
return e;
}


  • 说明:关于线程池的相关知识,下一篇文章再详细讲解


4 任务异步回调的核心方法

  • 前置说明:

    以下的方法都有重载的方法,参数会多一个 Executor,当调用 thenXxx() 方法时,第一个任务和第二个任务使用的是同一个线程池;当调用 thenXxxAsync() 方法时,第一个任务使用的是自己指定的线程池(如果传了的话),第二个任务使用的是 ForkJoinPool.commonPool() 的线程池

  • 无传参 + 无返回值: 任务前后依次执行,且都没有传参和返回值


    这才是 CompletableFuture 的正确打开方式!!!

    • thenRun(Runnable action)

    • thenRunAsync(Runnable action)

import java.util.concurrent.*;

@SuppressWarnings("ALL")
public class MyCompletableFuture {

private final static ExecutorService myExecutorService = new ThreadPoolExecutor(
3, // 核心线程大小
5, // 最大线程大小
5, // 存活时间
TimeUnit.SECONDS, // 存活时间单位
new ArrayBlockingQueue<>(1000), // 阻塞队列
Executors.defaultThreadFactory(), // 线程工厂
new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
);

public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture.supplyAsync(() -> {
System.out.println("第一个任务先执行");
return "需要一个返回值";
}, myExecutorService).thenRun(() -> {
System.out.println("然后第二个任务执行");
});
}

/**
* 输出:
* 第一个任务先执行
* 然后第二个任务执行
*/
}


  • 有传参 + 有返回值: 将上一个任务的结果作为参数传递给下一个任务,且任务执行完有返回值

    • thenApply(Function<? super T,? extends U> fn)

    • thenApplyAsync(Function<? super T,? extends U> fn)

import java.util.concurrent.*;

@SuppressWarnings("ALL")
public class MyCompletableFuture {

private final static ExecutorService myExecutorService = new ThreadPoolExecutor(
3, // 核心线程大小
5, // 最大线程大小
5, // 存活时间
TimeUnit.SECONDS, // 存活时间单位
new ArrayBlockingQueue<>(1000), // 阻塞队列
Executors.defaultThreadFactory(), // 线程工厂
new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
);

// supplyAsync + thenApply 异步回调,supplyAsync 的返回值作为 thenApply 的入参
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " start, time = " + TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()));
try {
Thread.sleep(1000);
} catch (Exception e) {
log.error("异常:{}", e.getMessage(), e);
}
System.out.println(Thread.currentThread().getName() + " exit, time = " + TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()));
return "supplyAsync";
}, myExecutorService).thenApply((result) -> {
return result + " + thenApply";
});
System.out.println(future.get());
}

/**
* 输出:
* pool-1-thread-1 start, time = 1706537482
* pool-1-thread-1 exit, time = 1706537483
* supplyAsync + thenApply
*/
}


  • 有传参 + 无返回值: 将上一个任务的结果作为参数传给下一个任务,但下一个任务执行后无返回值

    • thenAccept(Consumer<? super T> action)

    • thenAcceptAsync(Consumer<? super T> action)

import java.util.concurrent.*;

@SuppressWarnings("ALL")
public class MyCompletableFuture {

private final static ExecutorService myExecutorService = new ThreadPoolExecutor(
3, // 核心线程大小
5, // 最大线程大小
5, // 存活时间
TimeUnit.SECONDS, // 存活时间单位
new ArrayBlockingQueue<>(1000), // 阻塞队列
Executors.defaultThreadFactory(), // 线程工厂
new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
);

public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " start, time = " + TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()));
try {
Thread.sleep(1000);
} catch (Exception e) {
log.error("异常:{}", e.getMessage(), e);
}
System.out.println(Thread.currentThread().getName() + " exit, time = " + TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()));
return "supplyAsync";
}, myExecutorService).thenApply((result) -> {
return result + " + thenApply";
}).thenAccept((result) -> {
// thenApply的返回值作为thenAccept的入参
System.out.println(result + " + " + "thenAccept");
});
}

/**
* 输出:
* pool-1-thread-1 start, time = 1706537548
* pool-1-thread-1 exit, time = 1706537549
* supplyAsync + thenApply + thenAccept
*/
}


  • exceptionlly:任务执行异常后,抛出异常作为参数传递给回调方法

import java.util.concurrent.*;

@SuppressWarnings("ALL")
public class MyCompletableFuture {

private final static ExecutorService myExecutorService = new ThreadPoolExecutor(
3, // 核心线程大小
5, // 最大线程大小
5, // 存活时间
TimeUnit.SECONDS, // 存活时间单位
new ArrayBlockingQueue<>(1000), // 阻塞队列
Executors.defaultThreadFactory(), // 线程工厂
new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
);

public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " start, time = " + TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()));
try {
Thread.sleep(1000);
} catch (Exception e) {
log.error("异常:{}", e.getMessage(), e);
}
int i = 1 / 0;
System.out.println(Thread.currentThread().getName() + " exit, time = " + TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()));
return "supplyAsync";
}, myExecutorService).exceptionally((e) -> {
// 类似于 try...catch
return "异常:" + e;
});
System.out.println(future.get());
System.out.println("end");
myExecutorService.shutdown();
}

/**
* 输出:
* pool-1-thread-1 start, time = 1706538066
* 异常:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
* end
*/
}


5 多任务组合

  • 图示:

    这才是 CompletableFuture 的正确打开方式!!!

  • allOf():等所有 Future 任务完成,再执行剩余逻辑

  • anyOf():只要有一个 Future 任务完成,就继续执行剩余逻辑

  • 说明:

    1、需要使用:get()join() 方法等所有任务完成
    2、他们的异同点在于:
          a. get() 会阻塞当前线程直到异步计算完成,而 join() 不阻塞当前线程
          b. 调用 get() 时,会抛出异常,并会传播到调用 get() 的线程
          c. 调用 join() 会返回一个包含异常的 CompletionException

  • 注意: 在使用 get() 方法时,由于其是阻塞的,强烈建议添加超时时间,如果在设置的超时时间内,异步计算还没有完成,那么就会抛出一个TimeoutException 异常

import java.util.concurrent.*;

@SuppressWarnings("ALL")
public class MyCompletableFuture {

private final static ExecutorService myExecutorService = new ThreadPoolExecutor(
3, // 核心线程大小
5, // 最大线程大小
5, // 存活时间
TimeUnit.SECONDS, // 存活时间单位
new ArrayBlockingQueue<>(1000), // 阻塞队列
Executors.defaultThreadFactory(), // 线程工厂
new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
);

public static void main(String[] args) {
System.out.println("开始时间:" + LocalDateTimeUtil.now());
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
return "1";
}, myExecutorService);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return "2";
}, myExecutorService);
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
return "3";
}, myExecutorService);
CompletableFuture<Void> allFuture = CompletableFuture.allOf(future1, future2, future3);
try {
List<String> collect = Stream.of(future1, future2, future3).map(CompletableFuture::join).collect(Collectors.toList());
System.out.println(collect);
} catch (Exception e) {
log.error("异常:{}", e.getMessage(), e);
} finally {
myExecutorService.shutdown();
}
System.out.println("结束时间:" + LocalDateTimeUtil.now());
}
}

/**
* 输出:
* 开始时间:2024-01-29T22:30:07.535476700
* [1, 2, 3]
* 结束时间:2024-01-29T22:30:07.540478200
*/
  • 在项目的业务中有一个接口需要查询 20+ 张表的数据,并汇总后返回给前端,使用同步的方式性能太差了,因此我使用了上述 CompletableFuture 中的多任务组合的方式进行异步调用,当所有 Future 任务执行完成后,再进行剩余的数据处理逻辑,性能提高了不止一点,接口速度响应比之前快了好几倍。

  • 以上就是 CompletableFuture 的常用方法了,小伙伴们在真实场景中还使用过别的方法吗?可以给我留言讨论哦~

  • 创作不易,感谢阅读,若遇到问题,可以关注此微信公众号留言反馈,希望能够帮助到您。


原文始发于微信公众号(EzCoding):这才是 CompletableFuture 的正确打开方式!!!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/203626.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!