接着上面的项目,添加工具类的方法
项目gitee https://gitee.com/gangye/elasticsearch_demo
一、向es的指定索引中新增文档
/**
* @Description 单条记录插入
* @param indexName
* @param sourceMap 传入的记录的键值对
* @return
*/
public static Map<String, Object> singleInsert (String indexName, Map<String, Object> sourceMap) {
Map<String, Object> insertResult = new HashMap<>();
RestHighLevelClient client = EsClient.getInstance();
IndexRequest indexRequest = new IndexRequest(indexName, DefineConstant.SEARCH_REQUEST_TYPE);
//es对应的索引文档的字段一定要有id_s,便于与_id相对应,且一定要逻辑控制id的唯一
String id = StringUtils.isEmpty(sourceMap.get("id_s")) ? UUID.randomUUID().toString() : (String) sourceMap.get("id_s");
indexRequest.id(id);
indexRequest.source(sourceMap);
//写入后立即刷新
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
try {
String lineSeparator = System.lineSeparator();
log.info(lineSeparator + "index:" + indexName + lineSeparator + "insert:"+ indexRequest.toString() + lineSeparator);
IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
log.info("es insert response: {}", indexResponse.toString());
insertResult.put("id", indexResponse.getId());
insertResult.put("status", indexResponse.status().toString());
} catch (IOException e) {
log.error(e.getMessage());
}
return insertResult;
}
向income_expense_analysis索引中插入一条记录
对应的响应报文以及日志打印
成功插入一条记录,这里的id_s与_id一致,便于后续修改操作的时候传入id
二、向es指定的索引中,根据文档id修改记录
/**
* @Description 修改记录数据
* @param indexName
* @param updateMap
* @return
*/
public static Map<String, Object> updateEsRecord (String indexName, Map<String, Object> updateMap){
Map<String, Object> updateResult = new HashMap<>();
RestHighLevelClient client = EsClient.getInstance();
//此时之前指定的id_s字段就起作用了
if (StringUtils.isEmpty(updateMap.get("id_s"))){
log.error("尚未指定修改的记录id");
return null;
}
String id = (String) updateMap.get("id_s");
UpdateRequest updateRequest = new UpdateRequest(indexName, DefineConstant.SEARCH_REQUEST_TYPE, id);
updateRequest.doc(updateMap);
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
try {
String lineSeparator = System.lineSeparator();
log.info(lineSeparator + "index:" + indexName + lineSeparator + "update:"+ updateRequest.toString() + lineSeparator);
UpdateResponse updateResponse = client.update(updateRequest, RequestOptions.DEFAULT);
log.info("es update response: {}", updateResponse.toString());
updateResult.put("id", updateResponse.getId());
updateResult.put("status", updateResponse.status().toString());
} catch (IOException e) {
log.error(e.getMessage());
}
return updateResult;
}
修改刚新增id为2020-03-313186482的文档的txn_remark_s字段为”03月31日商户交易入账”
对应的响应报文以及日志打印
成功修改文档记录
三、根据指定的id删除文档
/**
* @Description 删除记录数据
* @param indexName
* @param id
* @return
*/
public static Map<String, Object> deleteEsRecord (String indexName, String id){
Map<String, Object> deleteResult = new HashMap<>();
RestHighLevelClient client = EsClient.getInstance();
//此时之前指定的id_s字段就起作用了
if (StringUtils.isEmpty(id)){
log.error("尚未指定删除的记录id");
return null;
}
DeleteRequest deleteRequest = new DeleteRequest(indexName, DefineConstant.SEARCH_REQUEST_TYPE, id);
deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
try {
String lineSeparator = System.lineSeparator();
log.info(lineSeparator + "index:" + indexName + lineSeparator + "delete:"+ deleteRequest.toString() + lineSeparator);
DeleteResponse deleteResponse = client.delete(deleteRequest, RequestOptions.DEFAULT);
log.info("es update response: {}", deleteResponse.toString());
deleteResult.put("id", deleteResponse.getId());
deleteResult.put("status", deleteResponse.status().toString());
} catch (IOException e) {
log.error(e.getMessage());
}
return deleteResult;
}
删除刚刚新增的id为2020-03-313186482 的文档记录
响应的结果以及日志打印
再结合工具查看income_expense_analysis索引下的文档数据记录,成功删除
四、批量插入数据,使用bulk操作
/**
* @Description 批量向es插入数据,最好将批量数据分批向es插入,而且将客户端的超时时间延长(此处我使用的单记录的操作的5秒对于批量操作就不合适了)
* @param indexName
* @param sourceList
* @return
*/
public static Map<String, Object> batchInsert(String indexName, List<Map<String, Object>> sourceList){
List<Map<String, Object>> insertResults = new ArrayList<>();
Map<String, Object> result = new HashMap<>();
RestHighLevelClient client = EsClient.getInstance();
BulkRequest bulkRequest = new BulkRequest();
for (Map<String, Object> source : sourceList){
IndexRequest indexRequest = new IndexRequest(indexName,DefineConstant.SEARCH_REQUEST_TYPE);
indexRequest.id(StringUtils.isEmpty(source.get("id_s")) ? UUID.randomUUID().toString() : (String) source.get("id_s"));
indexRequest.source(source);
bulkRequest.add(indexRequest);
}
bulkRequest.timeout(TimeValue.timeValueMintue(2));
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
try {
BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
BulkItemResponse[] itemResponses = bulkResponse.getItems();
for (int i = 0;i < itemResponses.length; i++){
BulkItemResponse response = itemResponses[i];
Map<String, Object> item = new HashMap<>();
item.put("id", response.getId());
item.put("status", response.status().toString());
item.put("failure", response.getFailureMessage());
if (response.isFailed()){
IndexRequest ireq = (IndexRequest) bulkRequest.requests().get(i);
log.error("Failed while indexing to " + response.getIndex() + "type " +response.getType() + " " +
"request: [" + ireq + "]: [" + response.getFailureMessage() + "]");
item.put("isSuccess", "FAIL");
}
log.info("-------bulk---batchInsert------->{}", item);
insertResults.add(item);
}
} catch (IOException e) {
log.error(e.getMessage());
}
result.put("insertResults", insertResults);
return result;
}
一般是从文件中读取记录,此处我就不使用请求中展示,就在代码中模拟插入的记录
响应结果以及执行的日志打印
查看es插入的记录
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/12292.html