热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

DataX

1.什么是DataX?DataX是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FT

1.什么是DataX

?       DataX 是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。

2. DataX的设计

为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。

                                       技术图片

3.  框架设计

                                 技术图片

DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

Reader:数据采集模块,负责采集数据源的数据,将数据发送给Framework。

Writer:数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。

Framework:用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

4.  运行原理

                                     技术图片

1)  DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。

2)  DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。

3)  切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。

4)  每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。

5)  DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0

 5.快速入门

  前置要求
    - Linux
    - JDK(1.8以上,推荐1.8)
    - Python(推荐Python2.6.X)
   安装
    1)将下载好的datax.tar.gz上传到hadoop101的/opt/software
      [kris@hadoop101 software]$ ls
        datax.tar.gz
    2)解压datax.tar.gz到/opt/module
      [kris@hadoop101 software]$ tar -zxvf datax.tar.gz -C /opt/module/
    3)运行自检脚本
      [kris@hadoop101 bin]$ cd /opt/module/datax/bin/
      [kris@hadoop101 bin]$ python datax.py /opt/module/datax/job/job.json

[kris@hadoop101 bin]$ python datax.py /opt/module/datax/job/job.json 

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


2019-07-14 07:51:25.661 [main] INFO  VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2019-07-14 07:51:25.676 [main] INFO  Engine - the machine info  => 

        osInfo: Oracle Corporation 1.8 25.144-b01
        jvmInfo:        Linux amd64 2.6.32-642.el6.x86_64
        cpu num:        8

        totalPhysicalMemory:    -0.00G
        freePhysicalMemory:     -0.00G
        maxFileDescriptorCount: -1
        currentOpenFileDescriptorCount: -1

        GC Names        [PS MarkSweep, PS Scavenge]

        MEMORY_NAME                    | allocation_size                | init_size                      
        PS Eden Space                  | 256.00MB                       | 256.00MB                       
        Code Cache                     | 240.00MB                       | 2.44MB                         
        Compressed Class Space         | 1,024.00MB                     | 0.00MB                         
        PS Survivor Space              | 42.50MB                        | 42.50MB                        
        PS Old Gen                     | 683.00MB                       | 683.00MB                       
        Metaspace                      | -0.00MB                        | 0.00MB                         


2019-07-14 07:51:25.708 [main] INFO  Engine - 
{
        "content":[
                {
                        "reader":{
                                "name":"streamreader",
                                "parameter":{
                                        "column":[
                                                {
                                                        "type":"string",
                                                        "value":"DataX"
                                                },
                                                {
                                                        "type":"long",
                                                        "value":19890604
                                                },
                                                {
                                                        "type":"date",
                                                        "value":"1989-06-04 00:00:00"
                                                },
                                                {
                                                        "type":"bool",
                                                        "value":true
                                                },
                                                {
                                                        "type":"bytes",
                                                        "value":"test"
                                                }
                                        ],
                                        "sliceRecordCount":100000
                                }
                        },
                        "writer":{
                                "name":"streamwriter",
                                "parameter":{
                                        "encoding":"UTF-8",
                                        "print":false
                                }
                        }
                }
        ],
        "setting":{
                "errorLimit":{
                        "percentage":0.02,
                        "record":0
                },
                "speed":{
                        "byte":10485760
                }
        }
}


2019-07-14 07:51:35.892 [job-0] INFO  HookInvoker - No hook invoked, because base dir not exists or is a file: /opt/module/datax/hook
2019-07-14 07:51:35.896 [job-0] INFO  JobContainer - 
         [total cpu info] => 
                averageCpu                     | maxDeltaCpu                    | minDeltaCpu                    
                -1.00%                         | -1.00%                         | -1.00%
                        

         [total gc info] => 
                 NAME                 | totalGCCount       | maxDeltaGCCount    | minDeltaGCCount    | totalGCTime        | maxDeltaGCTime     | minDeltaGCTime     
                 PS MarkSweep         | 0                  | 0                  | 0                  | 0.000s             | 0.000s             | 0.000s             
                 PS Scavenge          | 0                  | 0                  | 0                  | 0.000s             | 0.000s             | 0.000s             

