线程池及其底层原理

ThreadPoolExecutor

  • 1、线程池简介

  • 2、三大方法

    • 2.1 固定大小的线程池

    • 2.2 一个大小的线程池

    • 2.3 动态调整的线程池

  • 3、七大参数

    • 3.1 阻塞队列

    • 3.2 线程工厂

    • 3.3 拒绝策略

  • 4、底层原理

    • 4.1 工作原理

    • 4.2 属性及线程池状态

    • 4.3 提交任务原理

    • 4.4 执行任务原理

    • 4.5 关闭线程池原理




1、线程池简介

开发者可以根据系统的需求和硬件环境灵活的控制线程的数量,且可以对所有线程进行统一的管理和控制,从而提高系统的运行效率,降低系统运行运行压力。

作用

  • 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 便于线程管理。线程池进行统一的资源分配和管理。


2、三大方法

2.1 固定大小的线程池

  • 该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有空闲线程时,便处理任务队列中的任务;
ExecutorService executorService = Executors.newFixedThreadPool(3);


/**
 * 创建一个含有固定数量线程的线程池
 */

public static ExecutorService newFixedThreadPool(int nThreads) {
  return new ThreadPoolExecutor(nThreads, nThreads,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>());
}

2.2 一个大小的线程池

  • 该线程池中有且仅有一个线程。若多余任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。
ExecutorService executorService = Executors.newSingleThreadExecutor();


/**
 * 创建只有一个线程的线程池
 */

public static ExecutorService newSingleThreadExecutor() {
  return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(11,
                            0L, TimeUnit.MILLISECONDS,
                            new LinkedBlockingQueue<Runnable>()));
}

2.3 动态调整的线程池

  • 线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。
ExecutorService executorService = Executors.newCachedThreadPool();


/**
 * 创建动态调整大小的线程池
 */

public static ExecutorService newCachedThreadPool() {
  return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                60L, TimeUnit.SECONDS,
                                new SynchronousQueue<Runnable>());
}

扩展:阿里巴巴规范

线程池及其底层原理



3、七大参数

public ThreadPoolExecutor(int corePoolSize,  // 核心线程池大小
                          int maximumPoolSize,  // 最大线程的数量 
                          long keepAliveTime,  // 存活时间
                          TimeUnit unit,  // 超时单位
                          BlockingQueue<Runnable> workQueue,  // 阻塞队列
                          ThreadFactory threadFactory,  // 线程工厂
                          RejectedExecutionHandler handler)
 
//  拒绝策略
  if (corePoolSize < 0 ||
      maximumPoolSize <= 0 ||
      maximumPoolSize < corePoolSize ||
      keepAliveTime < 0)
    throw new IllegalArgumentException();
  if (workQueue == null || threadFactory == null || handler == null)
    throw new NullPointerException();
  this.acc = System.getSecurityManager() == null ?
    null :
  AccessController.getContext();
  this.corePoolSize = corePoolSize;
  this.maximumPoolSize = maximumPoolSize;
  this.workQueue = workQueue;
  this.keepAliveTime = unit.toNanos(keepAliveTime);
  this.threadFactory = threadFactory;
  this.handler = handler;
}
  • corePoolSize:核心线程数
  • maximumPoolSize:最大线程数
  • keepAliveTime:多余的空闲线程(数量超过corePoolSize的线程)的存活时间
  • unit:keepAliveTime的单位
  • workQueue:阻塞队列
  • threadFactory:线程工厂
  • handler:拒绝策略

3.1 阻塞队列

线程池及其底层原理

阻塞队列 指的是被提交但未执行的任务队列。其中

  • SynchronousQueue直接提交的队列。没有容量,每一个插入操作都要等待一个相应的删除操作,反之,每一个删除操作都要等待对应的插入操作。
  • ArrayBlockingQueue有界的任务队列。其构造函数必须带一个容量参数,表示该队列的最大容量。
  • LinkedBlockingQueue无界的任务队列。除非系统资源耗尽,否则无界的任务队列不存在任务入队失败的情况。
  • PriorityBlockingQueue优先任务队列。根据任务的优先级顺序先后执行,在确保系统性能的同时,也能有很好的质量保证(优先级高的先执行)。

3.2 线程工厂

线程池工厂 有以下选项:

  • 推荐使用:Guava包下的 ThreadFactoryBuilder
<dependency>
  <groupId>com.google.guava</groupId>
  <artifactId>guava</artifactId>
  <version>19.0</version>
