如何动态通过API的形式在XxlJob上创建任务

概述

根据官网的描述,XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。

背景

鉴于xxl-job官方并没有提供api的方式进行动态创建任务已经后台用api进行控制任务相关行为。基于次需求,本人编写了一个工具类,来实现以上需求。当然最主要的是还是xxl-job的restful api 设计,这才得益于我可以编一个http请求的工具类来实现

版本

  • 基于 xxl-job 2.3.0
  • springboot 2.6.13

特性

1、简单:支持通过Web页面对任务进行CRUD操作,操作简单,一分钟上手; 

2、动态:支持动态修改任务状态、启动/停止任务,以及终止运行中任务,即时生效;

3、调度中心HA(中心式):调度采用中心式设计,“调度中心”自研调度组件并支持集群部署,可保证调度中心HA; 

4、执行器HA(分布式):任务分布式执行,任务”执行器”支持集群部署,可保证任务执行HA; 

5、注册中心: 执行器会周期性自动注册任务, 调度中心将会自动发现注册的任务并触发执行。同时,也支持手动录入执行器地址;

6、弹性扩容缩容:一旦有新执行器机器上线或者下线,下次调度时将会重新分配任务;

7、触发策略:提供丰富的任务触发策略,包括:Cron触发、固定间隔触发、固定延时触发、API(事件)触发、人工触发、父子任务触发;

8、调度过期策略:调度中心错过调度时间的补偿处理策略,包括:忽略、立即补偿触发一次等;

9、阻塞处理策略:调度过于密集执行器来不及处理时的处理策略,策略包括:单机串行(默认)、丢弃后续调度、覆盖之前调度; 

10、任务超时控制:支持自定义任务超时时间,任务运行超时将会主动中断任务; 

11、任务失败重试:支持自定义任务失败重试次数,当任务失败时将会按照预设的失败重试次数主动进行重试;其中分片任务支持分片粒度的失败重试; 

12、任务失败告警;默认提供邮件方式失败告警,同时预留扩展接口,可方便的扩展短信、钉钉等告警方式;

13、路由策略:执行器集群部署时提供丰富的路由策略,包括:第一个、最后一个、轮询、随机、一致性HASH、最不经常使用、最近最久未使用、故障转移、忙碌转移等; 

14、分片广播任务:执行器集群部署时,任务路由策略选择”分片广播”情况下,一次任务调度将会广播触发集群中所有执行器执行一次任务,可根据分片参数开发分片任务; 

15、动态分片:分片广播任务以执行器为维度进行分片,支持动态扩容执行器集群从而动态增加分片数量,协同进行业务处理;在进行大数据量业务操作时可显著提升任务处理能力和速度。 

16、故障转移:任务路由策略选择”故障转移”情况下,如果执行器集群中某一台机器故障,将会自动Failover切换到一台正常的执行器发送调度请求。 

17、任务进度监控:支持实时监控任务进度; 

18、Rolling实时日志:支持在线查看调度结果,并且支持以Rolling方式实时查看执行器输出的完整的执行日志;

19、GLUE:提供Web IDE,支持在线开发任务逻辑代码,动态发布,实时编译生效,省略部署上线的过程。支持30个版本的历史版本回溯。 

20、脚本任务:支持以GLUE模式开发和运行脚本任务,包括Shell、Python、NodeJS、PHP、PowerShell等类型脚本; 

21、命令行任务:原生提供通用命令行任务Handler(Bean任务,”CommandJobHandler”);业务方只需要提供命令行即可; 

22、任务依赖:支持配置子任务依赖,当父任务执行结束且执行成功后将会主动触发一次子任务的执行, 多个子任务用逗号分隔; 

23、一致性:“调度中心”通过DB锁保证集群分布式调度的一致性, 一次任务调度只会触发一次执行; 

24、自定义任务参数:支持在线配置调度任务入参,即时生效; 

25、调度线程池:调度系统多线程触发调度运行,确保调度精确执行,不被堵塞; 

