线程池ThreadPoolExecutor

大家好,今天我们一起聊聊线程池ThreadPoolExecutor。

大纲

线程池ThreadPoolExecutor

线程池是程序设计中的一种多线程处理形式,它预先创建了多个工作线程,并将需要异步执行的任务放入队列中。当有新的任务提交到线程池时,线程池会根据预设的策略选择一个空闲的工作线程来执行该任务,而不是每次都新建线程。这样可以有效减少线程创建和销毁带来的系统开销,提高系统资源利用率,同时通过控制并发数防止过多线程导致的系统过载。

Java中,java.util.concurrent包提供了强大的线程池支持,主要通过ThreadPoolExecutor类实现自定义线程池,也可以通过Executors工具类创建线程池。

ThreadPoolExecutor创建线程池

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

corePoolSize

是 ThreadPoolExecutor 中的一个重要参数,它表示线程池中的常驻核心线程数。这个参数的值决定了线程池中始终保持在线程池中的线程数量,即即使这些线程处于空闲状态,也不会被销毁。

maximumPoolSize

线程池中的一个关键参数,表示线程池中允许的最大线程数量。当工作队列满了,并且当前线程数小于 maximumPoolSize 时,线程池会再创建新的线程来执行任务。这有助于确保当有大量任务提交到线程池时,线程池能够有足够的线程来处理这些任务。

keepAliveTime

当线程池中的线程数量超过核心线程数(corePoolSize)时,多余的空闲线程在终止前等待新任务的最长时间。

unit

为keepAliveTime参数的时间单位,有TimeUnit.NANOSECONDS(纳秒)、TimeUnit.MICROSECONDS(微秒)、TimeUnit.MILLISECONDS(毫秒)、TimeUnit.SECONDS(秒)、TimeUnit.MINUTES(分钟)、TimeUnit.HOURS(小时)和TimeUnit.DAYS(天)几种

workQueue

线程池中的一个关键组件,用于在任务执行之前存放待处理的任务。workQueue 是一个 BlockingQueue(阻塞队列),当线程池中的线程数量达到核心线程数时,新提交的任务会被放在这个队列中等待执行。

ArrayBlockingQueue

一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。如果试图添加一个元素而队列已满,则调用线程会阻塞;如果队列为空,则从队列中移除元素的线程会阻塞。有界队列有助于避免资源耗尽,但是可能需要更复杂的错误处理逻辑,因为当队列满时,新的任务会被拒绝。

LinkedBlockingQueue

一个由链表支持的可选有界阻塞队列。如果未指定容量,那么它就是一个无界队列。此队列也按 FIFO 排序元素。无界队列意味着它会尽可能多地接受新任务,而不会因为队列满而拒绝任务。但是,这也可能导致资源耗尽,因为线程池可能会无限制地增长。

SynchronousQueue

一个不存储元素的阻塞队列。每个插入操作必须等待一个相应的删除操作,反之亦然。这实际上是一个传递性队列,它在线程之间进行直接的手递手传递。使用 SynchronousQueue 通常要求 maximumPoolSize 被设置为一个大于或等于 corePoolSize 的值,这样当工作队列为空时,线程池可以创建额外的线程来处理任务。

PriorityBlockingQueue

一个支持优先级排序的无界阻塞队列。元素根据它们的自然顺序或者通过提供的 Comparator 在队列实例化时排序。这个队列不允许使用 null 元素。

DelayQueue

这些队列不是直接用于执行任务的,但是可以用于更复杂的任务调度场景,比如延迟任务或定时任务。

线程池ThreadPoolExecutor

选择哪种类型的 workQueue 取决于应用程序的具体需求。例如,如果希望限制线程池能够处理的任务数量,那么可以选择一个有界队列,如 ArrayBlockingQueue。如果希望线程池能够尽可能多地处理任务,而不考虑资源限制,那么可以选择一个无界队列,如 LinkedBlockingQueue。在创建线程池时,选择正确的 workQueue 是非常重要的,因为它会影响到线程池的行为、性能和资源使用情况。如果队列太小,可能会导致任务被拒绝;如果队列太大,可能会导致过多的内存使用或线程创建。因此,需要根据应用程序的具体需求来权衡这些因素。

threadFactory