</dependency>
// 自定义线程池工厂
private final static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("AsyncPool-pool-").build();
  • Executors 类里提供了默认的线程池工厂,即 Executors.defaultThreadFactory()
/**
 * The default thread factory
 * 默认的线程工厂
 */

static class DefaultThreadFactory implements ThreadFactory {
  // 静态的原子变量,用来统计线程工厂的个数
  private static final AtomicInteger poolNumber = new AtomicInteger(1);
  private final ThreadGroup group;
  // 用来记录每个线程工厂创建了多少线程,和上述的poolNumber分别作为线程池和线程的名称的一部分
  private final AtomicInteger threadNumber = new AtomicInteger(1);
  private final String namePrefix;

  DefaultThreadFactory() {
    SecurityManager s = System.getSecurityManager();
    group = (s != null) ? s.getThreadGroup() :
    Thread.currentThread().getThreadGroup();
    namePrefix = "pool-" +
      poolNumber.getAndIncrement() +
      "-thread-";
  }

  // 该方法是对线程的一个修饰
  public Thread newThread(Runnable r) {
    Thread t = new Thread(group, r,
                          namePrefix + threadNumber.getAndIncrement(),
                          0);
    if (t.isDaemon())
      t.setDaemon(false);
    if (t.getPriority() != Thread.NORM_PRIORITY)
      t.setPriority(Thread.NORM_PRIORITY);
    return t;
  }
}

3.3 拒绝策略

线程池及其底层原理

  • AbortPolicy : 抛出异常
public static class AbortPolicy implements RejectedExecutionHandler {

  public AbortPolicy() { }

  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
                                         " rejected from " +
                                         e.toString());
  }
}
  • CallerRunsPolicy : 使用调用者所在线程来运行任务
public static class CallerRunsPolicy implements RejectedExecutionHandler {

  public CallerRunsPolicy() { }

  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
      r.run();
    }
  }
}
  • DiscardOldestPolicy : 调用poll丢弃一个任务,执行当前任务
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
 
  public DiscardOldestPolicy() { }

  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
      e.getQueue().poll();
      e.execute(r);
    }
  }
}
  • DiscardPolicy : 默默丢弃,不抛出异常
public static class DiscardPolicy implements RejectedExecutionHandler {

  public DiscardPolicy() { }

  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  }
}

创建线程池的综合案例

import java.util.concurrent.*;

public class MyExecutorPool {
    public static void main(String[] args) {
      
        ExecutorService myThreadPool = new ThreadPoolExecutor(
                2,
                5,
                3,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()); //超载则抛出异常

        try{
            //最大负载为8(max + BlockingQueue)
            for(int i = 0; i < 9; i++){
                myThreadPool.execute(()->{
                    System.out.println(Thread.currentThread().getName() + " Running... ");
                });
            }

        }catch(Exception e){
            e.printStackTrace();
        }finally{
            //关闭线程池
            myThreadPool.shutdown();
        }
    }
}

4、底层原理

4.1 工作原理

线程池及其底层原理

  1. 在创建了线程池后,等待提交过来的任务请求。

  2. 当调用 execute() 方法添加一个请求任务时,线程池会做如下判断:

    1. 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;

    2. 如果正在运行的线程数量大于或等于 corePoolsize,那么将这个任务放入队列;

    3. 如果这时候队列满了且正在运行的线程数量还小于 maximumPoolSize,那么还是要创建非核心线程立即运行这个任务;

    4. 如果队列满了且正在运行的线程数量大于或等于 maximunPoolSize,那么线程池会启动饱和拒绝策略来执行

  3. 当一个线程完成任务时,它会从队列中取下一个任务来执行。

  4. 当一个线程无事可做超过一定的时间(keepAliveTime)时,线程池会判断:

    1. 如果当前运行的线程数大于 corePoolSize,那么这个线程就会被停掉。
    2. 所以线程池的所有任务完成后它最终会收缩到 corePoolSize 的大小

4.2 属性及线程池状态

/**
 * Since:1.5
 * Author:Doug Lea
 */

public class ThreadPoolExecutor extends AbstractExecutorService {
  
    // 用来标记线程池状态(高三位),线程个数(低29位)
    // 默认是RUNNING状态,线程个数为0
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  
   // 线程个数掩码位数,并不是所有平台int类型是32位,所以准确说是具体平台下Integer的二进制位数-3后的剩余位数才数线程的个数
    private static final int COUNT_BITS = Integer.SIZE - 3// Integer.SIZE = 32
  