2019-07-14 07:51:35.897 [job-0] INFO  JobContainer - PerfTrace not enable!
2019-07-14 07:51:35.898 [job-0] INFO  StandAloneJobContainerCommunicator - Total 100000 records, 2600000 bytes | Speed 253.91KB/s, 10000 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.018s |  All Task WaitReaderTime 0.047s | Percentage 100.00%
2019-07-14 07:51:35.899 [job-0] INFO  JobContainer - 
任务启动时刻                    : 2019-07-14 07:51:25
任务结束时刻                    : 2019-07-14 07:51:35
任务总计耗时                    :                 10s
任务平均流量                    :          253.91KB/s
记录写入速度                    :          10000rec/s
读出记录总数                    :              100000
读写失败总数                    :                   0

6. 使用案例

①从stream流读取数据并打印到控制台

1)查看配置模板

[kris@hadoop101 bin]$ python datax.py -r streamreader -w streamwriter
技术图片技术图片
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.

Please refer to the streamreader document:
     https://github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md 

Please refer to the streamwriter document:
     https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md 
 
Please save the following configuration as a json file and  use
     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json 
to run the job.

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "streamreader", 
                    "parameter": {
                        "column": [], 
                        "sliceRecordCount": ""
                    }
                }, 
                "writer": {
                    "name": "streamwriter", 
                    "parameter": {
                        "encoding": "", 
                        "print": true
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}
View Code

②从MySQL的导入和导出

读取MySQL中的数据存放到HDFS

    查看官方模板

技术图片技术图片
[kris@hadoop101 job]$ python /opt/module/datax/bin/datax.py -r mysqlreader -w hdfswriter

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


Please refer to the mysqlreader document:
     https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md 

Please refer to the hdfswriter document:
     https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md 
 
Please save the following configuration as a json file and  use
     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json 
to run the job.

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "column": [], 
                        "connection": [
                            {
                                "jdbcUrl": [], 
                                "table": []
                            }
                        ], 
                        "password": "", 
                        "username": "", 
                        "where": ""
                    }
                }, 
                "writer": {
                    "name": "hdfswriter", 
                    "parameter": {
                        "column": [], 
                        "compress": "", 
                        "defaultFS": "", 
                        "fieldDelimiter": "", 
                        "fileName": "", 
                        "fileType": "", 
                        "path": "", 
                        "writeMode": ""
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}
View Code

技术图片

技术图片

[kris@hadoop101 job]$ vim mysql2hdfs 
技术图片技术图片
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "column": [
                            "id",
                            "name"
                        ], 
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://hadoop101:3306/test"
                                ], 
                                "table": [
                                    "stu1"
                                ]
                            }
                        ], 
                        "username": "root", 
                        "password": "123456"
                    }
                }, 
                "writer": {
                    "name": "hdfswriter", 
                    "parameter": {
                        "column": [
                            {
                                "name": "id",
                                "type": "INT"
                            },
                            {
                                "name": "name",
                                "type": "STRING"
                            }
                        ],  
                        "defaultFS": "hdfs://hadoop101:9000", 
                        "fieldDelimiter": "\t", 
                        "fileName": "student.txt", 
                        "fileType": "text", 
                        "path": "/", 
                        "writeMode": "append"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "2"
            }
        }
    }
}
View Code
2019-07-14 09:02:55.924 [job-0] INFO  JobContainer - 
         [total cpu info] => 
                averageCpu                     | maxDeltaCpu                    | minDeltaCpu                    
                -1.00%                         | -1.00%                         | -1.00%
                        

         [total gc info] => 
                 NAME                 | totalGCCount       | maxDeltaGCCount    | minDeltaGCCount    | totalGCTime        | maxDeltaGCTime     | minDeltaGCTime     
                 PS MarkSweep         | 1                  | 1                  | 1                  | 0.071s             | 0.071s             | 0.071s             
                 PS Scavenge          | 1                  | 1                  | 1                  | 0.072s             | 0.072s             | 0.072s             

2019-07-14 09:02:55.924 [job-0] INFO  JobContainer - PerfTrace not enable!
2019-07-14 09:02:55.925 [job-0] INFO  StandAloneJobContainerCommunicator - Total 3 records, 30 bytes | Speed 3B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 100.00%
2019-07-14 09:02:55.927 [job-0] INFO  JobContainer - 
任务启动时刻                    : 2019-07-14 09:02:43
任务结束时刻                    : 2019-07-14 09:02:55
任务总计耗时                    :                 12s
任务平均流量                    :                3B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   3
读写失败总数                    :                   0

技术图片

注意:HdfsWriter实际执行时会在该文件名后添加随机的后缀作为每个线程写入实际文件名。