是一个实现了 ThreadFactory 接口的对象。ThreadFactory 接口中定义了一个方法 newThread(Runnable r),这个方法用于创建新的线程。当 ThreadPoolExecutor 需要创建一个新线程来执行任务时,它会调用 threadFactory 的 newThread 方法来生成线程。通过提供自定义的 ThreadFactory,可以控制新线程的创建方式,包括设置线程的名称、优先级、守护线程状态等。这在某些场景下非常有用,比如想要给线程池中的线程设置一个有意义的名字以便于调试,或者想要设置线程的优先级以影响它的调度。

handler

ThreadPoolExecutor 中的一个参数,它定义了当线程池中的线程数量达到最大值并且工作队列已满时,如何处理新提交的任务。这个参数是一个 RejectedExecutionHandler 接口的实现,它定义了一个 rejectedExecution 方法,用于处理无法执行的任务。RejectedExecutionHandler 接口有一个实现类 ThreadPoolExecutor.AbortPolicy,这是默认的策略。当无法执行新任务时,它会抛出一个 RejectedExecutionException。除了 AbortPolicy,还有其他几种策略可供选择:

ThreadPoolExecutor.AbortPolicy

这个策略会抛出异常。当线程池无法处理新任务时,该策略会在调用 execute 方法的线程中直接运行任务。        这意味着提交任务的线程将负责执行它。

ThreadPoolExecutor.CallerRunsPolicy

这个策略不会抛出异常,也不会丢弃任务。当线程池无法处理新任务时,该策略会在调用 execute 方法的线程中直接运行任务。这意味着提交任务的线程将负责执行它。

ThreadPoolExecutor.DiscardPolicy

这个策略会静默地丢弃无法执行的任务,不抛出任何异常。这意味着如果线程池无法处理新任务,任务将被简单地丢弃,没有任何反馈。

ThreadPoolExecutor.DiscardOldestPolicy

这个策略会丢弃队列中最老的任务,然后尝试重新提交新任务。这意味着新任务将优先于旧任务被执行。

线程池ThreadPoolExecutor

选择合适的拒绝策略取决于应用程序的需求。例如,如果你应用程序能够容忍丢失一些任务,那么 DiscardPolicy 或 DiscardOldestPolicy 可能是合适的。另一方面,如果希望知道何时任务被拒绝,并且想要对此进行特殊处理,那么 CallerRunsPolicy 或自定义的 RejectedExecutionHandler 可能更适合。

线程池状态

线程的状态

线程池ThreadPoolExecutor

线程池的状态

线程池的状态是线程池内部管理和控制线程时的一种重要属性。线程池的状态通常包括以下几种:

RUNNING

这是线程池的默认状态。在RUNNING状态下,线程池能够接受新的任务,并且也能处理阻塞队列中的任务。如果阻塞队列已满,尝试提交新任务将会失败,除非调用了allowCoreThreadTimeOut(true)方法。

SHUTDOWN

当调用线程池的shutdown()方法时,线程池进入SHUTDOWN状态。在SHUTDOWN状态下,线程池不能接受新的任务,但仍然会处理阻塞队列中的任务。当阻塞队列中的任务处理完毕后,线程池中的所有线程将会终止(除非调用了allowCoreThreadTimeOut(true)方法)。

STOP

当调用线程池的shutdownNow()方法时,线程池进入STOP状态。在STOP状态下,线程池不能接受新的任务,并且会中断正在处理的任务。然后,线程池中的所有线程将会尽快终止。

TIDYING

当线程池中的所有任务都执行完毕,并且线程池中的线程数量减少到corePoolSize(除非调用了allowCoreThreadTimeOut(true)方法),线程池进入TIDYING状态。在TIDYING状态下,线程池会执行一些清理工作,如关闭钩子(hooks)等。

TERMINATED

当线程池完成了所有的清理工作并成功终止后,线程池进入TERMINATED状态。在TERMINATED状态下,线程池已经彻底关闭,不能再接受新的任务,也不能处理阻塞队列中的任务。

线程池状态转换

线程池的状态转换通常遵循以下顺序:RUNNING -> SHUTDOWN -> STOP -> TIDYING -> TERMINATED。需要注意的是,线程池的状态转换是不可逆的,即一旦线程池进入某个状态,就不能再回到之前的状态。

