热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

PythonMongoDB合表

一、原始表结构1、imsi表MongoDBEnterprisedb.trs_action_dzwl_zm.findOne(){_id:{imsi:46002938001

一、原始表结构

1、imsi表

MongoDB Enterprise > db.trs_action_dzwl_zm.findOne()
{
        "_id" : {
                "imsi" : "460029380018855",
                "start_time" : "2019-03-13 15:37:07"
        },
        "site_address" : "织里-大港路与G318交叉口",
        "xnetbar_wacode" : "EG-MIX-WL-4C-006",
        "imei" : "000000052052052",
        "device_longitude" : "120.275424",
        "device_latitude" : "30.838656",
        "tmsi" : "1552462627",
        "rssi" : "140",
        "band" : "40",
        "plmn" : "46000",
        "tel_number" : "1595028",
        "device_name" : "织里-大港路与G318交叉口-4G",
        "vendor_name" : "南京森根",
        "province" : "江苏省",
        "city" : "盐城市"
}

2、car表

MongoDB Enterprise > db.trs_action_car_info.findOne()
{
        "_id" : {
                "license_number" : "苏A39NX7",
                "start_time" : "2019-05-16 23:03:13"
        },
        "site_address" : "湖织大道-香圩桥东侧",
        "site_location_id" : "",
        "unlawful_act" : "",
        "driving_direct" : "其它",
        "lane_id" : "001",
        "netbar_wacode" : "904",
        "license_color" : "002",
        "photo_cnt" : "",
        "monitor_type" : "卡口式监控",
        "photo_path" : "/pic?did=12ffaa00-78a3-1037-921c-54c4150760be&bid=486472&pid=4294966623&ptime=1558018994",
        "speed" : "0",
        "stat" : "0",
        "vehicle_brand1" : "0",
        "vehicle_brand2" : "0",
        "car_length" : "",
        "car_color" : "其它颜色",
        "shade" : "000",
        "car_type" : "轿车",
        "license_type" : "92式民用车",
        "vehicle_feature_path" : "",
        "device_name" : "湖织大道-香圩桥东侧",
        "monitor_direct" : "未知",
        "lane" : "001",
        "device_longitude" : "120.308512",
        "device_latitude" : "30.881026",
        "site_name" : "湖织大道-香圩桥东侧",
        "road_segment_direct" : "未知",
        "site_longitude" : "120.308512",
        "site_latitude" : "30.881026"
}

3、face表

MongoDB Enterprise > db.trs_action_face_info.findOne()
{
        "_id" : {
                "pid" : "0120_1561570383884_d61beb5b9e644ed081f4ffc5e362ece7",
                "start_time" : "2019-06-13 12:32:59"
        },
        "site_address" : "融泰宾馆",
        "img_mode" : "",
        "obj_img_url" : "/pic?=d4=i778z096as091-706105m6ep=t1i5i*d1=*ipd7=*9s8=42b8i2d05*717540c14-a563e27-1579*d-d0i806d8e42",
        "quality_score" : "0.883593",
        "netbar_wacode" : "33052802001310942740",
        "device_name" : "融泰宾馆",
        "device_longitude" : "120.262211",
        "device_latitude" : "30.841749",
        "age" : "",
        "gender" : "1",
        "race" : "",
        "beard" : "",
        "eye_open" : "",
        "eye_glass" : "",
        "sun_glass" : "1",
        "mask" : "",
        "mouth_open" : "",
        "smile" : "1",
        "similarity" : "0.97059",
        "image_id" : "0120_1561570383884_d61beb5b9e644ed081f4ffc5e362ece7",
        "bkg_url" : "/pic?=d4=i778z096as091-706105m6ep=t1i5i*d1=*ipd7=*9s8=42b8i2d05*717540c14-a563e27-1579*d-d0i806d8e42"
}

4、MAC表

二、合表后collectionsitetime结构

要求:将imsi、car、face、MAC(MAC暂时不合)四张表,将表中一些关键字段提取出来

1)以站点

2)以两分钟为间隔

