XxlJob深度性能优化实践

一、背景

天画项目的数据工厂目前在与xxl-job对接自动化数据生成任务,另外我司也在使用该组件做业务,所以想深入了解下XxlJob。在跟进了社区的github等仓库issue发现开发迭代停滞了一段时间,思来想去准备开个下游分支做一些性能优化和特性开发等,于是fork了下源码,将其作为天画社区关于任务调度的组件来使用。

我花了几天阅读源码,同时做了一些简单的调试,发现了一些问题之后着手进行优化,本文将从XxlJob的原理出发深入其性能瓶颈,并提出一些解决方案。相关优化的代码已经上传至gitee.

二、调度流程解读

在进行性能和安全改造之前,需要对xxljob的调度执行原理要非常了解,所以这里我们先简单看一下xxljob的调度流程和线程模型。

2.1 调度流程

  1. 服务端启动流程
XxlJob深度性能优化实践
xxlJob服务端启动流程.png
  1. 客户端启动流程
XxlJob深度性能优化实践
客户端启动流程 (2).png
  1. 客户端与服务端交互流程

XxlJob深度性能优化实践

2.2 调度线程模型

通过上面的流程图我们可以梳理出如下服务端和客户端用到的线程和线程池,具体作用通过名称即可了解。服务端:

  1. 触发器线程池(fastTriggerPool,slowTriggerPool)
  2. 注册&停止注册线程池(registryOrRemoveThreadPool)
  3. 注册监听线程(registryMonitorThread)
  4. 任务失败监听线程(monitorThread)
  5. 任务完成回调线程池(callbackThreadPool)
  6. 任务完成回调监听线程(monitorThread)
  7. 统计日志线程(logrThread)
  8. 调度线程(scheduleThread)
  9. 时间轮线程(ringThread)

客户端:

  1. http 交互监听线程(thread)
  2. 注册器线程(registryThread)
  3. 任务线程(JobThread)
  4. 任务回调日志线程(triggerCallbackThread)
  5. 回调日志重试线程(triggerRetryCallbackThread)

2.3 客户端API

这里需要说明的是,在一定程度上客户端(也就是执行器,以下统称为客户端)也会作为服务端接受调度中心(即xxl job server 服务端)的回调,所以这里是一个比较令人迷惑的地方。这里列出服务端调用客户端的API 127.0.0.1:9999/beat 127.0.0.1:9999/idleBeat 127.0.0.1:9999/run 127.0.0.1:9999/kill 127.0.0.1:9999/log

2.4 服务端API

这里列出客户端调用服务端的API 127.0.0.1:8080/api/callback 127.0.0.1:8080/api/registry 127.0.0.1:8080/api/registryRemove

三、潜在问题

在反复看了xxljob 开源社区gitee/github上的issue之后,我对下面的几个问题比较关注,所以在业余时间中专门调试并且进行了一定的复现,这里简单回顾一下xxljob在高并发调度过程中可能产生的问题。

3.1 重复调度

重复调度的问题其实是偶发问题,问题的现象或者特征就是一个任务在一次调度中重复执行了两次,同时产生了 两条xxljob log,发生时间相同。

3.2 安全问题

xxljob控制台查询客户端日志的时候会返回accessToken,github上的issue讨论已经提供了复现步骤,这里目前作者已经提供升级,但是本文的重点不止于此,因为另外的安全问题就是accessToken在客户端与服务端之间的交互是明文传输的,另外也是服务端与所有客户端共用的,一旦泄露其实会比较危险。

3.3 并发调度变慢

XxlJob的调度在并发变高的时候从日志上可以看出调度会有一些延迟,出现这个问题的原因有以下几个方面:

  1. 一秒内调度量比较多,对客户端和服务端在一秒内会产生上千次调度
  2. 调度粒度是线程(JobThread)和单次粒度的,所以一秒内要处理2-3倍的数据库IO和网络IO
  3. 选用了busyover或者failover的调度策略,这两种需要在调度之前给对应的客户端发送心跳
  4. 日志处理,这里包括客户端的日志处理和服务端的日志处理,在并发比较高的情况下日志产生的量也比较大,

对调度业务会产生一定的资源占用。