[kris@hadoop101 job]$ python /opt/module/datax/bin/datax.py -r mongodbreader -w hdfswriter  
技术图片技术图片
[kris@hadoop101 job]$ python /opt/module/datax/bin/datax.py -r mongodbreader -w hdfswriter     

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


Please refer to the mongodbreader document:
     https://github.com/alibaba/DataX/blob/master/mongodbreader/doc/mongodbreader.md 

Please refer to the hdfswriter document:
     https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md 
 
Please save the following configuration as a json file and  use
     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json 
to run the job.

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mongodbreader", 
                    "parameter": {
                        "address": [], 
                        "collectionName": "", 
                        "column": [], 
                        "dbName": "", 
                        "userName": "", 
                        "userPassword": ""
                    }
                }, 
                "writer": {
                    "name": "hdfswriter", 
                    "parameter": {
                        "column": [], 
                        "compress": "", 
                        "defaultFS": "", 
                        "fieldDelimiter": "", 
                        "fileName": "", 
                        "fileType": "", 
                        "path": "", 
                        "writeMode": ""
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}
View Code

读取HDFS数据写入MySQL

将上个案例上传的文件改名

  [kris@hadoop101 datax]$ hadoop fs -mv /student.txt* /student.txt

查看官方模板

  [kris@hadoop101 datax]$ python bin/datax.py -r hdfsreader -w mysqlwriter

创建配置文件

技术图片技术图片
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "hdfsreader", 
                    "parameter": {
                        "column": ["*"], 
                        "defaultFS": "hdfs://hadoop102:9000", 
                        "encoding": "UTF-8", 
                        "fieldDelimiter": "\t", 
                        "fileType": "text", 
                        "path": "/student.txt"
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "column": [
                            "id",
                            "name"
                        ], 
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://hadoop102:3306/datax", 
                                "table": ["student2"]
                            }
                        ], 
                        "password": "000000", 
                        "username": "root", 
                        "writeMode": "insert"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}
View Code

在MySQL的datax数据库中创建student2

  mysql> use test;

  mysql> create table stu1(id int,name varchar(20));

执行任务

  [kris@hadoop101 datax]$ bin/datax.py job/hdfs2mysql.json

③DataX导入导出案例

读取MongoDB的数据导入到HDFS

编写配置文件