3)一个document中,两分钟内最多只存200个关键数据

MongoDB Enterprise > db.collecsites.findOne()
{
        "_id" : ObjectId("5e159ef831d840f9482b2adc"),
        "timeline" : "2019-03-13 15:34:00",
        "site" : "织里-大港路与G318交叉口",
        "face" : [ ],
        "lpn" : [ ],
        "mac" : [ ],
        "nsamples" : 200,
        "imsi" : [
                {
                        "start_time" : "2019-03-13 15:35:56",
                        "imsi" : "460078995442766"
                },
                {
                        "start_time" : "2019-03-13 15:35:56",
                        "imsi" : "460006254007976"
                }
        ]
}

三、开发脚本

1、使用到python模块

from multiprocessing import Pool(进程池)

from pymongo import MongoClient(python连接mongodb驱动)

import pandas as pd(将一段时间划分为多个时间段,本例子以2分钟一个时间段)

2、脚本

1)连接mongodb的脚本

[root@mongodb07 python3]# cat mongodbclient.py
#coding=utf-8
from multiprocessing import Pool
import os, time, random
import json
from datetime import datetime
from pymongo import MongoClient
import sys
import datetime
class Database(object):
    def __init__(self, address, port, database):
        self.cOnn= MongoClient(host=address, port=port)
        self.db = self.conn[database]
    def get_state(self):
        return self.conn is not None and self.db is not None
    def insert_one(self, collection, data):
        if self.get_state():
            ret = self.db[collection].insert_one(data)
            return ret.inserted_id
        else:
            return ""
    def insert_many(self, collection, data):
        if self.get_state():
            ret = self.db[collection].insert_many(data)
            return ret.inserted_id
        else:
            return ""
    def update(self, collection, data):
        # data format:
        # {key:[old_data,new_data]}
        data_filter = {}
        data_revised = {}
        for key in data.keys():
            data_filter[key] = data[key][0]
            data_revised[key] = data[key][1]
        if self.get_state():
            return self.db[collection].update_many(data_filter, {"$set": data_revised}).modified_count
        return 0
    def updateOne(self, collection, data_filter,data_revised):
        if self.get_state():
            return self.db[collection].update(data_filter,data_revised,True)
        return 0

    def find(self, col, condition, column=None):
        if self.get_state():
            if column is None:
                return self.db[col].find(condition)
            else:
                return self.db[col].find(condition, column)
        else:
            return None
    def aggregate(self, col, condition):
        if self.get_state():
            optiOns= {‘allowDiskUse‘:True}
            result=self.db[col].aggregate(condition,**options)
            return result
        else:
            return None

    def delete(self, col, condition):
        if self.get_state():
            return self.db[col].delete_many(filter=condition).deleted_count
        return 0
    def close_connect(self):
        self.conn.close()
        #return ‘mongo连接已关闭‘
2)对mongodb中collection做实际操作的脚本
[root@mongodb07 python3]# cat collection_curd.py
#coding:utf-8
from multiprocessing import Pool
import os, time, random
import json
from datetime import datetime
from pymongo import MongoClient
import sys
import datetime
import mongodbclient
import pandas as pd
def max_number(num1,num2,num3):   ##获取最大值
    max_num=max(num1,num2,num3)
    return max_num
def site_cursor_to_list(myresult,colum):  ##将mongodb输出的cursor转换为python的list
    sitelist=[]
    for i in myresult:
        sitelist.append(i[colum])
    return sitelist
def list_Duplicate_removal(inlist):   ##去除重复值
    outlist=list(set(inlist))
    return outlist
def get_time_interval(str_start_time,str_end_time):  ##以2分钟为单位,将输入的时间范围切分
    time_interval=pd.date_range(str_start_time, str_end_time,freq=‘2 Min‘)
    return time_interval
def get_site(collection_name,str_start_time,str_end_time):  ##获取2分钟内imsi/face/lpn/mac的站点名称
    db = mongodbclient.Database("172.16.102.15", 27017, "idpad_zl")
    myresult=db.find(collection_name, {"_id.start_time":{ "$gte":str_start_time,"$lt":str_end_time}})
    db.close_connect()
    return site_cursor_to_list(myresult,"site_address")
