异步&线程池 CompletableFuture 异步编排 【下篇】

追求适度,才能走向成功;人在顶峰,迈步就是下坡;身在低谷,抬足既是登高;弦,绷得太紧会断;人,思虑过度会疯;水至清无鱼,人至真无友,山至高无树;适度,不是中庸,而是一种明智的生活态度。

导读:本篇文章讲解 异步&线程池 CompletableFuture 异步编排 【下篇】,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文


三连哦

1、创建异步对象

CompletableFuture 提供了四个静态方法来创建一个异步操作。

在这里插入图片描述

提示

  • 1、runXxxx 都是没有返回结果的,supplyXxx 都是可以获取返回结果的
  • 2、可以传入自定义的线程池,否则就用默认的线程池

1.1 不存在返回结果

public class ThreadTestDemo {
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) {

        System.out.println("main.....start......");

        CompletableFuture.runAsync(()->{
            System.out.println("当前线程是:"+Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运算结果是:"+i);
        },executor);

        System.out.println("main.....end......");
    }
  }

测试结果

在这里插入图片描述


1.2 存在返回结果

public class ThreadTestDemo {
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main.....start......");
        
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程是:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运算结果是:" + i);
            return i;

        }, executor);
        Integer integer = future.get();

        System.out.println("main.....end......"+integer);
    }

测试结果

在这里插入图片描述

2、计算完成时回调方法

whenComplete 可以处理正常和异常的计算结果,exceptionally 处理异常情况。
whenComplete 和 whenCompleteAsync 的区别:

  • whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。
  • whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。

方法不以 Async 结尾,意味着 Action 使用相同的线程执行,而 Async 可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执

在这里插入图片描述

2.1 whenComplete

public class ThreadTestDemo {
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        System.out.println("main.....start......");

        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程是:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运算结果是:" + i);
            return i;

        }, executor).whenComplete((res,exception)->{
            System.out.println("异步任务完成了...结果是..."+res+";异常是:"+exception);
        });

        System.out.println("main.....end......");
    }
 }

在这里插入图片描述

如果出现了异常情况
在这里插入图片描述

2.2 出现异常、处理返回值

public class ThreadTestDemo {
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        System.out.println("main.....start......");

        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程是:" + Thread.currentThread().getId());
            int i = 10 / 0;
            System.out.println("运算结果是:" + i);
            return i;

        }, executor).whenComplete((res,exception)->{
            //可以得到异常信息,但是不能返回修改数据
            System.out.println("异步任务完成了...结果是..."+res+";异常是:"+exception);
        }).exceptionally(throwable -> {
            //可以感知异常,同时返回默认值
            return 666;

        });
        Integer integer = future.get();

        System.out.println("main.....end......"+integer);
    }
}

结果
在这里插入图片描述

3、handle 方法

在这里插入图片描述
和 complete 一样,可对结果做最后的处理(可处理异常),可改变返回值。

3.1 出现异常情况

public class ThreadTestDemo {
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        System.out.println("main.....start......");


        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程是:" + Thread.currentThread().getId());
            int i = 10 / 0;
            System.out.println("运算结果是:" + i);
            return i;

        }, executor).handle((res,thr)->{
            if(res != null){
                return res*2;
            }

            if(thr != null){
                return 666;
            }

            return 0;
        });
        Integer integer = future.get();

        System.out.println("main.....end......"+integer);
    }
 }

结果
在这里插入图片描述

3.2 正常情况

public class ThreadTestDemo {
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        System.out.println("main.....start......");


        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程是:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运算结果是:" + i);
            return i;

        }, executor).handle((res,thr)->{
            if(res != null){
                return res*2;
            }

            if(thr != null){
                return 666;
            }

            return 0;
        });
        Integer integer = future.get();

        System.out.println("main.....end......"+integer);
    }
 }

结果
在这里插入图片描述

4、线程串行化方法

在这里插入图片描述

thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。

thenAccept 方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。

thenRun 方法:只要上面的任务执行完成,就开始执行 thenRun,只是处理完任务后,执行thenRun 的后续操作