26、数据加密:调度中心和执行器之间的通讯进行数据加密,提升调度信息安全性; 

27、邮件报警:任务失败时支持邮件报警,支持配置多邮件地址群发报警邮件; 

28、推送maven中央仓库: 将会把最新稳定版推送到maven中央仓库, 方便用户接入和使用; 

29、运行报表:支持实时查看运行数据,如任务数量、调度次数、执行器数量等;以及调度报表,如调度日期分布图,调度成功分布图等; 

30、全异步:任务调度流程全异步化设计实现,如异步调度、异步运行、异步回调等,有效对密集调度进行流量削峰,理论上支持任意时长任务的运行; 

31、跨语言:调度中心与执行器提供语言无关的 RESTful API 服务,第三方任意语言可据此对接调度中心或者实现执行器。除此之外,还提供了 “多任务模式”和“httpJobHandler”等其他跨语言方案; 

32、国际化:调度中心支持国际化设置,提供中文、英文两种可选语言,默认为中文; 

33、容器化:提供官方docker镜像,并实时更新推送dockerhub,进一步实现产品开箱即用; 

34、线程池隔离:调度线程池进行隔离拆分,慢任务自动降级进入”Slow”线程池,避免耗尽调度线程,提高系统稳定性; 

35、用户管理:支持在线管理系统用户,存在管理员、普通用户两种角色; 

36、权限控制:执行器维度进行权限控制,管理员拥有全量权限,普通用户需要分配执行器权限后才允许相关操作;

HTTP 工具

引入依赖

   <properties>
<fastjson.version>2.0.33</fastjson.version>
</properties>
<dependencies>
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>3.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba.fastjson2/fastjson2 -->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>${fastjson.version}</version>
</dependency>
</dependencies>