    // 线程最大个数(低29位)0001 1111 1111 1111 1111 1111 1111 1111
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    
    // 线程池的主要状态
    // 高3位 1110 0000 0000 0000 0000 0000 0000 0000(接收新任务并且处理阻塞队列里的任务)
    private static final int RUNNING    = -1 << COUNT_BITS;
    // 高3位 0000 0000 0000 0000 0000 0000 0000 0000(拒绝新任务但是处理处理阻塞队列里的任务)
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    // 高3位 0010 0000 0000 0000 0000 0000 0000 0000(拒绝新任务并且抛弃阻塞队列里的任务,同时中断正在处理的任务)
    private static final int STOP       =  1 << COUNT_BITS;
    // 高3位 0100 0000 0000 0000 0000 0000 0000 0000(所有任务都执行完(包含阻塞队列里的任务),当前线程池活动线程为0,将要调用terminated方法)
    private static final int TIDYING    =  2 << COUNT_BITS;
    // 高3位 0110 0000 0000 0000 0000 0000 0000 0000(终止状态。terminated方法调用完成以后的状态)
    private static final int TERMINATED =  3 << COUNT_BITS;
  
   
    // 获取高三位,也即线程池的运行状态
    private static int runStateOf(int c)     return c & ~CAPACITY; }
   // 获取低29位,线程个数  
   private static int workerCountOf(int c)  return c & CAPACITY; }
    // 计算ctl新值,线程状态 和 线程个数
    private static int ctlOf(int rs, int wc) return rs | wc; }
  
  
    // 独占锁,用来控制新增Worker线程时的原子性
    private final ReentrantLock mainLock = new ReentrantLock();
  
   // 该锁对应的条件队列,在线程调用awaitTermination时用来存放阻塞的线程
    private final Condition termination = mainLock.newCondition();

  ......  
}

线程池及其底层原理

线程池及其底层原理

扩展:通过原子的修改ctl这个值来维护线程池的状态。

4.3 提交任务原理

(1)Worker 类原理解析

线程池及其底层原理

private final class Worker
  extends AbstractQueuedSynchronizer  // 继承了AQS,实现了简单不可重入独占锁
  implements Runnable
{
  // 序列化id
  private static final long serialVersionUID = 6138294804551838833L;

  // 具体执行任务的线程
  final Thread thread;
  
  // 记录该工作线程执行的第一个任务
  Runnable firstTask;
  
  /** Per-thread task counter */
  volatile long completedTasks;

  /**
   * Creates with given first task and thread from ThreadFactory.
   * @param firstTask the first task (null if none)
   */

  Worker(Runnable firstTask) {
    setState(-1); // 为了避免该线程在运行runWorker()方法前被中断,
    // state=0表示锁未被获取的状态,state=1表示锁已经被获取的状态,state=-1是创建Worker时默认的状态
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
  }

  /** Delegates main run loop to outer runWorker  */
  public void run() {
    runWorker(this);
  }

  // Lock methods
  //
  // The value 0 represents the unlocked state.
  // The value 1 represents the locked state.

  protected boolean isHeldExclusively() {
    return getState() != 0;
  }

  protected boolean tryAcquire(int unused) {
    if (compareAndSetState(01)) {
      setExclusiveOwnerThread(Thread.currentThread());
      return true;
    }
    return false;
  }

  protected boolean tryRelease(int unused) {
    setExclusiveOwnerThread(null);
    setState(0);
    return true;
  }

  public void lock()        { acquire(1); }
  public boolean tryLock()  return tryAcquire(1); }
  public void unlock()      { release(1); }
  public boolean isLocked() return isHeldExclusively(); }

  void interruptIfStarted() {
    Thread t;
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
      try {
        t.interrupt();
      } catch (SecurityException ignore) {
      }
    }
  }
}

Worker继承了AQS,实现了简单不可重入独占锁,其中,

  • state = 0,表示锁未被获取的状态
  • state = 1,表示锁已经被获取的状态
  • state = -1,是创建Worker时默认的状态

(2)ThreadPoolExecutor 提交任务到线程池的方法有以下几种:

方法 任务类型 返回值
public void execute(Runnable command) 可以接受Runnable类型的任务 void
public Future<?> submit(Runnable task) 可以接受Runnable类型的任务 Future
public  <T>  Future<T>  submit(Runnable task, T result) 接收Runnable类型任务,以及特定参数作为返回值 Future
public  <T>  Future<T>  submit(Callable<T> task) 接收Callable类型任务 Future,具体内容为任务执行的结果

