一、Future&FutureTask
在《Java线程详解》这篇文章中,介绍了创建一个Java线程的三种方式,其中继承Thread
类或实现Runnable
接口都可以创建线程,但这两种方法都有一个问题就是:没有返回值,不能获取执行完的结果。因此后面在JDK1.5才新增了一个Callable
接口来解决上面的问题,而Future
和FutureTask
就可以与Callable
配合起来使用。
而Callable
只能在线程池中提交任务使用,且只能在submit()
和invokeAnay()
以及invokeAll()
这三个任务提交的方法中使用,如果需要直接使用Thread
的方式启动线程,则需要使用FutureTask
对象作为Thread
的构造参数,而FutureTask
的构造参数又是Callable
的对象
下面展示线程中的使用,以submit()
为例,其源码如下:
在该方法中,只是把Callable
封装成了FutureTask
而已,任务执行依然是execute()
方法
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
1.1 Callable和Runnable的区别
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
Callable的call方法可以有返回值,可以声明抛出异常。和 Callable
配合的有一个Future
类,通过Future
可以了解任务执行情况,或者取消任务的执行,还可获取任务执行的结果,这些功能都是Runnable
做不到的,Callable
的功能要比Runnable
强大。
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("通过Runnable方式执行任务");
}
}).start();
// 需要借助FutureTask
FutureTask task = new FutureTask(new Callable() {
@Override
public Object call() throws Exception {
System.out.println("通过Callable方式执行任务");
Thread.sleep(3000);
return "返回任务结果";
}
});
new Thread(task).start();
1.2 从Future到FutureTask
先看一下Future
的源码及其定义:
源码中接口上面的注释已经解释地很清楚了,Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。
/**
* A {@code Future} represents the result of an asynchronous
* computation. Methods are provided to check if the computation is
* complete, to wait for its completion, and to retrieve the result of
* the computation. The result can only be retrieved using method
* {@code get} when the computation has completed, blocking if
* necessary until it is ready. Cancellation is performed by the
* {@code cancel} method. Additional methods are provided to
* determine if the task completed normally or was cancelled. Once a
* computation has completed, the computation cannot be cancelled.
* If you would like to use a {@code Future} for the sake
* of cancellability but not provide a usable result, you can
* declare types of the form {@code Future<?>} and
* return {@code null} as a result of the underlying task.
*/
public interface Future<V> {
// 取消任务的执行,参数表示是否立即中断任务执行,或者等任务结束
boolean cancel(boolean mayInterruptIfRunning);
// 任务是否已经取消,任务完成前将其取消,则返回true
boolean isCancelled();
// 任务是否已经完成
boolean isDone();
// 等待任务执行结束,返回泛型结果.中断或任务执行异常都会抛出异常
V get() throws InterruptedException, ExecutionException;
// 同上面的get功能一样,多了设置超时时间。参数timeout指定超时时间,uint指定时间的单位,在枚举类TimeUnit中有相关的定义。如果计算超时,将抛出TimeoutException
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
其工作过程大概如下:
CompletionService
内部通过阻塞队列+FutureTask
,实现了任务先完成可优先获取到,即结果按照完成先后顺序排序,内部有一个先进先出的阻塞队列,用于保存已经执行完成的Future,通过调用它的take()
或poll()
可以获取到一个已经执行完成的Future
,进而通过调用Future
接口实现类的get方法获取最终的结果
2.1 使用案例分析
2.1.1 电商询价
直接Future
方式
向不同电商平台询价,并保存价格,采用ThreadPoolExecutor+Future
的方案:异步执行询价然后再保存
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(2);
// 异步向电商S1询价
Future<Integer> f1 = executor.submit(()->getPriceByS1());
// 异步向电商S2询价
Future<Integer> f2= executor.submit(()->getPriceByS2());
// 获取电商S1报价并异步保存
executor.execute(()->save(f1.get()));
// 获取电商S2报价并异步保存
executor.execute(()->save(f2.get())
如果获取电商S1报价的耗时很长,那么即便获取电商S2报价的耗时很短,也无法让保存S2报价的操作先执行,因为这个主线程都阻塞 在了f1.get()
操作上。
CompletionService
方式
使用CompletionService
实现先获取的报价先保存到数据库
//创建线程池
ExecutorService executor = Executors.newFixedThreadPool(10);
//创建CompletionService
CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
//异步向电商S1询价
cs.submit(() -> getPriceByS1());
//异步向电商S2询价
cs.submit(() -> getPriceByS2());
//将询价结果异步保存到数据库
for (int i = 0; i < 2; i++) {
Integer r = cs.take().get();
executor.execute(() -> save(r));
}
2.1.2 Dobbo的Forking Cluter场景
Dubbo
中有一种叫做Forking
的集群模式,这种集群模式下,支持并行地调用多个服务实例,只要有一个成功就返回结果。
geocoder(addr) {
//并行执行以下3个查询服务,
r1=geocoderByS1(addr);
r2=geocoderByS2(addr);
r3=geocoderByS3(addr);
//只要r1,r2,r3有一个返回
//则返回
return r1|r2|r3;
}
通过CompletionService
来实现这种Forking
集群模式
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 创建CompletionService
CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
// 用于保存Future对象
List<Future<Integer>> futures = new ArrayList<>(3);
//提交异步任务,并保存future到futures
futures.add(cs.submit(()->geocoderByS1()));
futures.add(cs.submit(()->geocoderByS2()));
futures.add(cs.submit(()->geocoderByS3()));
// 获取最快返回的任务执行结果
Integer r = 0;
try {
// 只要有一个成功返回,则break
for (int i = 0; i < 3; ++i) {
r = cs.take().get();
//简单地通过判空来检查是否成功返回
if (r != null) {
break;
}
}
} finally {
//取消所有任务
for(Future<Integer> f : futures)
f.cancel(true);
}
// 返回结果
return r;
应用场景总结
- 当需要批量提交异步任务的时候建议使用
CompletionService
。CompletionService
将线程池Executor
和阻塞队列BlockingQueue
的功能融合在了一起,能够让批量异步任务的管理更简单。 CompletionService
能够让异步任务的执行结果有序化。先执行完的先进入阻塞队列,利用这个特性,你可以轻松实现后续处理的有序性,避免无谓的等待,同时还可以快速实现诸如Forking Cluster
这样的需求。- 线程池隔离。
CompletionService
支持自己创建线程池,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险。
三、源码分析
3.1 构造方法
CompletionService
只是一个接口,它定义了一套提交任务和获取结果的方法,而它唯一的实现类ExecutorCompletionService
来实现了这些方法,该类构造如下:
public class ExecutorCompletionService<V> implements CompletionService<V> {
// 执行任务的线程池
private final Executor executor;
private final AbstractExecutorService aes;
// 任务完成会记录在该队列中
private final BlockingQueue<Future<V>> completionQueue;
}
ExecutorCompletionService
提供了两个构造方法,其中线程池是一定要有的参数,任务完成的记录队列默认使用的是LinkedBlockingQueue
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
public ExecutorCompletionService(Executor executor,
BlockingQueue<Future<V>> completionQueue) {
if (executor == null || completionQueue == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = completionQueue;
}
3.2 任务执行与结果记录
提供了两个任务提交的方法,可以传Callable
和Runnable
类型的参数,这内部都会将其转换为RunnableFutuer
实例,然后再封装成QueueingFuture
实例作为任务来执行
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture(f));
return f;
}
public Future<V> submit(Runnable task, V result) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task, result);
executor.execute(new QueueingFuture(f));
return f;
}
3.2.1 封装RunnableFuture
不论aes
是否为null,它们最终调用的都是FutureTask
的构造方法
private RunnableFuture<V> newTaskFor(Callable<V> task) {
if (aes == null)
return new FutureTask<V>(task);
else
return aes.newTaskFor(task);
}
private RunnableFuture<V> newTaskFor(Runnable task, V result) {
if (aes == null)
return new FutureTask<V>(task, result);
else
return aes.newTaskFor(task, result);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
在前面的已经看过FutureTask
的结构,它的任务是使用Callable
实例表示的,所以对于Runnable
类型的任务,会将其封装成一个Callable
类型任务
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
// 线程池静态方法,将runnable封装成一个callable
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
// 构造一个适配对象
return new RunnableAdapter<T>(task, result);
}
// 继承了Callble,内部存储Runnablle,调用call方法时,内部调用runnable的run方法
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
3.2.2 任务提交
QueueingFuture
是ExecutorCompletionService
的内部类,其实现了FutureTask
接口,当任务执行时,会去调用FutureTask
的run()
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
下面是FutureTask
的run()
public void run() {
// 任务已经被执行,直接退出
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 执行任务
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
// 记录异常
setException(ex);
}
// 任务执行成功,则记录返回结果
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
在任务执行成功,记录返回记录结果的时候,会调用finishCompletion()
去唤醒所有阻塞的线程并调用done()
方法
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
// 唤醒后面所有等待的线程
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
// 任务执行完了,将其置为null
callable = null; // to reduce footprint
}
而QueueingFuture
内部类就实现了done()
方法,它将执行完的FutureTask
放入到阻塞队列中,当调用take()
方法时就可以取到任务的执行结果,如果任务都还没有执行完,就阻塞。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/112106.html