场景
Java中ExecutorService线程池的使用(Runnable和Callable多线程实现):
Java中ExecutorService线程池的使用(Runnable和Callable多线程实现)_霸道流氓气质的博客-CSDN博客_executorservice线程池使用
Java中使用CountDownLatch实现并发流程控制:
Java中使用CountDownLatch实现并发流程控制_霸道流氓气质的博客-CSDN博客_countdownlatch模拟100w并发
在上面使用Future以及进行并发流程控制之外,可使用Java8新增的CompletableFuture。
Future的局限性,它没法直接对多个任务进行链式、组合等处理,需要借助并发工具类才能完成,
实现逻辑比较复杂。
CompletableFuture是对Future的扩展和增强。CompletableFuture实现了Future接口,
并在此基础上进行了丰富的扩展,完美弥补了Future的局限性,同时CompletableFuture实现了对任务编排的能力。
借助这项能力,可以轻松地组织不同任务的运行顺序、规则以及方式。
注:
博客:
https://blog.csdn.net/badao_liumang_qizhi
实现
创建异步任务
1、runAsync是创建没有返回值的异步任务
ExecutorService executorService = Executors.newSingleThreadExecutor();
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
try {
System.out.println("执行异步操作。。。");
Thread.sleep((long) (Math.random() * 10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}, executorService);
System.out.println("结果:"+voidCompletableFuture.get());
2、supplyAsync是创建带有返回值的异步任务
ExecutorService executorService = Executors.newSingleThreadExecutor();
CompletableFuture<String> uCompletableFuture = CompletableFuture.supplyAsync(() -> {
try {
System.out.println("执行异步操作。。。");
Thread.sleep((long) (Math.random() * 10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return "badao";
},executorService);
System.out.println("结果:"+uCompletableFuture.get());
3、注意上面两个新建时使用的线程池用的两种方式
使用默认线程池或者使用自定义线程池的重载方法可以自己选择。
一个是使用默认线程池(ForkJoinPool.commonPool())的方法,一个是带有自定义线程池的重载方法
获取任务结果的方法
1、get()如果完成则返回结果,否则就抛出具体的异常
ExecutorService executorService = Executors.newSingleThreadExecutor();
// CompletableFuture<Integer> uCompletableFuture = CompletableFuture.supplyAsync(() -> {
// System.out.println("执行异步操作。。。");
// return 1/0;
// },executorService);
//System.out.println("结果:"+uCompletableFuture.get());//抛出异常
CompletableFuture<String> uCompletableFuture1 = CompletableFuture.supplyAsync(() -> {
try {
System.out.println("执行异步操作。。。");
Thread.sleep((long) (Math.random() * 10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return "badao";
},executorService);
2、get(long timeout, TimeUnit unit) 最大时间等待返回结果,否则就抛出具体异常
3、join()
join()和get()方法都是用来获取CompletableFuture异步之后的返回值。
join()方法抛出的是uncheck异常(即未经检查的异常),不会强制开发者抛出。
get()方法抛出的是经过检查的异常。
异步回调处理
1、thenApply和thenApplyAsync
thenApply 表示某个任务执行完成后执行的动作,即回调方法,
会将该任务的执行结果即方法返回值作为入参传递到回调方法中,并且回调方法带有返回值。
CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("张三100米用时13秒");
return 13;
});
CompletableFuture<Integer> integerCompletableFuture1 = integerCompletableFuture.thenApply((result) -> {
System.out.println("张三的平均速度为:" + 100 / result);
return 100 / result;
});
System.out.println(integerCompletableFuture.get());//13
System.out.println(integerCompletableFuture1.get());//7
2、thenAccept/thenAcceptAsync
thenAccept方法表示,第一个任务执行完成后,执行第二个回调方法任务,
会将该任务的执行结果,作为入参,传递到回调方法中,但是回调方法是没有返回值的。
CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("张三100米用时13秒");
return 13;
});
CompletableFuture<Void> integerCompletableFuture1 = integerCompletableFuture.thenAccept((result) -> {
System.out.println("张三的平均速度为:" + 100 / result);
});
System.out.println(integerCompletableFuture.get());//13
System.out.println(integerCompletableFuture1.get());//null
3、thenRun/thenRunAsync
thenRun方法,通俗点讲就是,做完第一个任务后,再做第二个任务。
某个任务执行完成后,执行回调方法;但是前后两个任务没有参数传递,第二个任务也没有返回值。
CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("张三100米用时13秒");
return 13;
});
CompletableFuture<Void> integerCompletableFuture1 = integerCompletableFuture.thenRun(() -> {
System.out.println("裁判记录张三分数");
});
System.out.println(integerCompletableFuture.get());//13
System.out.println(integerCompletableFuture1.get());//null
4、 whenComplete 和 whenCompleteAsync
whenComplete是当某个任务执行完成后执行的回调方法,会将执行结果或者执行期间抛出的异常传递给回调方法,
如果是正常执行则异常为null,回调方法对应的CompletableFuture的result和该任务一致,如果该任务正常执行,
则get方法返回执行结果,如果是执行异常,则get方法抛出异常。
CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("张三100米用时13秒");
//int a = 1/0;
return 13;
});
CompletableFuture<Integer> integerCompletableFuture1 = integerCompletableFuture.whenComplete((result,exception) -> {
System.out.println("张三100的结果为:"+result);
System.out.println("张三100的异常为:"+exception);
});
System.out.println(integerCompletableFuture.get());//13
System.out.println(integerCompletableFuture1.get());//13
5、 handle和handleAsync
与whenComplete基本一致,区别在于handle的回调方法有返回值。
CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("张三100米用时13秒");
return 13;
});
CompletableFuture<String> integerCompletableFuture1 = integerCompletableFuture.handle((result,exception) -> {
System.out.println("张三100的结果为:"+result);
System.out.println("张三100的异常为:"+exception);//张三100的异常为:null
return "比赛结束";
});
System.out.println(integerCompletableFuture.get());//13
System.out.println(integerCompletableFuture1.get());//比赛结束
6、加不加Async的区别
thenApply和thenApplyAsync区别在于,使用thenApply方法时子任务与父任务使用的是同一个线程,
而thenApplyAsync在子任务中是另起一个线程执行任务,并且thenApplyAsync可以自定义线程池,
默认的使用ForkJoinPool.commonPool()线程池。
同理thenAccept/thenAcceptAsync 与 thenRun/thenRunAsyncye whenComplete
和 whenCompleteAsync handle和handleAsync 也是这种区别。
任务组合
1、thenCombine 合并两个线程任务的结果,并进一步处理。
CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("A组张三100米用时13秒");
return 13;
});
CompletableFuture<Integer> integerCompletableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println("A组李四100米用时13秒");
return 14;
});
CompletableFuture<Integer> integerCompletableFutureResult = integerCompletableFuture.thenCombine(integerCompletableFuture1, new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) {
return integer + integer2;
}
});
System.out.println("A组张三和李四一共用时:"+integerCompletableFutureResult.get());
thenAcceptBoth 和runAfterBoth
thenCombine会将两个任务的执行结果作为所提供函数的参数,且该方法有返回值;
thenAcceptBoth同样将两个任务的执行结果作为方法入参,但是无返回值;
runAfterBoth没有入参,也没有返回值。注意两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。
2、applyToEither
两个线程任务相比较,先获得执行结果的,就对该结果进行下一步的转化操作
CompletableFuture<Long> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
long l = (long) (Math.random() * 10000);
try {
Thread.sleep(l);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("张三100米用时"+l+"毫秒");
return l;
});
CompletableFuture<Long> integerCompletableFuture1 = CompletableFuture.supplyAsync(() -> {
long l = (long) (Math.random() * 10000);
try {
Thread.sleep(l);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("李四100米用时"+l+"毫秒");
return l;
});
CompletableFuture<Long> longCompletableFuture = integerCompletableFuture.applyToEither(integerCompletableFuture1, new Function<Long, Long>() {
@Override
public Long apply(Long aLong) {
return aLong;
}
});
System.out.println("用时最快为:"+longCompletableFuture.get());
applyToEither:会将已经执行完成的任务,作为方法入参,传递到指定方法中,且有返回值
acceptEither: 会将已经执行完成的任务,作为方法入参,传递到指定方法中,且无返回值
runAfterEither:不会把执行结果当做方法入参,且没有返回值。
3、anyOf() 的参数是多个给定的 CompletableFuture,当其中的任何一个完成时,
方法返回这个 CompletableFuture。
CompletableFuture<Long> zhangsan = CompletableFuture.supplyAsync(() -> {
long l = (long) (Math.random() * 10000);
try {
Thread.sleep(l);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("张三100米用时"+l+"毫秒");
return l;
});
CompletableFuture<Long> lisi = CompletableFuture.supplyAsync(() -> {
long l = (long) (Math.random() * 10000);
try {
Thread.sleep(l);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("李四100米用时"+l+"毫秒");
return l;
});
CompletableFuture<Long> wangwu = CompletableFuture.supplyAsync(() -> {
long l = (long) (Math.random() * 10000);
try {
Thread.sleep(l);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("王五100米用时"+l+"毫秒");
return l;
});
CompletableFuture<Object> anyofCompletableFuture = CompletableFuture.anyOf(zhangsan, lisi, wangwu);
System.out.println("100米个人赛最先完成的耗时:"+anyofCompletableFuture.get());
4、allOf方法用来实现多 CompletableFuture 的同时返回。
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(zhangsan, lisi, wangwu);
System.out.println("100米比赛总过程:"+voidCompletableFuture.get());
使用示例
下面记录Java实战第二版的使用示例代码-模拟查询多个商店获取商品最佳价格
新建Shop商品类
import java.text.DecimalFormat;
import java.text.DecimalFormatSymbols;
import java.util.Locale;
import java.util.Random;
import static com.ruoyi.demo.java8demo.utils.delay;
import static com.ruoyi.demo.java8demo.utils.format;
public class Shop {
private final String name;
private final Random random;
public Shop(String name) {
this.name = name;
random = new Random(name.charAt(0) * name.charAt(1) * name.charAt(2));
}
public String getPrice(String product) {
double price = calculatePrice(product);
Discount.Code code = Discount.Code.values()[random.nextInt(Discount.Code.values().length)];
return name + ":" + price + ":" + code;
}
public double calculatePrice(String product) {
delay();
return format(random.nextDouble() * product.charAt(0) + product.charAt(1));
}
public String getName() {
return name;
}
}
新建Discount折扣类
import static com.ruoyi.demo.java8demo.utils.delay;
import static com.ruoyi.demo.java8demo.utils.format;
public class Discount {
public enum Code {
NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20);
private final int percentage;
Code(int percentage) {
this.percentage = percentage;
}
}
public static String applyDiscount(Quote quote) {
return quote.getShopName() + " price is " + Discount.apply(quote.getPrice(), quote.getDiscountCode());
}
private static double apply(double price, Code code) {
delay();
return format(price * (100 - code.percentage) / 100);
}
}
其中又用到两个工具类方法
import java.text.DecimalFormat;
import java.text.DecimalFormatSymbols;
import java.util.Locale;
public class utils {
private static final DecimalFormat formatter = new DecimalFormat("#.##", new DecimalFormatSymbols(Locale.US));
public static void delay() {
int delay = 1000;
//int delay = 500 + RANDOM.nextInt(2000);
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public static double format(double number) {
synchronized (formatter) {
return new Double(formatter.format(number));
}
}
}
新建报价类Quote
public class Quote {
private final String shopName;
private final double price;
private final Discount.Code discountCode;
public Quote(String shopName, double price, Discount.Code discountCode) {
this.shopName = shopName;
this.price = price;
this.discountCode = discountCode;
}
public static Quote parse(String s) {
String[] split = s.split(":");
String shopName = split[0];
double price = Double.parseDouble(split[1]);
Discount.Code discountCode = Discount.Code.valueOf(split[2]);
return new Quote(shopName, price, discountCode);
}
public String getShopName() {
return shopName;
}
public double getPrice() {
return price;
}
public Discount.Code getDiscountCode() {
return discountCode;
}
}
新建查询商品价格发现类BestPriceFinder
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class BestPriceFinder {
//构造商店模拟数据
private final List<Shop> shops = Arrays.asList(
new Shop("店铺1"),
new Shop("店铺2"),
new Shop("店铺3"),
new Shop("店铺4"),
new Shop("店铺5")
);
private final Executor executor = Executors.newFixedThreadPool(shops.size(),(Runnable r)->{
Thread t = new Thread(r);
t.setDaemon(true);
return t;
});
public List<String> findPricesFuture(String product) {
List<CompletableFuture<String>> priceFutures = findPricesStream(product)
.collect(Collectors.<CompletableFuture<String>>toList());
return priceFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
}
//thenCompose方法会在某个任务执行完成后,将该任务的执行结果,作为方法入参,
//去执行指定的方法。该方法会返回一个新的CompletableFuture实例
public Stream<CompletableFuture<String>> findPricesStream(String product){
return shops.stream()
.map(shop -> CompletableFuture.supplyAsync(()->shop.getPrice(product), executor))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)));
}
}
调用客户端
import java.util.List;
import java.util.function.Supplier;
public class BestPriceFinderMain {
private static BestPriceFinder bestPriceFinder = new BestPriceFinder();
public static void main(String[] args) {
execute("composed CompletableFuture",()-> bestPriceFinder.findPricesFuture("手机"));
}
private static void execute(String msg, Supplier<List<String>> s){
long start = System.nanoTime();
System.out.println(s.get());
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println(msg + "done in" + duration +"msecs");
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/135837.html