[kris@hadoop101 datax]$ vim job/mongdb2hdfs.json
技术图片技术图片
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mongodbreader", 
                    "parameter": {
                        "address": ["127.0.0.1:27017"], 
                        "collectionName": "atguigu", 
                        "column": [
                            {
                                "name":"name",
                                "type":"string"
                            },
                            {
                                "name":"url",
                                "type":"string"
                            }
                        ], 
                        "dbName": "test", 
                    }
                }, 
                "writer": {
                    "name": "hdfswriter", 
                    "parameter": {
                        "column": [
                            {
                                "name":"name",
                                "type":"string"
                            },
                            {
                                "name":"url",
                                "type":"string"
                            }
                        ], 
                        "defaultFS": "hdfs://hadoop102:9000", 
                        "fieldDelimiter": "\t", 
                        "fileName": "mongo.txt", 
                        "fileType": "text", 
                        "path": "/", 
                        "writeMode": "append"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}
View Code

mongodbreader参数解析

  address: MongoDB的数据地址信息,因为MonogDB可能是个集群,则ip端口信息需要以Json数组的形式给出。【必填】

  userName:MongoDB的用户名。【选填】

  userPassword: MongoDB的密码。【选填】

  collectionName: MonogoDB的集合名。【必填】

  column:MongoDB的文档列名。【必填】

  name:Column的名字。【必填】

  type:Column的类型。【选填】

  splitter:因为MongoDB支持数组类型,但是Datax框架本身不支持数组类型,所以mongoDB读出来的数组类型要通过这个分隔符合并成字符串。【选填】

执行
[kris@hadoop101 datax]$ bin/datax.py job/mongdb2hdfs.json

读取MongoDB的数据导入MySQL

  在MySQL中创建表 mysql> create table kris(name varchar(20),url varchar(20));

编写DataX配置文件

  [kris@hadoop101 datax]$ vim job/mongodb2mysql.json

DataX


推荐阅读
  • 本文介绍了lua语言中闭包的特性及其在模式匹配、日期处理、编译和模块化等方面的应用。lua中的闭包是严格遵循词法定界的第一类值,函数可以作为变量自由传递,也可以作为参数传递给其他函数。这些特性使得lua语言具有极大的灵活性,为程序开发带来了便利。 ... [详细]
  • 基于layUI的图片上传前预览功能的2种实现方式
    本文介绍了基于layUI的图片上传前预览功能的两种实现方式:一种是使用blob+FileReader,另一种是使用layUI自带的参数。通过选择文件后点击文件名,在页面中间弹窗内预览图片。其中,layUI自带的参数实现了图片预览功能。该功能依赖于layUI的上传模块,并使用了blob和FileReader来读取本地文件并获取图像的base64编码。点击文件名时会执行See()函数。摘要长度为169字。 ... [详细]
  • 本文介绍了使用Java实现大数乘法的分治算法,包括输入数据的处理、普通大数乘法的结果和Karatsuba大数乘法的结果。通过改变long类型可以适应不同范围的大数乘法计算。 ... [详细]
  • HDU 2372 El Dorado(DP)的最长上升子序列长度求解方法
    本文介绍了解决HDU 2372 El Dorado问题的一种动态规划方法,通过循环k的方式求解最长上升子序列的长度。具体实现过程包括初始化dp数组、读取数列、计算最长上升子序列长度等步骤。 ... [详细]
  • Mac OS 升级到11.2.2 Eclipse打不开了,报错Failed to create the Java Virtual Machine
    本文介绍了在Mac OS升级到11.2.2版本后,使用Eclipse打开时出现报错Failed to create the Java Virtual Machine的问题,并提供了解决方法。 ... [详细]
  • 在说Hibernate映射前,我们先来了解下对象关系映射ORM。ORM的实现思想就是将关系数据库中表的数据映射成对象,以对象的形式展现。这样开发人员就可以把对数据库的操作转化为对 ... [详细]
  • 后台获取视图对应的字符串
    1.帮助类后台获取视图对应的字符串publicclassViewHelper{将View输出为字符串(注:不会执行对应的ac ... [详细]
  • 《数据结构》学习笔记3——串匹配算法性能评估
    本文主要讨论串匹配算法的性能评估,包括模式匹配、字符种类数量、算法复杂度等内容。通过借助C++中的头文件和库,可以实现对串的匹配操作。其中蛮力算法的复杂度为O(m*n),通过随机取出长度为m的子串作为模式P,在文本T中进行匹配,统计平均复杂度。对于成功和失败的匹配分别进行测试,分析其平均复杂度。详情请参考相关学习资源。 ... [详细]
  • 本文介绍了通过ABAP开发往外网发邮件的需求,并提供了配置和代码整理的资料。其中包括了配置SAP邮件服务器的步骤和ABAP写发送邮件代码的过程。通过RZ10配置参数和icm/server_port_1的设定,可以实现向Sap User和外部邮件发送邮件的功能。希望对需要的开发人员有帮助。摘要长度:184字。 ... [详细]
  • 动态规划算法的基本步骤及最长递增子序列问题详解
    本文详细介绍了动态规划算法的基本步骤,包括划分阶段、选择状态、决策和状态转移方程,并以最长递增子序列问题为例进行了详细解析。动态规划算法的有效性依赖于问题本身所具有的最优子结构性质和子问题重叠性质。通过将子问题的解保存在一个表中,在以后尽可能多地利用这些子问题的解,从而提高算法的效率。 ... [详细]
  • Java验证码——kaptcha的使用配置及样式
    本文介绍了如何使用kaptcha库来实现Java验证码的配置和样式设置,包括pom.xml的依赖配置和web.xml中servlet的配置。 ... [详细]
  • 高质量SQL书写的30条建议
    本文提供了30条关于优化SQL的建议,包括避免使用select *,使用具体字段,以及使用limit 1等。这些建议是基于实际开发经验总结出来的,旨在帮助读者优化SQL查询。 ... [详细]
  • 本文介绍了指针的概念以及在函数调用时使用指针作为参数的情况。指针存放的是变量的地址,通过指针可以修改指针所指的变量的值。然而,如果想要修改指针的指向,就需要使用指针的引用。文章还通过一个简单的示例代码解释了指针的引用的使用方法,并思考了在修改指针的指向后,取指针的输出结果。 ... [详细]
  • 在project.properties添加#Projecttarget.targetandroid-19android.library.reference.1..Sliding ... [详细]
  • 猜字母游戏
    猜字母游戏猜字母游戏——设计数据结构猜字母游戏——设计程序结构猜字母游戏——实现字母生成方法猜字母游戏——实现字母检测方法猜字母游戏——实现主方法1猜字母游戏——设计数据结构1.1 ... [详细]
author-avatar
手机用户2502936521
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有