带有 Async 默认是异步执行的。同之前。
以上都要前置任务成功完成。

Function<? super T,? extends U>
    T:上一个任务返回结果的类型
    U:当前任务的返回值类型

4.1 thenRun

不能获取到上一步的执行结果

public class ThreadTestDemo {
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        System.out.println("main.....start......");


        CompletableFuture<Void> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程是:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运算结果是:" + i);
            return i;

        }, executor).thenRunAsync(()->{
            System.out.println("任务二启动了");
        },executor);

        System.out.println("main.....end......");
    }
   }

结果
在这里插入图片描述

4.2 thenAccept

能接收上一步的返回值,但是无返回值

public class ThreadTestDemo {
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        System.out.println("main.....start......");

        CompletableFuture<Void> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程是:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运算结果是:" + i);
            return i;

        }, executor).thenAcceptAsync(res->{
            System.out.println("任务二启动了,获取上一步的结果:"+res);
        },executor);

        System.out.println("main.....end......");
    }
}

结果
在这里插入图片描述

4.3 thenApply

获取上一步的返回结果,同时返回结果

public class ThreadTestDemo {
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        System.out.println("main.....start......");


        CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程是:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运算结果是:" + i);
            return i;

        }, executor).thenApplyAsync(res->{
            System.out.println("任务二启动了,上一步的结果是:"+res);
            return "Hello" + res;
        },executor);

        System.out.println("main.....end......"+future1.get());
    }
}

结果
在这里插入图片描述

5、两任务组合 – 都要完成

在这里插入图片描述
在这里插入图片描述

两个任务必须都完成,触发该任务。

thenCombine:组合两个 future,获取两个 future 的返回结果,并返回当前任务的返回值

thenAcceptBoth:组合两个 future,获取两个 future 任务的返回结果,然后处理任务,没有返回值。

runAfterBoth:组合两个 future,不需要获取 future 的结果,只需两个 future 处理完任务后,处理该任务。

5.1 runAfterBoth

组合两个future,不接收值,也不返回值

public class ThreadTestDemo {
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        System.out.println("main.....start......");


        CompletableFuture<Object> future01 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务1线程开始..." + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("任务1线程结束..." + i);
            return i;

        }, executor);

        CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务2线程开始..." + Thread.currentThread().getId());
            System.out.println("任务2线程结束..." );
            return "hello";

        }, executor);

        future01.runAfterBothAsync(future02,()->{
            System.out.println("任务三开始");
        },executor);

        System.out.println("main.....end......"+future01.get());
    }
}

结果

在这里插入图片描述

5.2 thenAcceptBoth

接收前两个的值,不返回值

public class ThreadTestDemo {
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        System.out.println("main.....start......");

        CompletableFuture<Object> future01 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务1线程开始..." + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("任务1线程结束..." + i);
            return i;

        }, executor);

        CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务2线程开始..." + Thread.currentThread().getId());
            System.out.println("任务2线程结束..." );
            return "hello";

        }, executor);

        future01.thenAcceptBothAsync(future02,(f1,f2)->{
            System.out.println("任务三开始...之前的结果是:"+f1 +"......"+f2);
        },executor);

        System.out.println("main.....end......"+future01.get());
    }

结果
在这里插入图片描述

5.3 thenCombine

接收值同时返回值

public class ThreadTestDemo {
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        System.out.println("main.....start......");

        CompletableFuture<Object> future01 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务1线程开始..." + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("任务1线程结束..." + i);
            return i;

        }, executor);

        CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务2线程开始..." + Thread.currentThread().getId());
            System.out.println("任务2线程结束..." );
            return "hello";

        }, executor);

        CompletableFuture<Object> future = future01.thenCombineAsync(future02, (f1, f2) -> {
            return "任务三开始:"+f1+"..."+f2+"---->";
        }, executor);

        System.out.println("main.....end......"+future.get());
    }

结果
在这里插入图片描述

6、两任务组合 – 一个完成

在这里插入图片描述
在这里插入图片描述
当两个任务中,任意一个 future 任务完成的时候,执行任务。

applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。

acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。

