ScheduleThreadPoolExecutor 是一个可以在给定延期时间后执行任务或者定期执行任务的 ThreadPoolExecutor 。这个类在需要多个工作线程或连续执行的场景下,比 Timer 更加合适。
-
相比 Timer 使用单个线程,ScheduleThreadPoolExecutor 是一个线程池来执行任务,所以 Timer 所运行的线程可能会导致业务的阻塞,而线程池在某个线程阻塞或抛出异常时,不会影响其他线程执行任务,并且提供了更好的线程复用能力。
-
另一个区别是,Timer 调度是基于操作系统的绝对时间进行的,而 ScheduleThreadPoolExecutor 是一个相对的时间,当修改系统的时间后,Timer 可能因为时间变化出现问题,而 ScheduleThreadPoolExecutor 不会。
-
ScheduleThreadPoolExecutor 中执行的任务会包装成 ScheduledFutureTask ,具有返回值(Future 定义的能力)。Timer 可执行类是 TimerTask ,TimerTask 实现了 Runnable ,没有 Future 的能力。
ScheduleThreadPoolExecutor 可以通过schedule 方法延期执行一个任务,使用 scheduleAtFixedRate 或 scheduleWithFixedDelay 方法进行执行周期的任务。
ScheduleThreadPoolExecutor 这个类专门实现了 ThreadPoolExecutor,但有一些自己的特色:
-
使用自定义的任务类型,ScheduledFutureTask ,ExecutorService 通过 execute 提交的任务即使不是 ScheduledFutureTask 也会被视为 ScheduledFutureTask 。
-
使用一个自定义队列,DelayedWorkQueue。它是无界 DelayQueue 的变体。与 ThreadPoolExecutor 相比,缺乏容量约束,以及 corePoolSize 和maximumPoolSize 实际上是相同的,以此来简化了一些执行机制(参见 delayedExecute ) 。
-
支持可选的 shutdown 后运行的参数,这导致重写 shutdown 方法来删除和取消 shutdown 后不应该运行的任务,以及当任务(重新)提交与 shutdown 重叠时不同的重新检查逻辑。
-
任务的 decoration 方法来允许拦截和插装一些逻辑,这是必需的,因为子类不能通过重写 submit 方法来获得这种效果。这些方法对线程池控制逻辑没有任何影响。
// 供子类实现的装饰方法
protected <V> RunnableScheduledFuture<V> decorateTask(
Runnable runnable, RunnableScheduledFuture<V> task) {
return task;
}
protected <V> RunnableScheduledFuture<V> decorateTask(
Callable<V> callable, RunnableScheduledFuture<V> task) {
return task;
}
使用
因为 ScheduleThreadPoolExecutor 继承自 ScheduledExecutorService ,Executors 工具类中提供了快速创建 ScheduledExecutorService 对象的方法:
val threadPoolExecutor: ScheduledExecutorService = Executors.newScheduledThreadPool(2)
另一种方式是直接创建 ScheduleThreadPoolExecutor 对象:
val threadPoolExecutor: ScheduledThreadPoolExecutor = ScheduledThreadPoolExecutor(2)
两种方式效果是一样的,都会创建一个具有 2 个核心线程的线程池,通过执行不同的方法,可以执行一个延时执行的任务或是周期性地执行任务。
3 秒后执行一个任务:
threadPoolExecutor?.schedule({
dataSource.add("Thread: ${Thread.currentThread().name}, time - ${System.currentTimeMillis()}")
},1, TimeUnit.SECONDS)
执行一个 1s 执行一次的周期任务:
threadPoolExecutor?.scheduleAtFixedRate({
dataSource.add("Thread: ${Thread.currentThread().name}, time - ${System.currentTimeMillis()}")
},1, 1, TimeUnit.SECONDS)
在 3 秒后启动第一个任务,然后以 1 秒为周期执行任务:
threadPoolExecutor?.scheduleWithFixedDelay({
dataSource.add("Thread: ${Thread.currentThread().name}, time - ${System.currentTimeMillis()}")
},3, 1, TimeUnit.SECONDS)
这里有一个困惑点,scheduleAtFixedRate 和 scheduleWithFixedDelay 效果好像一样的,但实际运行和查阅代码发现,这两个方法参数是不同的,并且效果也不同:
scheduleWithFixedDelay 方法的第三个参数是上一个任务结束到下一个任务开始的时间间隔;scheduleAtFixedRate 的第三个参数代表的是两个任务开始的时间间隔。
这可能不太好理解,经过实际的代码跑起来发现,scheduleAtFixedRate 的任务执行时间间隔基本上就是指定的时间间隔,例如 1s 执行一次,时间间隔基本上精准到 1000 ~ 1001 毫秒左右;而 scheduleWithFixedDelay 跑起来的效果则不是精确的每次都是 1s 间隔。
因为 scheduleAtFixedRate 只是关注每个任务启动的时间,即使上一个任务还没结束,也会在周期时间后立即启动下一个任务。
scheduleWithFixedDelay 则会等上一个任务结束后,再间隔指定的周期时间后,才启动下一个任务。
这里附上一个 Demo 代码,在 Android 中实现的:
class ScheduledThreadPoolActivity : AppCompatActivity() {
private lateinit var recyclerView: RecyclerView
private var threadPoolExecutor: ScheduledThreadPoolExecutor? = null
private var dataSource = ArrayList<String>()
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_scheduled_thread_pool)
recyclerView = findViewById(R.id.recyclerView)
recyclerView.apply {
adapter = Adapter()
layoutManager = LinearLayoutManager(this@ScheduledThreadPoolActivity)
}
findViewById<Button>(R.id.addBtn).setOnClickListener {
startThreadPool()
}
findViewById<Button>(R.id.removeBtn).setOnClickListener {
stopThreadPool()
}
findViewById<Button>(R.id.clearScreenBtn).setOnClickListener {
dataSource.clear()
recyclerView.adapter?.notifyDataSetChanged()
}
}
private val mainHandler = Handler(Looper.getMainLooper())
private fun startThreadPool() {
// threadPoolExecutor = Executors.newScheduledThreadPool(2)
threadPoolExecutor = ScheduledThreadPoolExecutor(2)
threadPoolExecutor?.scheduleWithFixedDelay({
dataSource.add("Thread: ${Thread.currentThread().name}, time - ${System.currentTimeMillis()}")
mainHandler.post {
recyclerView.adapter?.notifyItemInserted(dataSource.size - 1)
}
},3, 1, TimeUnit.SECONDS)
// threadPoolExecutor?.scheduleAtFixedRate({
// dataSource.add("Thread: ${Thread.currentThread().name}, time - ${System.currentTimeMillis()}")
// mainHandler.post {
// recyclerView.adapter?.notifyItemInserted(dataSource.size - 1)
// }
// },3, 1, TimeUnit.SECONDS)
//
//
// threadPoolExecutor?.schedule({
// dataSource.add("Thread: ${Thread.currentThread().name}, time - ${System.currentTimeMillis()}")
// mainHandler.post {
// recyclerView.adapter?.notifyItemInserted(dataSource.size - 1)
// }
// },1, TimeUnit.SECONDS)
}
private fun stopThreadPool() {
dataSource.add("threadPoolExecutor Close, time - ${System.currentTimeMillis()}")
recyclerView.adapter?.notifyItemInserted(dataSource.size - 1)
threadPoolExecutor?.shutdown()
var isClosed = false
// 等待线程池终止
do {
isClosed = threadPoolExecutor?.awaitTermination(1, TimeUnit.DAYS) ?: false
dataSource.add("正在等待线程池中的任务执行完成")
recyclerView.adapter?.notifyItemInserted(dataSource.size - 1)
} while(!isClosed)
dataSource.add("所有线程执行结束,线程池关闭")
recyclerView.adapter?.notifyItemInserted(dataSource.size - 1)
}
inner class ViewHolder(val view: TextView) : RecyclerView.ViewHolder(view) {
fun setText(text: String) {
view.text = text
}
}
inner class Adapter: RecyclerView.Adapter<ViewHolder>() {
override fun onCreateViewHolder(parent: ViewGroup, viewType: Int): ViewHolder {
val itemView = TextView(parent.context).apply {
layoutParams = ViewGroup.LayoutParams(ViewGroup.LayoutParams.MATCH_PARENT, ViewGroup.LayoutParams.WRAP_CONTENT)
isSingleLine = false
gravity = Gravity.START
setPadding(10, 10, 10, 10)
}
return ViewHolder(itemView)
}
override fun onBindViewHolder(holder: ViewHolder, position: Int) {
holder.setText(dataSource[position])
}
override fun getItemCount(): Int {
return dataSource.size
}
}
}
原理
继承关系

