我看到以下API将在Elasticsearch中通过查询进行删除 - http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/docs-delete-by-query.html
但我想对弹性搜索批量API做同样的事情,即使我可以使用批量上传文档
es.bulk(body=json_batch)
我不知道如何使用python批量API进行弹性搜索来调用查询删除.
该elasticsearch-py
散装API允许你通过包括批量删除记录,'_op_type': 'delete'
每个记录。但是,如果要按查询删除,则仍然需要进行两个查询:一个查询要删除的记录,另一个查询要删除它们。
批量执行此操作的最简单方法是使用python模块的scan()
帮助程序,该帮助程序包装了ElasticSearch Scroll API,因此您不必跟踪_scroll_id
s。与bulk()
帮助程序一起使用,以代替已弃用的delete_by_query()
:
from elasticsearch.helpers import bulk, scan bulk_deletes = [] for result in scan(es, query=es_query_body, # same as the search() body parameter index=ES_INDEX, doc_type=ES_DOC, _source=False, track_scores=False, scroll='5m'): result['_op_type'] = 'delete' bulk_deletes.append(result) bulk(elasticsearch, bulk_deletes)
由于_source=False
已传递,因此不返回文档正文,因此每个结果都非常小。但是,如果您有内存限制,则可以很轻松地进行批处理:
BATCH_SIZE = 100000 i = 0 bulk_deletes = [] for result in scan(...): if i == BATCH_SIZE: bulk(elasticsearch, bulk_deletes) bulk_deletes = [] i = 0 result['_op_type'] = 'delete' bulk_deletes.append(result) i += 1 bulk(elasticsearch, bulk_deletes)
看看elasticsearch如何通过查询API弃用删除.我使用绑定创建了这个python脚本来做同样的事情.首先定义ES连接:
import elasticsearch es = elasticsearch.Elasticsearch(['localhost'])
现在,您可以使用它来为要删除的结果创建查询.
search=es.search( q='The Query to ES.', index="*logstash-*", size=10, search_type="scan", scroll='5m', )
现在,您可以循环滚动该查询.在我们这样做时生成我们的请求.
while True: try: # Git the next page of results. scroll=es.scroll( scroll_id=search['_scroll_id'], scroll='5m', ) # Since scroll throws an error catch it and break the loop. except elasticsearch.exceptions.NotFoundError: break # We have results initialize the bulk variable. bulk = "" for result in scroll['hits']['hits']: bulk = bulk + '{ "delete" : { "_index" : "' + str(result['_index']) + '", "_type" : "' + str(result['_type']) + '", "_id" : "' + str(result['_id']) + '" } }\n' # Finally do the deleting. es.bulk( body=bulk )
要使用批量api,您需要确保两件事:
文档已标识您要更新.(索引,类型,id)
每个请求都以换行符或/ n终止.