3.4 线程模型的性能瓶颈

这个问题其实与3.3类似,从上面的调度流程图可以看到xxljob其实是对每个xxl_job_info在对应的客户端上构建一个JobThread,如果执行器对应的任务比较多且存在一秒内并发触发的话就可能导致客户端本身出现性能问题。

3.5 回调并发变慢

这个问题也是高并发调度下产生的问题,xxljob在日志处理上有两种方式:一种是在调度job方法内通过XxlJobHelper.log去记录产生的业务日志,这部分是存在于客户端的,另外一种则是调度日志,即对该job在当前调度结果的日志信息推送到服务端做记录或者统计,这部分数据存储在服务端的xxl_job_log表中。在高并发调度的情况下,客户端的处理存在性能问题,一次回调推送的日志太少,客户端存在积压。

3.6 分页性能问题

在高并发或者大量任务的场景中xxlJob的管理控制台在任务管理和调度日志分页上存在性能问题,可能导致页面加载缓慢同时服务端进程崩溃,进而影响服务运行。

四、优化点

4.1 认证优化

在xxl job的源码中需要借助xxl.job.accessToken来完成客户端与服务端的认证,这个配置在客户端注册到服务端,以及客户端和服务端在相互调用过程中都会进行认证。由于配置是统一的,所以所有客户端都需要配置这个,另外认证过程中是accessToken是明文传输的,所以这种认证机制一定程度上是比较弱的。另外如果改动认证机制的话则客户端和服务端都要改,本次优化则花一定的精力来提升安全性。总结起来就是如下几点:

  1. 强制安全认证

客户端或者服务端不配置accessToken则启动失败

  1. 认证加密

客户端与服务端之间的交互均通过加密accessToken的方式

  1. 弱化accessToken在认证过程中的影响

对于每个客户端虽然都配置同样的accessToken,但是新生成的accessToken(原accessToken+appName)则是通过AES进行加密。这里我们看一下进行强认证过程的时序图:

XxlJob深度性能优化实践下面我们看一下具体实现源码:客户端:

 private String  initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {

        // fill ip port
        port = port>0?port: NetUtil.findAvailablePort(9999);
        ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();


        // generate address
        // registry-address:default use address to registry , otherwise use ip:port if address is null
        if (address==null || address.trim().length()==0) {
            String ip_port_address = IpUtil.getIpPort(ip, port);
            address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
        }

        // accessToken
        if (accessToken == null || accessToken.trim().length()==0) {
            throw new IllegalArgumentException("xxl-job accessToken need not  null. ");
            //logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");
        }

        // start
        embedServer = new EmbedServer();
        //accessToken加密
        String accessCookie = AESUtils.encrypt(appname + accessToken,AESUtils.ENCRYPT_KEY);

        embedServer.start(address, port, appname, accessCookie, null);
        //后续的initAdminBizList和init TriggerCallbackThread均需要accessCookie
        return accessCookie;
    }

服务端

/**
  * v3.0.0-snapshot
  * 进行注册
  * @param registryParam
  * @return
  */

 public ReturnT<String> registryV3(RegistryParam registryParam) {

  // valid
  if (!StringUtils.hasText(registryParam.getRegistryGroup())
    || !StringUtils.hasText(registryParam.getRegistryKey())
    || !StringUtils.hasText(registryParam.getRegistryValue())) {
   return new ReturnT<>(ReturnT.FAIL_CODE, "Illegal Argument.");
  }

  //进行注册认证
  XxlJobGroup xxlJobGroup = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findRegistGroup(registryParam.getRegistryKey());
  if(xxlJobGroup == null){
   return new ReturnT<>(ReturnT.FAIL_CODE, "appName is not regist into jobGroup.");
  }

  // async execute
  registryOrRemoveThreadPool.execute(() -> {
   int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
   if (ret < 1) {
    XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());

    // fresh
    freshGroupRegistryInfo(registryParam);
   }
  });

  //加密
  String verifyCode = registryParam.getRegistryKey() + XxlJobAdminConfig.getAdminConfig().getAccessToken();
  String cookie = AESUtils.encrypt(verifyCode,XxlJobAdminConfig.ENCRYPT_KEY);
    //将加密key保存到服务端内存中,生命周期跟随regist,和registRemove
  XxlJobAdminConfig.getAdminConfig().putRegistCookie(registryParam.getRegistryValue(), cookie);

  return new ReturnT<>(cookie);
 }

