作者:壹花壹浄土 | 来源:互联网 | 2023-06-12 12:23
DataX定位:是一个异构数据源之间的离线同步工具架构:采用Framework+Plugin架构构建,将数据源读取和写入抽象成为ReaderWriter插件核心模块:DataX完成
DataX
定位:是一个异构数据源之间的离线同步工具 架构:采用Framework + Plugin架构构建,将数据源读取和写入抽象成为Reader/Writer插件
核心模块:
DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。
DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
DataX Job启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。 Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。 每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0
DataX调度流程:
举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。DataX的调度决策思路是: DataXJob根据分库分表切分成了100个Task。 根据20个并发,DataX计算共需要分配4个TaskGroup。 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。
demo :
{ "job": { "setting": { "speed": { "channel": 3 }, "errorLimit": { "record": 0, "percentage": 0.02 } }, "content": [{ "reader": { "name": "mysqlreader", "parameter": { "username": "root", "password": "root", "column": ['id', 'name' ], "where":"gmt_created>='$bizdate' and gmt_created "splitPk": "id", "connection": [{ "table": [ "table" ], "jdbcUrl": [ "jdbc:mysql://127.0.0.1:3306/database" ] }] } }, "writer": { "name": "hdfswriter", "parameter": { "defaultFS": "hdfs://xxx:port", "fileType": "orc", "path": "/user/hive/warehouse/writerorc.db/orcfull", "fileName": "xxx", "column": [{ "name": "id", "type": "BIGINT" }, { "name": "name", "type": "STRING" } ], "writeMode": "append", "fieldDelimiter": "\t", "compress": "GZIP" } } }] } }