文章目录
介绍
ScheduledThreadPoolExecutor
是JDK提供的一个基于内存的定时任务调度器继承于ThreadPoolExecutor
,需要注意的就是scheduleAtFixedRate()
方法和scheduleWithFixedDelay()
方法的区别。
详解
这里我们依旧将其分为三个模块:
- 调度器
- 任务队列
- 执行线程
任务队列负责管理用户提交的任务信息,执行线程负责去执行任务,而调度器居中协调和维护整个容器的状态和分配任务的执行线程。
调度器
首先ScheduledThreadPoolExecutor
继承自ThreadPoolExecutor
。
先说结论:ScheduledThreadPoolExecutor
相比较于ThreadPoolExecutor
,做出了以下的改动:
差异:
- 实现了任务队列(延迟队列)
DelayedWorkQueue
赋值给ThreadPoolExecutor.workQueue
,改变了队列存储和获取的条件 - 定义了新的Task包装类
ScheduledFutureTask
,改变了任务运行时的判断条件 - 重写了任务提交的方式,改变了任务提交行为
除了上述所说的差异点,当然存在着一些相同点。
相同点:
- 执行线程的创建方式
- 执行线程获取任务的方式
- 容器的生命周期变化
首先,本次的源码阅读起点先定在构造方法中(有上一章ThreadPoolExecutor源码详解的基础,不用在从变量或生命周期开始了)
构造方法
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
上述构造方法,是入参最多,自定义最多的但是相比于ThreadPoolExecutor
动则6,7个还是要少的可怜。
首先允许自定义的有:
- 核心线程数
- ThreadFactory
- 拒绝策略
不允许自定义的有:
- 最大线程数
- 线程等待时间
- 线程等待时间单位
- 任务队列(默认为
DelayedWorkQueue
)
任务提交
在ThreadPoolExecutor
中,我们的提交方式有submit()
,execute()
。
在ScheduledThreadPoolExecutor
中,依然有submit()
,execute()
,但是新增了schedule()
,scheduleAtFixedRate()
,scheduleWithFixedDelay()
方法
这几个方法的区别就在于
延迟性任务:
submit()
,execute()
,schedule()
其中submit()
,execute()
延迟时间默认为0,其实就是立即(底层也是调用的schedule()
),schedule()
就是设置的延迟时间。
周期性任务:
scheduleAtFixedRate()
,scheduleWithFixedDelay()
scheduleAtFixedRate()
:固定频率执行
scheduleWithFixedDelay()
:以任务结束为起点,周期延迟性执行
这样说不太直观,举个例子:
scheduleAtFixedRate()
设置频率为5s,前一个任务在15:50:00 开始执行,然后在15:50:03结束,那么下一次执行时间是在15:50:05。而如果结束时间是在15:50:06,那么任务结束后就会立即开始下一次的任务。
scheduleWithFixedDelay()
设置频率为5s,前一个任务在15:50:00 开始执行,然后在15:50:03结束,那么下一次执行时间是在15:50:08。
这里要先说下ScheduledThreadPoolExecutor
中的两个属性:
- continueExistingPeriodicTasksAfterShutdown,true:表示在SHUTDOWN状态下周期性任务继续执行
- executeExistingDelayedTasksAfterShutdown,true:表示在SHUTDOWN状态下延迟性任务继续执行
接下来就是看ScheduledThreadPoolExecutor
对于task的定义对象ScheduledFutureTask
ScheduledFutureTask
首先是它的成员属性
//自增序列号,保证FIFO
private final long sequenceNumber;
//下一次任务执行的时间
private long time;
//周期性执行间隔设置,0代表非周期,非0代表周期
private final long period;
RunnableScheduledFuture<V> outerTask = this;
//在延迟队列中的索引,用于快速从队列中获取
int heapIndex;
然后是它的执行方法
public void run() {
//是否是周期
boolean periodic = isPeriodic();
//SHUTDOWN并且不继续(那两个超级长的变量为false)则取消任务
//说人话就是容器未运行,并且SHUTDOWN的时候不允许任务在执行那就取消
if (!canRunInCurrentRunState(periodic))
cancel(false);
//非周期执行
else if (!periodic)
ScheduledFutureTask.super.run();
//周期执行
else if (ScheduledFutureTask.super.runAndReset()) {
//设置下一次的执行时间
setNextRunTime();
//创建线程
reExecutePeriodic(outerTask);
}
}
简单的说:非周期就执行一次,周期任务就需要计算下次执行的时间
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
//RUNNING 或者 SHUTDOWN并且continueExistingPeriodicTasksAfterShutdown为ture
if (canRunInCurrentRunState(true)) {
//任务加入延迟队列
super.getQueue().add(task);
//doubleCheck,防止这个时候容器状态变了。
//如果容器关闭,并且SHUTDOWN不运行执行,而且还把这个任务从任务队列中移除成功了。
if (!canRunInCurrentRunState(true) && remove(task))
//那我就直接取消任务
task.cancel(false);
else
//不然就创建个线程与其对应
ensurePrestart();
}
}
执行线程的创建:
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
//如果小于核心线程数,创建核心线程
if (wc < corePoolSize)
addWorker(null, true);
//如果核心线程数设置为0,那么创建普通线程,按照构造参数默认的Integer.MAX_VALUE
//相当于无限的线程
else if (wc == 0)
addWorker(null, false);
}
铛铛铛,喜闻乐见的提问时间到
问:是否能保证定时的任务一定会在我们规定的时间启动?
答:假设最大线程允许我们设置,核心线程5,这时候进行执行的定时任务10,那么只有5个线程去执行五个任务,剩下5个任务需要等前面的核心线程执行完了才会来执行,如果执行的慢,那就会出现定时设置的是1点执行,但是可能4,5点才开始跑任务,那就与我们的预期不同。
问:根据问题1是否能表示”最大线程数”没有任何意义?
答:不是,当”核心线程数”为0,那么就会创建创建普通线程与任务一一对应。
问:线程等待时间为什么是0?或者说为什么执行完毕后要立即关闭空闲的普通线程?
答:应该是因为定时任务的特点,会是一波一波的来,那么执行完毕后不关闭意义就不大,但这也面临着一些问题,每一次都可能创建线程然后销毁线程,这明显是比较浪费资源和消耗时间的。
通过上面两个问题已经了解了不允许自定义构造参数中的三个,还差最后一个 任务队列DelayedWorkQueue
,这个留在后面在分析。
十万个问什么ing:
问:有20个任务,并且任务的time都是500(虚构),那么任务的执行顺序是什么(任务执行时间一样)?
答:根据sequenceNumber,由小到大,每一个任务都会被分配一个单调递增不重复的sequenceNumber。
问:延迟性的executeExistingDelayedTasksAfterShutdown为true比较好理解,在SHUTDOWN执行完就没有了,但是假设周期性的continueExistingPeriodicTasksAfterShutdown设置为true,容器能停止下来吗?
答:在容器中的周期任务会一直反复执行,容器不接收新的任务,同时也不会停止。(默认为false)
好了,ScheduledThreadPoolExecutor
对于自己任务的定义和封装我们理解完了,接下来从最简单的任务提交方法schedule()
方法开始,let’s go!!!
schedule()
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
//通过triggerTime方法计算出下次执行的时间
//包装成RunnableScheduledFuture
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit)));
//延迟执行
delayedExecute(t);
return t;
}
private void delayedExecute(RunnableScheduledFuture<?> task) {
//如果状态不是RUNNING,直接拒绝
//这里不是判断是否是SHUTDOWN状态,而是RUNNING方法名有歧义
if (isShutdown())
reject(task);
else {
//向任务队列中添加任务
super.getQueue().add(task);
/*
*如果状态不是RUNNING,
*或者是SHUTDOWN并且continueExistingPeriodicTasksAfterShutdown(延迟性任务就是executeExistingDelayedTasksAfterShutdown)为true并且移除任务成功
*/
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
//取消任务的执行
task.cancel(false);
else
//创建工作线程
ensurePrestart();
}
}
先梳理先schedule()
的流程
简单理解:状态正常的情况下,每创建一个任务都会与之对应的创建一个线程。
scheduleAtFixedRate()
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
//构造的时候,设置下次执行时间和周期性时间
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
解释下几个入参:
command
:就是我们用户提交的任务initialDelay
:第一次执行延迟时间period
:周期性间隔时间unit
:间隔时间的单位
第一次的执行延迟时间通过在计算时间中设置来表示。
scheduleAtFixedRate()
和schedule()
区别点就在于:
period
不同
scheduleAtFixedRate()
将周期性间隔时间存放在ScheduledFutureTask
的period
成员属性中(正数)。
schedule()
的ScheduledFutureTask
的period
成员属性默认为0
scheduleWithFixedDelay()
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
//构造的时候,设置下次执行时间和周期性时间
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
scheduleWithFixedDelay()
和scheduleAtFixedRate()
区别点就在于:
period
不同
scheduleAtFixedRate()
的ScheduledFutureTask
的period
成员属性为正数。
scheduleWithFixedDelay()
的ScheduledFutureTask
的period
成员属性为负数。
源码实现:
private void setNextRunTime() {
long p = period;
//如果为正数代表是scheduleAtFixedRate,那么就在上一次时间+间隔时间
if (p > 0)
time += p;
else
//如果为负数代表是scheduleWithFixedDelay,那么就是现在时间+间隔时间
time = triggerTime(-p);
}
ok,调度器中任务提交已经阅读完毕,旅途已过半,加油ヾ(◍°∇°◍)ノ゙!
任务队列(延迟队列)
接下来我们就要来看看DelayedWorkQueue
任务的延迟队列是如何获取任务的了。
首先还是来看看其中的成员变量
//任务队列,使用的是数组,需要考虑扩容问题
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
//首个任务的执行线程
private Thread leader = null;
需要稍微理解下的就一个leader
,这个属性等后面看到相关使用得地方在来聊,这样比较好理解一些。
在DelayedWorkQueue
中主要看三个方法,入列:offer()
,出列:poll()
,take()
offer()
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>) x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
//扩容
if (i >= queue.length)
grow();
size = i + 1;
//如果队列为空,则直接放到头部
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
//不为空就需要判断,进行排序
} else {
siftUp(i, e);
}
//如果首个任务被替换成了用户提交的x
if (queue[0] == e) {
//要首个任务占有的线程清空,这里需要结合poll()和take()来看,后续在聊
leader = null;
//那么要唤醒线程,去尝试获取任务
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
接下来就看下排序问题
siftUp()
private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
//获取当前索引元素的父节点
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
//如果加入的元素大于父节点,那就存放在K中不用动
if (key.compareTo(e) >= 0)
break;
//如果不是,那么就将父节点的数据和K当前插入的位置进行交换
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}
整体思路是这样的,数组是一个二叉树,这棵树呢,有规则,父节点比子节点要小(也叫小顶对),这样的特点也就产生了,数组中的第一个元素一定是最先执行的。
简单想象:siftUp
就相当于一个刚加入的元素不停向上比较并交换位置,一直到父节点比它还小为止。
举个例子:
这时候新增一个2,那么现在的索引就是7,根据int parent = (k - 1) >>> 1
算出父节点索引为3,也就是数值为4的那个节点,因为2<4所以交换位置,上升一位,由于1<2所以不在上升,停止。
问问问
问:为什么排序会存在,直接放数组中不行吗?
答:不行,效率太低,要时时去判断数组中所有任务现在该不该执行,使用小顶堆(二叉树)优化查询效率。
poll()
尝试获取任务:
public RunnableScheduledFuture<?> poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
RunnableScheduledFuture<?> first = queue[0];
//如果队列中没有任务,或者任务的执行时间没到,则返回null
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return finishPoll(first);
} finally {
lock.unlock();
}
}
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
int s = --size;
RunnableScheduledFuture<?> x = queue[s];
//将任务缩减一个
queue[s] = null;
//如果任务队列不为空,那么就进行重排序
if (s != 0)
siftDown(0, x);
setIndex(f, -1);
return f;
}
take()
获取任务,如果无任务则进行等待
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (; ; ) {
RunnableScheduledFuture<?> first = queue[0];
//如果队列是空的,则进行等待
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
//如果第一个任务现在可以执行,则返回重新对队列排序,并返回第一个任务
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
//如果第一个任务不能执行,并且leader不为空,则等待
if (leader != null)
available.await();
else {
//将leader设置为当前线程
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
//并等待任务距离执行的间隔时间
available.awaitNanos(delay);
} finally {
//当唤醒时,需要看下是否leader被切换了,如果没被切换则设置为null
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
到这里延迟队列就大致梳理完毕了,接下来就是收尾看看leader
这个属性的作用。
问题是这样的,如果第一个任务被执行线程获取到并且不能执行,那么就会等待 该任务到执行时间的间隔时间 这么长的时间,但是第一个任务只有一个,那到时候唤醒之后大家还要来进行竞争,这就浪费资源了,所以解决方案就是,当一个线程回去到第一个任务,并且发现现在无法执行,那么他就说,这个任务我领了,到时候我醒过来执行,在唤醒你们执行后面的,你们直接等着就行。
文字太过苍白,举例子:
ok,本次旅程已结束,各位乘客请按序下车!(* ̄︶ ̄)
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/121856.html