XxlJobRemotingUtil

com.xxl.job.core.util.XxlJobRemotingUtil#postBody

//这里将XXL_JOB_ACCESS_TOKEN替换为XXL_JOB_ACCESS_COOKIE
connection.setRequestProperty(XXL_JOB_ACCESS_COOKIE, accessCookie);

4.2 API优化

  1. 分页

这里针对分页的优化有两个模块,一个是任务管理的分页和调度日志的分页,下面看一下改造前后的底层SQL,基本上是采用了Mysql ID连续大数据量深度分页的典型解法:

 <select id="pageList" resultMap="XxlJobLog">
  SELECT <include refid="Base_Column_List" />
  FROM xxl_job_log AS t
  <trim prefix="WHERE" prefixOverrides="AND | OR" >
   <if test="jobId==0 and jobGroup gt 0">
    AND t.job_group = #{jobGroup}
   </if>
   <if test="jobId gt 0">
    AND t.job_id = #{jobId}
   </if>
   <if test="triggerTimeStart != null">
    AND t.trigger_time <![CDATA[ >= ]]> #{triggerTimeStart}
   </if>
   <if test="triggerTimeEnd != null">
    AND t.trigger_time <![CDATA[ <= ]]> #{triggerTimeEnd}
   </if>
   <if test="logStatus == 1" >
    AND t.handle_code = 200
   </if>
   <if test="logStatus == 2" >
    AND (
     t.trigger_code NOT IN (0, 200) OR
     t.handle_code NOT IN (0, 200)
    )
   </if>
   <if test="logStatus == 3" >
    AND t.trigger_code = 200
    AND t.handle_code = 0
   </if>
  </trim>
  ORDER BY t.trigger_time DESC
  LIMIT #{offset}, #{pagesize}
 </select>

 <select id="pageListV2" resultMap="XxlJobLog">
  SELECT <include refid="Base_Column_List" /> from xxl_job_log as t inner join
      (SELECT a.id FROM xxl_job_log AS a
  <trim prefix="WHERE" prefixOverrides="AND | OR" >
   <if test="jobId==0 and jobGroup gt 0">
    AND a.job_group = #{jobGroup}
   </if>
   <if test="jobId gt 0">
    AND a.job_id = #{jobId}
   </if>
   <if test="triggerTimeStart != null">
    AND a.trigger_time <![CDATA[ >= ]]> #{triggerTimeStart}
   </if>
   <if test="triggerTimeEnd != null">
    AND a.trigger_time <![CDATA[ <= ]]> #{triggerTimeEnd}
   </if>
   <if test="logStatus == 1" >
    AND a.handle_code = 200
   </if>
   <if test="logStatus == 2" >
    AND (
    a.trigger_code NOT IN (0, 200) OR
    a.handle_code NOT IN (0, 200)
    )
   </if>
   <if test="logStatus == 3" >
    AND a.trigger_code = 200
    AND a.handle_code = 0
   </if>
  </trim>
  ORDER BY a.trigger_time DESC
  LIMIT #{offset}, #{pagesize}) as a on a.id = t.id;
 </select>

经过改造后百万数据量级的分页也都比较顺畅,且延迟和卡顿显著降低。

  1. 批量API

在xxljob的线程模型和其设计上来看,当调度任务上千,每秒调度上百任务的时候就会显得有点吃力,所以这里在交互api中增加了以下几个客户端api: 127.0.0.1:9999/runpool,客户端不再是单任务单线程方式,而是采用线程池的模式 127.0.0.1:9999/runbatch,服务端批量发起调度,客户端采用线程池模式调用任务 服务端API:127.0.0.1:8088/api/registryV3,

4.3 数据库模型优化&SQL优化

在研究完xxljob源码之后准备修改源码的时候首先也是将底层模型重构了一遍,同时集合一些场景简化了一些sql

  1. 使用bigint类型
  2. 相关表增加索引
  3. xxl_job_log表增加uuid
  4. 相关查询去掉order by (源码上看并不需要排序)

具体内容可以看一下源码:https://gitee.com/sky-painting/xxl-job/blob/3.0.0/doc/db/tables_xxl_job3.0.0.sql

4.4 并发优化

  1. 调度优化

xxljob每秒可调度的任务数有个公式,如下:

    preReadCount=(XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;

但是在调度核心代码中有个并发问题,就是会产生重复调度,也就是说1秒中一个任务有可能会调度两次,排查的方向就是查xxl_job_log表是否存在一个任务在同一秒内产生两条调度日志,sql如下:

select job_id,trigger_time  from xxl_job.xxl_job_log group by job_id,trigger_time having count(*)>1;

出问题的核心代码如下:

   
   private void pushTimeRing(int ringSecond, int jobId){
        // push async ring
        List<Integer> ringItemData = ringData.get(ringSecond);
        if (ringItemData == null) {
            ringItemData = new ArrayList<Integer>();
            ringData.put(ringSecond, ringItemData);
        }
        ringItemData.add(jobId);

        logger.debug(">>>>>>>>>>> xxl-job, schedule push time-ring : " + ringSecond + " = " + Arrays.asList(ringItemData) );
    }

    try {
                        // second data
                        List<Integer> ringItemData = new ArrayList<>();
                        int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
                        for (int i = 0; i < 2; i++) {
                            List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
                            if (tmpData != null) {
                                ringItemData.addAll(tmpData);
                            }
                        }

                        // ring trigger
                        logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
                        if (ringItemData.size() > 0) {
                            // do trigger
                            for (int jobId: ringItemData) {
                                // do trigger
                                JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1nullnullnull);
                            }
                            // clear
                            ringItemData.clear();
                        }
                    } catch (Exception e) {
                        if (!ringThreadToStop) {
                            logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
                        }
                    }

由于ringData内部存储的列表List非并发安全,同时list无法识别重复元素,另外在时间轮驱动的时候是驱动了两格度,所以在某些情况下会出现重复调度。本次优化采用了CopyOnWriteArraySet+cache的方式确保每秒调度的时候不存在相同的任务ID在调度器中,经过本地多轮高并发调度测试后不再产生该问题。

  1. 回调优化

在进行单元测试压测的过程中发现回调日志延迟比较高,触发时间和执行时间基本相差很多,另外执行时间失真,这里在测试过程中发现回调线程的效率很慢,虽然一直在跑,但是每次基本上就往服务端推了一条日志,这里通过 线程的睡眠机制,让回调线程可以一次推送更多回调日志,降低网络开销,减少客户端回调日志积压。如下是改造效果:

XxlJob深度性能优化实践
image.png
XxlJob深度性能优化实践
image.png

4.5 批量调度优化

本次进行优化的一个重点内容就是批量调度优化,调度流程如下:XxlJob深度性能优化实践需要注意的是,本次调度在调度过程中进行批量调度是同步的,可能存在对某个客户端进行批量调度的过程中存在网络延迟,导致后续调度存在阻塞,这个后续会进行进一步优化。

4.6 单元测试

本次优化过程增强了对xxljob的单元测试,更加方便测试和应用xxljob.

4.7 其他

上述几个优化点是本次改造的核心功能,另外还有一些小的细节性的优化这里不再继续阐述,感兴趣的可以看一下改造后的源码。

五、后续规划

代码地址:https://gitee.com/sky-painting/xxl-job.git因为本次改造主要关注性能方面和安全方面,所以一次性改了很多代码,同时在一些流程上的改进会比较激进,如果要应用的话还需要做好测试,创建本下游分支主要是为了学习xxljob,另外会根据合适的时机将本分支的一些改进反馈到社区。

就目前看很多公司还在用xxljob,也有不少公司在进行二次开发,所以如果你有想法或者已经在公司实践了也欢迎加我讨论,后续的迭代将专注于功能性扩展和我司对于xxljob的需求方面,相关内容会在本公众号更新,敬请关注。


原文始发于微信公众号(神帅的架构实战):XxlJob深度性能优化实践

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/241332.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!