线程池ThreadPoolExecutor

线程池线程的执行过程

提交任务

当客户端提交一个任务给线程池时,线程池首先会检查当前的工作线程数量是否小于核心线程数。如果是,那么直接调用addWorker()方法创建一个核心线程去执行任务。

任务队列

如果工作线程数已经等于核心线程数,但线程池中的核心线程都还在忙碌(即都在执行任务),那么新任务会被添加到阻塞队列中等待执行。在添加任务到队列之前,线程池会判断队列是否已满。

创建非核心线程

如果阻塞队列也已经满了,那么线程池会检查当前线程数是否已经达到了最大线程数maximumPoolSize。如果还没有达到,那么会再次调用addWorker()方法创建一个非核心线程去执行任务。

拒绝策略

如果当前线程的数量已经达到了最大线程数,并且阻塞队列也已经满了,那么当再有新的任务提交过来时,就需要执行拒绝策略。拒绝策略有多种,例如直接丢弃任务、抛出异常、由调用者运行任务等。具体选择哪种策略,可以根据应用场景和需求来决定。

线程销毁

非核心线程在空闲时会等待一定的时间(keepAliveTime),如果在这段时间内没有新的任务到来,它们就会被销毁,以释放系统资源。

在线程池执行任务的过程中,工作线程会直接从任务队列中获取任务并执行。线程池通过workers工作线程集合来管理这些工作线程,提供整个线程池从创建到执行任务,再到消亡的整个流程方法。此外,ThreadPoolExecutor类中提供了execute(Runnable)方法来提交任务,addWorker(Runnable, boolean)方法来创建工作线程,addWorkerFailed(Worker)方法来处理创建工作线程失败的情况,以及rejectedExecutionHandler来处理任务被拒绝的情况。这些方法和组件共同协作,实现了线程池的高效管理和控制。

线程池ThreadPoolExecutor

由线程池线程执行过程图来分析,如果BlockQueue用的无界阻塞队列,会存在什么问题?是的,线程池中的线程数量始终为corePoolSize,新的Task,始终会添加到BlockQueue中,在内存资源有限的情况,随着Task的增加,可能很快会出现OOM,选择阻塞队列时,应尽量避免使用无界阻塞队列。

Executors创建线程池

Executors创建线程池,底层使用ThreadPoolExecutor的构造方法创建。以常用的创建方式举例说明:

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}

核心线程数和非核心线程数相同,无法创建非核心线程,创建的阻塞队列为无界阻塞队列,内存资源OOM风险。

newCachedThreadPool

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>(),
                                  threadFactory);
}

核心线程数为0,最大线程数无限制,非存储阻塞队列,同样存在内存资源OOM风险

newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(11,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(11,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>(),
                                threadFactory));
}

使用无界阻塞队列,在提交的任务足够多的情况下,同样会存在内存资源OOM风险。

虽然Executors提供了一种简便的线程池创建方式,在实际使用中需要谨慎考虑其优缺点,并根据具体场景和需求选择更合适的线程池实现方式。对于复杂的线程池需求,建议直接使用ThreadPoolExecutor类进行更精细的配置和管理。在阿里的开发手册中,明确禁止使用Executors创建线程池。

总结

ThreadPoolExecutor创建线程池时,有多个参数,非核心线程数参数,依赖于阻塞队列是否为无界阻塞队列,当线程池满时,应根据场景使用不同的策略处理,一般使用异常抛出的拒绝策略。线程池有5种状态,在线程池创建后,处于RUNNING状态,调用shutdown(),线程池由RUNNING状态变为SHUTDOWN状态,RUNNING或SHUTDOWN状态下,调用shutdownNow(),线程池状态变更为STOP状态,线程池中的所有任务执行完成并且线程数量等于核心线程数量时,状态由SHUTDOWN或STOP状态变更为TIDYING状态,待核心线程数为0,并线程池清理完后,变为TERMINATED状态。创建线程池时,应使用ThreadPoolExecutor的方式,避免使用Executors方式,同时应充分评估阻塞队列的使用,尽可能使用有界阻塞队列。

点击这里给我留言吧

原文始发于微信公众号(扬哥手记):线程池ThreadPoolExecutor

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/239532.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!