runAfterEither:两个任务有一个执行完成,不需要获取 future 的结果,处理任务,也没有返回值。

6.1 runAfterEither

两个有一个执行结束,就执行。不接收返回值,本身没有返回值。休息几秒钟,看效果

public class ThreadTestDemo {
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        System.out.println("main.....start......");


        CompletableFuture<Object> future01 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务1线程开始..." + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("任务1线程结束..." + i);
            return i;

        }, executor);

        CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务2线程开始..." + Thread.currentThread().getId());
            try {
                Thread.sleep(6000);
                System.out.println("任务2线程结束..." );
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";

        }, executor);

        future01.runAfterEitherAsync(future02, () -> {
            System.out.println("任务三开始");
        }, executor);

        System.out.println("main.....end......");
    }
   }

效果
在这里插入图片描述

6.2 acceptEither

其中一个执行完,就执行,得到值,本身无返回值。两个future的返回值类型要相同。

public class ThreadTestDemo {
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        System.out.println("main.....start......");


        CompletableFuture<Object> future01 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务1线程开始..." + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("任务1线程结束..." + i);
            return i;

        }, executor);

        CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务2线程开始..." + Thread.currentThread().getId());
            try {
                Thread.sleep(6000);
                System.out.println("任务2线程结束..." );
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";

        }, executor);

        future01.acceptEitherAsync(future02,(res)->{
            System.out.println("任务三开始...之前的结果:"+res);
        },executor);

        System.out.println("main.....end......");
    }

在这里插入图片描述

6.3 applyToEither

接收返回值,同时返回值

public class ThreadTestDemo {
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        System.out.println("main.....start......");


        CompletableFuture<Object> future01 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务1线程开始..." + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("任务1线程结束..." + i);
            return i;

        }, executor);

        CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务2线程开始..." + Thread.currentThread().getId());
            try {
                Thread.sleep(6000);
                System.out.println("任务2线程结束..." );
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";

        }, executor);


        CompletableFuture<String> future = future01.applyToEitherAsync(future02, res -> {
            System.out.println("任务三开始....之前的结果:" + res);
            return res.toString() + " -->哈喽";
        }, executor);

        System.out.println("main.....end......"+future.get());
    }

结果
在这里插入图片描述

7、多任务组合

在这里插入图片描述

allOf:等待所有任务完成
anyOf:只要有一个任务完成

7.1 allOf

public class ThreadTestDemo {
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        System.out.println("main.....start......");


        CompletableFuture<String> futureImage = CompletableFuture.supplyAsync(() -> {
            System.out.println("商品图片查询");
            return "水杯.png";
        }, executor);

        CompletableFuture<String> futureAttr = CompletableFuture.supplyAsync(() -> {
            System.out.println("商品属性查询");
            return "黑色+128G";
        }, executor);

        CompletableFuture<String> futureDesc = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
                System.out.println("商品介绍查询");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "华为手机好用呢";
        }, executor);

        CompletableFuture<Void> allOf = CompletableFuture.allOf(futureImage, futureAttr, futureDesc);
        allOf.get();//阻塞等待


        System.out.println("main.....end......");
    }
}

在这里插入图片描述

如果没有allOf.get(); 查看结果

在这里插入图片描述

7.2 anyOf

任意一个执行成功,就返回

public class ThreadTestDemo {
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        System.out.println("main.....start......");


        CompletableFuture<String> futureImage = CompletableFuture.supplyAsync(() -> {
            System.out.println("商品图片查询");
            return "水杯.png";
        }, executor);

        CompletableFuture<String> futureAttr = CompletableFuture.supplyAsync(() -> {
            System.out.println("商品属性查询");
            return "黑色+128G";
        }, executor);

        CompletableFuture<String> futureDesc = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
                System.out.println("商品介绍查询");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "华为手机好用呢";
        }, executor);

        CompletableFuture<Object> anyOf = CompletableFuture.anyOf(futureImage, futureAttr, futureDesc);

        System.out.println("main.....end......查看哪个执行成功"+anyOf.get());
    }

结果
在这里插入图片描述

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/131447.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!