大家好,我是栗子为
上次我们介绍了CompletableFuture的一些基本方法,我们也知道CompletableFuture具有Future接口的优点,同时又提供了更强大的API,这一篇我们就来彻底学完CompletableFuture的方法,小为也会举些🌰来帮助大家理解。
我们一起来学今天的内容
01
—
什么是同步任务和异步任务?
-
同步任务是指程序执行一个请求的时候,会一直等待其返回结果后,才继续执行 -
异步任务是程序执行一个方法时,不会阻塞执行流程,程序正常往下执行。当执行完毕时,能够获得执行完毕的返回结果 -
同步任务会阻塞,异步任务不会有阻塞
02
—
CompletableFuture实现异步编程
复习一下上一讲JUC之CompletableFuture(一)的内容,我们先看一个🌰
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程是:" + Thread.currentThread().getName() + ",开始运行咯~");
int result = ThreadLocalRandom.current().nextInt(10);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("一秒后输出结果为:" + result);
return result;
}).whenComplete((v, e) -> {
if (e == null) {
System.out.println("whenComplete方法拿到的结果是:" + v);
}
}).exceptionally(e -> {
e.printStackTrace();
System.out.println("异常情况:" + e.getCause() + "t" + e.getMessage());
return null;
});
System.out.println(Thread.currentThread().getName() + "线程在执行其他任务");
System.out.println(completableFuture.get());
}
}
输出结果
当前线程是:ForkJoinPool.commonPool-worker-9,开始运行咯~
main线程在执行其他任务
一秒后输出结果为:0
whenComplete方法拿到的结果是:0
0
这是异步编程的思想,利用了whenComplete方法处理完成后的逻辑,对前端有一定了解的同学,这一块很像Promise中的then和catch
利用这种方式,就可以减少轮询带来的资源损耗,当任务处理完成后,会返回结果;当发生异常的时候也能捕获异常
CompletableFuture的优点
异步任务结束时,会回调正常的业务逻辑 主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行 异步任务出错时,会回调出错后的业务逻辑
03
—
CompletableFuture常用的方法
将CompletableFuture常用的方法按如下分类进行介绍
1.获得结果和触发计算
获取结果
-
public T get() -
public T get(long timeout,TimeUnit unit) -
public T join() -
public T getNow(T valueIfAbsent):计算完返回计算完的结果,否则返回默认值
主动触发计算
-
public boolean complete(T value):当get方法被阻塞的时候,complete方法结束阻塞并将值赋给get方法
举个🌰
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程是:" + Thread.currentThread().getName() + ",开始运行咯~");
int result = ThreadLocalRandom.current().nextInt(10);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("一秒后输出结果为:" + result);
return result;
}).whenComplete((v, e) -> {
if (e == null) {
System.out.println("whenComplete方法拿到的结果是:" + v);
}
}).exceptionally(e -> {
e.printStackTrace();
System.out.println("异常情况:" + e.getCause() + "t" + e.getMessage());
return null;
});
System.out.println(Thread.currentThread().getName() + "线程在执行其他任务");
System.out.println(completableFuture.get());
System.out.println(completableFuture.complete(21));
}
运行结果如下
当前线程是:ForkJoinPool.commonPool-worker-9,开始运行咯~
main线程在执行其他任务
一秒后输出结果为:3
whenComplete方法拿到的结果是:3
3
false
可以发现,get方法执行完,程序未被阻塞,所以complete方法返回false;否则若程序被get方法阻塞,complete方法返回true,且get方法的返回值为complete方法的参数
System.out.println(completableFuture.complete(21) + "----" + completableFuture.get());
其结果如下
true----21
2.对计算结果进行处理
-
thenApply -
handle
相同点:计算结果存在依赖关系,任务串行执行
不同点:thenApply当前步骤有异常就会停止执行;handle有异常也可以往下走,根据带的异常参数可以进一步处理
举个🌰
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("第一阶段完成,返回1");
return 1;
}, threadPool).thenApply(f -> {
System.out.println("第二阶段完成,结果加2");
return f + 2;
}).thenApply(f -> {
System.out.println("第三阶段完成,结果加3");
return f + 3;
}).
whenComplete((v, e) -> {
if (e == null) {
System.out.println("whenComplete方法拿到的结果是:" + v);
}
}).exceptionally(e -> {
e.printStackTrace();
System.out.println("异常情况:" + e.getCause() + "t" + e.getMessage());
return null;
});
System.out.println(Thread.currentThread().getName() + "线程在执行其他任务");
System.out.println("异步任务得到的最终结果为:" + completableFuture.get());
threadPool.shutdown();
}
结果如下
main线程在执行其他任务
第一阶段完成,返回1
第二阶段完成,结果加2
第三阶段完成,结果加3
whenComplete方法拿到的结果是:6
异步任务得到的最终结果为:6
3.对计算结果进行消费
接收任务的处理结果,并进行消费,无返回值
-
thenAccept
4.对计算速度进行选择
比较两个异步任务,处理速度较快的那个任务,该方法返回一个CompletableFuture,同样可以用join方法拿到异步处理的结果
-
applyToEither
举个🌰
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> player1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "一号玩家";
});
CompletableFuture<String> player2 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "二号玩家";
});
CompletableFuture<Object> result = player1.applyToEither(player2, f -> f + "速度更快");
System.out.println("异步任务拿到先前任务的结果为:" + result.join());
}
结果如下
异步任务拿到先前任务的结果为:一号玩家速度更快
5.对计算结果进行合并
两个CompletionStage(对CompletionStage不清楚的小伙伴可以先看上一讲内容)任务都完成后,最终能把两个任务的结果一起交给thenCombine来处理,先完成的任务等待其他分支任务完成
-
thenCombine
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "线程开始启动");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10;
});
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "线程开始启动");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 20;
});
CompletableFuture<Integer> result = completableFuture1.thenCombine(completableFuture2, (x, y) -> {
System.out.println("开始合并两个异步任务的结果");
return x + y;
});
System.out.println(Thread.currentThread().getName() + "线程处理最终异步任务合并的结果:" + result.join());
}
输出结果为
ForkJoinPool.commonPool-worker-9线程开始启动
ForkJoinPool.commonPool-worker-2线程开始启动
开始合并两个异步任务的结果
main线程处理最终异步任务合并的结果:30
补充
三种方法顺序执行的问题:
thenRun:任务A执行完执行B,B不需要A的结果 thenAccept:任务A执行完执行B,B需要A的结果,但是B无返回值 thenApply:任务A执行完执行B,B需要A的结果,同时B有返回值 关于线程池的选择(thenXxx方法与thenXxxAsync方法区别):
如果没有指定自定义线程池,则采用默认的线程池ForkJoinPool
当给第一个任务指定了自定义线程池时,
a. 调用thenRun方法执行第二个任务时,第二个任务和第一个任务共用一个线程池
b. 调用thenRunAsync方法执行第二个任务时,则第一个任务使用自定义的线程池,第二个任务使用默认的ForkJoinPool
有时候处理太快,系统优化切换原则,直接采用main线程进行处理
04
—
实战
需求
一个异步的场景,我们需要在不同电商平台搜索出同一款商品的价格
返回
返回是同一个产品在不同平台的价格列表,即返回了一个List<String>
思路
-
在一个平台查完再查另一个平台 -
利用异步编程的思想,各任务同时执行
具体实现
首先我们搭建出业务场景的基本框架
import lombok.Getter;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
public class MallDemo {
static List<Mall> mallList = Arrays.asList(new Mall("淘宝"),
new Mall("京东"),
new Mall("唯品会"),
new Mall("天猫"));
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
List<String> list1 = xxx(mallList, "六只栗子"); // xxx处填写静态方法名
for (String s : list1) {
System.out.println(s);
}
long endTime = System.currentTimeMillis();
System.out.println("搜索一共花费了" + (endTime - startTime) + "毫秒");
}
}
class Mall {
@Getter
private String mallName;
public Mall(String mallName) {
this.mallName = mallName;
}
public double getPrice(String productName) {
try {
// 模拟搜索到该商品要花1s
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 模拟价格
return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);
}
}
Mall类代表各大电商平台,定义了一个方法用来模拟从该平台中查询得到的价格,并将其返回
Step-by-step的方式
我们可以采用一个接着一个搜索的方式,其静态方法如下:
public static List<String> handleByStep(List<Mall> mallList, String productName) {
return mallList
.stream()
.map(mall -> String.format("在%s,%s的售价是%.2f", mall.getMallName(), productName, mall.getPrice(productName)))
.collect(Collectors.toList());
}
返回结果如下:
在淘宝,六只栗子的售价是20845.95
在京东,六只栗子的售价是20846.36
在唯品会,六只栗子的售价是20845.86
在天猫,六只栗子的售价是20846.52
搜索一共花费了4109毫秒
可以看到,在每个平台大约是1秒,总共花费时间是在4秒左右
异步编程的方式
利用所学的CompletableFuture实现异步来解决这一业务场景
public static List<String> handleByCompletableFuture(List<Mall> mallList, String productName) {
return mallList.stream()
.map(mall -> CompletableFuture.supplyAsync(() -> String.format("在%s,%s的售价是%.2f", mall.getMallName(), productName, mall.getPrice(productName))))
.collect(Collectors.toList())
.stream()
.map(s -> s.join()) // 此处join方法等同于get方法
.collect(Collectors.toList());
}
同样对异步编程的方式,查看其运行时间,结果如下:
在淘宝,六只栗子的售价是20845.10
在京东,六只栗子的售价是20846.03
在唯品会,六只栗子的售价是20846.40
在天猫,六只栗子的售价是20845.30
搜索一共花费了1105毫秒
可以发现异步编程的方式,其用时不会随着任务数变多而增加
补充:
join() 与get() 区别在于join() 返回计算的结果或者抛出一个unchecked异常(CompletionException),而get() 返回一个具体的异常.
05
—
总结
好了,关于CompletableFuture的内容就是以上这些了,一些特殊的场景如同时进行多个任务、多个结果完成后再一起进行处理等都需要我们对并发编程有一定的了解,这能大大提高我们开发的效率,相信大家看完能够学会在项目中使用并发编程。今天就给大家分享到这,我们下次再见~
关注六只栗子,面试不迷路。
作者 栗子为
编辑 一口栗子
原文始发于微信公众号(六只栗子):JUC之CompletableFuture(二)
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/88553.html