大家好,今天我们一起聊聊线程池ThreadPoolExecutor。
大纲

线程池是程序设计中的一种多线程处理形式,它预先创建了多个工作线程,并将需要异步执行的任务放入队列中。当有新的任务提交到线程池时,线程池会根据预设的策略选择一个空闲的工作线程来执行该任务,而不是每次都新建线程。这样可以有效减少线程创建和销毁带来的系统开销,提高系统资源利用率,同时通过控制并发数防止过多线程导致的系统过载。
在Java中,java.util.concurrent包提供了强大的线程池支持,主要通过ThreadPoolExecutor类实现自定义线程池,也可以通过Executors工具类创建线程池。
ThreadPoolExecutor创建线程池
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize
是 ThreadPoolExecutor 中的一个重要参数,它表示线程池中的常驻核心线程数。这个参数的值决定了线程池中始终保持在线程池中的线程数量,即即使这些线程处于空闲状态,也不会被销毁。
maximumPoolSize
线程池中的一个关键参数,表示线程池中允许的最大线程数量。当工作队列满了,并且当前线程数小于 maximumPoolSize 时,线程池会再创建新的线程来执行任务。这有助于确保当有大量任务提交到线程池时,线程池能够有足够的线程来处理这些任务。
keepAliveTime
当线程池中的线程数量超过核心线程数(corePoolSize)时,多余的空闲线程在终止前等待新任务的最长时间。
unit
为keepAliveTime参数的时间单位,有TimeUnit.NANOSECONDS(纳秒)、TimeUnit.MICROSECONDS(微秒)、TimeUnit.MILLISECONDS(毫秒)、TimeUnit.SECONDS(秒)、TimeUnit.MINUTES(分钟)、TimeUnit.HOURS(小时)和TimeUnit.DAYS(天)几种
workQueue
线程池中的一个关键组件,用于在任务执行之前存放待处理的任务。workQueue 是一个 BlockingQueue(阻塞队列),当线程池中的线程数量达到核心线程数时,新提交的任务会被放在这个队列中等待执行。
ArrayBlockingQueue
一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。如果试图添加一个元素而队列已满,则调用线程会阻塞;如果队列为空,则从队列中移除元素的线程会阻塞。有界队列有助于避免资源耗尽,但是可能需要更复杂的错误处理逻辑,因为当队列满时,新的任务会被拒绝。
LinkedBlockingQueue
一个由链表支持的可选有界阻塞队列。如果未指定容量,那么它就是一个无界队列。此队列也按 FIFO 排序元素。无界队列意味着它会尽可能多地接受新任务,而不会因为队列满而拒绝任务。但是,这也可能导致资源耗尽,因为线程池可能会无限制地增长。
SynchronousQueue
一个不存储元素的阻塞队列。每个插入操作必须等待一个相应的删除操作,反之亦然。这实际上是一个传递性队列,它在线程之间进行直接的手递手传递。使用 SynchronousQueue 通常要求 maximumPoolSize 被设置为一个大于或等于 corePoolSize 的值,这样当工作队列为空时,线程池可以创建额外的线程来处理任务。
PriorityBlockingQueue
一个支持优先级排序的无界阻塞队列。元素根据它们的自然顺序或者通过提供的 Comparator 在队列实例化时排序。这个队列不允许使用 null 元素。
DelayQueue
这些队列不是直接用于执行任务的,但是可以用于更复杂的任务调度场景,比如延迟任务或定时任务。

