线程使用Runnable还是Callable

Callable接口

Runnable的缺陷

在创建线程的时候不管是继承Thread类(Thread本身也是实现的Runnable接口)还是实现Runnable接口,所实现的run()方法都没有返回值,使得需要将返回值写到map中,等到线程结束再去从map中取数据,特别的不方便

而且Runnable还有另一个问题是不能抛出任何异常,必须在run()方法中自己处理异常。

由于Runnable存在的两个问题,所以Callable接口和Future接口应运而生,这里来介绍一下Callable接口和Future接口

Callable的改善

Callable与Runnable不同,提供的方法为call()方法,大家看到,该方法是有返回值的,且可以抛出异常

获取Callable的返回值需要FutureTask的支持

public interface Callable<V{
    
    call() throws Exception;
}

public interface Runnable {
    
    public abstract void run();
}

Runnable和Callable接口的区别

  • Runnable中提供的是run()方法,Callable中提供的是call()方法
  • Runnable中的run()方法返回值为void,Callable中的call()方法有返回值
  • Runnable的run()方法不能抛出异常,Callable中的call方法可以抛出异常
  • Callable可以和Future、FutureTask配合获取异步执行的结果

自旋锁执行时间短、线程数少的时候使用(由于占用CPU)  —–> Atomic和lock都是使用的自旋锁

FutureTask

Future接口

使用ExecutorService来执行Callable对象

// submit方法既可以传入Callable,也可以传入Runnable,如果传入Callable的话,可以使用get方法获取到值;如果传入Runnable的话,get方法不可以获取到值
<T> Future<T> submit(Callable<T> task);

Future<?> submit(Runnable task);
// result参数来作为执行结果的返回值
<T> Future<T> submit(Runnable task, T result);

看到该方法返回的是一个Future对象,Future对象是什么呢?

public interface Future<V{

    // 在任务正常结束之前可以尝试取消任务,如果任务已完成或已取消,则次尝试失败。如果调用成功,且此任务还没有启动,则该任务将不会执行;如果任务已经启动,则mayInterruptIfRunning参数确定是否应该以试图停止任务的方式来中断此任务的线程
  // 中断需要判断 if(Thread.currentThread().isInterrupted())
    boolean cancel(boolean mayInterruptIfRunning);

    // 如果任务正常结束之前被取消,isCancelled会返回true
    boolean isCancelled();

    // 检测任务是否结束,任务完成返回true,由于正常终止、异常或取消而完成,也会返回true
    boolean isDone();

    // get方法会将结果返回给调用方,会阻塞
    get() throws InterruptedException, ExecutionException;

    // get方法会将结果返回给调用方,可以限制限制超时时间
    get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException
;
}

但是线程中没有可以使用Callable接口和Future接口的地方,Thread唯一可用的参数是Runnable对象,那么如何才能用到这个好用的技术呢?

答案是FutureTask

FutureTask类

FutureTask类同时实现了Runnable接口和Future接口,同时拥有异步获取结果以及取消任务的能力,可以创建出可取消的任务。

通过传入Runnable或者Callable的任务给FutureTask,通过FutureTask的get方法异步获取执行结果


public class FutureTask<Vimplements RunnableFuture<V{
  //NEW -> COMPLETING -> NORMAL  异步任务的正常结束,COMPLETING是在执行set方法为outcome赋值时的一个过渡状态,赋值完成后状态变为NORMAL
  //NEW -> COMPLETING -> EXCEPTIONAL   异步任务执行过程中抛出异常,COMPLETING是在执行setException方法为outcome赋值时的一个过渡状态,赋值完成后状态变为EXCEPTIONAL
  //NEW -> CANCELLED 调用cancel(false)将状态设置为CANCELLED
  //NEW -> INTERRUPTING -> INTERRUPTED 调用了cancel(true)
  // 新建状态
  private static final int NEW          = 0;
  // 任务正在完成状态
    private static final int COMPLETING   = 1;
  // 正常执行结束
    private static final int NORMAL       = 2;
  // 非正常结束
    private static final int EXCEPTIONAL  = 3;
  // 任务被取消,对应cancel(false)
    private static final int CANCELLED    = 4;
  // 任务中断
    private static final int INTERRUPTING = 5;
  // 任务被中断,对应cancel(true)
    private static final int INTERRUPTED  = 6;
  
