分布式调度中间件xxl-job(三):执行器Executor—任务注册

人生苦短,不如养狗

  在任务调度中,我们经常能看到执行器和任务一起的身影,两者的关系相当的紧密,在xxl-job中也是如此。


分布式调度中间件xxl-job(三):执行器Executor—任务注册


  可以看到,执行器是以机器为单位,一个应用中可能包含多个执行器(单机模式下,只会有一个执行器)。而每个执行器中会注册多个任务,其中属于同一个应用的执行器中注册的任务执行程序应当是相同的。

  下面我们就来具体看一下在xxl-job中任务是如何注册的。

一、从示例代码开始

  话不多说,我们直接来看一下示例代码是如何进行任务编写的。首先,我们来看下示例模块的目录:


分布式调度中间件xxl-job(三):执行器Executor—任务注册


  在 xxl-job-executor-samples 模块中提供了诸多框架的使用示例,这里我们选择使用SpringBoot版本的示例代码进行演示。可以看到在 jobhandler 目录下有一个 SampleXxlJob.class ,在这个类中有一个 demoJobHandler(String param) 方法,这个方法也就是上一章节中我们在调度中心注册的任务。我们来看一下这里的代码:

@XxlJob("demoJobHandler")
public ReturnT<String> demoJobHandler(String param) throws Exception {
XxlJobLogger.log("XXL-JOB, Hello World.");

for (int i = 0; i < 5; i++) {
XxlJobLogger.log("beat at:" + i);
TimeUnit.SECONDS.sleep(2);
}
return ReturnT.SUCCESS;
}

  方法体内的逻辑十分简单,这里就不多加赘述了。具体来看下方法的注释、返回参数和参数列表。
  可以看到,方法上使用了 @XxlJob("demoJobHandler") 注解来进行任务标识,标明了该方法是一个xxl-job任务,并且任务名为 demoJobHandler 。该方法的返回值是xxl-job提供的一个封装类 ReturnT<T> ,请求参数只有一个,类型是String。
  这里需要提的一点是在之前的版本中是通过继承 IJobHandler 和在类上加注解的方式进行任务标识,在最新版中则抛弃了原有的做法,将任务的粒度细化到了方法级别。
  前者的好处是任务编写的范式已经规定好,只需要重写对应抽象类中的方法并加上注解,但每一次编写新的任务执行程序都需要创建新的类来重新实现接口。后者的好处是在于细化了任务的粒度,将注解细化到了方法级别,不需要再重复地继承方法,很好地实现了类的复用。但这种方式同样存在问题,虽然只需要加上注解就可以进行方法标识,但是在进行方法编写时仍然还是需要按照 IJobHandler 中规定好的编写范式来进行方法的编写,至于这个编写范式在这种方式就需要去翻阅文档,无形中还是增加了一些步骤。
  总体来看,这两种方式各有其优势,选择何种方式来实现只能说是看场景和编写者的喜好了。

二、JobHandler注册

  既然我们选择的是SpringBoot版本的示例,那么任务注册基本也逃不出Spring加载Bean的基本套路。
  上面我们提到了,任务需要注册到执行器中,也就是 XxlJobSpringExecutor 中。这也就意味 XxlJobSpringExecutor 在Spring容器中进行Bean初始化时需要用到任务注册信息,这就需要在Bean加载过程中将获取到 Application 中的信息。这种情况,一般会选择实现 ApplicationContextAware 接口来进行来从应用上下文中获取Bean加载所需的容器内的服务。
  下面我们来看下具体的任务注册代码:

public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean

  首先看下 XxlJobSpringExecutor 实现的几个接口,除了 ApplicationContextAware ,它还实现了 SmartInitializingSingletonDisposableBean 两个接口。前者说明 XxlJobSpringExecutor 在Bean初始化完成后进行回调时要搞些事情,后者则说明在Bean销毁时要搞事情。同时 XxlJobSpringExecutor 还继承了 XxlJobExecutor ,关于 XxlJobExecutor 的具体细节闲鱼会在下一章进行分析。

