在elasticsearch中滚动时,重要的是在每个滚动中提供最新的内容scroll_id
:
初始搜索请求和每个后续滚动请求将返回一个新的scroll_id?—仅应使用最新的scroll_id。
以下示例(取自此处)使我感到困惑。首先,滚动初始化:
rs = es.search(index=['tweets-2014-04-12','tweets-2014-04-13'],
scroll='10s',
search_type='scan',
size=100,
preference='_primary_first',
body={
"fields" : ["created_at", "entities.urls.expanded_url", "user.id_str"],
"query" : {
"wildcard" : { "entities.urls.expanded_url" : "*.ru" }
}
}
)
sid = rs['_scroll_id']
然后循环:
tweets = [] while (1): try: rs = es.scroll(scroll_id=sid, scroll='10s') tweets += rs['hits']['hits'] except: break
它可以工作,但是我看不到sid
更新的地方。我相信它是在python客户端内部发生的。但我不明白它是如何工作的...
使用python请求
import requests import json elastic_url = 'http://localhost:9200/my_index/_search?scroll=1m' scroll_api_url = 'http://localhost:9200/_search/scroll' headers = {'Content-Type': 'application/json'} payload = { "size": 100, "sort": ["_doc"] "query": { "match" : { "title" : "elasticsearch" } } } r1 = requests.request( "POST", elastic_url, data=json.dumps(payload), headers=headers ) # first batch data try: res_json = r1.json() data = res_json['hits']['hits'] _scroll_id = res_json['_scroll_id'] except KeyError: data = [] _scroll_id = None print 'Error: Elastic Search: %s' % str(r1.json()) while data: print data # scroll to get next batch data scroll_payload = json.dumps({ 'scroll': '1m', 'scroll_id': _scroll_id }) scroll_res = requests.request( "POST", scroll_api_url, data=scroll_payload, headers=headers ) try: res_json = scroll_res.json() data = res_json['hits']['hits'] _scroll_id = res_json['_scroll_id'] except KeyError: data = [] _scroll_id = None err_msg = 'Error: Elastic Search Scroll: %s' print err_msg % str(scroll_res.json())
参考:https://www.elastic.co/guide/zh-CN/elasticsearch/reference/current/search-request-scroll.html#search-request-scroll