线程池及其底层原理

说明execute() 来自于 Executor 接口submit() 来自于 ExecutorService 接口。

方法1

public void execute(Runnable command) {
  // 1.如果任务为null,则抛出NPE异常
  if (command == null)
    throw new NullPointerException();
 
  // 2.获取当前线程池的状态 和 线程个数变量的组合值
  int c = ctl.get();
  
  // 3.当前线程池中线程个数小于corePoolSize,则新增一个新的核心线程执行该任务
  if (workerCountOf(c) < corePoolSize) {
    // 新增一个核心线程执行该任务,true表示为核心线程
    if (addWorker(command, true))
      return;
    c = ctl.get();
  }
  
  // 4.如果线程池处于RUNNING状态,则添加当前任务到阻塞队列;非RUNNING状态下需要抛弃新任务
  if (isRunning(c) && workQueue.offer(command)) {
    // 4.1 二次检查,添加任务到阻塞队列后线程池的状态有可能已经发生变化了,如果线程池为非RUNNING状态,则把任务从阻塞队列移除,并且执行拒绝策略
    int recheck = ctl.get();
    // 4.2 如果当前线程池状态不是RUNNING状态,则从任务队列删除任务,并执行拒绝策略
    if (! isRunning(recheck) && remove(command))
      reject(command);
    // 4.3 二次校验通过,即线程池为RUNNING状态,那么需要判断当前线程池里面是否还有线程,如果没有则新增一个线程
    else if (workerCountOf(recheck) == 0)
      addWorker(nullfalse);
  }
  // 5.如果队列满了,则新增线程执行该任务,如果当前线程池里线程的个数大于maximumPoolSize,则新增失败,执行拒绝策略
  else if (!addWorker(command, false))
    reject(command);
}

总结 :

  • 判断核心线程池是否已满,如果不是,则创建线程执行任务
  • 如果核心线程池满了,判断队列是否满了,如果队列没满,将任务放在队列中
  • 如果队列满了,则判断线程池是否已满,如果没满,创建线程执行任务
  • 如果线程池也满了,则按照拒绝策略对任务进行处理

方法2

public Future<?> submit(Runnable task) {
  // 1.NPE判断
  if (task == nullthrow new NullPointerException();

  // 2.包装任务为FutureTask
  RunnableFuture<Void> ftask = newTaskFor(task, null);

  // 3.投递到线程池执行,底层调用的是上述的execute()方法
  execute(ftask);

  // 4.返回ftask
  return ftask;
}


// 上述的 newTaskFor() 底层就是 new FutureTask<T>()  
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
  // 内部创建了一个FutureTask对象
  return new FutureTask<T>(runnable, value);
}


// 上述的new FutureTask<T>()的构造函数如下
public FutureTask(Runnable runnable, V result) {
  // 将Runnable适配为Callable类型任务,并且让result作为执行结果
  this.callable = Executors.callable(runnable, result);
  this.state = NEW;       // ensure visibility of callable
}


// 上述代码中的FutureTask会在运行时执行给定的Runnable,并将在任务Runnable执行完成后,把给定的结果value通过FutureTask的get方法返回

方法3