def get_site_data(collection_name,str_start_time,str_end_time,site,colums):  ##根据条件:2分钟的起止时间、站点名、集合名、字段名,获取所需数据
    db = mongodbclient.Database("172.16.102.15", 27017, "idpad_zl")
    myresult=db.find(collection_name, {"_id.start_time":{ "$gte":str_start_time,"$lt":str_end_time},"site_address":site},colums)
    db.close_connect()
    return myresult

def sitetime_insert(collection_name,site,str_start_time,imsi_sitetime,face_sitetime,car_sitetime,mac_sitetime):  ##将数据插入集合
    db = mongodbclient.Database("172.16.102.15", 27017, "idpad_zl")
    db.insert_one(collection_name,{"site":site,"timeline":str_start_time,"nsamples":200,"imsi":imsi_sitetime,"face":face_sitetime,"lpn":car_sitetime,"mac":mac_sitetime})
    db.close_connect()
def sitetime_updateOne(collection_name,site,str_start_time,key,value):   ##将数据更新到集合中
    db = mongodbclient.Database("172.16.102.15", 27017, "idpad_zl")
    db.updateOne(collection_name,{"site":site,"timeline":str_start_time,"nsamples":200,key:[]},{"$set":{key:value}})
    db.close_connect()
#def sit_colse():
#    db.close_connect()
 
3)操作脚本
[root@mongodb07 python3]# cat collection_insert.py
#coding:utf-8
from multiprocessing import Pool
import os, time, random
import json
from datetime import datetime
from pymongo import MongoClient
import sys
import datetime
import mongodbclient
import pandas as pd
import collection_curd as curd
from multiprocessing import Pool
#update_exec(imsi_outlen_flo,"collecsites",‘imsi‘,imsidata,imsi_outlen_int,imsi_max_len)
def update_exec(type_outlen_flo,collectionname,site,str_start_time,typelist,datalist,type_outlen_int,type_max_len):
    if type_outlen_flo <=1.0:
        curd.sitetime_updateOne(collectionname,site,str_start_time,typelist,datalist)
    else:
        for x in range(type_outlen_int+1):
            if x==type_outlen_int:
                curd.sitetime_updateOne(collectionname,site,str_start_time,typelist,datalist[x*200:type_max_len])
                #print(typelist)
            else:
                curd.sitetime_updateOne(collectionname,site,str_start_time,typelist,datalist[x*200:(x+1)*200])
                #print(typelist)