  /** The underlying callable; nulled out after running */
    private Callable<V> callable;
  // 异步任务的结果
    /** The result to return or exception to throw from get() */
    private Object outcome; // non-volatile, protected by state reads/writes
  // 执行Callable任务的线程
    /** The thread running the callable; CASed during run() */
    private volatile Thread runner;
  // 线程等待节点
    /** Treiber stack of waiting threads */
    private volatile WaitNode waiters;
  
  // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long stateOffset;
    private static final long runnerOffset;
    private static final long waitersOffset;
  
  // get方法阻塞
  public V get() throws InterruptedException, ExecutionException {
        int s = state;
      // state小于等于COMPLETING表示还没开始执行
        if (s <= COMPLETING)
          // 休眠等待执行结果
            s = awaitDone(false0L);
     // 休眠返回,调用report获取结果
        return report(s);
    }
  
  private int awaitDone(boolean timed, long nanos)
        throws InterruptedException 
{
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
    // 线程等待节点
        WaitNode q = null;
    // 是否入队
        boolean queued = false;
    // 自旋
        for (;;) {
           // 中断之后移除自身对应的等待的节点
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
          // 状态大于COMPLETING,任务完成或者取消或者中断,则返回
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
          // 状态等于COMPLETING,任务正在执行,获取任务结果的线程让出cpu
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
          // 若当前线程还没有进入线程等待链表的WaitNode节点,此时新建一个WaitNode节点,并把当前线程赋值给WaitNode节点的thread属性,创建等待节点,在下一轮自旋时会入队
            else if (q == null)
                q = new WaitNode();
          // 还没有入队,则入队等待执行结果
            else if (!queued) 
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
          // 定时,则观察时间是否到了
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
              // 被finishCompletion唤醒,自旋回到for循环重新往下执行
                LockSupport.park(this);
        }
    }
  
  // 执行
  public void run() {
    // 不是新建的任务 并且不能将执行线程由null变为当前线程,则直接返回
    // 如果任务状态为NEW且runner为null,说明还未有线程执行过异步任务,则满足条件,可以执行
    // 如果任务状态不为NEW,说明已经有线程执行过异步任务,没有必要再次执行,直接返回
    // 如果任务状态为NEW且runner不为null,说明异步任务正在执行,直接返回
   
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
              // 异步任务结果
                V result;
              // 异步任务执行成功还是失败标志
                boolean ran;
                try {
                  // 执行任务的call方法
                  // Callable.call
                    result = c.call();
                  // 正常执行完成,设置为true
                    ran = true;
                } catch (Throwable ex) { // 执行失败,也会修改状态,然后唤醒get方法阻塞的线程
                    result = null;
                  // 抛出异常,设置为false
                    ran = false;
                  // 异常结果赋值给outcome
                    setException(ex);
                }
              // 执行成功,则设置结果
                if (ran)
                  // 赋值给outcome
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
          // 异步任务正在执行过程中,runner一直都是非空的,防止并发执行
          // 任务执行结束后,不管成功还是失败,都将runner设置为null
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
          // 表示被中断了,调用了cancel(true),调用handlePossibleCancellationInterrupt处理中断
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
  // 设置结果并进行通知
  protected void set(V v) {
    // 将状态由NEW 变为COMPLETING
    // 任务不会被并发执行,导致状态不为NEW的原因是中断
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
      // 将结果赋给outcome
      outcome = v;
      // 将状态变为NORMAL
      UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
      // 唤醒调用get方法阻塞等待结果的线程
      finishCompletion();
    }
  }
  
