热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

推荐引擎数据导入模块的实现

毕设做到后半部分,需要实现将用户在一段时间(比如1天)内产生的新数据导入HDFS的功能,这样数据仓库中的数据才能和数据库中的数据同步在新建了一个PyDev项目后,需要如下操作(拣最

毕设做到后半部分,需要实现将用户在一段时间(比如1天)内产生的新数据导入HDFS的功能,这样数据仓库中的数据才能和数据库中的数据同步

在新建了一个PyDev项目后,需要如下操作(拣最主要的写):

 

模块的环境变量:

# -*- coding:UTF-8 -*-
#
!/usr/bin/python # FileName:pro_env.py

#*************************************************** # 项目的路径 PROJECT_DIR = "/usr/local/EclipseProjects/MyBI" # 项目配置文件的路径 PROJECT_CONF_DIR = PROJECT_DIR + "/conf/" # 项目第三方库的路径 PROJECT_LIB_DIR = PROJECT_DIR + "/lib" # 项目临时文件的路径 PROJECT_TMP_DIR = PROJECT_DIR + "/temp" #*************************************************** # Hadoop的安装路径 HADOOP_HOME = "/usr/local/hadoop/" # Hadoop的命令路径 HADOOP_PATH = HADOOP_HOME + "bin/" # HIVE的安装路径 HIVE_HOME = "/opt/hive-0.9.0/" # HIVE的命令路径 HIVE_PATH = HIVE_HOME + "bin/" # Sqoop的安装路径 SQOOP_HOME = "/opt/Sqoop/" # Sqoop的命令路径 SQOOP_PATH = SQOOP_HOME + "bin/" #*************************************************** # Java的安装路径 Java_HOME = "/usr/lib/jvm/jdk1.7.0_75"

配置文件:

导入模块的配置文件主要的目的是告诉Sqoop,导入哪些表,怎么导入,我暂时需要一张表,新建一个XML文件Import.xml,type="add"表示增量导入

xml version="1.0" encoding="UTF-8"?>
<root>
    <task type="add">
        <table>ModifyRecordstable>
    task>
root>

需要对每张表进行更细一步的配置,新建ModifyRecords.xml

xml version="1.0" encoding="UTF-8"?>
<root>
    <sqoop-shell type="import">
        <param key="connect">jdbc:mysql://localhost:3306/Recommendparam>
        <param key="username">${username}param>
        <param key="password">${password}param>
        <param key="target-dir">/user/hadoop/Recommend/$dtparam>
        <param key="query">‘select userID,movieID,pref from Recommend.ModifyRecords where modifyDate$flag"\$CONDITIONS" and $CONDITIONS‘param>
        <param key="m">1param>
        <param key="fields-terminated-by">‘,‘param>
    sqoop-shell>
root>

剩下的工作就是解析配置文件:

# -*- coding:UTF-8 -*-
#!/usr/bin/python
# FileName:import.py
from com.utls.pro_env import PROJECT_CONF_DIR
import time
from com.utls.sqoop import SqoopUtil
import xml.etree.ElementTree as ET

# 其中dt为昨天的日期,将由调度模块传入
def resolve_conf(dt):
    
    # 获得配置文件名
    conf_file = PROJECT_CONF_DIR + "Import.xml"
    
    # 解析配置文件
    xml_tree = ET.parse(conf_file)
    # 获得task元素
    tasks = xml_tree.findall(./task)
    
    for task in tasks:
        # 获得导入类型,增量导入或者全量导入
        import_type = task.attrib["type"]
        
        # 获得表名集合
        tables = task.findall(./table)
        
        # 用来保存待执行的Sqoop命令的集合
        cmds = []

        # 迭代表名集合,解析表配置文件
        for i in range(len(tables)):
            # 表名
            table_name = tables[i].text
            # 表配置文件名
            table_conf_file = PROJECT_CONF_DIR + table_name + ".xml"
            
            # 解析表配置文件
            xmlTree = ET.parse(table_conf_file)
            
            # 获取sqoop-shell节点
            sqoopNodes = xmlTree.findall("./sqoop-shell")
            
            # 获取sqoop-shell节点
            sqoop_cmd_type = sqoopNodes[0].attrib["type"]
            # 获取
            praNodes = sqoopNodes[0].findall("./param")
            
            # 用来保存param信息的字典
            cmap = {}
            
            for i in range(len(praNodes)):
                # 获得key属性的值
                key = praNodes[i].attrib["key"]
                # 获得param标签中间的值
                value = praNodes[i].text
                # 保存到字典中
                cmap[key] = value
                
            # 首先组装成sqoop命令头
            command = "sqoop " + sqoop_cmd_type
                
            # 如果为全量导入
            if(import_type == "all"):
                # query的查询条件为
                import_cOndition= dt
                flag = "<"
            # 如果为增量导入
            elif (import_type == "add"):
                # query的查询条件为=dt
                import_cOndition= dt
                flag = "="
            else:
                raise Exception
                
            # #迭代字典将param的信息拼装成字符串
            for key in cmap.keys():
                    
                value = cmap[key]
                    
                # 如果不是键值对形式的命令选项
                if(value == None or value == "" or value == " "):
                    value = ""
                    
                # 将query的CONDITIONS替换为查询条件
                if(key == "query"):
                    value = value.replace("\$CONDITIONS", import_condition)
                    value = value.replace("$flag", flag)
                        
                # 将导入分区替换为传入的时间
                if(key == "target-dir"):
                    value = value.replace("$dt", dt)
                    
                # 拼装为命令
                if key == "fields-terminated-by":
                    command += " --" + key + " " + value
                else:
                    command += " --" + key + " " + value + "\\" + "\n"
                
            # 将命令加入至待执行的命令集合
            cmds.append(command)
        
    return cmds

# Python模块的入口:main函数
if __name__ == __main__:
    
    # 调度模块将昨天的时间传入
    dt = time.strftime("%Y-%m-%d", time.localtime(time.time()))
    # 解析配置文件,获得sqoop命令集合
    cmds = resolve_conf(dt)
    
    # 迭代集合,执行命令
    for i in range(len(cmds)):
        cmd = cmds[i]
        
        # 执行导入过程
        SqoopUtil.execute_shell(cmd)

拼装出来的命令如下:

sqoop import --username xxxx --target-dir /user/hadoop/Recommend/2015-04-26 --m 1 --connect jdbc:mysql://localhost:3306/Recommend --query select userID,movieID,pref from Recommend.ModifyRecords where modifyDate="2015-04-26" and $CONDITIONS --password xxxx --fields-terminated-by ,

最后新建一个模块(不过当然写在import.py的main函数之前...),编写一个类,为该类编写一个函数,目的是用Python调用Sqoop命令:

#!/usr/bin/python
# FileName sqoop.py
# -*- coding:UTF-8 -*-
import os
class SqoopUtil(object):
    ‘‘‘
    sqoop operation
    ‘‘‘
    def __init__(self):
        pass
    
    @staticmethod
    def execute_shell(shell):
        print shell
        os.system(shell)
        

推荐引擎数据导入模块的实现


推荐阅读
  • 本文讨论了Alink回归预测的不完善问题,指出目前主要针对Python做案例,对其他语言支持不足。同时介绍了pom.xml文件的基本结构和使用方法,以及Maven的相关知识。最后,对Alink回归预测的未来发展提出了期待。 ... [详细]
  • 本文内容为asp.net微信公众平台开发的目录汇总,包括数据库设计、多层架构框架搭建和入口实现、微信消息封装及反射赋值、关注事件、用户记录、回复文本消息、图文消息、服务搭建(接入)、自定义菜单等。同时提供了示例代码和相关的后台管理功能。内容涵盖了多个方面,适合综合运用。 ... [详细]
  • 本文介绍了lua语言中闭包的特性及其在模式匹配、日期处理、编译和模块化等方面的应用。lua中的闭包是严格遵循词法定界的第一类值,函数可以作为变量自由传递,也可以作为参数传递给其他函数。这些特性使得lua语言具有极大的灵活性,为程序开发带来了便利。 ... [详细]
  • 基于layUI的图片上传前预览功能的2种实现方式
    本文介绍了基于layUI的图片上传前预览功能的两种实现方式:一种是使用blob+FileReader,另一种是使用layUI自带的参数。通过选择文件后点击文件名,在页面中间弹窗内预览图片。其中,layUI自带的参数实现了图片预览功能。该功能依赖于layUI的上传模块,并使用了blob和FileReader来读取本地文件并获取图像的base64编码。点击文件名时会执行See()函数。摘要长度为169字。 ... [详细]
  • Mac OS 升级到11.2.2 Eclipse打不开了,报错Failed to create the Java Virtual Machine
    本文介绍了在Mac OS升级到11.2.2版本后,使用Eclipse打开时出现报错Failed to create the Java Virtual Machine的问题,并提供了解决方法。 ... [详细]
  • 在说Hibernate映射前,我们先来了解下对象关系映射ORM。ORM的实现思想就是将关系数据库中表的数据映射成对象,以对象的形式展现。这样开发人员就可以把对数据库的操作转化为对 ... [详细]
  • 本文介绍了在SpringBoot中集成thymeleaf前端模版的配置步骤,包括在application.properties配置文件中添加thymeleaf的配置信息,引入thymeleaf的jar包,以及创建PageController并添加index方法。 ... [详细]
  • 本文详细介绍了Linux中进程控制块PCBtask_struct结构体的结构和作用,包括进程状态、进程号、待处理信号、进程地址空间、调度标志、锁深度、基本时间片、调度策略以及内存管理信息等方面的内容。阅读本文可以更加深入地了解Linux进程管理的原理和机制。 ... [详细]
  • 1,关于死锁的理解死锁,我们可以简单的理解为是两个线程同时使用同一资源,两个线程又得不到相应的资源而造成永无相互等待的情况。 2,模拟死锁背景介绍:我们创建一个朋友 ... [详细]
  • 后台获取视图对应的字符串
    1.帮助类后台获取视图对应的字符串publicclassViewHelper{将View输出为字符串(注:不会执行对应的ac ... [详细]
  • 《数据结构》学习笔记3——串匹配算法性能评估
    本文主要讨论串匹配算法的性能评估,包括模式匹配、字符种类数量、算法复杂度等内容。通过借助C++中的头文件和库,可以实现对串的匹配操作。其中蛮力算法的复杂度为O(m*n),通过随机取出长度为m的子串作为模式P,在文本T中进行匹配,统计平均复杂度。对于成功和失败的匹配分别进行测试,分析其平均复杂度。详情请参考相关学习资源。 ... [详细]
  • 本文介绍了通过ABAP开发往外网发邮件的需求,并提供了配置和代码整理的资料。其中包括了配置SAP邮件服务器的步骤和ABAP写发送邮件代码的过程。通过RZ10配置参数和icm/server_port_1的设定,可以实现向Sap User和外部邮件发送邮件的功能。希望对需要的开发人员有帮助。摘要长度:184字。 ... [详细]
  • 本文介绍了在Python3中如何使用选择文件对话框的格式打开和保存图片的方法。通过使用tkinter库中的filedialog模块的asksaveasfilename和askopenfilename函数,可以方便地选择要打开或保存的图片文件,并进行相关操作。具体的代码示例和操作步骤也被提供。 ... [详细]
  • 本文介绍了OC学习笔记中的@property和@synthesize,包括属性的定义和合成的使用方法。通过示例代码详细讲解了@property和@synthesize的作用和用法。 ... [详细]
  • Java验证码——kaptcha的使用配置及样式
    本文介绍了如何使用kaptcha库来实现Java验证码的配置和样式设置,包括pom.xml的依赖配置和web.xml中servlet的配置。 ... [详细]
author-avatar
ik82jht
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有