创建相关类

  • 创建一个bean,与XxlJobInfoBO基本类似,主要当作接收参数转换使用

    package com.example.jobdemo;

    /**
    * @Author zuiyu 创建任务,API 接口 使用
    * @Date 2023/6/2 17:20
    */

    public class XxlJobInfoBO {
    private int id; // 主键ID

    private int jobGroup; // 执行器主键ID
    private String jobDesc;
    private String author; // 负责人
    private String alarmEmail; // 报警邮件
    private String scheduleType; // 调度类型
    private String scheduleConf;
    private String executorHandler; // 执行器,任务Handler名称
    private String executorParam; // 执行器,任务参数
    private String executorRouteStrategy; // 执行器路由策略
    private String misfireStrategy; // 调度过期策略
    private String executorBlockStrategy; // 阻塞处理策略
    private int executorTimeout; // 任务执行超时时间,单位秒
    private int executorFailRetryCount; // 失败重试次数


    private String glueType; // GLUE类型 #com.xxl.job.core.glue.GlueTypeEnum
    private String childJobId; // 子任务ID,多个逗号分隔
    private String glueSource; // GLUE源代码
    private String glueRemark; // GLUE备注

    public String getGlueType() {
    return glueType;
    }

    public void setGlueType(String glueType) {
    this.glueType = glueType;
    }

    public String getChildJobId() {
    return childJobId;
    }

    public void setChildJobId(String childJobId) {
    this.childJobId = childJobId;
    }

    public String getGlueSource() {
    return glueSource;
    }

    public void setGlueSource(String glueSource) {
    this.glueSource = glueSource;
    }

    public String getGlueRemark() {
    return glueRemark;
    }

    public void setGlueRemark(String glueRemark) {
    this.glueRemark = glueRemark;
    }

    public int getId() {
    return id;
    }

    public void setId(int id) {
    this.id = id;
    }

    public int getJobGroup() {
    return jobGroup;
    }

    public void setJobGroup(int jobGroup) {
    this.jobGroup = jobGroup;
    }

    public String getJobDesc() {
    return jobDesc;
    }

    public void setJobDesc(String jobDesc) {
    this.jobDesc = jobDesc;
    }

    public String getAuthor() {
    return author;
    }

    public void setAuthor(String author) {
    this.author = author;
    }

    public String getAlarmEmail() {
    return alarmEmail;
    }

    public void setAlarmEmail(String alarmEmail) {
    this.alarmEmail = alarmEmail;
    }

    public String getScheduleType() {
    return scheduleType;
    }

    public void setScheduleType(String scheduleType) {
    this.scheduleType = scheduleType;
    }

    public String getScheduleConf() {
    return scheduleConf;
    }

    public void setScheduleConf(String scheduleConf) {
    this.scheduleConf = scheduleConf;
    }

    public String getExecutorHandler() {
    return executorHandler;
    }

    public void setExecutorHandler(String executorHandler) {
    this.executorHandler = executorHandler;
    }

    public String getExecutorParam() {
    return executorParam;
    }

    public void setExecutorParam(String executorParam) {
    this.executorParam = executorParam;
    }

    public String getExecutorRouteStrategy() {
    return executorRouteStrategy;
    }

    public void setExecutorRouteStrategy(String executorRouteStrategy) {
    this.executorRouteStrategy = executorRouteStrategy;
    }

    public String getMisfireStrategy() {
    return misfireStrategy;
    }

    public void setMisfireStrategy(String misfireStrategy) {
    this.misfireStrategy = misfireStrategy;
    }

    public String getExecutorBlockStrategy() {
    return executorBlockStrategy;
    }

    public void setExecutorBlockStrategy(String executorBlockStrategy) {
    this.executorBlockStrategy = executorBlockStrategy;
    }

    public int getExecutorTimeout() {
    return executorTimeout;
    }

    public void setExecutorTimeout(int executorTimeout) {
    this.executorTimeout = executorTimeout;
    }

    public int getExecutorFailRetryCount() {
    return executorFailRetryCount;
    }

    public void setExecutorFailRetryCount(int executorFailRetryCount) {
    this.executorFailRetryCount = executorFailRetryCount;
    }
    }

  • 创建一个参数类,接收springboot配置文件中的配置参数值,当然,最重要的就是调度中心地址的配置

    package com.example.jobdemo;

    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Configuration;

    /**
    * @Author zuiyu
    * @Date 2023/6/8 11:42
    */
    @Configuration
    public class XxlJobClientConfigProperties {

    @Value("${xxl.job.client.loginUrl:${xxl.job.admin.addresses}/login?userName=admin&password=123456}")
    private String loginUrl;
    @Value("${xxl.job.client.addUrl:${xxl.job.admin.addresses}/jobinfo/add}")
    private String jobInfoAddUrl;
    @Value("${xxl.job.client.deleteUrl:${xxl.job.admin.addresses}/jobinfo/remove?id=%s}")
    private String jobInfoDeleteUrl;
    @Value("${xxl.job.client.startJobUrl:${xxl.job.admin.addresses}/jobinfo/start?id=%s}")
    private String jobInfoStartJobUrl;
    @Value("${xxl.job.client.stopJobUrl:${xxl.job.admin.addresses}/jobinfo/stop?id=%s}")
    private String jobInfoStopJobUrl;
    @Value("${xxl.job.client.updateUrl:${xxl.job.admin.addresses}/jobinfo/update}")
    private String jobInfoUpdateUrl;
    @Value("${xxl.job.client.loadByIdUrl:${xxl.job.admin.addresses}/jobinfo/loadById/%s}")
    private String jobInfoLoadByIdUrl;
    /**
    * 任务列表
    */
    @Value("${xxl.job.client.jobInfoPageListUrl:${xxl.job.admin.addresses}/jobinfo/pageList}")
    private String jobInfoPageListUrl;
    /**
    * 执行器列表
    */
    @Value("${xxl.job.client.jobGroupPageListUrl:${xxl.job.admin.addresses}/jobgroup/pageList}")
    private String jobGroupPageListUrl;
    /**
    * 执行器创建 URL
    */
    @Value("${xxl.job.client.jobGroupSaveUrl:${xxl.job.admin.addresses}/jobgroup/save")
    private String jobGroupSaveUrl;
    public String getLoginUrl() {
    return loginUrl;
    }

    public void setLoginUrl(String loginUrl) {
    this.loginUrl = loginUrl;
    }

    public String getJobInfoAddUrl() {
    return jobInfoAddUrl;
    }

    public void setJobInfoAddUrl(String jobInfoAddUrl) {
    this.jobInfoAddUrl = jobInfoAddUrl;
    }

    public String getJobInfoDeleteUrl() {
    return jobInfoDeleteUrl;
    }

    public void setJobInfoDeleteUrl(String jobInfoDeleteUrl) {
    this.jobInfoDeleteUrl = jobInfoDeleteUrl;
    }

    public String getJobInfoStartJobUrl() {
    return jobInfoStartJobUrl;
    }

    public void setJobInfoStartJobUrl(String jobInfoStartJobUrl) {
    this.jobInfoStartJobUrl = jobInfoStartJobUrl;
    }

    public String getJobInfoStopJobUrl() {
    return jobInfoStopJobUrl;
    }

    public void setJobInfoStopJobUrl(String jobInfoStopJobUrl) {
    this.jobInfoStopJobUrl = jobInfoStopJobUrl;
    }

    public String getJobInfoUpdateUrl() {
    return jobInfoUpdateUrl;
    }

    public void setJobInfoUpdateUrl(String jobInfoUpdateUrl) {
    this.jobInfoUpdateUrl = jobInfoUpdateUrl;
    }

    public String getJobInfoLoadByIdUrl() {
    return jobInfoLoadByIdUrl;
    }

    public void setJobInfoLoadByIdUrl(String jobInfoLoadByIdUrl) {
    this.jobInfoLoadByIdUrl = jobInfoLoadByIdUrl;
    }

    public String getJobInfoPageListUrl() {
    return jobInfoPageListUrl;
    }

    public void setJobInfoPageListUrl(String jobInfoPageListUrl) {
    this.jobInfoPageListUrl = jobInfoPageListUrl;
    }

    public String getJobGroupPageListUrl() {
    return jobGroupPageListUrl;
    }

    public void setJobGroupPageListUrl(String jobGroupPageListUrl) {
    this.jobGroupPageListUrl = jobGroupPageListUrl;
    }

    public String getJobGroupSaveUrl() {
    return jobGroupSaveUrl;
    }

    public void setJobGroupSaveUrl(String jobGroupSaveUrl) {
    this.jobGroupSaveUrl = jobGroupSaveUrl;
    }

    @Override
    public String toString() {
    return "XxlJobClientConfigProperties{" +
    "loginUrl='" + loginUrl + ''' +
    ", jobInfoAddUrl='" + jobInfoAddUrl + ''' +
    ", jobInfoDeleteUrl='" + jobInfoDeleteUrl + ''' +
    ", jobInfoStartJobUrl='" + jobInfoStartJobUrl + ''' +
    ", jobInfoStopJobUrl='" + jobInfoStopJobUrl + ''' +
    ", jobInfoUpdateUrl='" + jobInfoUpdateUrl + ''' +
    ", jobInfoLoadByIdUrl='" + jobInfoLoadByIdUrl + ''' +
    ", jobInfoPageListUrl='" + jobInfoPageListUrl + ''' +
    ", jobGroupPageListUrl='" + jobGroupPageListUrl + ''' +
    '}';
    }
    }

  • 创建核心调用类XxlJobClient

    package com.example.jobdemo;


    import com.alibaba.fastjson2.JSON;
    import com.alibaba.fastjson2.JSONObject;
    import org.apache.commons.httpclient.*;
    import org.apache.commons.httpclient.methods.GetMethod;
    import org.apache.commons.httpclient.methods.PostMethod;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;

    import javax.annotation.PostConstruct;
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.InputStreamReader;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import java.util.Optional;
    import java.util.stream.Collectors;

    /**
    * @Author zuiyu
    * @Date 2023/5/31 11:12
    */
    @Component
    public class XxlJobClient {
    private String COOKIE ="";
    private HttpClient httpClient;
    private static final String POST_FORM_CONTENT_TYPE = "application/x-www-form-urlencoded; charset=UTF-8";
    private static final Header POST_FORM_CONTENT_TYPE_HEADER = new Header("Content-Type",POST_FORM_CONTENT_TYPE);
    @Autowired
    private XxlJobClientConfigProperties clientConfigProperties;

    private final Logger log = LoggerFactory.getLogger(getClass());


    @PostConstruct
    public void init() throws IOException {
    log.debug("xxl JOB 初始化配置:{}",clientConfigProperties.toString());
    httpClient = new HttpClient();
    login();
    }

    /**
    * 登录获取 cookie
    * @throws IOException
    */
    private void login() throws IOException {
    HttpMethod postMethod = new PostMethod(clientConfigProperties.getLoginUrl());
    httpClient.executeMethod(postMethod);
    if (postMethod.getStatusCode() == 200) {
    Cookie[] cookies = httpClient.getState().getCookies();
    StringBuilder tmpCookies = new StringBuilder();
    for (Cookie c : cookies) {
    tmpCookies.append(c.toString()).append(";");
    }
    COOKIE = tmpCookies.toString();
    log.debug("xxlJob 登录成功");
    }else {
    log.debug("xxlJob 登录失败:{}",postMethod.getStatusCode());
    }
    }

    /**
    * 创建任务
    * @param params
    * @return
    * @throws IOException
    */
    public JSONObject createJob(JSONObject params) throws IOException {
    return doPost(clientConfigProperties.getJobInfoAddUrl(), params);
    }

    /**
    * 更新任务
    * @param params
    * @return
    * @throws IOException
    */
    public JSONObject updateJob(JSONObject params) throws IOException {
    return doPost(clientConfigProperties.getJobInfoUpdateUrl(), params);
    }

    /**
    * 根据任务 ID 加载
    * @param id
    * @return
    * @throws IOException
    */
    public JSONObject loadById(int id) throws IOException {
    log.info("loadById: {}",id);
    return doGet(String.format(clientConfigProperties.getJobInfoLoadByIdUrl(),id));
    }
    /**
    * 删除任务
    * @param id 任务 ID
    * @return
    * @throws IOException
    */
    public JSONObject deleteJob(int id) throws IOException {
    log.info("deleteJob: {}",id);
    return doGet(String.format(clientConfigProperties.getJobInfoDeleteUrl(),id));
    }

    /**
    * 开启任务
    * @param id 任务 ID
    * @return
    * @throws IOException
    */
    public JSONObject startJob(int id) throws IOException {
    log.info("startJob: {}",id);
    return doGet(String.format(clientConfigProperties.getJobInfoStartJobUrl(),id));
    }

    /**
    * 停止任务
    * @param id 任务 ID
    * @return
    * @throws IOException
    */
    public JSONObject stopJob(int id) throws IOException {
    log.info("stopJob: {}",id);
    return doGet(String.format(clientConfigProperties.getJobInfoStopJobUrl(),id));
    }




    /**
    * 创建执行器
    * @param params
    * @return
    * @throws IOException
    */
    public JSONObject createJobGroup(JSONObject params) throws IOException {
    return doPost(clientConfigProperties.getJobGroupSaveUrl(), params);
    }

    /**
    * 执行器列表
    * @param params
    * @return
    * @throws IOException
    */
    public JSONObject jobGroupPageList(JSONObject params) throws IOException {
    params.put("start",Optional.ofNullable(params.getInteger("start")).orElse(0));
    params.put("length", Optional.ofNullable(params.getInteger("length")).orElse(10));
    return doPost(clientConfigProperties.getJobGroupPageListUrl(),params);
    }

    /**
    * 任务列表
    * @param params
    * @return
    * @throws IOException
    */
    public JSONObject jobInfoPageList(JSONObject params) throws IOException {
    params.put("start",Optional.ofNullable(params.getInteger("start")).orElse(0));
    params.put("length", Optional.ofNullable(params.getInteger("length")).orElse(10));
    return doPost(clientConfigProperties.getJobInfoPageListUrl(),params);
    }

    /**
    * 发起 GET 请求
    * @param url
    * @return
    * @throws IOException
    */
    private JSONObject doGet(String url) throws IOException {
    GetMethod get = new GetMethod(url);
    get.setRequestHeader("cookie", COOKIE);
    httpClient.executeMethod(get);
    return readResponse(get);
    }

    /**
    * post 请求
    * @param url
    * @param params
    * @return
    * @throws IOException
    */
    private JSONObject doPost(String url,JSONObject params) throws IOException {
    PostMethod post = new PostMethod(url);
    post.setRequestHeader("cookie", COOKIE);
    List<NameValuePair> pairList = new ArrayList<>();
    params.forEach((k,v)-> pairList.add(new NameValuePair(k, v.toString())));
    NameValuePair[] arr = pairList.toArray(new NameValuePair[0]);
    post.setRequestBody(arr);
    post.setRequestHeader(POST_FORM_CONTENT_TYPE_HEADER);
    httpClient.executeMethod(post);
    return readResponse(post);
    }

    /**
    * 处理响应内容
    * @param httpMethod
    * @return
    * @throws IOException
    */
    private JSONObject readResponse(HttpMethod httpMethod) {
    if (httpMethod.getStatusCode() == HttpStatus.SC_OK) {
    try (InputStream inputStream = httpMethod.getResponseBodyAsStream();
    InputStreamReader inputStreamReader = new InputStreamReader(inputStream);
    BufferedReader bufferedReader = new BufferedReader(inputStreamReader)) {
    return JSON.parseObject(bufferedReader.lines().collect(Collectors.joining(System.lineSeparator())));
    } catch (IOException e) {
    log.error("读取响应失败:{}", e.getMessage(), e);
    JSONObject error = new JSONObject();
    error.put("code",HttpStatus.SC_INTERNAL_SERVER_ERROR);
    error.put("msg","响应内容读取失败:"+e.getMessage());
    return error;
    }
    }
    return new JSONObject();
    }
    }

  • application.yml配置

    xxl:
    job:
    admin:
    addresses: http://192.168.80.88:8761/xxl-job-admin
    client:
    userName: admin
    password: 123456
    loginUrl: ${xxl.job.admin.addresses}/login?userName=${xxl.job.client.userName}&password=${xxl.job.client.password}
    addUrl: ${xxl.job.admin.addresses}/jobinfo/add
    deleteUrl: ${xxl.job.admin.addresses}/jobinfo/remove?id=%s
    startJobUrl: ${xxl.job.admin.addresses}/jobinfo/start?id=%s
    stopJobUrl: ${xxl.job.admin.addresses}/jobinfo/stop?id=%s

示例

  • 创建任务

            XxlJobInfoBO info = new XxlJobInfoBO();
    JSONObject executorParams = new JSONObject();
    executorParams.put("id",id);
    info.setExecutorParam(JSONObject.toJSONString(executorParams));
    info.setScheduleConf(cronExpression);
    // 省略其他参数
    JSONObject jobInfo = JSONObject.parseObject(com.alibaba.fastjson2.JSON.toJSONString(info));
    JSONObject creatResult = xxlJobClient.createJob(jobInfo);
  • 开启任务

            final String xxlJobAdminContentKey = "content";
    Integer jobId = creatResult.getInteger(xxlJobAdminContentKey);
    xxlJobClient.startJob(jobId);

总结

在使用时,直接注入XxlJobClient的bean对象,然后封装调用参数即可,具体参考不同版本可能会有变化,具体以使用版本为主。如遇到问题欢迎评论区留言。感觉对你有帮助的话欢迎点赞关注转发。


原文始发于微信公众号(醉鱼Java):如何动态通过API的形式在XxlJob上创建任务

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/231536.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!