@Override
public void afterSingletonsInstantiated() {

// 从方法级别进行任务处理程序仓库初始化,这里进行了优化,将任务的粒度由原先的类转变为了方法级别
initJobHandlerMethodRepository(applicationContext);

// 刷新GlueFactory,此处获取SpringGlueFactory
GlueFactory.refreshInstance(1);

// super start
try {
super.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

  可以看到在 afterSingletonsInstantiated 方法做了这样几件事情:

  • 初始化任务执行程序仓库,即进行任务注册
  • 刷新GlueFactory,获取SpringGlueFactory(用于动态脚本任务)
  • 启动执行器,此处调用父类start()方法

  我们具体来看下 initJobHandlerMethodRepository(applicationContext); 方法:

private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
if (applicationContext == null) {
return;
}
// 进行任务处理程序初始化
// 获取bean名称列表
String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
for (String beanDefinitionName : beanDefinitionNames) {
// 从上下文中根据bean元数据名称获取bean对象
Object bean = applicationContext.getBean(beanDefinitionName);
// referred to :org.springframework.context.event.EventListenerMethodProcessor.processBean
// 获取被@XxlJob注解的方法
Map<Method, XxlJob> annotatedMethods = null;
try {
// 根据bean类元信息获取被@XxlJob注解的方法
annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
new MethodIntrospector.MetadataLookup<XxlJob>() {
@Override
public XxlJob inspect(Method method) {
return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
}
});
} catch (Throwable ex) {
logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
}
if (annotatedMethods == null || annotatedMethods.isEmpty()) {
continue;
}

for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
// 被注解的方法,即具体的任务
Method method = methodXxlJobEntry.getKey();
// XxlJob注解类,获取设置的注解信息
XxlJob xxlJob = methodXxlJobEntry.getValue();
if (xxlJob == null) {
continue;
}
// 任务名称
String name = xxlJob.value();
if (name.trim().length() == 0) {
throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
}
// 检查当前任务名是否已经被使用,注意这里是通过任务名称来进行任务判重的
if (loadJobHandler(name) != null) {
throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
}

// 进行方法参数类型数量和参数类型判断,这里要求方法的参数列表只能有一个参数,并且参数类型为String
if (!(method.getParameterTypes().length == 1 && method.getParameterTypes()[0].isAssignableFrom(String.class))) {
throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
"The correct method format like " public ReturnT<String> execute(String param) " .");
}
// 判断方法返回参数类型是否为ReturnT,如果不为ReturnT则为非法
if (!method.getReturnType().isAssignableFrom(ReturnT.class)) {
throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
"The correct method format like " public ReturnT<String> execute(String param) " .");
}
method.setAccessible(true);

// 设置初始化方法和销毁方法
Method initMethod = null;
Method destroyMethod = null;

if (xxlJob.init().trim().length() > 0) {
try {
initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());
initMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
}
}
if (xxlJob.destroy().trim().length() > 0) {
try {
destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());
destroyMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
}
}

// 进行任务处理程序注册
registJobHandler(name, new MethodJobHandler(bean, method, initMethod, destroyMethod));
}
}

}

  方法的入参 ApplicationContext 就是通过实现 ApplicationContextAware 接口获取的应用上下文。具体注册过程如下:

  1. ApplicationContext中获取所有Bean元数据名称,通过Bean元数据名称获取所有Bean;
  2. 遍历获取到的Bean,找到有XxlJob注解的类,获取类中被注解的所有方法;
  3. 获取被注解方法的相应信息,根据注解中的任务名称,调用loadJobHandler(name)方法检查该任务是否已经注册;
  4. 进行方法编写范式检查,主要检查方法名称、入参类型以及返回值类型是否符合要求;
  5. 设置被注解方法的初始化方法和销毁方法;
  6. 最后,将上述被注解方法注册到任务处理程序仓库中;

  至此,我们编写的任务就已经注册到了执行器Executor中了。

三、总结

  总体来看,将任务注册到执行器中的过程就是Bean加载过程中加载其他服务的过程,整体的代码和流程还是相对简单流畅的。下一章我们会开始学习执行器Executor注册和执行任务的过程。


原文始发于微信公众号(Brucebat的伪技术鱼塘):分布式调度中间件xxl-job(三):执行器Executor—任务注册

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/90196.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!