Callable接口
Runnable的缺陷
在创建线程的时候不管是继承Thread类(Thread本身也是实现的Runnable接口)还是实现Runnable接口,所实现的run()方法都没有返回值,使得需要将返回值写到map中,等到线程结束再去从map中取数据,特别的不方便
而且Runnable还有另一个问题是不能抛出任何异常,必须在run()方法中自己处理异常。
由于Runnable存在的两个问题,所以Callable接口和Future接口应运而生,这里来介绍一下Callable接口和Future接口
Callable的改善
Callable与Runnable不同,提供的方法为call()方法,大家看到,该方法是有返回值的,且可以抛出异常
获取Callable的返回值需要FutureTask的支持
public interface Callable<V> {
V call() throws Exception;
}
public interface Runnable {
public abstract void run();
}
Runnable和Callable接口的区别
-
Runnable中提供的是run()方法,Callable中提供的是call()方法 -
Runnable中的run()方法返回值为void,Callable中的call()方法有返回值 -
Runnable的run()方法不能抛出异常,Callable中的call方法可以抛出异常 -
Callable可以和Future、FutureTask配合获取异步执行的结果
自旋锁执行时间短、线程数少的时候使用(由于占用CPU) —–> Atomic和lock都是使用的自旋锁
FutureTask
Future接口
使用ExecutorService来执行Callable对象
// submit方法既可以传入Callable,也可以传入Runnable,如果传入Callable的话,可以使用get方法获取到值;如果传入Runnable的话,get方法不可以获取到值
<T> Future<T> submit(Callable<T> task);
Future<?> submit(Runnable task);
// result参数来作为执行结果的返回值
<T> Future<T> submit(Runnable task, T result);
看到该方法返回的是一个Future对象,Future对象是什么呢?
public interface Future<V> {
// 在任务正常结束之前可以尝试取消任务,如果任务已完成或已取消,则次尝试失败。如果调用成功,且此任务还没有启动,则该任务将不会执行;如果任务已经启动,则mayInterruptIfRunning参数确定是否应该以试图停止任务的方式来中断此任务的线程
// 中断需要判断 if(Thread.currentThread().isInterrupted())
boolean cancel(boolean mayInterruptIfRunning);
// 如果任务正常结束之前被取消,isCancelled会返回true
boolean isCancelled();
// 检测任务是否结束,任务完成返回true,由于正常终止、异常或取消而完成,也会返回true
boolean isDone();
// get方法会将结果返回给调用方,会阻塞
V get() throws InterruptedException, ExecutionException;
// get方法会将结果返回给调用方,可以限制限制超时时间
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
但是线程中没有可以使用Callable接口和Future接口的地方,Thread唯一可用的参数是Runnable对象,那么如何才能用到这个好用的技术呢?
答案是FutureTask
FutureTask类
FutureTask类同时实现了Runnable接口和Future接口,同时拥有异步获取结果以及取消任务的能力,可以创建出可取消的任务。
通过传入Runnable或者Callable的任务给FutureTask,通过FutureTask的get方法异步获取执行结果
public class FutureTask<V> implements RunnableFuture<V> {
//NEW -> COMPLETING -> NORMAL 异步任务的正常结束,COMPLETING是在执行set方法为outcome赋值时的一个过渡状态,赋值完成后状态变为NORMAL
//NEW -> COMPLETING -> EXCEPTIONAL 异步任务执行过程中抛出异常,COMPLETING是在执行setException方法为outcome赋值时的一个过渡状态,赋值完成后状态变为EXCEPTIONAL
//NEW -> CANCELLED 调用cancel(false)将状态设置为CANCELLED
//NEW -> INTERRUPTING -> INTERRUPTED 调用了cancel(true)
// 新建状态
private static final int NEW = 0;
// 任务正在完成状态
private static final int COMPLETING = 1;
// 正常执行结束
private static final int NORMAL = 2;
// 非正常结束
private static final int EXCEPTIONAL = 3;
// 任务被取消,对应cancel(false)
private static final int CANCELLED = 4;
// 任务中断
private static final int INTERRUPTING = 5;
// 任务被中断,对应cancel(true)
private static final int INTERRUPTED = 6;
/** The underlying callable; nulled out after running */
private Callable<V> callable;
// 异步任务的结果
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
// 执行Callable任务的线程
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
// 线程等待节点
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;
// get方法阻塞
public V get() throws InterruptedException, ExecutionException {
int s = state;
// state小于等于COMPLETING表示还没开始执行
if (s <= COMPLETING)
// 休眠等待执行结果
s = awaitDone(false, 0L);
// 休眠返回,调用report获取结果
return report(s);
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 线程等待节点
WaitNode q = null;
// 是否入队
boolean queued = false;
// 自旋
for (;;) {
// 中断之后移除自身对应的等待的节点
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
// 状态大于COMPLETING,任务完成或者取消或者中断,则返回
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
// 状态等于COMPLETING,任务正在执行,获取任务结果的线程让出cpu
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
// 若当前线程还没有进入线程等待链表的WaitNode节点,此时新建一个WaitNode节点,并把当前线程赋值给WaitNode节点的thread属性,创建等待节点,在下一轮自旋时会入队
else if (q == null)
q = new WaitNode();
// 还没有入队,则入队等待执行结果
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 定时,则观察时间是否到了
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
// 被finishCompletion唤醒,自旋回到for循环重新往下执行
LockSupport.park(this);
}
}
// 执行
public void run() {
// 不是新建的任务 并且不能将执行线程由null变为当前线程,则直接返回
// 如果任务状态为NEW且runner为null,说明还未有线程执行过异步任务,则满足条件,可以执行
// 如果任务状态不为NEW,说明已经有线程执行过异步任务,没有必要再次执行,直接返回
// 如果任务状态为NEW且runner不为null,说明异步任务正在执行,直接返回
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
// 异步任务结果
V result;
// 异步任务执行成功还是失败标志
boolean ran;
try {
// 执行任务的call方法
// Callable.call
result = c.call();
// 正常执行完成,设置为true
ran = true;
} catch (Throwable ex) { // 执行失败,也会修改状态,然后唤醒get方法阻塞的线程
result = null;
// 抛出异常,设置为false
ran = false;
// 异常结果赋值给outcome
setException(ex);
}
// 执行成功,则设置结果
if (ran)
// 赋值给outcome
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
// 异步任务正在执行过程中,runner一直都是非空的,防止并发执行
// 任务执行结束后,不管成功还是失败,都将runner设置为null
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
// 表示被中断了,调用了cancel(true),调用handlePossibleCancellationInterrupt处理中断
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
// 设置结果并进行通知
protected void set(V v) {
// 将状态由NEW 变为COMPLETING
// 任务不会被并发执行,导致状态不为NEW的原因是中断
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 将结果赋给outcome
outcome = v;
// 将状态变为NORMAL
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
// 唤醒调用get方法阻塞等待结果的线程
finishCompletion();
}
}
// 设置异常
protected void setException(Throwable t) {
// 任务不会被并发执行,导致状态不为NEW的原因是中断
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 将异常赋给outcome
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
private void finishCompletion() {
// assert state > COMPLETING;
// waiters中存储的是调用了get方法的线程集合,遍历
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
// waitNode节点的线程
Thread t = q.thread;
if (t != null) {
q.thread = null;
// 唤醒阻塞的线程
LockSupport.unpark(t);
}
// 继续取下一个节点
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
// 正常执行完成还是抛出异常都会执行done方法
done();
callable = null; // to reduce footprint
}
}
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
示例:
public class TestCallable {
public static void main(String[] args) {
CallDemo call = new CallDemo();
FutureTask<Integer> futureTask = new FutureTask<>(call);
new Thread(futureTask).start();
try {
// get方法是一个阻塞方法 在没有执行结束之前一直阻塞,直到执行完毕
int sum = futureTask.get();
System.out.println("---------");
System.out.println(sum);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
/**
* Callable相较于Runnable有返回值和异常
*/
class CallDemo implements Callable<Integer>{
@Override
public Integer call() throws Exception {
int sum = 0;
for(int i = 0;i<1000;i++){
sum +=i;
}
return sum;
}
}
CompletionService
Future具有阻塞同步性的特点,由于阻塞代码的运行效率会比较差,而CompletionService可以解决这样的问题
CompletionService的功能是以异步的方式一边生产新的任务,一边处理已完成任务的结果,这样可以将执行任务与处理任务分离开进行处理,使用submit执行任务,使用take取得已完成的任务,并按照完成这些任务的时间顺序处理它们的结果,把线程池Executor和阻塞队列BlockingQueue融合在一起
ExecutorCompletionService是CompletionService的唯一实现类
// 依赖于Executor对象,completionQueue作为完成队列
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
源码
take
用来取得completionQueue队列中的Future对象,最先完成的会先进入completionQueue队列中,执行时间最短的最先返回,虽然调用completionService.take().get()也会进行阻塞,但是并不会阻塞后续的任务,哪个任务先执行完,哪个任务的返回值就先打印
public Future<V> take() throws InterruptedException {
return completionQueue.take();
}
poll
获取并移除表示下一个已完成任务的Future,如果不存在这样的任务,则返回null,无阻塞效果
public Future<V> poll() {
return completionQueue.poll();
}
示例
public class TestCompletionService {
public static void main(String[] args) {
try{
ExecutorService executorService = Executors.newFixedThreadPool(2);
ExecutorCompletionService<String> executorCompletionService = new ExecutorCompletionService<>(executorService);
for (int i = 0; i < 10; i++) {
executorCompletionService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
long sleep = (long) (Math.random() * 1000);
System.out.println("sleep=" + sleep + " " + Thread.currentThread().getName());
Thread.sleep(sleep);
return Thread.currentThread().getName() + " " + sleep;
}
});
}
for (int i = 0; i < 10; i++) {
System.out.println(executorCompletionService.take().get());
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
https://zhhll.icu/2020/多线程/基础/2.Callable接口/
原文始发于微信公众号(bug生产基地):线程使用Runnable还是Callable
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/234264.html