如何使用Bulk API通过Python将关键字存储在ES中

  发布于 2023-02-13 19:02

我必须在ElasticSearch中存储一些与我的python程序集成的消息.现在我尝试存储消息的是:

d={"message":"this is message"}
    for index_nr in range(1,5):
        ElasticSearchAPI.addToIndex(index_nr, d)
        print d

这意味着如果我有10条消息,那么我必须重复我的代码10次.所以我想做的是尝试制作脚本文件或批处理文件.我已经检查了ElasticSearch指南,可以使用BULK API.格式应如下所示:

{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_type" : "type1", "_id" : "2" } }
{ "create" : { "_index" : "test", "_type" : "type1", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_type" : "type1", "_index" : "index1"} }
{ "doc" : {"field2" : "value2"} }

我做的是:

{"index":{"_index":"test1","_type":"message","_id":"1"}}
{"message":"it is red"}
{"index":{"_index":"test2","_type":"message","_id":"2"}}
{"message":"it is green"}

我还使用curl工具来存储doc.

$ curl -s -XPOST localhost:9200/_bulk --data-binary @message.json

现在我想使用我的Python代码将文件存储到Elastic Search.

3 个回答
  • (此线程中提到的其他方法使用python列表进行ES更新,这在今天不是一个好的解决方案,特别是当您需要向ES添加数百万个数据时)

    更好的方法是使用python生成器 - 处理数据演出,而不会出现内存不足或速度受损的情况.

    下面是一个实际用例的示例片段 - 将数据从nginx日志文件添加到ES进行分析.

    def decode_nginx_log(_nginx_fd):
        for each_line in _nginx_fd:
            # Filter out the below from each log line
            remote_addr = ...
            timestamp   = ...
            ...
    
            # Index for elasticsearch. Typically timestamp.
            idx = ...
    
            es_fields_keys = ('remote_addr', 'timestamp', 'url', 'status')
            es_fields_vals = (remote_addr, timestamp, url, status)
    
            # We return a dict holding values from each line
            es_nginx_d = dict(zip(es_fields_keys, es_fields_vals))
    
            # Return the row on each iteration
            yield idx, es_nginx_d   # <- Note the usage of 'yield'
    
    def es_add_bulk(nginx_file):
        # The nginx file can be gzip or just text. Open it appropriately.
        ...
    
        es = Elasticsearch(hosts = [{'host': 'localhost', 'port': 9200}])
    
        # NOTE the (...) round brackets. This is for a generator.
        k = ({
                "_index": "nginx",
                "_type" : "logs",
                "_id"   : idx,
                "_source": es_nginx_d,
             } for idx, es_nginx_d in decode_nginx_log(_nginx_fd))
    
        helpers.bulk(es, k)
    
    # Now, just run it.
    es_add_bulk('./nginx.1.log.gz')
    

    这个骨架演示了生成器的用法.如果需要,您甚至可以在裸机上使用它.您可以继续扩展以快速定制您的需求.

    Python Elasticsearch 在这里引用.

    2023-02-13 19:05 回答
  • from datetime import datetime
    
    from elasticsearch import Elasticsearch
    from elasticsearch import helpers
    
    es = Elasticsearch()
    
    actions = [
      {
        "_index": "tickets-index",
        "_type": "tickets",
        "_id": j,
        "_source": {
            "any":"data" + str(j),
            "timestamp": datetime.now()}
      }
      for j in range(0, 10)
    ]
    
    helpers.bulk(es, actions)
    

    2023-02-13 19:05 回答
  • 虽然@justinachen的代码帮助我从py-elasticsearch开始,但在查看源代码之后让我做一个简单的改进:

    es = Elasticsearch()
    j = 0
    actions = []
    while (j <= 10):
        action = {
            "_index": "tickets-index",
            "_type": "tickets",
            "_id": j,
            "_source": {
                "any":"data" + str(j),
                "timestamp": datetime.now()
                }
            }
        actions.append(action)
        j += 1
    
    helpers.bulk(es, actions)
    

    helpers.bulk()已经为你做了分割.通过分段,我的意思是每次发送到服务器的chucks.如果要减少发送文件的块数,请执行以下操作:helpers.bulk(es, actions, chunk_size=100)

    一些方便的信息开始:

    helpers.bulk()只是一个包装,helpers.streaming_bulk但第一个接受一个使其方便的列表.

    helpers.streaming_bulk已基于此,Elasticsearch.bulk()所以你不必担心选择什么.

    所以在大多数情况下,helpers.bulk()应该就是你所需要的.

    2023-02-13 19:05 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有