public <T> Future<T> submit(Runnable task, T result) {
  // 1.NPE判断
  if (task == nullthrow new NullPointerException();
  
  // 2.包装任务为FutureTask
  RunnableFuture<T> ftask = newTaskFor(task, result);
  
  // 3.投递到线程池执行,底层调用的是上述的execute()方法
  execute(ftask);
  
  // 4.返回ftask
  return ftask;
}

方法4

public <T> Future<T> submit(Callable<T> task) {
  // 1.NPE判断
  if (task == nullthrow new NullPointerException();
  
  // 2.包装任务为FutureTask
  RunnableFuture<T> ftask = newTaskFor(task);
  
  // 3.投递到线程池执行,底层调用的是上述的execute()方法
  execute(ftask);
  
  // 4.返回ftask
  return ftask;
}

方法3和方法4可知,两个参数的 submit 方法类似,不同在于该方法接收的是含有返回值的Callable类型的任务,最终也是转换为FutureTask后提交到线程池,并返回。

4.4 执行任务原理

线程池及其底层原理

当用户线程提交任务到线程池后,在线程池没有执行拒绝策略的情况下,用户线程会马上返回,而提交的任务要么直接切换到线程池中的Worker线程来执行要么先放入线程池的阻塞队列里面,稍后再由Worker线程来执行。

private final class Worker
  extends AbstractQueuedSynchronizer
  implements Runnable
{
  // 序列化id
  private static final long serialVersionUID = 6138294804551838833L;

  /** Thread this worker is running in.  Null if factory fails. */
  final Thread thread;
  /** Initial task to run.  Possibly null. */
  Runnable firstTask;
  /** Per-thread task counter */
  volatile long completedTasks;

  // 构造函数
  Worker(Runnable firstTask) {
    setState(-1); // state为AQS中的同步状态字段,设置为-1是为了避免当前Worker在调用runWorker方法前被中断(当其他线程调用了线程池的shutdownNow时,如果Worker状态 >= 0则会中断该线程)
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this); // 创建一个线程
  }

  /** Delegates main run loop to outer runWorker  */
  // 该类实现了Runnable接口,内部委托给了runWorker()方法
  public void run() {
    runWorker(this);
  }

  // ......
}

runWorker() 方法如下:

final void runWorker(Worker w) {
  Thread wt = Thread.currentThread();
  Runnable task = w.firstTask;
  w.firstTask = null;
  w.unlock(); // allow interrupts; 1. 设置为0表示允许中断,这时如果调用shutdownNow()会中断Worker线程
  boolean completedAbruptly = true;
  try {
    // 2. 如果当前task==null 或者从任务队列获取的任务返回则null,则执行清理工作,当前Worker也就退出执行了
    while (task != null || (task = getTask()) != null) {
      // 2.1 获取工作线程内部持有的独占锁,执行扩展接口代码2.2
      w.lock();
      // If pool is stopping, ensure thread is interrupted;
      // if not, ensure thread is not interrupted.  This
      // requires a recheck in second case to deal with
      // shutdownNow race while clearing interrupt
      if ((runStateAtLeast(ctl.get(), STOP) ||
           (Thread.interrupted() &&
            runStateAtLeast(ctl.get(), STOP))) &&
          !wt.isInterrupted())
        wt.interrupt();
      try {
        // 2.2 任务执行前干一些事情
        beforeExecute(wt, task);
        Throwable thrown = null;
        try {
          // 2.3 执行具体的任务
          task.run();
        } catch (RuntimeException x) {
          thrown = x; throw x;
        } catch (Error x) {
          thrown = x; throw x;
        } catch (Throwable x) {
          thrown = x; throw new Error(x);
        } finally {
          // 2.4 任务执行完毕后干一些事情
          afterExecute(task, thrown);
        }
      } finally {
        task = null;
        // 2.5 统计当前Worker完成了多少个任务,并释放锁
        w.completedTasks++;
        w.unlock();
      }
    }
    completedAbruptly = false;
  } finally {
    // 3.执行清理工作
    processWorkerExit(w, completedAbruptly);
  }
}

说明:

  • 这里执行具体任务期间加锁,是为了避免任务运行期间,其他线程调用了shutdown方法关闭线程池时中断正在执行任务的线程

processWorkerExit() 执行清理工作:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
  if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
    decrementWorkerCount();

  // 1. 统计整个线程池完成的任务的个数,并从工作集里面删除当前Worker
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    completedTaskCount += w.completedTasks;
    workers.remove(w);
  } finally {
    mainLock.unlock();
  }

  
  // 2.如果当前是shutdown状态并且工作队列为空,或者当前是stop状态且当前线程池里面没有活动线程,则设置线程池状态为TERMINATED,
  tryTerminate();

  // 3. 如果当前线程个数小于核心线程个数,则增加
  int c = ctl.get();
  if (runStateLessThan(c, STOP)) {
    if (!completedAbruptly) {
      int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
      if (min == 0 && ! workQueue.isEmpty())
        min = 1;
      if (workerCountOf(c) >= min)
        return// replacement not needed
    }
    addWorker(nullfalse);
  }
}

4.5 关闭线程池原理

线程池中有两种模式的线程池关闭方法,如下所示

方法 作用 返回值
public void shutdown() 调用该方法后,线程池就不会再接收新的任务,但是工作队列里面的任务还是要执行的,该方法是立刻返回的,并不同步等待队列任务完成再返回 Void
public List<Runnable> shutdownNow() 调用该方法后,线程池就不会再接收新的任务,并且会丢弃工作队列里面的任务,正在执行的任务也会被中断,该方法是立刻返回的,并不同步等待激活的任务执行完成再返回 返回值为这时任务队列里面被丢弃的任务列表