选择哪种类型的 workQueue 取决于应用程序的具体需求。例如,如果希望限制线程池能够处理的任务数量,那么可以选择一个有界队列,如 ArrayBlockingQueue。如果希望线程池能够尽可能多地处理任务,而不考虑资源限制,那么可以选择一个无界队列,如 LinkedBlockingQueue。在创建线程池时,选择正确的 workQueue 是非常重要的,因为它会影响到线程池的行为、性能和资源使用情况。如果队列太小,可能会导致任务被拒绝;如果队列太大,可能会导致过多的内存使用或线程创建。因此,需要根据应用程序的具体需求来权衡这些因素。
threadFactory
是一个实现了 ThreadFactory 接口的对象。ThreadFactory 接口中定义了一个方法 newThread(Runnable r),这个方法用于创建新的线程。当 ThreadPoolExecutor 需要创建一个新线程来执行任务时,它会调用 threadFactory 的 newThread 方法来生成线程。通过提供自定义的 ThreadFactory,可以控制新线程的创建方式,包括设置线程的名称、优先级、守护线程状态等。这在某些场景下非常有用,比如想要给线程池中的线程设置一个有意义的名字以便于调试,或者想要设置线程的优先级以影响它的调度。
handler
ThreadPoolExecutor 中的一个参数,它定义了当线程池中的线程数量达到最大值并且工作队列已满时,如何处理新提交的任务。这个参数是一个 RejectedExecutionHandler 接口的实现,它定义了一个 rejectedExecution 方法,用于处理无法执行的任务。RejectedExecutionHandler 接口有一个实现类 ThreadPoolExecutor.AbortPolicy,这是默认的策略。当无法执行新任务时,它会抛出一个 RejectedExecutionException。除了 AbortPolicy,还有其他几种策略可供选择:
ThreadPoolExecutor.AbortPolicy
这个策略会抛出异常。当线程池无法处理新任务时,该策略会在调用 execute 方法的线程中直接运行任务。 这意味着提交任务的线程将负责执行它。
ThreadPoolExecutor.CallerRunsPolicy
这个策略不会抛出异常,也不会丢弃任务。当线程池无法处理新任务时,该策略会在调用 execute 方法的线程中直接运行任务。这意味着提交任务的线程将负责执行它。
ThreadPoolExecutor.DiscardPolicy
这个策略会静默地丢弃无法执行的任务,不抛出任何异常。这意味着如果线程池无法处理新任务,任务将被简单地丢弃,没有任何反馈。
ThreadPoolExecutor.DiscardOldestPolicy
这个策略会丢弃队列中最老的任务,然后尝试重新提交新任务。这意味着新任务将优先于旧任务被执行。

选择合适的拒绝策略取决于应用程序的需求。例如,如果你应用程序能够容忍丢失一些任务,那么 DiscardPolicy 或 DiscardOldestPolicy 可能是合适的。另一方面,如果希望知道何时任务被拒绝,并且想要对此进行特殊处理,那么 CallerRunsPolicy 或自定义的 RejectedExecutionHandler 可能更适合。
线程池状态
线程的状态

线程池的状态
线程池的状态是线程池内部管理和控制线程时的一种重要属性。线程池的状态通常包括以下几种:
RUNNING
这是线程池的默认状态。在RUNNING状态下,线程池能够接受新的任务,并且也能处理阻塞队列中的任务。如果阻塞队列已满,尝试提交新任务将会失败,除非调用了allowCoreThreadTimeOut(true)方法。
SHUTDOWN
当调用线程池的shutdown()方法时,线程池进入SHUTDOWN状态。在SHUTDOWN状态下,线程池不能接受新的任务,但仍然会处理阻塞队列中的任务。当阻塞队列中的任务处理完毕后,线程池中的所有线程将会终止(除非调用了allowCoreThreadTimeOut(true)方法)。
STOP
当调用线程池的shutdownNow()方法时,线程池进入STOP状态。在STOP状态下,线程池不能接受新的任务,并且会中断正在处理的任务。然后,线程池中的所有线程将会尽快终止。
TIDYING
当线程池中的所有任务都执行完毕,并且线程池中的线程数量减少到corePoolSize(除非调用了allowCoreThreadTimeOut(true)方法),线程池进入TIDYING状态。在TIDYING状态下,线程池会执行一些清理工作,如关闭钩子(hooks)等。
TERMINATED
当线程池完成了所有的清理工作并成功终止后,线程池进入TERMINATED状态。在TERMINATED状态下,线程池已经彻底关闭,不能再接受新的任务,也不能处理阻塞队列中的任务。
线程池状态转换
线程池的状态转换通常遵循以下顺序:RUNNING -> SHUTDOWN -> STOP -> TIDYING -> TERMINATED。需要注意的是,线程池的状态转换是不可逆的,即一旦线程池进入某个状态,就不能再回到之前的状态。