def data_exec(nums,time_interval):
    #start = time.time()
    #print("start_time : ",start)
    #time_interval=curd.get_time_interval(‘20190310‘,‘20191230‘)
    #for i in range(len(time_interval)-1):  ##从时间切片中,选取每一个切片时间段
    #print("start : ",nums)
    str_start_time = datetime.datetime.strftime(time_interval[nums],‘%Y-%m-%d %H:%M:%S‘)  ##时间切片,每个切片的开始时间
    str_end_time = datetime.datetime.strftime(time_interval[nums+1],‘%Y-%m-%d %H:%M:%S‘)  ##时间切片,每个切片的结束时间
    #print(str_start_time,‘   ‘,str_end_time)
    #print("########################")
    #time.sleep(5)
    #exit()
    #sitelist=[]
    myresult_imsi_sit=curd.get_site("trs_action_dzwl_zm",str_start_time,str_end_time) ##获取2分钟内imsi的站点名称,并将站点名带入下面的循环
    myresult_car_sit=curd.get_site("trs_action_car_info",str_start_time,str_end_time) ##获取2分钟内car的站点名称,并将站点名带入下面的循环
    myresult_face_sit=curd.get_site("trs_action_face_info",str_start_time,str_end_time) ##获取2分钟内face的站点名称,并将站点名带入下面的循环
    myresult=myresult_imsi_sit+myresult_car_sit+myresult_face_sit
    #print(myresult)
    myresult=curd.list_Duplicate_removal(myresult) ##获取去重后的所有站点
    #print(myresult)
    #exit()
    if not myresult:
        pass
    else:
        for i in range(len(myresult)):
            site=myresult[i]
            #print(site)
            my_imsi_site_data=curd.get_site_data("trs_action_dzwl_zm",str_start_time,str_end_time,site,{"_id"}) ##获取这个站点、这段时间内的数据imsi
            my_car_site_data=curd.get_site_data("trs_action_car_info",str_start_time,str_end_time,site,{"_id"}) ##获取这个站点、这段时间内的数据car
            my_face_site_data=curd.get_site_data("trs_action_face_info",str_start_time,str_end_time,site,{"_id"}) ##获取这个站点、这段时间内的数据face
            imsidata=curd.site_cursor_to_list(my_imsi_site_data,"_id")
            cardata=curd.site_cursor_to_list(my_car_site_data,"_id")
            facedata=curd.site_cursor_to_list(my_face_site_data,"_id")
            #print(imsidata)
            imsi_outlen_int=len(imsidata)/200
            imsi_outlen_flo=len(imsidata)/200.0
            car_outlen_int=len(cardata)/200
            face_outlen_int=len(facedata)/200
            car_outlen_flo=len(cardata)/200.0
            face_outlen_flo=len(facedata)/200.0
            car_max_len=len(cardata)
            face_max_len=len(facedata)
            imsi_max_len=len(imsidata)
            #print("car_max_len:",car_outlen_int," ","face_max_len:",face_outlen_int," ","imsi_max_len:",imsi_outlen_int)
            max_mod_200=max(imsi_outlen_int,car_outlen_int,face_outlen_int)+1
            #print(max_mod_200)
            if imsi_outlen_flo>imsi_outlen_int or car_outlen_flo>car_outlen_int or face_outlen_flo>face_outlen_int:
                for i in range(max_mod_200):
                    curd.sitetime_insert("collecsites",site,str_start_time,[],[],[],[])
            else:
                for i in range(max_mod_200-1):
                    curd.sitetime_insert("collecsites",site,str_start_time,[],[],[],[])
            update_exec(imsi_outlen_flo,"collecsites",site,str_start_time,‘imsi‘,imsidata,imsi_outlen_int,imsi_max_len)
            update_exec(car_outlen_flo,"collecsites",site,str_start_time,‘lpn‘,cardata,car_outlen_int,car_max_len)
            update_exec(face_outlen_flo,"collecsites",site,str_start_time,‘face‘,facedata,face_outlen_int,face_max_len)
            #print(site)
            #exit()
            #curd.sit_colse
    #def update_exec(type_outlen_flo,collectionname,site,str_start_time,typelist,datalist,type_outlen_int,type_max_len):
    #end = time.time()
    #print("end_time : ",end)
    #print(‘ALL Insert Task runs %s(ms).‘ % ((end - start)*1000))

if __name__ == ‘__main__‘:
    start = time.time()
    p=Pool(30)
    #print("start_time : ",start)
    time_interval=curd.get_time_interval(‘20190310‘,‘20191230‘)
    for i in range(len(time_interval)-1):  ##从时间切片中,选取每一个切片时间段
        #print(i)
        #res=p.apply_async(data_exec,args=(i,))
        result=p.apply_async(data_exec, args=(i,time_interval))
    p.close()
    p.join()
    end = time.time()
    print("end_time : ",end)
    print(‘ALL Insert Task runs %s(ms).‘ % ((end - start)*1000))
 

四、开发中遇到的问题

1、如何将一段时间按照两分钟进行划分
2、实例化mongodb连接后,在脚本运行中,连接如何close
3、进程线程池的使用

Python MongoDB 合表