  // 设置异常
  protected void setException(Throwable t) {
    // 任务不会被并发执行,导致状态不为NEW的原因是中断
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
          // 将异常赋给outcome
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }
  
  private void finishCompletion() {
        // assert state > COMPLETING;
    // waiters中存储的是调用了get方法的线程集合,遍历
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                  // waitNode节点的线程
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                      // 唤醒阻塞的线程
                        LockSupport.unpark(t);
                    }
                  // 继续取下一个节点
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null// unlink to help gc
                    q = next;
                }
                break;
            }
        }
   // 正常执行完成还是抛出异常都会执行done方法
        done();

        callable = null;        // to reduce footprint
    }
}
  
public interface RunnableFuture<Vextends RunnableFuture<V{
    
    void run();
}

示例:

public class TestCallable {

    public static void main(String[] args) {
        CallDemo call = new CallDemo();
        FutureTask<Integer> futureTask = new FutureTask<>(call);
        new Thread(futureTask).start();

        try {
            // get方法是一个阻塞方法  在没有执行结束之前一直阻塞,直到执行完毕
            int sum = futureTask.get();
            System.out.println("---------");
            System.out.println(sum);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

    }
}

/**
 * Callable相较于Runnable有返回值和异常
 */

class CallDemo implements Callable<Integer>{

    @Override
    public Integer call() throws Exception {
        int sum = 0;
        for(int i = 0;i<1000;i++){
            sum +=i;
        }
        return sum;
    }
}

CompletionService

Future具有阻塞同步性的特点,由于阻塞代码的运行效率会比较差,而CompletionService可以解决这样的问题

CompletionService的功能是以异步的方式一边生产新的任务,一边处理已完成任务的结果,这样可以将执行任务与处理任务分离开进行处理,使用submit执行任务,使用take取得已完成的任务,并按照完成这些任务的时间顺序处理它们的结果,把线程池Executor和阻塞队列BlockingQueue融合在一起

ExecutorCompletionService是CompletionService的唯一实现类

// 依赖于Executor对象,completionQueue作为完成队列
public ExecutorCompletionService(Executor executor) {
    if (executor == null)
        throw new NullPointerException();
    this.executor = executor;
    this.aes = (executor instanceof AbstractExecutorService) ?
        (AbstractExecutorService) executor : null;
    this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}

源码

take

用来取得completionQueue队列中的Future对象,最先完成的会先进入completionQueue队列中,执行时间最短的最先返回,虽然调用completionService.take().get()也会进行阻塞,但是并不会阻塞后续的任务,哪个任务先执行完,哪个任务的返回值就先打印

public Future<V> take() throws InterruptedException {
    return completionQueue.take();
}
poll

获取并移除表示下一个已完成任务的Future,如果不存在这样的任务,则返回null,无阻塞效果

public Future<V> poll() {
    return completionQueue.poll();
}

示例

public class TestCompletionService {
    public static void main(String[] args) {

        try{
            ExecutorService executorService = Executors.newFixedThreadPool(2);

            ExecutorCompletionService<String> executorCompletionService = new ExecutorCompletionService<>(executorService);

            for (int i = 0; i < 10; i++) {
                executorCompletionService.submit(new Callable<String>() {
                    @Override
                    public String call() throws Exception {
                        long sleep = (long) (Math.random() * 1000);
                        System.out.println("sleep=" + sleep + " " + Thread.currentThread().getName());
                        Thread.sleep(sleep);
                        return Thread.currentThread().getName() + " " + sleep;
                    }
                });
            }

            for (int i = 0; i < 10; i++) {
                System.out.println(executorCompletionService.take().get());
            }
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

    }

}

https://zhhll.icu/2020/多线程/基础/2.Callable接口/


原文始发于微信公众号(bug生产基地):线程使用Runnable还是Callable

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/234264.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!