方法1:

public void shutdown() {
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    // 1. 权限检查
    checkShutdownAccess();
    // 2. 设置当前线程池状态为SHUTDOWN,如果已经是了则直接返回
    advanceRunState(SHUTDOWN);
    // 3. 设置中断标志
    interruptIdleWorkers();
    onShutdown(); // hook for ScheduledThreadPoolExecutor
  } finally {
    mainLock.unlock();
  }
  // 4. 尝试状态变为TERMINATED
  tryTerminate();
}


// 上述代码2的内容
// 如果当前状态>=SHUTDOWN则直接返回,否则设置当前状态为SHTDOWN 
private void advanceRunState(int targetState) {
  for (;;) {
    int c = ctl.get();
    if (runStateAtLeast(c, targetState) ||
        ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
      break;
  }
}


// 上述代码3的内容
private void interruptIdleWorkers() {
  interruptIdleWorkers(false);
}

// 设置所有空闲线程的中断标志,首先会加全局锁,同时只有一个线程可以调用shutdown设置中断标志。
// 然后尝试获取Worker本身的锁,获取成功则设置中断标识,由于正在执行的任务已经获取了锁,所以正在执行的任务没有被中断。
// 这里中断的是阻塞到getTask()方法,企图从队列里获取任务的线程,也就是空闲线程。
private void interruptIdleWorkers(boolean onlyOne) {
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    for (Worker w : workers) {
      Thread t = w.thread;
      // 如果工作线程没有被中断,并且没有正在运行则设置中断
      if (!t.isInterrupted() && w.tryLock()) {
        try {
          t.interrupt();
        } catch (SecurityException ignore) {
        } finally {
          w.unlock();
        }
      }
      if (onlyOne)
        break;
    }
  } finally {
    mainLock.unlock();
  }
}


// 上述代码4的内容,尝试将线程池的状态变为TERMINATED
// 首先使用CAS设置当前线程池状态为TIDYING,如果成功则执行扩展接口terminated在线程池状态变为TERMINATED前做一些事情,然后设置当前线程池状态为TERMINATED,最后调用termination.signalAll()来激活调用线程池的awaitTermination系列方法被阻塞的所有线程
final void tryTerminate() {
  for (;;) {
    int c = ctl.get();
    if (isRunning(c) ||
        runStateAtLeast(c, TIDYING) ||
        (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
      return;
    if (workerCountOf(c) != 0) { // Eligible to terminate
      interruptIdleWorkers(ONLY_ONE);
      return;
    }

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
      // 设置当前线程池状态为TIDYING
      if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
        try {
          terminated();
        } finally {
          // 设置当前线程池状态为TERMINATED
          ctl.set(ctlOf(TERMINATED, 0));
          // 激活调用条件变量termination的await系列方法被阻塞的所有线程
          termination.signalAll();
        }
        return;
      }
    } finally {
      mainLock.unlock();
    }
    // else retry on failed CAS
  }
}

方法2:

public List<Runnable> shutdownNow() {
  List<Runnable> tasks;
  final ReentrsantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    // 1.权限检查
    checkShutdownAccess();
    // 2.设置线程池状态为STOP
    advanceRunState(STOP);
    // 3.中断所有的线程,包括空闲线程和正在执行任务的线程
    interruptWorkers();
    // 4.移动队列任务到tasks
    tasks = drainQueue();
  } finally {
    mainLock.unlock();
  }
  tryTerminate();
  return tasks;
}


// 上述3的内容,中断所有线程,包括空闲线程和执行任务的线程
private void interruptWorkers() {
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    for (Worker w : workers)
      w.interruptIfStarted();
  } finally {
    mainLock.unlock();
  }
}


// 上述4的内容,将当前任务队列的任务移动到tasks列表
private List<Runnable> drainQueue() {
  // 1.获取任务队列
  BlockingQueue<Runnable> q = workQueue;
  // 2.从任务队列移除任务到taskList列表
  ArrayList<Runnable> taskList = new ArrayList<Runnable>();
  q.drainTo(taskList);
  // 3.如果q还不为空,则说明drainQueue接口调用失败,则循环移除
  if (!q.isEmpty()) {
    for (Runnable r : q.toArray(new Runnable[0])) {
      if (q.remove(r))
        taskList.add(r);
    }
  }
  // 4.返回异常的任务列表
  return taskList;
}


原文始发于微信公众号(AJCoder):线程池及其底层原理

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/24316.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!