推荐阅读
  • 本文介绍了OC学习笔记中的@property和@synthesize,包括属性的定义和合成的使用方法。通过示例代码详细讲解了@property和@synthesize的作用和用法。 ... [详细]
  • YOLOv7基于自己的数据集从零构建模型完整训练、推理计算超详细教程
    本文介绍了关于人工智能、神经网络和深度学习的知识点,并提供了YOLOv7基于自己的数据集从零构建模型完整训练、推理计算的详细教程。文章还提到了郑州最低生活保障的话题。对于从事目标检测任务的人来说,YOLO是一个熟悉的模型。文章还提到了yolov4和yolov6的相关内容,以及选择模型的优化思路。 ... [详细]
  • 本文介绍了lua语言中闭包的特性及其在模式匹配、日期处理、编译和模块化等方面的应用。lua中的闭包是严格遵循词法定界的第一类值,函数可以作为变量自由传递,也可以作为参数传递给其他函数。这些特性使得lua语言具有极大的灵活性,为程序开发带来了便利。 ... [详细]
  • 本文介绍了使用Java实现大数乘法的分治算法,包括输入数据的处理、普通大数乘法的结果和Karatsuba大数乘法的结果。通过改变long类型可以适应不同范围的大数乘法计算。 ... [详细]
  • HDU 2372 El Dorado(DP)的最长上升子序列长度求解方法
    本文介绍了解决HDU 2372 El Dorado问题的一种动态规划方法,通过循环k的方式求解最长上升子序列的长度。具体实现过程包括初始化dp数组、读取数列、计算最长上升子序列长度等步骤。 ... [详细]
  • 本文讨论了Alink回归预测的不完善问题,指出目前主要针对Python做案例,对其他语言支持不足。同时介绍了pom.xml文件的基本结构和使用方法,以及Maven的相关知识。最后,对Alink回归预测的未来发展提出了期待。 ... [详细]
  • 本文讨论了如何优化解决hdu 1003 java题目的动态规划方法,通过分析加法规则和最大和的性质,提出了一种优化的思路。具体方法是,当从1加到n为负时,即sum(1,n)sum(n,s),可以继续加法计算。同时,还考虑了两种特殊情况:都是负数的情况和有0的情况。最后,通过使用Scanner类来获取输入数据。 ... [详细]
  • 本文介绍了C#中数据集DataSet对象的使用及相关方法详解,包括DataSet对象的概述、与数据关系对象的互联、Rows集合和Columns集合的组成,以及DataSet对象常用的方法之一——Merge方法的使用。通过本文的阅读,读者可以了解到DataSet对象在C#中的重要性和使用方法。 ... [详细]
  • 使用Ubuntu中的Python获取浏览器历史记录原文: ... [详细]
  • 本文详细介绍了Linux中进程控制块PCBtask_struct结构体的结构和作用,包括进程状态、进程号、待处理信号、进程地址空间、调度标志、锁深度、基本时间片、调度策略以及内存管理信息等方面的内容。阅读本文可以更加深入地了解Linux进程管理的原理和机制。 ... [详细]
  • 后台获取视图对应的字符串
    1.帮助类后台获取视图对应的字符串publicclassViewHelper{将View输出为字符串(注:不会执行对应的ac ... [详细]
  • 《数据结构》学习笔记3——串匹配算法性能评估
    本文主要讨论串匹配算法的性能评估,包括模式匹配、字符种类数量、算法复杂度等内容。通过借助C++中的头文件和库,可以实现对串的匹配操作。其中蛮力算法的复杂度为O(m*n),通过随机取出长度为m的子串作为模式P,在文本T中进行匹配,统计平均复杂度。对于成功和失败的匹配分别进行测试,分析其平均复杂度。详情请参考相关学习资源。 ... [详细]
  • 高质量SQL书写的30条建议
    本文提供了30条关于优化SQL的建议,包括避免使用select *,使用具体字段,以及使用limit 1等。这些建议是基于实际开发经验总结出来的,旨在帮助读者优化SQL查询。 ... [详细]
  • 一、需求:        将MongoDB表中的数据按照时间戳增量抽取到Mysql表中。二、实现方式:   1.kettle    2.pytho ... [详细]
  • 用Python在Windows上安装MongoDB原文 ... [详细]
author-avatar
君君6789_903
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有