一、ES Python脚本查询操作 http方式
#!coding:utf-8
import json
import logging
import time
import requests
PAGE_RESULT_SCROLL_ID = 'scroll_id'
PAGE_RESULT_SCROLL_SIZE = 'scroll_size'
PAGE_RESULT_TOTAL_SIZE = 'total_size'
PAGE_RESULT_HITS = 'hits'
PAGE_RESULT_DATA = 'data'
PAGE_RESULT_CONVERT_DATA = 'convert_data'
CONVERT_DEST_KEY = 'dest_key'
CONVERT_DEFAULT_VALUE = 'default_value'
current_time = time.strftime("%Y-%m-%d-%H-%M", time.localtime(time.time()))
# 日志设置
log_file = "operate_es_" + current_time + ".log"
logging.FileHandler(filename=log_file, encoding='utf-8')
logging.basicConfig(filename=log_file, level=logging.INFO)
# 创建索引
def create_index(es_url, index_name, es_mapping):
logging.info("es_url:%s index_name:%s " % (es_url, index_name))
es_index_url = es_url + index_name
logging.info("es_index_url: %s" % (es_index_url))
r = requests.put(es_index_url, json.dumps(es_mapping), headers={"content-type": "application/json"})
response_json = json.loads(r.text)
if 200 == r.status_code:
if response_json['acknowledged']:
logging.info("index_name: %s 创建成功" % (index_name))
else:
logging.info("index_name: %s 创建失败" % (index_name))
logging.error("错误信息: %s" % r.text.encode(encoding="utf-8"))
return -1
logging.info("index create done!")
return 0
# 批量创建索引
def batch_create_index_with_mapping(es_url, es_index_prefix, es_mapping, batch_num=1, start_index=0):
logging.info("es_url:" + es_url)
new_index_array = []
for i in range(start_index, batch_num):
suffix = "%03d" % i
index_name = es_index_prefix + suffix
es_index_url = es_url + index_name
logging.info("es_index_url: %s" % (es_index_url))
r = requests.put(es_index_url, json.dumps(es_mapping), headers={"content-type": "application/json"})
response_json = json.loads(r.text)
if 200 == r.status_code:
if response_json['acknowledged']:
logging.info("index_name: %s 创建成功" % (index_name))
new_index_array.append(index_name)
else:
logging.info("index_name: %s 创建失败" % (index_name))
logging.error("错误信息: %s" % r.text.encode(encoding="utf-8"))
break
logging.info("index create done!")
logging.info("new_index_array:%s" % (new_index_array))
# 删除索引
def delete_index(es_url, delete_index):
delete_es_url = es_url + delete_index
logging.info("es_url:%s delete_index:%s start..." % (delete_es_url, delete_index))
r = requests.delete(delete_es_url, headers={"content-type": "application/json"})
if 200 == r.status_code:
response_json = json.loads(r.text)
logging.info("delete数据返回响应: %s " % r.text.encode(encoding="utf-8"))
else:
logging.info("es_url:%s delete_index: %s 删除失败" % (es_url, delete_index))
logging.error("错误信息: %s" % r.text.encode(encoding="utf-8"))
return -1
return 0
# 根据前缀删除索引
def del_index_by_prefix(es_url, index_prefix):
delete_es_url = es_url + index_prefix + "*"
logging.info("es_url:%s start..." % (delete_es_url))
r = requests.delete(delete_es_url, headers={"content-type": "application/json"})
if 200 == r.status_code:
response_json = json.loads(r.text)
logging.info("delete数据返回响应: %s " % r.text.encode(encoding="utf-8"))
else:
logging.info("delete_es_url:%s 删除失败" % (delete_es_url))
logging.error("错误信息: %s" % r.text.encode(encoding="utf-8"))
return -1
return 0
# 批量前缀+分桶删除索引
def batch_es_del_index(es_url, index_prefix, bucket_num, start_index=0):
logging.info("es_url:" + es_url)
delete_index_array = []
for i in range(start_index, bucket_num):
suffix = "%03d" % i
delete_index = index_prefix + suffix
ret = es_delete_index(es_url, delete_index)
if 0 == ret:
delete_index_array.append(delete_index)
else:
logging.info("delete_index:%s 失败" % (delete_index))
break
logging.info("batch_es_del_index done!")
logging.info("delete_index_array:%s" % (delete_index_array))
# 更新索引mapping
def add_properties_to_index(es_url, index_name, doc_type, es_mapping):
logging.info("es_url:" + es_url)
es_index_url = es_url + index_name + '/' + doc_type + '/_mapping'
logging.info("es_index_url: %s" % (es_index_url))
r = requests.post(es_index_url, json.dumps(es_mapping), headers={"content-type": "application/json"})
response_json = json.loads(r.text)
if 200 == r.status_code:
if response_json['acknowledged']:
logging.info("index_name: %s 更新成功" % (index_name))
else:
logging.info("index_name: %s 更新失败" % (index_name))
logging.error("错误信息: %s" % r.text.encode(encoding="utf-8"))
return -1
logging.info("index modify done!")
return 0
# 批量更新索引mapping
def batch_add_properties_to_index(es_url, index_prefix, doc_type, es_mapping, bucket_num, start_index=0):
logging.info("es_url:" + es_url)
new_index_array = []
for i in range(start_index, bucket_num):
suffix = "%03d" % i
index_name = index_prefix + suffix
es_index_url = es_url + index_name + '/' + doc_type + '/_mapping'
logging.info("es_index_url: %s" % (es_index_url))
r = requests.post(es_index_url, json.dumps(es_mapping), headers={"content-type": "application/json"})
response_json = json.loads(r.text)
if 200 == r.status_code:
if response_json['acknowledged']:
logging.info("index_name: %s 更新成功" % (index_name))
new_index_array.append(index_name)
else:
logging.info("index_name: %s 更新失败" % (index_name))
logging.error("错误信息: %s" % r.text.encode(encoding="utf-8"))
logging.info("index modify done new_index_array:%s" % (new_index_array))
return -1
logging.info("index modify done!")
logging.info("new_index_array:%s" % (new_index_array))
return 0
# 备份索引内容(结构+数据)
def es_reindex_with_routing(source_index, bak_index, query, routing_filed):
# 备份数据
url = es_url + "_reindex"
data = {
"source": {
"index": source_index,
"query": query
},
"dest": {
"index": bak_index
},
"script": {
"inline": "ctx._routing = ctx._source." + routing_filed,
"lang": "painless"
}
}
logging.info("source_index:%s to bak_index: %s start..." % (source_index, bak_index))
r = requests.post(url, json.dumps(data), headers={"content-type": "application/json"})
if 200 == r.status_code:
response_json = json.loads(r.text)
logging.info("备份数据返回响应: %s " % r.text.encode(encoding="utf-8"))
logging.info("source_index:%s to bak_index: %s 复制成功" % (source_index, bak_index))
logging.info("备份索引: %s , 备份数据量: %d" % (bak_index, response_json['total']))
else:
logging.info("source_index:%s bak_index: %s 备份失败" % (source_index, bak_index))
logging.error("错误信息: %s" % r.text.encode(encoding="utf-8"))
return -1
return 0
# 备份索引
def es_reindex(source_index, bak_index, query):
# 备份数据
url = es_url + "_reindex"
data = {
"source": {
"index": source_index,
"query": query
},
"dest": {
"index": bak_index
}
}
logging.info("source_index:%s to bak_index: %s start..." % (source_index, bak_index))
r = requests.post(url, json.dumps(data), headers={"content-type": "application/json"})
if 200 == r.status_code:
response_json = json.loads(r.text)
logging.info("备份数据返回响应: %s " % r.text.encode(encoding="utf-8"))
logging.info("source_index:%s to bak_index: %s 复制成功" % (source_index, bak_index))
logging.info("备份索引: %s , 备份数据量: %d" % (bak_index, response_json['total']))
else:
logging.info("source_index:%s bak_index: %s 备份失败" % (source_index, bak_index))
logging.error("错误信息: %s" % r.text.encode(encoding="utf-8"))
return -1
return 0
# 批量备份
def batch_es_reindex(es_url, source_index_prefix, bak_index_prefix, queryParam, bucket_num, start_index=0):
'''
:param es_url:
:param source_index_prefix:
:param bak_index_prefix:
:param queryParam:
queryParam = {
"query": {
"bool": {
"must": [
{
"match_all": {}
}
],
"must_not": [],
"should": []
}
}
}
:param bucket_num:
:param start_index:
:return:
'''
logging.info("es_url:" + es_url)
new_index_array = []
for i in range(start_index, bucket_num):
suffix = "%03d" % i
source_index = source_index_prefix + suffix
bak_index = bak_index_prefix + suffix
ret = es_reindex(source_index, bak_index, queryParam["query"])
if 0 == ret:
new_index_array.append(bak_index)
else:
logging.info("source_index:%s to bak_index: %s 复制失败" % (source_index, bak_index))
logging.info("do new_index_array:%s over" % (new_index_array))
return -1
logging.info("batch_es_reindex done!")
logging.info("new_index_array:%s" % (new_index_array))
return 0
# 批量备份加路由指定
def batch_es_reindex_with_routing(es_url, source_index_prefix, bak_index_prefix, queryParam, routing_filed, bucket_num,
start_index=0):
'''
:param es_url:
:param source_index_prefix:
:param bak_index_prefix:
:param queryParam:
queryParam = {
"query": {
"bool": {
"must": [
{
"match_all": {}
}
],
"must_not": [],
"should": []
}
}
}
:param bucket_num:
:param routing_filed:
:param start_index:
:return:
'''
logging.info("es_url:" + es_url)
new_index_array = []
for i in range(start_index, bucket_num):
suffix = "%03d" % i
source_index = source_index_prefix + suffix
bak_index = bak_index_prefix + suffix
ret = es_reindex_with_routing(source_index, bak_index, queryParam["query"], routing_filed)
if 0 == ret:
new_index_array.append(bak_index)
else:
logging.info("source_index:%s to bak_index: %s 复制失败" % (source_index, bak_index))
logging.info("do new_index_array:%s over" % (new_index_array))
return -1
logging.info("batch_es_reindex done!")
logging.info("new_index_array:%s" % (new_index_array))
return 0
# 根据业务规则创建索引
def create_index_by_business_code_rel_type_dict(es_url, es_index_prefix, es_mapping,
business_codes_rel_type_dict,
do_reindex=False,
source_index='',
routing_field=''):
'''
:param es_url:
:param es_index_prefix:
:param es_mapping:
:param business_codes_rel_type_dict:
business_codes_rel_type_dict = {"003": ["fk_tl"],"004": ["bq_sb"],"005": [ "jd_zh", "jd_zz"]}
:param do_reindex:
:param source_index:
:param routing_field:
:return:
'''
logging.info("es_url:" + es_url)
new_index_array = []
business_codes = business_codes_rel_type_dict.keys()
for business_code in business_codes:
relation_types = business_codes_rel_type_dict.get(business_code)
for relation_type in relation_types:
index_name = es_index_prefix + business_code + "_" + relation_type
es_index_url = es_url + index_name
logging.info("es_index_url: %s" % (es_index_url))
r = requests.put(es_index_url, json.dumps(es_mapping), headers={"content-type": "application/json"})
response_json = json.loads(r.text)
if 200 == r.status_code:
if response_json['acknowledged']:
logging.info("index_name: %s 创建成功" % (index_name))
new_index_array.append(index_name)
if do_reindex:
result = es_reindex_by_rel_type(source_index, index_name, relation_type, routing_field)
if 0 != result:
logging.info("do new_index_array:%s over" % (new_index_array))
return -1
else:
logging.info("index_name: %s 创建失败" % (index_name))
logging.error("错误信息: %s" % r.text.encode(encoding="utf-8"))
logging.info("do new_index_array:%s over" % (new_index_array))
return -1
logging.info("index create done!")
logging.info("new_index_array:%s" % (new_index_array))
return 0
# 备份索引内容(结构+数据)
def es_reindex_by_rel_type(source_index, bak_index, rel_type, routing_field):
# 备份数据
url = es_url + "_reindex"
data = {
"source": {
"index": source_index,
"query": {
"term": {
"rel_type": rel_type.swapcase()
}
}
},
"dest": {
"index": bak_index
},
"script": {
"inline": "ctx._routing = ctx._source." + routing_field,
"lang": "painless"
}
}
r = requests.post(url, json.dumps(data), headers={"content-type": "application/json"})
if 200 == r.status_code:
response_json = json.loads(r.text)
logging.info("备份数据返回响应: %s " % r.text.encode(encoding="utf-8"))
logging.info("备份索引: %s , 备份数据量: %d" % (bak_index, response_json['total']))
else:
logging.info("source_index:%s bak_index: %s 备份失败" % (source_index, bak_index))
logging.error("错误信息: %s" % r.text.encode(encoding="utf-8"))
return -1
return 0
# 分页查询
def get_es_data_by_scroll(es_url, index, query={}, scroll_id=None, scroll="5m", batch_size=10000):
data = []
try:
while True:
if not scroll_id:
# 每次取的数据量
query["size"] = batch_size
curl_url = es_url + index + '/_search?scroll=' + scroll
logging.info("curl_url:%s" % (curl_url))
response = requests.post(curl_url, json.dumps(query), headers={'content-type': 'application/json'})
else:
curl_url = es_url + '_search/scroll?scroll=' + scroll + '&scroll_id=' + scroll_id
logging.info("curl_url:%s" % (curl_url))
response = requests.get(curl_url)
# 结果返回处理
if response:
if 200 == response.status_code:
response_json = json.loads(response.text)
scroll_id = response_json['_scroll_id']
# Update the scroll ID
if scroll_id is None:
break
# Get the number of results that we returned in the last scroll
if not response_json['hits']['hits']:
break
response_data = [doc["_source"] for doc in response_json['hits']['hits']]
data.extend(response_data)
else:
logging.info("curl_url:%s 查询失败" % (curl_url))
logging.error("错误信息: %s" % response.text.encode(encoding="utf-8"))
return None
else:
logging.info("curl_url:%s 查询失败" % (curl_url))
return None
logging.info("get data size:%s" % (len(data)))
return data
except Exception as e:
logging.error(e)
logging.error("query fail!")
print("exception!")
return None
# 查询
def query(es_url, index, query, batch_size=1000, scroll="3m"):
try:
curl_url = es_url + index_name + '/_search?scroll=' + scroll
# 每次取的数据量
query["size"] = batch_size
response = requests.post(curl_url, json.dumps(query), headers={'content-type': 'application/json'})
if response:
response_json = json.loads(response.text)
return response_json
except Exception as e:
logging.error(e)
logging.error("query fail!")
print("exception!")
# 分页查询
def query_by_scroll(es_url, index, doc_type=None, query=None, scroll='5m', batch_size=1000):
'''
:param index:
:param doc_type:
:param query:
queryParam = {
"query": {
"bool": {
"must": [
{
"range": {
"import_es_time": {
"lt": "2019-07-31 00:00:00"
}
}
},
{
"term": {
"list_type": "01"
}
},
{
"term": {
"delete_status": "0"
}
}
],
"must_not": [],
"should": []
}
}
}
:param scroll:
:param batch_size:
:return:
'''
try:
logging.info("query: index:%s doc_type:%s scroll:%s batch_size:%s query:%s" % (
index, doc_type, scroll, batch_size, query))
# 每次取的数据量
query["size"] = batch_size
if doc_type:
curl_url = es_url + index + '/' + doc_type + '/_search?scroll=' + scroll
else:
curl_url = es_url + index + '/_search?scroll=' + scroll
response = requests.post(curl_url, json.dumps(query), headers={'content-type': 'application/json'})
if response:
if 200 == response.status_code:
response_json = json.loads(response.text)
return response_json
else:
logging.info("curl_url:%s query: %s 失败" % (curl_url, query))
logging.error("错误信息: %s" % response.text.encode(encoding="utf-8"))
return None
except Exception as e:
logging.error(e)
logging.error("query fail!")
print("query_by_scroll exception!")
return None
# 分页scroll_id查询
def query_by_scroll_id(es_url, index, scroll_id, scroll='5m'):
if scroll_id is None:
return
try:
curl_url = es_url + '_search/scroll?scroll=' + scroll + '&scroll_id=' + scroll_id
logging.info("curl_url:%s" % (curl_url))
response = requests.get(curl_url)
# 结果返回处理
if response:
if 200 == response.status_code:
response_json = json.loads(response.text)
return response_json
else:
logging.info("curl_url:%s 查询失败" % (curl_url))
logging.error("错误信息: %s" % response.text.encode(encoding="utf-8"))
return None
else:
logging.info("curl_url:%s 查询失败" % (curl_url))
return None
except Exception as e:
logging.error(e)
logging.error("query fail! scroll_id:%s" % (scroll_id))
print("query_by_scroll_id exception!")
return None
# 分页获取解析数据
def get_and_parse_query_scroll_data(es_url, index, doc_type=None, query=None, scroll='5m', batch_size=1000,
convert_dict={}, add_date_time=False):
page = query_by_scroll(es_url, index, doc_type=doc_type, query=query, scroll=scroll)
return convert_es_page_data(page, convert_dict, add_date_time)
# 解析返回数据
def parse_es_page_data(page):
result_data = {}
if not page or not page['_scroll_id']:
logging.warning("query_by_scroll return none")
print("query_by_scroll return none")
return result_data
if page['_scroll_id']:
scroll_id = page['_scroll_id']
result_data[PAGE_RESULT_SCROLL_ID] = scroll_id
print("Scrolling scroll_id:%s" % (scroll_id))
if page['hits']:
total_size = page['hits']['total']
print("Scrolling total_size:%s" % (total_size))
result_data[PAGE_RESULT_TOTAL_SIZE] = total_size
hits = page['hits']['hits']
scroll_size = len(hits)
result_data[PAGE_RESULT_SCROLL_SIZE] = scroll_size
result_data[PAGE_RESULT_HITS] = hits
return result_data
# 根据业务需要转换数据
def convert_es_page_data(page, convert_dict={}, add_date_time=False):
'''
:param page:
:param convert_dict:
convert_dict 示例
{"key1": {"dest_key": ["key1","key2"], "default_value":""}}
:param add_date_time:
:return:
'''
result_data = parse_es_page_data(page)
if result_data and result_data['hits']:
result = result_data['hits']
# parse data
convert_data = []
for item in result:
if item['_source']:
source_data = item['_source']
convert_result = {}
keys = convert_dict.keys()
for source_key in keys:
dest_dict = convert_dict.get(source_key, [])
dst_keys = dest_dict.get(CONVERT_DEST_KEY, [])
default_value = dest_dict.get(CONVERT_DEFAULT_VALUE, '')
for dst_key in dst_keys:
convert_result[dst_key] = source_data.get(source_key, default_value)
if add_date_time:
date_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
convert_result["date_time"] = date_time
convert_str = json.dumps(convert_result, ensure_ascii=False)
convert_data.append(convert_str.encode('utf-8'))
result_data[PAGE_RESULT_CONVERT_DATA] = convert_data
return result_data
def main():
# ES服务器地址
# ##开发环境
es_url = "http://192.168.3.63:9200/"
# ##测试环境
# es_url = "http://192.168.3.206:9200/"
# ##预发环境
# es_url = "http://100.1.1.1:9200/"
# ##线上环境
# es_url = "http://10.1.1.1:9200/"
BUCKET_MUM = 2
INDEX_NAME = 'zyc_test'
DOC_TYPE = 'relation'
BAK_INDEX_NAME = 'backup' + INDEX_NAME
INDEX_PREFIX = INDEX_NAME + '_'
BAK_INDEX_PREFIX = BAK_INDEX_NAME + '_'
ES_MAPPING = {
"settings": {
"index": {
"search": {
"slowlog": {
"threshold": {
"fetch": {
"warn": "100ms"
},
"query": {
"warn": "100ms"
}
}
}
},
"refresh_interval": "1s",
"indexing": {
"slowlog": {
"threshold": {
"index": {
"warn": "1s"
}
}
}
},
"number_of_shards": "6",
"translog": {
"flush_threshold_size": "1gb",
"sync_interval": "120s",
"durability": "async"
}
}
},
"aliases": {
"vii_relation": {
}
},
"mappings": {
"relation": {
"dynamic_date_formats": [
"yyyy-MM-dd HH:mm:ss",
"yyyy-MM-dd"
],
"dynamic_templates": [
{
"date_template": {
"match_pattern": "regex",
"mapping": {
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis",
"type": "date"
},
"match_mapping_type": "string",
"match": "(.*_date|.*_timestamp|.*_ts|.*_time)"
}
},
{
"keyword_template": {
"mapping": {
"type": "keyword"
},
"match_mapping_type": "string"
}
}
],
"_all": {
"enabled": "false"
},
"properties": {
"pk": {
"type": "keyword"
},
"in_obj_type": {
"type": "keyword"
},
"in_obj_value": {
"type": "keyword"
},
"in_obj_type_value": {
"type": "keyword"
},
"in_obj_tag": {
"type": "keyword"
},
"in_obj_tag_desc": {
"type": "keyword"
},
"out_obj_type": {
"type": "keyword"
},
"out_obj_value": {
"type": "keyword"
},
"out_obj_type_value": {
"type": "keyword"
},
"out_obj_tag": {
"type": "keyword"
},
"out_obj_tag_desc": {
"type": "keyword"
},
"start_time": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
},
"end_time": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
},
"rel_type": {
"type": "keyword"
},
"rel_detail": {
"type": "keyword",
"index": "false"
},
"count": {
"type": "long"
},
"similarity": {
"type": "double"
},
"tag_codes": {
"type": "keyword"
},
"delete_status": {
"type": "integer"
},
"tenant_code": {
"type": "keyword"
},
"business_code": {
"type": "keyword"
},
"import_es_time": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
}
}
}
}
}
ADD_ES_MAPPING = {
"properties": {
"delete_status": {
"type": "integer"
},
"start_time": {
"format": "yyyy-MM-dd HH:mm:ss",
"type": "date"
},
"end_time": {
"format": "yyyy-MM-dd HH:mm:ss",
"type": "date"
}
}
}
queryParam = {
"query": {
"bool": {
"must": [
],
"must_not": [],
"should": []
}
}
}
QUERY_INDEX_NAME = 'cysaas_object_basic_info'
QUERY_DOC_TYPE = 'basic'
logging.info("begin...")
create_index(es_url, INDEX_NAME, ES_MAPPING)
time.sleep(5)
delete_index(es_url, INDEX_NAME)
time.sleep(5)
batch_create_index_with_mapping(es_url, INDEX_PREFIX, ES_MAPPING)
time.sleep(5)
add_properties_to_index(es_url, INDEX_NAME, DOC_TYPE, ADD_ES_MAPPING)
time.sleep(5)
batch_add_properties_to_index(es_url, INDEX_PREFIX, DOC_TYPE, ADD_ES_MAPPING, 2)
time.sleep(5)
result = query_by_scroll(es_url, index=QUERY_INDEX_NAME, query=queryParam)
convert_dict = {"obj_value": {"dest_key": ["obj_value", "value"], "default_value": ""}}
result_data = get_and_parse_query_scroll_data(es_url, index=QUERY_INDEX_NAME, query=queryParam,
convert_dict=convert_dict)
get_data = get_es_data_by_scroll(es_url, index=QUERY_INDEX_NAME, query=queryParam)
del_index_by_prefix(es_url, INDEX_NAME)
time.sleep(5)
batch_es_del_index(es_url, INDEX_PREFIX, 2)
logging.info("done")
print("done")
if __name__ == '__main__':
main()
二、ES Python脚本查询操作 client方式
#!coding:utf-8
import json
import logging
import time
from elasticsearch import Elasticsearch, helpers
PAGE_RESULT_SCROLL_ID = 'scroll_id'
PAGE_RESULT_SCROLL_SIZE = 'scroll_size'
PAGE_RESULT_TOTAL_SIZE = 'total_size'
PAGE_RESULT_HITS = 'hits'
PAGE_RESULT_DATA = 'data'
PAGE_RESULT_CONVERT_DATA = 'convert_data'
CONVERT_DEST_KEY = 'dest_key'
CONVERT_DEFAULT_VALUE = 'default_value'
current_time = time.strftime("%Y-%m-%d-%H-%M", time.localtime(time.time()))
# 日志设置
log_file = "operate_es_client_" + current_time + ".log"
logging.FileHandler(filename=log_file, encoding='utf-8')
logging.basicConfig(filename=log_file, level=logging.INFO)
def query(es_client, index, query):
try:
return helpers.scan(es_client, index=index, scroll="3m", query=query)
except Exception as e:
logging.error(e)
logging.error("query fail!")
print("exception!")
def query_by_scroll(es_client, index, doc_type=None, query=None, scroll='5m', batch_size=1000, preserve_order=False,
**kwargs):
'''
:param index:
:param doc_type:
:param query:
queryParam = {
"query": {
"bool": {
"must": [
{
"range": {
"import_es_time": {
"lt": "2019-07-31 00:00:00"
}
}
},
{
"term": {
"list_type": "01"
}
},
{
"term": {
"delete_status": "0"
}
}
],
"must_not": [],
"should": []
}
}
}
:param scroll:
:param batch_size:
:param preserve_order:
:param kwargs:
:return:
'''
if not preserve_order: # 是否需要scan模式
kwargs['search_type'] = 'query_then_fetch'
try:
logging.info("query: index:%s doc_type:%s scroll:%s batch_size:%s query:%s" % (
index, doc_type, scroll, batch_size, query))
resp = es_client.search(index=index,
doc_type=doc_type,
scroll=scroll,
size=batch_size,
body=query,
**kwargs)
return resp
except Exception as e:
logging.error(e)
logging.error("query fail!")
print("exception!")
return None
def query_by_scroll_id(es_client, scroll_id, scroll='5m'):
if scroll_id is None:
return
try:
resp = es_client.scroll(scroll_id, scroll=scroll)
return resp
except Exception as e:
logging.error(e)
logging.error("query fail! scroll_id:%s" % (scroll_id))
print("exception!")
return None
def get_and_parse_query_scroll_data(es_client, index, doc_type=None, query=None, scroll='5m', batch_size=1000,
convert_dict={}, add_date_time=False):
page = query_by_scroll(es_client, index, doc_type=doc_type, query=query, scroll=scroll)
return convert_es_page_data(page, convert_dict, add_date_time)
def parse_es_page_data(page):
result_data = {}
if not page or not page['_scroll_id']:
logging.warning("query_by_scroll return none")
print("query_by_scroll return none")
return result_data
if page['_scroll_id']:
scroll_id = page['_scroll_id']
result_data[PAGE_RESULT_SCROLL_ID] = scroll_id
print("Scrolling scroll_id:%s" % (scroll_id))
if page['hits']:
total_size = page['hits']['total']
print("Scrolling total_size:%s" % (total_size))
result_data[PAGE_RESULT_TOTAL_SIZE] = total_size
hits = page['hits']['hits']
scroll_size = len(hits)
result_data[PAGE_RESULT_SCROLL_SIZE] = scroll_size
result_data[PAGE_RESULT_HITS] = hits
return result_data
def convert_es_page_data(page, convert_dict={}, add_date_time=False):
'''
:param page:
:param convert_dict:
convert_dict 示例
{"key1": {"dest_key": ["key1","key2"], "default_value":""}}
:param add_date_time:
:return:
'''
result_data = parse_es_page_data(page)
result = []
if result_data and result_data['hits']:
result = result_data['hits']
# parse data
convert_data = []
for item in result:
if item['_source']:
source_data = item['_source']
convert_result = {}
keys = convert_dict.keys()
for source_key in keys:
dest_dict = convert_dict.get(source_key, [])
dst_keys = dest_dict.get(CONVERT_DEST_KEY, [])
default_value = dest_dict.get(CONVERT_DEFAULT_VALUE, '')
for dst_key in dst_keys:
convert_result[dst_key] = source_data.get(source_key, default_value)
if add_date_time:
date_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
convert_result["date_time"] = date_time
convert_str = json.dumps(convert_result, ensure_ascii=False)
convert_data.append(convert_str.encode('utf-8'))
result_data[PAGE_RESULT_CONVERT_DATA] = convert_data
return result_data
def main():
# ES服务器地址
# ##开发环境
es_host = "192.168.3.63"
es_url = "http://192.168.3.63:9200/"
INDEX_NAME = 'cysaas_object_basic_info'
DOC_TYPE = 'basic'
queryParam = {
"query": {
"bool": {
"must": [],
"must_not": [],
"should": []
}
}
}
logging.info("begin...")
# es_client = Elasticsearch([{'host': es_host, 'port': '9200'}])
es_client = Elasticsearch([es_url], verify_certs=False)
result = query_by_scroll(es_client, index=INDEX_NAME, doc_type=DOC_TYPE, query=queryParam)
time.sleep(5)
result_data = get_and_parse_query_scroll_data(es_client, index=INDEX_NAME, doc_type=DOC_TYPE,
query=queryParam)
logging.info("done")
print("done")
if __name__ == '__main__':
main()
scan能避免scroll的排序性能消耗,from size分页查询模式会对数据集进行整体排序, 性能损耗是很大的. 如果我们关闭排序,那么可以消耗极少资源返回所有的文档。scan就是不去,而是仅仅从每个有结果的分片中返回数据.
下面是python elasticsearch helpers.scan的源码。对照elasticsearch scroll scan基本用法,很容易就能理解下面的代码。elasticsearch-py把高性能的功能都继承在了helpers模块里,比如helpers.scan helpers.reindex streaming_bulk helpers.bulk parallel_bulk .
elasticsearch.helpers.scan(client, query=None, scroll=u'5m', raise_on_error=True, preserve_order=False, **kwargs)
参数介绍:
client – elasticsearch的连接对象
query – elasticsearch dsl查询语句
scroll – 你想让scroll的结果集在server端标记多久
raise_on_error – raise的error class
preserve_order – 这里其实对应的是search_type,是否要求排序
file: helpers/__init__.py
#默认是5m分钟, 默认是search_type是scan扫描模式
def scan(client, query=None, scroll='5m', preserve_order=False, **kwargs):
if not preserve_order: #是否需要scan模式
kwargs['search_type'] = 'scan'
resp = client.search(body=query, scroll=scroll, **kwargs)
scroll_id = resp.get('_scroll_id') #第一次查询拿到_scroll_id token
if scroll_id is None:
return
first_run = True
while True:
#如果你server_type不是scan,那么第一次的结果里是包含数据的。
if preserve_order and first_run:
first_run = False
else:
resp = client.scroll(scroll_id, scroll=scroll)
if not resp['hits']['hits']:
break
for hit in resp['hits']['hits']:
yield hit #通过yield生成器来返回数据
scroll_id = resp.get('_scroll_id')
if scroll_id is None:
break
file: client/__init__.py
@query_params('scroll')
def scroll(self, scroll_id, params=None):
# 第二次scroll的数据请求是直接 /_search/scroll,方法用的是GET
_, data = self.transport.perform_request('GET', '/_search/scroll',params=params, body=scroll_id)
return data
对于elasticsearch scanscroll的使用方法, 大家注意一下异常情况.
data = scan(es,
query={"query": {"match": {"domain": "xiaorui.cc"}}},
index="xiaorui_index",
doc_type="blog"
)
for one in data:
print one
Elasticsearch博大精深… … 经过我的线下线上测试,使用scroll scan的性能还是不错的,返回的速度不错,明显比那种from size分页要快速,而且节省了elasticsearch的检索资源。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/14027.html