我必须在ElasticSearch中存储一些与我的python程序集成的消息.现在我尝试存储消息的是:
d={"message":"this is message"} for index_nr in range(1,5): ElasticSearchAPI.addToIndex(index_nr, d) print d
这意味着如果我有10条消息,那么我必须重复我的代码10次.所以我想做的是尝试制作脚本文件或批处理文件.我已经检查了ElasticSearch指南,可以使用BULK API.格式应如下所示:
{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } } { "field1" : "value1" } { "delete" : { "_index" : "test", "_type" : "type1", "_id" : "2" } } { "create" : { "_index" : "test", "_type" : "type1", "_id" : "3" } } { "field1" : "value3" } { "update" : {"_id" : "1", "_type" : "type1", "_index" : "index1"} } { "doc" : {"field2" : "value2"} }
我做的是:
{"index":{"_index":"test1","_type":"message","_id":"1"}} {"message":"it is red"} {"index":{"_index":"test2","_type":"message","_id":"2"}} {"message":"it is green"}
我还使用curl工具来存储doc.
$ curl -s -XPOST localhost:9200/_bulk --data-binary @message.json
现在我想使用我的Python代码将文件存储到Elastic Search.
(此线程中提到的其他方法使用python列表进行ES更新,这在今天不是一个好的解决方案,特别是当您需要向ES添加数百万个数据时)
更好的方法是使用python生成器 - 处理数据演出,而不会出现内存不足或速度受损的情况.
下面是一个实际用例的示例片段 - 将数据从nginx日志文件添加到ES进行分析.
def decode_nginx_log(_nginx_fd): for each_line in _nginx_fd: # Filter out the below from each log line remote_addr = ... timestamp = ... ... # Index for elasticsearch. Typically timestamp. idx = ... es_fields_keys = ('remote_addr', 'timestamp', 'url', 'status') es_fields_vals = (remote_addr, timestamp, url, status) # We return a dict holding values from each line es_nginx_d = dict(zip(es_fields_keys, es_fields_vals)) # Return the row on each iteration yield idx, es_nginx_d # <- Note the usage of 'yield' def es_add_bulk(nginx_file): # The nginx file can be gzip or just text. Open it appropriately. ... es = Elasticsearch(hosts = [{'host': 'localhost', 'port': 9200}]) # NOTE the (...) round brackets. This is for a generator. k = ({ "_index": "nginx", "_type" : "logs", "_id" : idx, "_source": es_nginx_d, } for idx, es_nginx_d in decode_nginx_log(_nginx_fd)) helpers.bulk(es, k) # Now, just run it. es_add_bulk('./nginx.1.log.gz')
这个骨架演示了生成器的用法.如果需要,您甚至可以在裸机上使用它.您可以继续扩展以快速定制您的需求.
Python Elasticsearch 在这里引用.
from datetime import datetime from elasticsearch import Elasticsearch from elasticsearch import helpers es = Elasticsearch() actions = [ { "_index": "tickets-index", "_type": "tickets", "_id": j, "_source": { "any":"data" + str(j), "timestamp": datetime.now()} } for j in range(0, 10) ] helpers.bulk(es, actions)
虽然@justinachen的代码帮助我从py-elasticsearch开始,但在查看源代码之后让我做一个简单的改进:
es = Elasticsearch() j = 0 actions = [] while (j <= 10): action = { "_index": "tickets-index", "_type": "tickets", "_id": j, "_source": { "any":"data" + str(j), "timestamp": datetime.now() } } actions.append(action) j += 1 helpers.bulk(es, actions)
helpers.bulk()
已经为你做了分割.通过分段,我的意思是每次发送到服务器的chucks.如果要减少发送文件的块数,请执行以下操作:helpers.bulk(es, actions, chunk_size=100)
一些方便的信息开始:
helpers.bulk()
只是一个包装,helpers.streaming_bulk
但第一个接受一个使其方便的列表.
helpers.streaming_bulk
已基于此,Elasticsearch.bulk()
所以你不必担心选择什么.
所以在大多数情况下,helpers.bulk()应该就是你所需要的.