热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

使用Python将MongoDB数据同步到Elasticsearch

使用Python将MongoDB数据同步到Elasticsearch版本说明:Python3.7PyMongo:3.11.0Elasticsearch

使用Python将MongoDB数据同步到Elasticsearch

版本说明:Python 3.7 PyMongo:3.11.0 Elasticsearch:5.5.3
话不多说直接鲁码(如遇到什么问题欢迎留言讨论)

# coding:utf8
# 将mongodb中的数据同步到Es中
from pymongo import MongoClient
from elasticsearch import Elasticsearch, helpers
import json
import logging# mongodb 数据库地址
CONN_ADDR1 = '更换为自己的MongoDB地址'
USERNAME = 'MongoDB用户名'
PASSWORD = 'MongoDB密码'
DB = "MongoDB的库"
COLLECTION = "MongoDB集合"# Es 数据库地址
class ElasticObj:def __init__(self, index_name, index_type, ip):""":param index_name: 索引名称:param index_type: 索引类型"""self.index_name = index_nameself.index_type = index_type# 无用户名密码状态# self.es = Elasticsearch([ip])# 用户名密码状态# 连接ESself.es = Elasticsearch([ip], http_auth=('ES用户名', 'ES密码'), port=9200)# def chaxun(self):#查询所有数据# db = self.client['xcc_company_name']# collection = db['name_A']# data_qiyes = collection.find({}, no_cursor_timeout=True)# return data_qiyes# 创建索引def create_index(self):'''创建索引,创建索引名称为ott,类型为ott_type的索引:param ex: Elasticsearch对象:return:'''# 创建映射_index_mappings = {"mappings": {self.index_type: {"properties": {"name": {'type': 'text'},"password": {'type': 'text'},"birthplace": {'type': 'text'}}}}}if self.es.indices.exists(index=self.index_name) is not True:res = self.es.indices.create(index=self.index_name, body=_index_mappings, ignore=400)print(res)# 打印详细的日志def detailedlog(self):logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s',level=logging.DEBUG)logging.debug('debug 信息')logging.info('info 信息')logging.warning('warning 信息')logging.error('error 信息')logging.critical('critial 信息')# 插入数据def insert_data(self, p=0):ACTIONS = []p = 1bulk_num = 200# 连接mongo库client = MongoClient(CONN_ADDR1, port=27017)client.admin.authenticate(USERNAME, PASSWORD)mydb = client[DB]mycol = mydb[COLLECTION]# find()最大限制是101条 使用mycol.find({}, no_cursor_timeout=True) 查询所有Cursor = mycol.find({}, no_cursor_timeout=True)n = 0for mongoRecordRes in Cursor:data = []# for mongoRecordRes in list(mycol.find({}, no_cursor_timeout=True)):n = n + 1print("开始处理数据" + str(n) + "条")data_value = []data_name = ["corporateAnnualInfoId", "companyId", "nbYear", "annualCount", "basicInfo", "onlineStoreInfo","shareContributive", "frimAssetInfo", "socialInfo"]。。。(这里编写自己处理数据的逻辑)# print(data_value)# print(data_name)data_name_value = dict(zip(data_name, data_value))data.append(data_name_value)# print(data)# 遍历data数据for list_line in data:# 去掉引号# list_line = eval(list_line)# print("-" * 10)# print(list_line)# print("-" * 10)action = {"_index": self.index_name,"_type": self.index_type,"_id": list_line["corporateAnnualInfoId"], # _id 也可以默认生成,不赋值"_source": {"corporateAnnualInfoId": list_line["corporateAnnualInfoId"],"companyId": list_line["companyId"],"nbYear": list_line["nbYear"],"annualCount": list_line["annualCount"],"basicInfo": list_line["basicInfo"],"onlineStoreInfo": list_line["onlineStoreInfo"],"shareContributive": list_line["shareContributive"],"frimAssetInfo": list_line["frimAssetInfo"],"socialInfo": list_line["socialInfo"]}}p += 1ACTIONS.append(action)print(ACTIONS)# success, _ = helpers.bulk(self.es, action, index=self.index_name, raise_on_error=True)# 批量处理if len(ACTIONS) == bulk_num:print('插入', p / bulk_num, '批数据')print(len(ACTIONS))success, _ = helpers.bulk(self.es, ACTIONS, index=self.index_name, raise_on_error=True)del ACTIONS[0:len(ACTIONS)]print(success)if len(ACTIONS) > 0:success, _ = helpers.bulk(self.es, ACTIONS, index=self.index_name, raise_on_error=True)del ACTIONS[0:len(ACTIONS)]print('Performed %d actions' % success)if __name__ == '__main__':#这里IP更换为自己的obj = ElasticObj("IndexName", "IndexType",ip="es-cn-********.com")#创建索引 本人这里没有使用只是测试了一下#obj.create_index()obj.detailedlog()#执行数据的插入obj.insert_data()