线程池线程的执行过程
提交任务
当客户端提交一个任务给线程池时,线程池首先会检查当前的工作线程数量是否小于核心线程数。如果是,那么直接调用addWorker()方法创建一个核心线程去执行任务。
任务队列
如果工作线程数已经等于核心线程数,但线程池中的核心线程都还在忙碌(即都在执行任务),那么新任务会被添加到阻塞队列中等待执行。在添加任务到队列之前,线程池会判断队列是否已满。
创建非核心线程
如果阻塞队列也已经满了,那么线程池会检查当前线程数是否已经达到了最大线程数maximumPoolSize。如果还没有达到,那么会再次调用addWorker()方法创建一个非核心线程去执行任务。
拒绝策略
如果当前线程的数量已经达到了最大线程数,并且阻塞队列也已经满了,那么当再有新的任务提交过来时,就需要执行拒绝策略。拒绝策略有多种,例如直接丢弃任务、抛出异常、由调用者运行任务等。具体选择哪种策略,可以根据应用场景和需求来决定。
线程销毁
非核心线程在空闲时会等待一定的时间(keepAliveTime),如果在这段时间内没有新的任务到来,它们就会被销毁,以释放系统资源。
在线程池执行任务的过程中,工作线程会直接从任务队列中获取任务并执行。线程池通过workers工作线程集合来管理这些工作线程,提供整个线程池从创建到执行任务,再到消亡的整个流程方法。此外,ThreadPoolExecutor类中提供了execute(Runnable)方法来提交任务,addWorker(Runnable, boolean)方法来创建工作线程,addWorkerFailed(Worker)方法来处理创建工作线程失败的情况,以及rejectedExecutionHandler来处理任务被拒绝的情况。这些方法和组件共同协作,实现了线程池的高效管理和控制。

由线程池线程执行过程图来分析,如果BlockQueue用的无界阻塞队列,会存在什么问题?是的,线程池中的线程数量始终为corePoolSize,新的Task,始终会添加到BlockQueue中,在内存资源有限的情况,随着Task的增加,可能很快会出现OOM,选择阻塞队列时,应尽量避免使用无界阻塞队列。
Executors创建线程池
Executors创建线程池,底层使用ThreadPoolExecutor的构造方法创建。以常用的创建方式举例说明:
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
核心线程数和非核心线程数相同,无法创建非核心线程,创建的阻塞队列为无界阻塞队列,内存资源OOM风险。
newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
核心线程数为0,最大线程数无限制,非存储阻塞队列,同样存在内存资源OOM风险
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
使用无界阻塞队列,在提交的任务足够多的情况下,同样会存在内存资源OOM风险。
虽然Executors提供了一种简便的线程池创建方式,在实际使用中需要谨慎考虑其优缺点,并根据具体场景和需求选择更合适的线程池实现方式。对于复杂的线程池需求,建议直接使用ThreadPoolExecutor类进行更精细的配置和管理。在阿里的开发手册中,明确禁止使用Executors创建线程池。
总结
ThreadPoolExecutor创建线程池时,有多个参数,非核心线程数参数,依赖于阻塞队列是否为无界阻塞队列,当线程池满时,应根据场景使用不同的策略处理,一般使用异常抛出的拒绝策略。线程池有5种状态,在线程池创建后,处于RUNNING状态,调用shutdown(),线程池由RUNNING状态变为SHUTDOWN状态,RUNNING或SHUTDOWN状态下,调用shutdownNow(),线程池状态变更为STOP状态,线程池中的所有任务执行完成并且线程数量等于核心线程数量时,状态由SHUTDOWN或STOP状态变更为TIDYING状态,待核心线程数为0,并线程池清理完后,变为TERMINATED状态。创建线程池时,应使用ThreadPoolExecutor的方式,避免使用Executors方式,同时应充分评估阻塞队列的使用,尽可能使用有界阻塞队列。
原文始发于微信公众号(扬哥手记):线程池ThreadPoolExecutor
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/239532.html