dolphinscheduler调度器接入注意事项等信息可参考我的上一篇博客进行了解,地址在这里 ->
文章目录
一、功能清单
二、可拖拽spark任务管理
说明:任务管理实际是操作dolphinscheduler调度器中的项目中的用户下唯一工作流中的各种类型节点的管理操作
共用的依赖
<!--httpclient-->
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>3.1</version>
</dependency>
共用配置文件
dolphinscheduler.token=xxx
dolphinscheduler.address=http://IP:12345
共用代码
@Autowired
private RestTemplate restTemplate;
@Value("${dolphinscheduler.token}")
String token;
@Value("${dolphinscheduler.address}")
String address;
public static final int ZERO = 0;
public static final int SUCCESS = 200;
@Autowired
private DragSparkTaskService dragSparkTaskService;
@Value("${spark.main.class}")
String mainClass;
public static final String CREATE = "create";
public static final String UPDATE = "update";
public static final String ADD = "add";
public static final String DELETE = "delete";
public static final String ONLINE = "ONLINE";
public static final String OFFLINE = "OFFLINE";
public static final int ONE_THOUSAND_AND_FIVE_HUNDRED = 1500;
public static final int SIX = 6;
public static final int EIGHTY = 80;
public static final int THREE = 3;
@Autowired
private StringRedisTemplate redisTemplate;
@Value("${drag.task.state}")
String dragTaskState;
@Autowired
private DragSparkTaskMapper dragSparkTaskMapper;
1.创建可拖拽模型spark任务
{
"name": "测试任务模型1",
"describeInfo": "测试任务模型1",
"projectName": "spark线性回归模型",
"task": {
"edges": [
{
"id": "13889c99",
"index": 2,
"source": "356bc2be",
"target": "33838a0d",
"sourceAnchor": 2,
"targetAnchor": 0
}
],
"nodes": [
{
"x": 482.671875,
"y": 89.125,
"id": "356bc2be",
"size": "72*72",
"type": "node",
"color": "#1890FF",
"index": 0,
"label": "数据源",
"shape": "flow-circle",
"config": {
"sourceType": "mysql",
"targetTable": "machine_learning_house_info2"
},
"inputs": [],
"outputs": [],
"modelType": "dataSource"
},
{
"x": 502.171875,
"y": 269.125,
"id": "33838a0d",
"size": "110*42",
"type": "node",
"color": "#66C35D",
"index": 1,
"label": "全表统计",
"shape": "flow-capsule",
"config": {
"selectColumns": "*"
},
"inputs": [],
"outputs": [],
"modelType": "fullTableStatistics"
}
]
}
}
/**
* 创建任务-创建用户下唯一工作流,无则创建有则并排添加
* @param request request
* @param vo 任务参数
* @author liudz
* @date 2021/5/8
* @return 执行结果
**/
@PostMapping("/project/process")
@Transactional(rollbackFor = Exception.class)
public Response operatorDragSparkTask(HttpServletRequest request, @RequestBody DragSparkTaskVo vo) {
Long userId = Long.valueOf(request.getUserPrincipal().getName());
vo.setCreateId(userId);
vo.setUpdateId(userId);
if (vo == null || org.apache.commons.lang3.StringUtils.isBlank(vo.getName()) || vo.getDescribeInfo() == null
|| vo.getCreateId() == null) {
log.error("--DragSparkTaskController--addDragSparkTask--PARAM_ERROR!--");
return Response.error(Msg.PARAM_ERROR);
}
Response<DragSparkTaskVo> dragSparkTaskResponse = dragSparkTaskService.addDragSparkTask(vo);
log.info("--(1)addDragSparkTask--success");
if (dragSparkTaskResponse.getCode() == SUCCESS) {
Boolean verifyResult = verifyProcessExist(userId + "-dragSparkTask", vo.getProjectName());
log.info("--(2)verifyProcessExist--success:{}", verifyResult);
if (!verifyResult) {
ProcessDto processDto = packageProcessParam(
"create", userId + "-dragSparkTask", vo, null);
log.info("--(3)packageProcessParam--success");
dragSparkTaskResponse = createProcess(vo, processDto);
} else {
//获取用户下唯一工作流ID
DolphinschedulerResponse processInfoList = getUserProcess(vo.getProjectName());
JSONObject processJson = new JSONObject();
log.info("--(3)getUserProcess--success:{}", processInfoList);
List<Map<String, Object>> list = (List<Map<String, Object>>) processInfoList.getData();
for (Map<String, Object> map : list) {
if (map.get("name").equals(userId + "-dragSparkTask")) {
processJson.fluentPutAll(map);
}
}
ProcessDto processDto = packageProcessParam(
"add", userId + "-dragSparkTask", vo, processJson);
processDto.setId(processJson.getInteger("id"));
log.info("--(4)packageProcessParam--success");
if (processJson.getString(DictionaryEnum.RELEASE_STATE.getFiledString()).equals(ONLINE)) {
releaseProcessDefinition(vo.getProjectName(), userId + "-dragSparkTask",
processDto.getId(), 0);
log.info("--(5)releaseProcessDefinition--OFFLINE--success");
}
dragSparkTaskResponse = updateProcess(vo, processDto);
}
}
return dragSparkTaskResponse;
}
/**
* 校验工作流是否存在
*
* @param processName
* 工作流名称
* @param projectName 项目名称
* @author liudz
* @date 2021/5/8
* @return boolean
**/
public Boolean verifyProcessExist(String processName, String projectName) {
HttpHeaders headers = new HttpHeaders();
headers.set("token", token);
headers.set("Content-Type", "application/json");
HttpEntity requestEntity = new HttpEntity(headers);
ResponseEntity<DolphinschedulerResponse> response =
restTemplate.exchange(address + "/dolphinscheduler/projects/" + projectName
+ "/process/verify-name?name=" + processName,
HttpMethod.GET, requestEntity, DolphinschedulerResponse.class);
if (response.getBody().getCode() == ZERO) {
return false;
}
return true;
}
/**
* 获取dolphinscheduler上的某用户下唯一工作流
* @param projectName 项目名称
* @author liudz
* @date 2021/5/8
* @return id
**/
public DolphinschedulerResponse getUserProcess(String projectName) {
HttpHeaders headers = new HttpHeaders();
headers.set("token", token);
headers.set("Content-Type", "application/json");
HttpEntity requestEntity = new HttpEntity(headers);
ResponseEntity<DolphinschedulerResponse> response =
restTemplate.exchange(address + "/dolphinscheduler/projects/" + projectName + "/process/list",
HttpMethod.GET, requestEntity, DolphinschedulerResponse.class);
return response.getBody();
}
/**
* 封装参数
* @param type 操作类型
* @param processName 用户工作流名称
* @param vo 任务参数
* @param processJson 工作流json
* @author liudz
* @date 2021/5/13
* @return ProcessDto
**/
public ProcessDto packageProcessParam(String type, String processName, DragSparkTaskVo vo, JSONObject processJson) {
ProcessDto processDto = new ProcessDto();
processDto.setConnects("[]");
processDto.setName(processName);
JSONObject locationsOne = new JSONObject();
JSONObject locationsTwo = new JSONObject();
locationsTwo.fluentPut("name", "spark-" + vo.getId()).fluentPut("targetarr", "").fluentPut("nodenumber", "0");
locationsTwo.fluentPut("x", 0).fluentPut("y", 0);
locationsOne.put("tasks-" + vo.getId(), locationsTwo);
// 创建工作流
if (CREATE.equals(type)) {
processDto = packageProcessParamOfCreate(processDto, vo, locationsOne);
} else if (ADD.equals(type)) {
//工作流添加节点
processDto = packageProcessParamOfAdd(processDto, vo, processJson, locationsOne, locationsTwo);
} else if (UPDATE.equals(type)) {
//更新工作流-只更新参数processDefinitionJson的tasks参数
processDto = packageProcessParamOfUpdate(processDto, processJson, vo);
} else if (DELETE.equals(type)) {
//更新工作流或删除工作流-更新则删除参数processDefinitionJson的tasks参数
processDto = packageProcessParamOfDelete(processDto, processJson, vo);
}
return processDto;
}
/**
* packageProcessParamOfCreate
* @param processDto 工作流参数
* @param vo 任务参数
* @param locationsOne locationsOne
* @author liudz
* @date 2021/5/7
* @return ProcessDto
**/
public ProcessDto packageProcessParamOfCreate(ProcessDto processDto, DragSparkTaskVo vo, JSONObject locationsOne) {
processDto.setLocations(locationsOne.toString());
JSONObject processOne = new JSONObject();
processOne.fluentPut("globalParams", new ArrayList<>()).fluentPut("tenantId", THREE).fluentPut("timeout", 0);
JSONObject processTwo = new JSONObject();
processTwo.fluentPut("type", "SPARK").fluentPut("id", "tasks-" + vo.getId());
processTwo.fluentPut("name", "spark-" + vo.getId()).fluentPut("description", "");
JSONArray nodesArray = parseLineAndNode(vo.getTask());
JSONObject json = new JSONObject();
json.put("id", vo.getId());
json.put("name", vo.getName());
json.put("describeInfo", vo.getDescribeInfo());
json.put("nodes", nodesArray);
String taskJsonString = json.toString().replace("}}", "} }").replace("{{", "{ {");
processTwo.put("params", JSONObject.parseObject("{\"mainClass\":\"" + mainClass + "\","
+ "\"mainJar\":{\"id\":" + getSparkResourceJarId() + "},\"deployMode\":\"cluster\","
+ "\"resourceList\":[],\"localParams\":[],\"driverCores\":1,\"driverMemory\":\"1G\","
+ "\"numExecutors\":\"1\",\"executorMemory\":\"1G\",\"executorCores\":\"1\",\"mainArgs\":\"\\\""
+ taskJsonString.replace("\"", "\\\\\\\"") + "\\\"\",\"others\":\"\","
+ "\"programType\":\"JAVA\",\"sparkVersion\":\"SPARK2\"}"));
JSONObject jsonTimeout = new JSONObject();
jsonTimeout.fluentPut("strategy", "").fluentPut("interval", null).fluentPut("enable", false);
processTwo.fluentPut("timeout", jsonTimeout).fluentPut("runFlag", "NORMAL");
JSONObject processTree = new JSONObject();
processTree.fluentPut("successNode", new ArrayList<>()).fluentPut("failedNode", new ArrayList<>());
JSONObject jsonconditionResult = new JSONObject();
jsonconditionResult.put("successNode", new ArrayList<>());
jsonconditionResult.put("failedNode", new ArrayList<>());
processTwo.fluentPut("conditionResult", jsonconditionResult).fluentPut("dependence", new JSONObject());
processTwo.fluentPut("maxRetryTimes", "0").fluentPut("retryInterval", "1");
processTwo.fluentPut("taskInstancePriority", "MEDIUM").fluentPut("workerGroup", "default");
processTwo.fluentPut("preTasks", new ArrayList<>());
JSONArray processTaskArray = new JSONArray();
processTaskArray.add(processTwo);
processOne.put("tasks", processTaskArray);
processDto.setProcessDefinitionJson(processOne.toString());
return processDto;
}
/**
* packageProcessParamOfAdd
* @param processDto 工作流参数
* @param locationsOne locationsOne
* @param locationsTwo locationsTwo
* @param vo 任务参数
* @param processJson 工作流json
* @author liudz
* @date 2021/5/7
* @return ProcessDto
**/
public ProcessDto packageProcessParamOfAdd(ProcessDto processDto, DragSparkTaskVo vo, JSONObject processJson,
JSONObject locationsOne, JSONObject locationsTwo) {
String maxTaskKey = JsonTools.getJsonStringMaxKey(processJson.getString("locations"));
Integer x = processJson.getJSONObject("locations").getJSONObject(maxTaskKey).getInteger("x");
Integer y = processJson.getJSONObject("locations").getJSONObject(maxTaskKey).getInteger("y");
if (x < ONE_THOUSAND_AND_FIVE_HUNDRED) {
locationsTwo.fluentPut("x", x + EIGHTY).fluentPut("y", y);
} else if (x >= ONE_THOUSAND_AND_FIVE_HUNDRED) {
locationsTwo.fluentPut("y", y + EIGHTY).fluentPut("x", 0);
}
locationsOne = processJson.getJSONObject("locations").fluentPut("tasks-" + vo.getId(), locationsTwo);
processDto.setLocations(locationsOne.toString());
processDto.setId(processJson.getInteger("id"));
JSONObject processTwo = new JSONObject();
processTwo.fluentPut("type", "SPARK").fluentPut("id", "tasks-" + vo.getId());
processTwo.fluentPut("name", "spark-" + vo.getId()).fluentPut("description", "");
JSONArray nodesArray = parseLineAndNode(vo.getTask());
JSONObject json = new JSONObject();
json.put("id", vo.getId());
json.put("name", vo.getName());
json.put("describeInfo", vo.getDescribeInfo());
json.put("nodes", nodesArray);
String taskJsonString = json.toString().replace("}}", "} }").replace("{{", "{ {");
processTwo.put("params", JSONObject.parseObject("{\"mainClass\":\"" + mainClass + "\","
+ "\"mainJar\":{\"id\":" + getSparkResourceJarId() + "},\"deployMode\":\"cluster\","
+ "\"resourceList\":[],\"localParams\":[],\"driverCores\":1,\"driverMemory\":\"1G\","
+ "\"numExecutors\":\"1\",\"executorMemory\":\"1G\",\"executorCores\":\"1\",\"mainArgs\":\"\\\""
+ taskJsonString.replace("\"", "\\\\\\\"") + "\\\"\",\"others\":\"\","
+ "\"programType\":\"JAVA\",\"sparkVersion\":\"SPARK2\"}"));
JSONObject jsonTimeout = new JSONObject();
jsonTimeout.fluentPut("strategy", "").fluentPut("interval", null).fluentPut("enable", false);
processTwo.fluentPut("timeout", jsonTimeout).fluentPut("runFlag", "NORMAL");
JSONObject processTree = new JSONObject();
processTree.fluentPut("successNode", new ArrayList<>()).fluentPut("failedNode", new ArrayList<>());
JSONObject jsonconditionResult = new JSONObject();
jsonconditionResult.put("successNode", new ArrayList<>());
jsonconditionResult.put("failedNode", new ArrayList<>());
processTwo.fluentPut("conditionResult", jsonconditionResult).fluentPut("dependence", new JSONObject());
processTwo.fluentPut("maxRetryTimes", "0").fluentPut("retryInterval", "1");
processTwo.fluentPut("taskInstancePriority", "MEDIUM").fluentPut("workerGroup", "default");
processTwo.fluentPut("preTasks", new ArrayList<>());
JSONObject jsonNew = processJson.getJSONObject("processDefinitionJson");
JSONArray jsonArray = jsonNew.getJSONArray("tasks");
jsonArray.add(processTwo);
jsonNew.put("tasks", jsonArray);
processDto.setProcessDefinitionJson(jsonNew.toString());
return processDto;
}
/**
* 工作流【上线或者下线】
* @param projectName 项目名称
* @param processName 用户工作流名称
* @param processId 工作流ID
* @param releaseState 上下线状态操作【0:下线,1:上线】
* @author liudz
* @date 2021/5/7
* @return 执行结果
**/
public Response releaseProcessDefinition(String projectName, String processName, Integer processId,
Integer releaseState) {
try {
String postURL = address + "/dolphinscheduler/projects/"
+ URLEncoder.encode(projectName, "utf-8") + "/process/release";
PostMethod postMethod = new PostMethod(postURL);
postMethod.setRequestHeader("Content-Type",
"application/x-www-form-urlencoded;charset=utf-8");
postMethod.setRequestHeader("token", token);
// 参数设置,需要注意的就是里边不能传NULL,要传空字符串
NameValuePair[] data = {new NameValuePair("name", processName),
new NameValuePair("processId", processId.toString()),
new NameValuePair("releaseState", releaseState.toString())};
postMethod.setRequestBody(data);
org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient();
httpClient.executeMethod(postMethod);
JSONObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString());
if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) {
return Response.error(result.getInteger("code"), result.getString("msg"));
}
} catch (Exception e) {
log.info("请求异常:{}", e);
}
return Response.success();
}
2.更新可拖拽模型spark任务
{
"name": "测试任务模型1",
"describeInfo": "测试任务模型1",
"projectName": "spark线性回归模型",
"id": 111,
"task": {
"edges": [
{
"id": "13889c99",
"index": 2,
"source": "356bc2be",
"target": "33838a0d",
"sourceAnchor": 2,
"targetAnchor": 0
}
],
"nodes": [
{
"x": 482.671875,
"y": 89.125,
"id": "356bc2be",
"size": "72*72",
"type": "node",
"color": "#1890FF",
"index": 0,
"label": "数据源",
"shape": "flow-circle",
"config": {
"sourceType": "mysql",
"targetTable": "machine_learning_house_info2"
},
"inputs": [],
"outputs": [],
"modelType": "dataSource"
},
{
"x": 502.171875,
"y": 269.125,
"id": "33838a0d",
"size": "110*42",
"type": "node",
"color": "#66C35D",
"index": 1,
"label": "全表统计",
"shape": "flow-capsule",
"config": {
"selectColumns": "*"
},
"inputs": [],
"outputs": [],
"modelType": "fullTableStatistics"
}
]
}
}
/**
* 更新任务
* @param request request
* @param vo 任务参数
* @author liudz
* @date 2021/5/8
* @return 执行结果
**/
@PutMapping("/project/process")
@Transactional(rollbackFor = Exception.class)
public Response updateDragSparkTask(HttpServletRequest request, @RequestBody DragSparkTaskVo vo) {
Long userId = Long.valueOf(request.getUserPrincipal().getName());
vo.setCreateId(userId);
vo.setUpdateId(userId);
if (vo == null || org.apache.commons.lang3.StringUtils.isBlank(vo.getName()) || vo.getDescribeInfo() == null
|| vo.getCreateId() == null) {
log.error("--DragSparkTaskController--updateDragSparkTask--PARAM_ERROR!--");
return Response.error(Msg.PARAM_ERROR);
}
Response<DragSparkTaskVo> dragSparkTaskResponse = dragSparkTaskService.updateDragSparkTask(vo);
log.info("--(1)updateDragSparkTask--mysql--success");
if (dragSparkTaskResponse.getCode() == SUCCESS) {
//获取用户下唯一工作流ID
DolphinschedulerResponse processInfoList = getUserProcess(vo.getProjectName());
JSONObject processJson = new JSONObject();
log.info("--(2)getUserProcess--success:{}", processInfoList);
List<Map<String, Object>> list = (List<Map<String, Object>>) processInfoList.getData();
for (Map<String, Object> map : list) {
if (map.get("name").equals(userId + "-dragSparkTask")) {
processJson.fluentPutAll(map);
}
}
ProcessDto processDto = packageProcessParam(
"update", userId + "-dragSparkTask", vo, processJson);
processDto.setId(processJson.getInteger("id"));
log.info("--(3)packageProcessParam--success");
if (processJson.getString(DictionaryEnum.RELEASE_STATE.getFiledString()).equals(ONLINE)) {
releaseProcessDefinition(vo.getProjectName(), userId + "-dragSparkTask",
processDto.getId(), 0);
log.info("--(4)releaseProcessDefinition--OFFLINE--success");
}
return updateProcess(vo, processDto);
}
return dragSparkTaskResponse;
}
/**
* packageProcessParamOfUpdate
* @param processDto 工作流参数
* @param vo 任务参数
* @param processJson 工作流json
* @author liudz
* @date 2021/5/7
* @return ProcessDto
**/
public ProcessDto packageProcessParamOfUpdate(ProcessDto processDto, JSONObject processJson, DragSparkTaskVo vo) {
processDto.setLocations(processJson.getString("locations"));
processDto.setId(processJson.getInteger("id"));
JSONArray jsonTasksArray = processJson.getJSONObject("processDefinitionJson").getJSONArray("tasks");
JSONArray copyJsonTasksArray = new JSONArray();
copyJsonTasksArray.addAll(jsonTasksArray);
JSONObject processDefinitionJson = new JSONObject();
JSONArray nodesArray = parseLineAndNode(vo.getTask());
JSONObject json = new JSONObject();
json.put("id", vo.getId());
json.put("name", vo.getName());
json.put("describeInfo", vo.getDescribeInfo());
json.put("nodes", nodesArray);
String taskJsonString = json.toString().replace("}}", "} }").replace("{{", "{ {");
for (Object object : jsonTasksArray) {
JSONObject jsonObject = JSONObject.parseObject(object.toString());
if (Long.valueOf(jsonObject.getString("id").substring(SIX)) == vo.getId()) {
String mainArgs = jsonObject.getString("mainArgs");
mainArgs = "\"" + taskJsonString.replace("\"", "\\\"") + "\"";
copyJsonTasksArray.remove(jsonObject);
jsonObject.getJSONObject("params").put("mainArgs", mainArgs);
copyJsonTasksArray.add(jsonObject);
processDefinitionJson = processJson.getJSONObject("processDefinitionJson");
processDefinitionJson.put("tasks", copyJsonTasksArray);
}
}
processDto.setProcessDefinitionJson(processDefinitionJson.toString());
return processDto;
}
/**
* 更新工作流
* @param vo vo
* @param processDto processDto
* @author liudz
* @date 2021/5/7
* @return 执行结果
**/
public Response updateProcess(DragSparkTaskVo vo, ProcessDto processDto) {
try {
String postURL = address + "/dolphinscheduler/projects/"
+ URLEncoder.encode(vo.getProjectName(), "utf-8") + "/process/update";
PostMethod postMethod = new PostMethod(postURL);
postMethod.setRequestHeader("Content-Type",
"application/x-www-form-urlencoded;charset=utf-8");
postMethod.setRequestHeader("token", token);
// 参数设置,需要注意的就是里边不能传NULL,要传空字符串
NameValuePair[] data = {new NameValuePair("connects", processDto.getConnects()),
new NameValuePair("name", processDto.getName()),
new NameValuePair("locations", processDto.getLocations()),
new NameValuePair("id", processDto.getId().toString()),
new NameValuePair("processDefinitionJson", processDto.getProcessDefinitionJson())};
postMethod.setRequestBody(data);
org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient();
httpClient.executeMethod(postMethod);
JSONObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString());
log.info("--(5)httpUpdateProcess:{}", result);
if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) {
return Response.error(result.getInteger("code"), result.getString("msg"));
}
} catch (Exception e) {
log.info("请求异常:{}", e);
}
return Response.success();
}
3.删除可拖拽模型spark任务
/**
* 删除任务
* @param request request
* @param projectName 项目名称
* @param id 任务ID
* @author liudz
* @date 2021/5/8
* @return 执行结果
**/
@DeleteMapping("/project/process/{projectName}/{id}")
@Transactional(rollbackFor = Exception.class)
public Response deleteDragSparkTask(HttpServletRequest request, @PathVariable("projectName") String projectName,
@PathVariable("id") Long id) {
DragSparkTaskVo vo = new DragSparkTaskVo();
Long userId = Long.valueOf(request.getUserPrincipal().getName());
vo.setId(id);
vo.setCreateId(userId);
vo.setProjectName(projectName);
if (vo == null || vo.getId() == null || vo.getCreateId() == null) {
log.error("--deleteDragSparkTask--PARAM_ERROR!--");
return Response.error(Msg.PARAM_ERROR);
}
Response<DragSparkTaskVo> dragSparkTaskResponse = dragSparkTaskService.deleteDragSparkTask(vo);
log.info("--(1)deleteDragSparkTask--mysql--success");
if (dragSparkTaskResponse.getCode() == SUCCESS) {
//获取用户下唯一工作流ID
DolphinschedulerResponse processInfoList = getUserProcess(vo.getProjectName());
JSONObject processJson = new JSONObject();
log.info("--(2)getUserProcess--success:{}", processInfoList);
List<Map<String, Object>> list = (List<Map<String, Object>>) processInfoList.getData();
for (Map<String, Object> map : list) {
if (map.get("name").equals(userId + "-dragSparkTask")) {
processJson.fluentPutAll(map);
}
}
ProcessDto processDto = packageProcessParam(
"delete", userId + "-dragSparkTask", vo, processJson);
processDto.setId(processJson.getInteger("id"));
log.info("--(3)packageProcessParam--success");
if (processJson.getString(DictionaryEnum.RELEASE_STATE.getFiledString()).equals(ONLINE)) {
releaseProcessDefinition(vo.getProjectName(), userId + "-dragSparkTask",
processDto.getId(), 0);
log.info("--(4)releaseProcessDefinition--OFFLINE--success");
}
if (JSONObject.parseObject(processDto.getLocations()).keySet().size() == 0) {
//删除工作流
deleteProcess(vo, processDto);
} else {
//更新工作流
updateProcess(vo, processDto);
}
}
return dragSparkTaskResponse;
}
/**
* 删除工作流
* @param vo vo
* @param processDto processDto
* @author liudz
* @date 2021/5/7
* @return 执行结果
**/
public DolphinschedulerResponse deleteProcess(DragSparkTaskVo vo, ProcessDto processDto) {
HttpHeaders headers = new HttpHeaders();
headers.set("token", token);
headers.set("Content-Type", "application/json");
HttpEntity requestEntity = new HttpEntity(headers);
ResponseEntity<DolphinschedulerResponse> response =
restTemplate.exchange(address + "/dolphinscheduler/projects/" + vo.getProjectName()
+ "/process/delete?processDefinitionId=" + processDto.getId(),
HttpMethod.GET, requestEntity, DolphinschedulerResponse.class);
log.info("--(5)httpDeleteProcess:{}", response);
return response.getBody();
}
三、本人相关其他文章链接
1.springboot项目集成dolphinscheduler调度器 可拖拽spark任务管理:
https://blog.csdn.net/a924382407/article/details/117119831
2.springboot项目集成dolphinscheduler调度器 实现datax数据同步任务:
https://blog.csdn.net/a924382407/article/details/120951230
3.springboot项目集成dolphinscheduler调度器 项目管理:
https://blog.csdn.net/a924382407/article/details/117118931
4.springboot项目集成大数据第三方dolphinscheduler调度器 执行/停止任务
https://blog.csdn.net/a924382407/article/details/117121181
5.springboot项目集成大数据第三方dolphinscheduler调度器
https://blog.csdn.net/a924382407/article/details/117113848
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/106269.html