ElasticJob分布式调度,使用注册中心zookeeper开启动态定时任务附源码(三)
问题背景
上一篇介绍了ElasticJob分布式固态定时任务,这个篇章介绍一下分布式动态定时任务
注意事项:
- 默认安装zookeeper,可以参考我的另一篇文章zookeeper单机及集群部署,附安装包下载(二)
- 默认安装JDK
- 可以复制文章的代码自己创建工程,也可以自己下载源码进行参考
- 依赖是有两个版本的,以前是当当网的开源项目,现在也有Apache的版本,本文使用的当当网的开源
项目搭建
1 引入pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.yg</groupId>
<artifactId>elastic-job</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>elastic-job</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- elastic-job -->
<!-- <dependency>-->
<!-- <groupId>org.apache.shardingsphere.elasticjob</groupId>-->
<!-- <artifactId>elasticjob-lite-core</artifactId>-->
<!-- <version>3.0.1</version>-->
<!-- </dependency>-->
<!-- elastic-job -->
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.1.5</version>
</dependency>
<!-- elastic-job end -->
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.6.0</version>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
2 动态添加定时任务句柄
package com.yg.elasticjob.elasticjob;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
//import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
//import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @Author suolong
* @Date 2022/4/14 11:24
* @Version 1.5
*/
@Component
public class ElasticJobHandler {
@Resource
private ZookeeperRegistryCenter registryCenter;
@Resource
private ElasticJobListener elasticJobListener;
/**
* @param jobName:任务的命名空间
* @param jobClass:执行的定时任务对象
* @param shardingTotalCount:分片个数
* @param cron:定时周期表达式
* @param id:自定义参数
* @return
*/
private static LiteJobConfiguration.Builder simpleJobConfigBuilder(String jobName, Class<? extends SimpleJob> jobClass, int shardingTotalCount, String cron, String id) {
//创建任务构建对象
LiteJobConfiguration.Builder builder = LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(
JobCoreConfiguration.
//任务命名空间名字、任务执行周期表达式、分片个数
newBuilder(jobName, cron, shardingTotalCount).
//自定义参数
jobParameter(id).
build(),
jobClass.getCanonicalName()));
//本地配置是否可覆盖注册中心配置
builder.overwrite(true);
return builder;
}
/**
* 添加一个定时任务
*
* @param cron:周期执行表达式
* @param id:自定义参数
* @param jobName:命名空间
* @param instance:任务对象
*/
public void addPublishJob(String cron, String id, String jobName, SimpleJob instance) {
LiteJobConfiguration jobConfig = simpleJobConfigBuilder(
jobName,
instance.getClass(),
1,
cron,
id).overwrite(true).build();
//DynamicTask为具体的任务执行逻辑类
new SpringJobScheduler(instance, registryCenter, jobConfig, elasticJobListener).init();
}
/***
* Date转cron表达式
*/
public static final String CRON_DATE_FORMAT = "ss mm HH dd MM ? yyyy";
/**
* 获得定时
*
* @param date
* @return
*/
public static String getCron(final Date date) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat(CRON_DATE_FORMAT);
return simpleDateFormat.format(date);
}
}
3 application配置文件
server.port=1995
#动态定时任务
#zookeeper地址
zkserver=10.10.195.193:2181
# 任务命名空间
zknamespace=zknamesp
4 zookeeper注册中心配置
package com.yg.elasticjob.config;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import com.yg.elasticjob.elasticjob.ElasticJobListener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author suolong
* @Date 2022/4/14 11:11
* @Version 2.0
*/
@Configuration
public class ElasticJobConfig {
//配置文件中的zookeeper的ip和端口
@Value(value = "${zkserver}")
private String serverlists;
//指定一个命名空间
@Value("${zknamespace}")
private String namespace;
/***
* 配置Zookeeper和namespace
* @return
*/
@Bean
public ZookeeperConfiguration zkConfig() {
return new ZookeeperConfiguration(serverlists, namespace);
}
/***
* 向zookeeper注册初始化信息
* @param
* @return
*/
@Bean(initMethod = "init")
public ZookeeperRegistryCenter regCenter(@Value("${zkserver}") final String serverList, @Value("${zknamespace}") final String namespace) {
ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(serverList, namespace);
// zookeeperConfiguration.setSessionTimeoutMilliseconds(10000);
return new ZookeeperRegistryCenter(zookeeperConfiguration);
}
5 实现SimpleJob
package com.yg.elasticjob.elasticjob;
//import org.apache.shardingsphere.elasticjob.api.ShardingContext;
//import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
/**
* @Author suolong
* @Date 2022/4/14 12:36
* @Version 1.5
*/
public class DynamicTask implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
//传递的参数
String id = shardingContext.getJobParameter();
try {
//具体任务逻辑
System.out.println("执行你的逻辑代码!param:" + id + " jobName: " + shardingContext.getJobName());
} catch (Exception e) {
e.printStackTrace();
}
}
}
6 使用Getmapping动态添加定时任务
package com.yg.elasticjob.controller;
import com.yg.elasticjob.elasticjob.DynamicTask;
import com.yg.elasticjob.elasticjob.ElasticJobHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.PostConstruct;
import java.util.Date;
/**
* @Author suolong
* @Date 2022/4/14 12:36
* @Version 1.5
*/
@RestController
@RequestMapping(value = "/test")
public class TestController {
@Autowired
ElasticJobHandler elasticJobHandler;
/***
* 动态创建任务
* @param times:延迟时间,为了测试到效果,所以在当前时间往后延迟
* @param jobname:任务名字
* @param param:自定义参数
* @return
*/
@GetMapping
public String add(@RequestHeader Long times, @RequestHeader String jobname, @RequestHeader String param) {
//在当前指定时间内延迟times毫秒执行任务
Date date = new Date(System.currentTimeMillis() + times);
//需要传递给定时任务的参数
// String cron = ElasticJobHandler.getCron(date);
String cron = "0/2 * * * * ?";
//执行任务
elasticJobHandler.addPublishJob(cron, param, jobname, new DynamicTask());
return "添加任务成功!";
}
}
7 任务启动类
package com.yg.elasticjob;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ElasticJobApplication {
public static void main(String[] args) {
SpringApplication.run(ElasticJobApplication.class, args);
}
}
代码测试
总结
- 可以根据需求设置自己需要的任务参数
作为程序员第 112 篇文章,每次写一句歌词记录一下,看看人生有几首歌的时间,wahahaha …
Lyric: 你说给过我纵容
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/110763.html