推荐阅读
  • 关键词:Golang, Cookie, 跟踪位置, net/http/cookiejar, package main, golang.org/x/net/publicsuffix, io/ioutil, log, net/http, net/http/cookiejar ... [详细]
  • 开发笔记:加密&json&StringIO模块&BytesIO模块
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了加密&json&StringIO模块&BytesIO模块相关的知识,希望对你有一定的参考价值。一、加密加密 ... [详细]
  • sklearn数据集库中的常用数据集类型介绍
    本文介绍了sklearn数据集库中常用的数据集类型,包括玩具数据集和样本生成器。其中详细介绍了波士顿房价数据集,包含了波士顿506处房屋的13种不同特征以及房屋价格,适用于回归任务。 ... [详细]
  • 本文介绍了计算机网络的定义和通信流程,包括客户端编译文件、二进制转换、三层路由设备等。同时,还介绍了计算机网络中常用的关键词,如MAC地址和IP地址。 ... [详细]
  • 拥抱Android Design Support Library新变化(导航视图、悬浮ActionBar)
    转载请注明明桑AndroidAndroid5.0Loollipop作为Android最重要的版本之一,为我们带来了全新的界面风格和设计语言。看起来很受欢迎࿰ ... [详细]
  • Android JSON基础,音视频开发进阶指南目录
    Array里面的对象数据是有序的,json字符串最外层是方括号的,方括号:[]解析jsonArray代码try{json字符串最外层是 ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • 也就是|小窗_卷积的特征提取与参数计算
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了卷积的特征提取与参数计算相关的知识,希望对你有一定的参考价值。Dense和Conv2D根本区别在于,Den ... [详细]
  • Android开发实现的计时器功能示例
    本文分享了Android开发实现的计时器功能示例,包括效果图、布局和按钮的使用。通过使用Chronometer控件,可以实现计时器功能。该示例适用于Android平台,供开发者参考。 ... [详细]
  • Linux如何安装Mongodb的详细步骤和注意事项
    本文介绍了Linux如何安装Mongodb的详细步骤和注意事项,同时介绍了Mongodb的特点和优势。Mongodb是一个开源的数据库,适用于各种规模的企业和各类应用程序。它具有灵活的数据模式和高性能的数据读写操作,能够提高企业的敏捷性和可扩展性。文章还提供了Mongodb的下载安装包地址。 ... [详细]
  • 本文介绍了在Linux下安装和配置Kafka的方法,包括安装JDK、下载和解压Kafka、配置Kafka的参数,以及配置Kafka的日志目录、服务器IP和日志存放路径等。同时还提供了单机配置部署的方法和zookeeper地址和端口的配置。通过实操成功的案例,帮助读者快速完成Kafka的安装和配置。 ... [详细]
  • 本文讨论了在openwrt-17.01版本中,mt7628设备上初始化启动时eth0的mac地址总是随机生成的问题。每次随机生成的eth0的mac地址都会写到/sys/class/net/eth0/address目录下,而openwrt-17.01原版的SDK会根据随机生成的eth0的mac地址再生成eth0.1、eth0.2等,生成后的mac地址会保存在/etc/config/network下。 ... [详细]
  • 解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法
    本文介绍了解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法,包括检查location配置是否正确、pass_proxy是否需要加“/”等。同时,还介绍了修改nginx的error.log日志级别为debug,以便查看详细日志信息。 ... [详细]
  • mac php错误日志配置方法及错误级别修改
    本文介绍了在mac环境下配置php错误日志的方法,包括修改php.ini文件和httpd.conf文件的操作步骤。同时还介绍了如何修改错误级别,以及相应的错误级别参考链接。 ... [详细]
  • 移动端常用单位——rem的使用方法和注意事项
    本文介绍了移动端常用的单位rem的使用方法和注意事项,包括px、%、em、vw、vh等其他常用单位的比较。同时还介绍了如何通过JS获取视口宽度并动态调整rem的值,以适应不同设备的屏幕大小。此外,还提到了rem目前在移动端的主流地位。 ... [详细]
author-avatar
临冬将至
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有