一.回顾多线程
目录
1、初始化线程的 4 种方式
1)、继承 Thread
Thread thread = new Thread01();
thread.start();
2)、实现 Runnable 接口
Runnable01 runnable01 = new Runnable01();
new Thread(runnable01).start();
3)、实现 Callable 接口 + FutureTask (可以拿到返回结果,可以处理异常)
FutureTask futureTask = new FutureTask<>(new Callable01());
new Thread(futureTask).start();
// 阻塞等待线程执行完成,获取返回结果
Integer i = (Integer) futureTask.get();
4)、线程池
public static ExecutorService service = Executors.newFixedThreadPool(10);
service.execute(new Runnable01());
通过如下两种方式初始化线程池:
Executors.newFiexedThreadPool(3);
//或者
new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit unit, workQueue, threadFactory,
区别: Thread与Runnable无法获取返回值,Callable可获取返回值 Thread与Runnable、Callable可不能控制资源,线程池可控制资源 通过线程池性能稳定,也可以获取执行结果,并捕获异常。但是,在业务复杂情况下,一个异步调用可能会依赖于另一个异步调用的执行结果。
2.线程池的七大参数
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
七大核心参数:
corePoolSize:核心线程数,在线程池创建完后就准备就绪的线程数量,等待接收异步任务。
maximumPoolSize:最大线程数量,控制资源
keepAliveTime:存活时间。如果当前线程数量大于核心线程数,如果线程的空闲时间大于指定的keepAliveTime则会释放线程(最大线程数-核心线程数)
unit:时间单位
BlockingQueue<Runnable> workQueue:阻塞队列。如果任务过多,就会将当前多的任务置入队列中(使用Runable execute提交的任务),当有空闲的线程时,则会去队列中取出任务并执行。
ThreadFactory:创建线程的工厂
RejectedExecutionHandler handler:如果队列满了,按照指定的拒绝策略拒绝执行任务
阻塞队列中的无界队列与有界队列:
有界队列:就是有固定大小的队列。比如设定了固定大小的 LinkedBlockingQueue,又或者大小为 0,只是在生产者和消费者中做中转用的 SynchronousQueue。
无界队列:指的是没有设置固定大小的队列。这些队列的特点是可以直接入列,直到溢出。当然现实几乎不会有到这么大的容量(超过 Integer.MAX_VALUE),所以从使用者的体验上,就相当于 “无界”。比如没有设定固定大小的 LinkedBlockingQueue。
*运行流程:
1、线程池创建,准备好 core 数量的核心线程,准备接受任务。
2、新的任务进来,用 core 准备好的空闲线程执行。
(1) 、如果core 满了,就将再进来的任务放入阻塞队列中。空闲的 core 就会自己去阻塞队 列获取任务执行。
(2) 、阻塞队列满了,就直接开新线程执行,最大只能开到 max 指定的数量。
(3) 、max 都执行好了。Max-core 数量空闲的线程会在 keepAliveTime 指定的时间后自 动销毁。最终保持到 core 大小。
(4) 、如果线程数开到了 max 的数量,还有新任务进来,就会使用 reject 指定的拒绝策 略进行处理。
3、所有的线程创建都是由指定的 factory
3、常见的 4 种线程池
newCachedThreadPool:创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若 无可回收,则新建线程。
newFixedThreadPool:创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
newScheduledThreadPool:创建一个定长线程池,支持定时及周期性任务执行。
newSingleThreadExecutor:创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务 按照指定顺序(FIFO, LIFO, 优先级)执行
4、开发中为什么使用线程池
- 降低资源的消耗:通过重复利用已经创建好的线程降低线程的创建和销毁带来的损耗
- 提高响应速度:因为线程池中的线程数没有超过线程池的最大上限时,有的线程处于等待分配任务的状态,当任务来时无需创建新的线程就能执行
- 提高线程的可管理性:线程池会根据当前系统特点对池内的线程进行优化处理,减少创建和销毁线程带来 的系统开销。无限的创建和销毁线程不仅消耗系统资源,还降低系统的稳定性,使 用线程池进行统一分配
二、CompletableFuture 异步编排
业务场景: 查询商品详情页的逻辑比较复杂,有些数据还需要远程调用,必然需要花费更多的时间。
假如商品详情页的每个查询,需要如下标注的时间才能完成 那么,用户需要 5.5s 后才能看到商品详情页的内容。很显然是不能接受的。 如果有多个线程同时完成这 6 步操作,也许只需要 1.5s 即可完成相应。
Future 是 Java 5 添加的类,用来描述一个异步计算的结果。你可以使用`isDone`方法检查计算是否完成,或者使用`get`阻塞住调用线程,直到计算完成返回结果,你也可以使用`cancel` 方法停止任务的执行。
虽然`Future`以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的 CPU 资源,而且也不能及时地得到计算结果,为什么不能用观察者设计模式当计算结果完成及时通知监听者呢?
很多语言,比如 Node.js,采用回调的方式实现异步编程。Java 的一些框架,比如 Netty,自 己扩展了 Java 的 `Future`接口,提供了`addListener`等多个扩展方法;Google guava 也提供了 通用的扩展 Future;Scala 也提供了简单易用且功能强大的 Future/Promise 异步编程模式。
作为正统的 Java 类库,是不是应该做点什么,加强一下自身库的功能呢?
在 Java 8 中, 新增加了一个包含 50 个方法左右的类: CompletableFuture,提供了非常强大的 Future 的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合 CompletableFuture 的方法。 CompletableFuture 类实现了 Future 接口,所以你还是可以像以前一样通过`get`方法阻塞或 者轮询的方式获得结果,但是这种方式不推荐使用。
CompletableFuture 和 FutureTask 同属于 Future 接口的实现类,都可以获取线程的执行结果。
1、创建异步对象
CompletableFuture 提供了四个静态方法来创建一个异步操作。
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
1、runXxxx 都是没有返回结果的,supplyXxx 都是可以获取返回结果的
2、可以传入自定义的线程池,否则就用默认的线程池;
2、计算完成时回调方法
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
whenComplete 可以处理正常和异常的计算结果(成功回调),exceptionally 处理异常情况(异常回调)。
whenComplete 和 whenCompleteAsync 的区别:
whenComplete:是执行当前任务的线程继续执行 whenComplete 的任务。
whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。
方法不以 Async 结尾,意味着 Action 使用相同的线程执行,而 Async 可能会使用其他线程 执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("Main...Start...."+Thread.currentThread());
// supplyAsync 可获取返回值
CompletableFuture<Integer> result = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程" + Thread.currentThread());
int i = 10 / 0;
System.out.println("运行结果:" + i);
return i;
}, threadPool).whenCompleteAsync((res,exception)->{
// 当上面的任务执行完成,能得到结果和异常信息,但无法修改返回值
System.out.println("异步任务完成,结果为:"+res+",异常为:"+exception);
}).exceptionally(throwable -> {
// 可以感知异常,同时返回默认值
return 101;
});
Integer integer = result.get();
System.out.println("Main...End...."+integer);
}
结果:
Main...Start....Thread[main,5,main]
当前线程Thread[pool-1-thread-1,5,main]
异步任务完成,结果为:null,异常为:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
Main...End....101
类似与ES6中 Promise的 then 和 reject;
3、handle 方法
public static void main(String[] args) throws ExecutionException, InterruptedException {
/**
* 方法执行完成后的处理
*/
System.out.println("Main...Start...."+Thread.currentThread());
CompletableFuture<Integer> result = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程" + Thread.currentThread());
int i = 10 / 2;
System.out.println("运行结果:" + i);
return i;
}, threadPool).handle((res,throwable)->{
// handle可获取到返回值并可进行修改,且可以感知异常,并修改返回值
if(res!=null && throwable==null){
return res;
}else{
return 0;
}
});
Integer integer = result.get();
System.out.println("Main...End...."+integer);
}
结果:
Main...Start....Thread[main,5,main]
当前线程Thread[pool-1-thread-1,5,main]
Main...End....0
4、线程串行化方法
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor)
public CompletableFuture<Void> thenRun(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor)
thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。
thenAccept 方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。
thenRun 方法:只要上面的任务执行完成,就开始执行 thenRun,只是处理完任务后,执行 thenRun 的后续操作
带有 Async 默认是异步执行的。同之前。
以上都要前置任务成功完成。
Function<? super T,? extends U>
T:上一个任务返回结果的类型
U:当前任务的返回值类型
public static void main(String[] args) throws ExecutionException, InterruptedException {
/**
* 线程串行化
* 1、thenRunAsync:无法获取上一步的执行结果,无返回值
* .thenRunAsync(() -> {
* System.out.println("任务2启动了");
* },threadPool)
* 2、thenAcceptAsync:能获取到上一步的返回值,无返回值
* .thenAcceptAsync(res -> {
* System.out.println("任务2启动了"+res);
* },threadPool)
* 3、thenApplyAsync:既能获取到上一步的返回值,有返回值
*/
System.out.println("Main...Start...."+Thread.currentThread());
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程" + Thread.currentThread());
int i = 10 / 2;
System.out.println("运行结果:" + i);
return i;
}, threadPool).thenApplyAsync(res->{
System.out.println("任务2启动了"+res);
return "hello,"+res;
},threadPool);
//future.get() 获取返回值,会阻塞
System.out.println("Main...End...."+future.get());
}
结果:
Main...Start....Thread[main,5,main]
当前线程Thread[pool-1-thread-1,5,main]
运行结果:5
任务2启动了5
Main...End....hello,5
5、两任务组合 – 都要完成
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor)
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor)
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,Runnable action)
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action)
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor)
两个任务必须都完成,触发该任务。
thenCombine:组合两个 future,获取两个 future 的返回结果,并返回当前任务的返回值。
thenAcceptBoth:组合两个 future,获取两个 future 任务的返回结果,然后处理任务,没有返回值。
runAfterBoth:组合两个 future,不需要获取 future 的结果,只需两个 future 处理完任务后,处理该任务。
示例:
先创建两个任务
/**
* 两个都完成
*/
System.out.println("Main...Start...."+Thread.currentThread());
CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1开始" + Thread.currentThread().getId());
int i = 10 / 2 ;
System.out.println("任务1结束");
return i;
}, threadPool);
CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2开始"+Thread.currentThread().getId());
System.out.println("任务2结束");
return "Hello";
}, threadPool);
使用 runAfterBothAsync (无法获取之前任务执行的结果):
future01.runAfterBothAsync(future02,()->{
System.out.println("任务3启动");
},threadPool);
// 结果:
Main...Start....Thread[main,5,main]
任务1开始12
任务1结束
任务2开始13
任务2结束
任务3启动
使用 thenAcceptBothAsync(可获取之前任务执行的结果,但不能返回):
future01.thenAcceptBothAsync(future02,(f1,f2)->{
System.out.println("任务3开始----f1结果:"+f1+"===f2结果"+f2);
},threadPool);
// 结果:
Main...Start....Thread[main,5,main]
任务1开始12
任务1结束
任务2开始13
任务2结束
任务3开始----f1结果:5===f2结果Hello
使用 thenCombineAsync(可获取之前任务执行的结果,可修改并返回结果):
CompletableFuture<String> future = future01.thenCombineAsync(future02, (f1, f2) -> {
return f1 +":"+ f2 + "--->Hello";
}, threadPool);
System.out.println("Main...End...."+future.get());
// 结果:
Main...Start....Thread[main,5,main]
任务1开始12
任务1结束
任务2开始13
任务2结束
Main...End....5:Hello--->Hello
6、两任务组合 – 一个完成
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action)
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action)
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor)
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action,Executor executor)
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn,Executor executor)
当两个任务中,任意一个 future 任务完成的时候,执行任务。
applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。 acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。 runAfterEither:两个任务有一个执行完成,不需要获取 future 的结果,处理任务,也没有返回值。
7、多任务组合
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
allOf:等待所有任务完成
anyOf:只要有一个任务完成
示例:
创建三个任务模拟用户载入首页时的数据获取
CompletableFuture<String> futureImg = CompletableFuture.supplyAsync(() -> {
System.out.println("查询商品图片信息");
return "hello.jpg";
}, threadPool);
CompletableFuture<String> futureAttr = CompletableFuture.supplyAsync(() -> {
System.out.println("查询商品属性");
return "白色+1TB";
}, threadPool);
CompletableFuture<String> futureDesc = CompletableFuture.supplyAsync(() -> {
System.out.println("查询商品介绍");
return "Apple";
}, threadPool);
使用 allOf :
CompletableFuture<Void> allOf = CompletableFuture.allOf(futureImg, futureAttr, futureDesc);
allOf.get(); // 等待所有任务执行完成 返回结果
// 获取所有任务的执行结果
System.out.println("Main...End...."+futureImg.get()+"===>"+futureAttr.get()+"===>"+futureDesc.get());
// 结果:
Main...Start....Thread[main,5,main]
查询商品图片信息
查询商品属性
查询商品介绍
Main...End....hello.jpg===>白色+1TB===>Apple
使用 anyOf :
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(futureImg, futureAttr, futureDesc);
anyOf.get(); // 有一个任务执行完成 返回结果
// 获取所有任务的执行结果
System.out.println("Main...End...."+anyOf.get());
// 结果:
Main...Start....Thread[main,5,main]
查询商品图片信息
查询商品属性
Main...End....hello.jpg
查询商品介绍
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/154445.html