ScheduledThreadPoolExecutor 继承自 ThreadPoolExecutor 和 ScheduledExecutorService 接口,前者是线程池类,后者则是定义任务调度能力的接口:
public interface ScheduledExecutorService extends ExecutorService {
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}
schedule 方法的作用是创建并执行在给定延迟后启用的一次性操作。
而其他两个方法在上文中提到过,用来周期执行任务,这里不再重复说明。
除了父类关系,ScheduledThreadPoolExecutor 还有两个内部类 ScheduledFutureTask 和 DelayedWorkQueue 。
内部类
ScheduledFutureTask

这是一个 FutureTask ,并实现了 RunnableScheduledFuture 接口:
public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> {
boolean isPeriodic();
}
RunnableScheduledFuture 定义了是否周期执行的能力,所以 ScheduledFutureTask 是一个可周期执行的 FutureTask 。并且在继承关系中,RunnableScheduledFuture 又继承自 Delayed ,具备延时时间的能力,Delayed 继承自 Comparable ,延时时间是可以进行比较的。在 ScheduledFutureTask 中实现了对延时时间进行比较的方法,可以对队列进行按时间的排序:
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
ScheduledFutureTask 中的 sequenceNumber 属性表示了先进先出的序号,通过原子类保证序号是同步的:
private static final AtomicLong sequencer = new AtomicLong();
// ScheduledFutureTask 的构造方法中调用
this.sequenceNumber = sequencer.getAndIncrement();
// Creates a one-shot action with given nanoTime-based trigger time.
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
// Creates a periodic action with given nano time and period.
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
// Creates a one-shot action with given nanoTime-based trigger time.
ScheduledFutureTask(Callable<V> callable, long ns) {
super(callable);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
ScheduledFutureTask 是 FutureTask ,所以具备取消的能力:
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = super.cancel(mayInterruptIfRunning);
if (cancelled && removeOnCancel && heapIndex >= 0)
remove(this);
return cancelled;
}
剩下的部分是 ScheduledFutureTask 执行的主要逻辑:
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
// ...
private long time;
private final long period;
/** The actual task to be re-enqueued by reExecutePeriodic */
RunnableScheduledFuture<V> outerTask = this;
// Index into delay queue, to support faster cancellation.
int heapIndex;
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
// Returns {@code true} if this is a periodic (not a one-shot) action.
public boolean isPeriodic() {
return period != 0;
}
// Sets the next time to run for a periodic task.
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}
// Overrides FutureTask version so as to reset/requeue if periodic.
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
}
DelayedWorkQueue

