前言
上一章节我们介绍了ElasticJob框架中最基本也是使用最广的一种作业类型SimpleJob整合JavaApi来实现基本的定时任务,本章节我们将介绍第二种相对来说不怎么常用的作业,DataflowJob流式作业。总而言之就是,完整的一次流式作业执行过程包括取数据和处理数据的过程。这种作业常用于导入第三方订单等业务场景之中。
一、 开发环境准备
1.Eclipse
2.zookeeper
3.JDK1.8
4.maven3.6.3
5.mysql8.0
二、使用步骤
1.编写代码
在App.java中编写DataflowJob作业配置方法:
/*
* Job配置--DataflowJob
*/
public static LiteJobConfiguration configurationDataFlow() {
//第一步job核心配置
JobCoreConfiguration jcc = JobCoreConfiguration
.newBuilder("myDataFlowJob", "0/10 * * * * ?", 2)
.build();
//第二步job类型配置
JobTypeConfiguration jtc = new DataflowJobConfiguration(jcc,
MyDataFlowJob.class.getCanonicalName(), true);
//第三步job根的配置
LiteJobConfiguration ljc = LiteJobConfiguration
.newBuilder(jtc)
.overwrite(true)
.build();
return ljc;
}
zookeeper注册中心:
public static CoordinatorRegistryCenter zkCenter() {
ZookeeperConfiguration zc = new ZookeeperConfiguration("localhost:2181"
, "java-dataflow-job");
CoordinatorRegistryCenter crc = new ZookeeperRegistryCenter(zc);
//注册中心初始化
crc.init();
return crc;
}
将作业注册到JobSchedule:
new JobScheduler(zkCenter(), configurationDataFlow()).init();
创建订单实体类:
package com.example.java_simple_job.model;
public class Order {
private Integer orderId;
private Integer status;
public Integer getOrderId() {
return orderId;
}
public void setOrderId(Integer orderId) {
this.orderId = orderId;
}
public Integer getStatus() {
return status;
}
public void setStatus(Integer status) {
this.status = status;
}
@Override
public String toString() {
return "Order [orderId=" + orderId + ", status=" + status + "]";
}
}
DataFlowJob流式作业分为两个部分,第一部分是取数据,第二部分是处理数据。
具体逻辑实现如下:
package com.example.java_simple_job.job;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import com.example.java_simple_job.model.Order;
public class MyDataFlowJob implements DataflowJob<Order>{
/**
1.创建订单列表
*/
List<Order> orders = new ArrayList<Order>();
{
for(int i = 0; i < 100; i++) {
Order order = new Order();
order.setOrderId(i+1);
//数据未处理
order.setStatus(0);
orders.add(order);
}
}
/**
1.取数据
*/
@Override
public List<Order> fetchData(ShardingContext shardingContext) {
// TODO Auto-generated method stub
// 订单号 % 分片总数 == 当前分片项
List<Order> orderList = orders.stream().filter(o->o.getStatus() == 0)
.filter(o->o.getOrderId() % shardingContext.getShardingTotalCount() == shardingContext.getShardingItem())
.collect(Collectors.toList());
List<Order> subList = null;
if(orderList != null && orderList.size() > 0) {
subList = orderList.subList(0, 10);
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
LocalTime time = LocalTime.now();
System.out.println(time + ",我是分片项:"+ shardingContext.getShardingItem() + ",我抓取的数据:" + subList);
return subList;
}
/**
1.数据处理
*/
@Override
public void processData(ShardingContext shardingContext, List<Order> data) {
// TODO Auto-generated method stub
data.forEach(o->o.setStatus(1));
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
LocalTime time = LocalTime.now();
System.out.println(time + "我是分片项" + shardingContext.getShardingItem() +",我正在处理数据!");
}
}
总结
本章介绍了ElasticJob中第二种流式作业DataFlowJob的基本使用,下章节介绍SimpleJob结合Spring框架进行整合
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/71713.html