❝
紧接上一篇文章,我们一下来思考一下,下面两个问题:
1.spring中定时任务是谁来执行的?
2.如何动态调整调度策略。
接下来我们详细分析一下这两个问题。
为@Scheduled定义一个线程池
spring在执行调度任务前,会按照好一定的策略,寻找一个可用的线程池来执行调度任务,寻找这个线程池的过程如下:
private void finishRegistration() {
if (this.scheduler != null) {
this.registrar.setScheduler(this.scheduler);
}
if (this.beanFactory instanceof ListableBeanFactory) {
Map<String, SchedulingConfigurer> beans =
((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());
AnnotationAwareOrderComparator.sort(configurers);
// 1.使用SchedulingConfigurer进行配置
for (SchedulingConfigurer configurer : configurers) {
configurer.configureTasks(this.registrar);
}
}
if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type");
try {
// Search for TaskScheduler bean... // 2.寻找一个类型为 TaskScheduler 的bean
this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));
}
catch (NoUniqueBeanDefinitionException ex) {
if (logger.isTraceEnabled()) {
logger.trace("Could not find unique TaskScheduler bean - attempting to resolve by name: " +
ex.getMessage());
}
try {
this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));
}
catch (NoSuchBeanDefinitionException ex2) {
if (logger.isInfoEnabled()) {
logger.info("More than one TaskScheduler bean exists within the context, and " +
"none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
"(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
"ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
ex.getBeanNamesFound());
}
}
}
catch (NoSuchBeanDefinitionException ex) {
if (logger.isTraceEnabled()) {
logger.trace("Could not find default TaskScheduler bean - attempting to find ScheduledExecutorService: " +
ex.getMessage());
}
// Search for ScheduledExecutorService bean next...
try {
this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));
}
catch (NoUniqueBeanDefinitionException ex2) {
if (logger.isTraceEnabled()) {
logger.trace("Could not find unique ScheduledExecutorService bean - attempting to resolve by name: " +
ex2.getMessage());
}
try {
this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));
}
catch (NoSuchBeanDefinitionException ex3) {
if (logger.isInfoEnabled()) {
logger.info("More than one ScheduledExecutorService bean exists within the context, and " +
"none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
"(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
"ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
ex2.getBeanNamesFound());
}
}
}
catch (NoSuchBeanDefinitionException ex2) {
if (logger.isTraceEnabled()) {
logger.trace("Could not find default ScheduledExecutorService bean - falling back to default: " +
ex2.getMessage());
}
// Giving up -> falling back to default scheduler within the registrar...
logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing");
}
}
}
// 3.在afterPropertiesSet中创建一个默认的线程池
this.registrar.afterPropertiesSet();
}
在上面的代码中,有三处可以完成对 线程池的设置:
❝
1.通过自定义的
SchedulingConfigurer
的实现类,实现对 registrar 中线程池的配置。
if (this.beanFactory instanceof ListableBeanFactory) {
Map<String, SchedulingConfigurer> beans =
((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());
AnnotationAwareOrderComparator.sort(configurers);
for (SchedulingConfigurer configurer : configurers) {
configurer.configureTasks(this.registrar);
}
}
❝
2.如果第一步没有找到可用的线程池,那么就会从上下文中获取一个类型为
TaskScheduler
的bean作为可用线程池.
这个也是比较坑的地方:有可能你在业务代码中定义了一个 TaskScheduler
的bean,结果被spring的调度器给使用了,这样就会产生,资源的竞争。
如果你在上下文中没有定义或者定义了多个 TaskScheduler
的话,那么 spring会抛出一个找不到bean或者bean不唯一的异常,然后输出相关的日志提醒。
if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type");
try {
// Search for TaskScheduler bean...
this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));
}
catch (NoUniqueBeanDefinitionException ex) {
if (logger.isTraceEnabled()) {
logger.trace("Could not find unique TaskScheduler bean - attempting to resolve by name: " +
ex.getMessage());
}
try {
this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));
}
catch (NoSuchBeanDefinitionException ex2) {
...
}
}
catch (NoSuchBeanDefinitionException ex) {
if (logger.isTraceEnabled()) {
logger.trace("Could not find default TaskScheduler bean - attempting to find ScheduledExecutorService: " +
ex.getMessage());
}
// Search for ScheduledExecutorService bean next...
try {
this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));
}
catch (NoUniqueBeanDefinitionException ex2) {
if (logger.isTraceEnabled()) {
logger.trace("Could not find unique ScheduledExecutorService bean - attempting to resolve by name: " +
ex2.getMessage());
}
try {
this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));
}
catch (NoSuchBeanDefinitionException ex3) {
....
}
}
catch (NoSuchBeanDefinitionException ex2) {
...
}
}
}
❝
3.如果第二步也没有找到可用线程池的话,就会创建一个默认的线程池。
this.registrar.afterPropertiesSet();
public void afterPropertiesSet() {
scheduleTasks();
}
protected void scheduleTasks() {
if (this.taskScheduler == null) {
// 创建一个线程池
this.localExecutor = Executors.newSingleThreadScheduledExecutor();
this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
}
if (this.triggerTasks != null) {
for (TriggerTask task : this.triggerTasks) {
addScheduledTask(scheduleTriggerTask(task));
}
}
if (this.cronTasks != null) {
for (CronTask task : this.cronTasks) {
addScheduledTask(scheduleCronTask(task));
}
}
if (this.fixedRateTasks != null) {
for (IntervalTask task : this.fixedRateTasks) {
addScheduledTask(scheduleFixedRateTask(task));
}
}
if (this.fixedDelayTasks != null) {
for (IntervalTask task : this.fixedDelayTasks) {
addScheduledTask(scheduleFixedDelayTask(task));
}
}
}
默认会创建一个 newSingleThreadScheduledExecutor
作为默认的线程池。这个线程池的 最大线程池个数是 Interger
的最大值,队里大小也是 Integer
的最大值。这种线程池在业务开发中是不推荐,会导致系统资源被过多占用。
那么如何给spring提供一个自定义的线程池呢?按照上面spring获取线程池的流程,我们可以在第一步的时候,定义一个 SchedulingConfigurer
来完成对 register
的配置,具体代码如下:
@Configuration
public class SchedulerConfig implements SchedulingConfigurer {
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
taskRegistrar.setTaskScheduler(threadPoolTaskScheduler());
}
@Bean(destroyMethod="shutdown")
public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(10); // 设置线程池大小
scheduler.setThreadNamePrefix("my-scheduled-task-"); // 设置线程名前缀
scheduler.initialize();
return scheduler;
}
}
实现可动态修改的调度任务
使用spring的@Scheduled实现的定时任务可以动态修改调度策略吗?
其实是不可以的
看过上面的执行流程,可以发现,定时调度策略、定时执行的业务逻辑,是在spring容器启动的时候就设置好的,而且spring并没有提供相关的接口,让我们来修改。
所以要想动态修改 定时调度的策略,那么就只能自己实现一套简单的小框架。
当然不是直接使用jdk中原生的api,而是使用spring提供的现有工具,为了对现有使用 @Scheduled
方式产生最小影响,以下实现方案也是采用注解的方式来实现,具体实现流程分为4步。
1.自定义注解
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface DynamicScheduled {
String taskId(); // 执行任务的唯一标识符
String cron(); // 调度的cron表示,这里暂时只支持一种
}
2.解析自定义注解
@Component
public class DynamicScheduler {
private ThreadPoolTaskScheduler taskScheduler ;
@Autowired
private ApplicationContext applicationContext;
private Map<String, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>();
@Autowired
public DynamicScheduler(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
this.taskScheduler = createTaskScheduler();
}
// 自定义任务执行线程池
@Bean
public ThreadPoolTaskScheduler createTaskScheduler() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(10);
taskScheduler.setThreadNamePrefix("DynamicTaskScheduler-");
taskScheduler.initialize();
return taskScheduler;
}
// 监听上下文刷新事件
@EventListener(ContextRefreshedEvent.class)
public void init() {
Map<String, Object> beans = applicationContext.getBeansWithAnnotation(Component.class);
for (Object bean : beans.values()) {
Method[] methods = bean.getClass().getDeclaredMethods();
for (Method method : methods) {
if (method.isAnnotationPresent(DynamicScheduled.class)) {
DynamicScheduled dynamicScheduled = method.getAnnotation(DynamicScheduled.class);
String taskId = dynamicScheduled.taskId();
String cron = dynamicScheduled.cron();
Runnable task = () -> {
try {
method.invoke(bean);
} catch (Exception e) {
e.printStackTrace();
}
};
ScheduledFuture<?> scheduledFuture = taskScheduler.schedule(task, new CronTrigger(cron));
scheduledTasks.put(taskId, scheduledFuture);
}
}
}
}
// 更新调度策略
public void updateCron(String taskId, String newCron) {
ScheduledFuture<?> scheduledFuture = scheduledTasks.get(taskId);
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
scheduledTasks.remove(taskId);
}
Map<String, Object> beans = applicationContext.getBeansWithAnnotation(Component.class);
for (Object bean : beans.values()) {
Method[] methods = bean.getClass().getDeclaredMethods();
for (Method method : methods) {
if (method.isAnnotationPresent(DynamicScheduled.class)) {
DynamicScheduled dynamicScheduled = method.getAnnotation(DynamicScheduled.class);
if (taskId.equals(dynamicScheduled.taskId())) {
Runnable task = () -> {
try {
method.invoke(bean);
} catch (Exception e) {
e.printStackTrace();
}
};
ScheduledFuture<?> newScheduledFuture = taskScheduler.schedule(task, new CronTrigger(newCron));
scheduledTasks.put(taskId, newScheduledFuture);
return;
}
}
}
}
}
}
3.修改调度策略
@PostMapping("/update-cron")
public String updateCron(@RequestParam("taskId") String taskId, @RequestParam("cron") String cronExpression) {
dynamicScheduler.updateCron(taskId, cronExpression);
return "Cron expression for task " + taskId + " updated to: " + cronExpression;
}
4.使用自定义的注解
// @Scheduled(cron="0/1 * * * * ?")
@DynamicScheduled(taskId = "retryFailJob",cron = "0/1 * * * * ?") // 每个一秒
public void schedule(){
System.out.println("scheduler" + new SimpleDateFormat("YYYY-MM-dd HH-mm:ss").format(new Date()) + " : "+Thread.currentThread().getName());
}
当项目启动以后,可以通过 /update-cron 接口在不停服的情况下更新任务的调度时间,如果有需要的小伙伴可以直接复制代码直接使用
❝
今日分享如果对你有帮助,帮忙点个在看
原文始发于微信公众号(小李哥编程):这样做,就可以修改spring中任务的调度时间了
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/253873.html