DelayedWorkQueue 是一种 BlockingQueue , 并且继承 Abstract Queue 实现了默认的 Queue 能力。
DelayedWorkQueue 是专门用于 ScheduleThreadPoolExecutor 的延迟队列。为了与ThreadPoolExecutor 的声明相匹配,这个类必须声明为BlockingQueue,尽管它只能保存 RunnableScheduledFutures。
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
private static final int INITIAL_CAPACITY = 16;
private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;
// ...
}
DelayedWorkQueue 的数据结构是一个 RunnableScheduledFuture 数组。每个ScheduledFutureTask 都会记录它自己在 DelayedWorkQueue 数组中的索引,以便快速索引。
数组中的元素变化都会通过 siftUp 和 siftDown 中进行记录;删除时,ScheduledFutureTask 的 heapIndex 设置为 -1。注意,ScheduledFutureTask 在队列中最多只能出现一次(其他类型的任务或工作队列不需要这样),因此由heapIndex 作为唯一标识。
private void setIndex(RunnableScheduledFuture<?> f, int idx) {
if (f instanceof ScheduledFutureTask)
((ScheduledFutureTask)f).heapIndex = idx;
}
private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}
private void siftDown(int k, RunnableScheduledFuture<?> key) {
int half = size >>> 1;
while (k < half) {
int child = (k << 1) + 1;
RunnableScheduledFuture<?> c = queue[child];
int right = child + 1;
if (right < size && c.compareTo(queue[right]) > 0)
c = queue[child = right];
if (key.compareTo(c) <= 0)
break;
queue[k] = c;
setIndex(c, k);
k = child;
}
queue[k] = key;
setIndex(key, k);
}
DelayedWorkQueue 的数组每次扩容 50% :
/**
* Resizes the heap array. Call only when holding lock.
*/
private void grow() {
int oldCapacity = queue.length;
int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
if (newCapacity < 0) // overflow
newCapacity = Integer.MAX_VALUE;
queue = Arrays.copyOf(queue, newCapacity);
}
DelayedWorkQueue 中的操作是安全的,操作前通过 ReentrantLock 加锁,操作完成后再解锁,例如:
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return size;
} finally {
lock.unlock();
}
}
添加和删除等方法的源码逻辑可自行查看源码。
构造函数
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
ScheduledThreadPoolExecutor 的构造函数主要有三个参数:
-
corePoolSize -
ThreadFactory -
RejectedExecutionHandler
其中只有 ThreadFactory 是 ScheduledThreadPoolExecutor 独有的,用来支持可以自定义创建线程的工厂类。
另一个需要关注的点是,ScheduledThreadPoolExecutor 将 maximumPoolSize 设置为 Integer.MAX_VALUE ,keepAliveTime 设置为 0,workQueue 设置为 DelayedWorkQueue ,没有任何变种的余地。
核心属性
// 如果应该在 shutdown 时cancel /suppress 周期性任务,则为False。
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
// 如果在 shutdown 时取消非周期性任务,则为False。
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
// 如果 ScheduledFutureTask.cancel 应该从队列中移除时为 true
private volatile boolean removeOnCancel = false;
// 序号可以打破调度关联,进而保证绑定条目之间的FIFO顺序。
private static final AtomicLong sequencer = new AtomicLong();
ScheduledThreadPoolExecutor 的属性没有太多,都是用来做标志位的,只有 sequencer 是用来给每个 ScheduledFutureTask 排序的。
核心方法
从 Executor 继承来的 execute 方法和 AbstractExecutorService 继承而来的 submit 方法都是直接使用 schedule 执行:
public void execute(Runnable command) {
schedule(command, 0, NANOSECONDS);
}
public Future<?> submit(Runnable task) {
return schedule(task, 0, NANOSECONDS);
}
public <T> Future<T> submit(Runnable task, T result) {
return schedule(Executors.callable(task, result), 0, NANOSECONDS);
}
public <T> Future<T> submit(Callable<T> task) {
return schedule(task, 0, NANOSECONDS);
}
schedule
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
延时执行单个任务的方法,核心逻辑在 delayedExecute 方法中:
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
delayedExecute 是延迟或周期执行任务的主要执行方法。如果 pool 关闭,则拒绝任务。否则,将 task 添加到队列并启动一个线程(如果需要的话)来运行它。(我们不能提前启动线程来运行任务,因为任务(可能)还不应该运行。)如果在添加任务时关闭了线程池,根据线程池的状态和 run-after-shutdown 参数,来决定是否取消并删除它。
预启动线程的方法是 ensurePrestart()
:
// 与prestartCoreThread相同,不同的是,即使corePoolSize为0,至少有一个线程被启动。
void ensurePrestart() {
int wc = workerCountOf(ctl.get()); // 当前池中线程数量
if (wc < corePoolSize) // 当前池中线程数小于核心线程数,初始化核心线程
addWorker(null, true);
else if (wc == 0)
addWorker(null, false); // 在没有线程的时候启动一个线程
}
新建的工作线程会保存到 workers ,供后续快速访问。创建新的 Worker 后,会调用 Worker 中的 Thread 的 start 方法,Thread 的参数 Runnable 是 Worker 自己,从而调用到 Worker 的 run 方法,工作线程开始工作。
addWorker 方法:
private boolean addWorker(Runnable firstTask, boolean core) {
// ...
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
// ...
if (workerAdded) {
t.start();
workerStarted = true;
}
// ...
}
Worker 初始化时,创建线程并将自身作为 Runnable 放到线程中执行:
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
Worker 的 run 方法:
public void run() {
runWorker(this);
}
scheduleAtFixedRate
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
scheduleAtFixedRate 逻辑和 schedule 方法类似,ScheduledFutureTask 多设置了一个 period 参数,因此可以周期运行。周期运行主要是在 ScheduledFutureTask 的 run 方法中的 setNextRunTime()
方法:
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}
scheduleWithFixedDelay
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
scheduleWithFixedDelay 方法逻辑和 scheduleAtFixedRate 方法基本一致,唯一的不同点是 ScheduledFutureTask 的第三个参数 period,scheduleWithFixedDelay 方法设置的是 unit.toNanos(-delay))
, 这里做了一个取负值操作,为什么会造成不同的效果呢,还是要去 ScheduledFutureTask 的 setNextRunTime 方法中 :
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}
当 p 小于 0 时, time 取的是 triggerTime(-p)
:
long triggerTime(long delay) {
return now() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
-
time:开始执行时间 -
period:周期任务时间间隔
也就是说 scheduleAtFixedRate 下一次任务是 time += p
,以第一个任务启动后,period 时间后启动第二个。
而 scheduleWithFixedDelay 每次取当前时间,再加上 period ,这也是两个方法的根本区别。
overflowFree 方法的作用是将延时时间限制在 Long.,MAX_VALUE 以内,避免溢出:
private long overflowFree(long delay) {
Delayed head = (Delayed) super.getQueue().peek();
if (head != null) {
long headDelay = head.getDelay(NANOSECONDS);
if (headDelay < 0 && (delay - headDelay < 0))
delay = Long.MAX_VALUE + headDelay;
}
return delay;
}
shutdown 和 shutdownNow
public void shutdown() {
super.shutdown();
}
public List<Runnable> shutdownNow() {
return super.shutdownNow();
}
关闭线程池方法都是使用 ThreadPoolExecutor 的逻辑,这里不再展开介绍,唯一的区别是 ScheduleThreadPoolExecutor 实现了 onShutdown 方法:
@Override void onShutdown() {
BlockingQueue<Runnable> q = super.getQueue();
boolean keepDelayed =
getExecuteExistingDelayedTasksAfterShutdownPolicy();
boolean keepPeriodic =
getContinueExistingPeriodicTasksAfterShutdownPolicy();
if (!keepDelayed && !keepPeriodic) {
for (Object e : q.toArray())
if (e instanceof RunnableScheduledFuture<?>)
((RunnableScheduledFuture<?>) e).cancel(false);
q.clear();
}
else {
// Traverse snapshot to avoid iterator exceptions
for (Object e : q.toArray()) {
if (e instanceof RunnableScheduledFuture) {
RunnableScheduledFuture<?> t =
(RunnableScheduledFuture<?>)e;
if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
t.isCancelled()) { // also remove if already cancelled
if (q.remove(t))
t.cancel(false);
}
}
}
}
tryTerminate();
}
在 onShutdown 中,取消并清除由于关闭策略而不应该运行的所有任务的队列。
onShutdown 的调用时机:
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
总结
-
ScheduleThreadPoolExecutor 是用来执行延时任务或周期执行任务的线程池。 -
scheduleAtFixedRate 和 scheduleWithFixedDelay 的区别是 scheduleAtFixedRate 是定时开启任务执行;scheduleWithFixedDelay 则是当上一个结束后,延迟一定时间后,再启动下一个任务。 -
ScheduleThreadPoolExecutor 中运行的任务都会被包装成 ScheduledFutureTask 。 -
ScheduleThreadPoolExecutor 设置了 Integer.MAX_VALUE 为 maximumPoolSize ,阻塞队列设置为 DelayedWorkQueue 。 -
DelayedWorkQueue 的底层数据结构是数组,每次扩容 0.5 容量。 -
and 其他的看文章吧。
原文始发于微信公众号(八千里路山与海):Java 多线程并发【17】ScheduleThreadPoolExecutor
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/85074.html