源数据表 |
HDFS目录 |
对应EXT模式中的表 |
抽取模式 |
customer |
/data/ext/customer |
customer |
整体、拉取 |
product |
/data/ext/product |
product |
整体、拉取 |
sales_order |
/data/ext/sales_order |
sales_order |
基于时间戳的CDC、拉取 |
参数 |
描述 |
--check-column |
在确定应该导入哪些行时,指定被检查的列。列不能是CHAR/NCHAR/VARCHAR/VARNCHAR/LONGVARCHAR/LONGNVARCHAR数据类型。 |
--incremental |
指定Sqoop怎样确定哪些行是新行。有效值是append和lastmodified。 |
--last-value |
指定已经导入数据的被检查列的最大值。 |
#!/bin/bash说明:
# 建立Sqoop增量导入作业,以order_number作为检查列,初始的last-value是0
sqoop job --delete myjob_incremental_import
sqoop job --create myjob_incremental_import \
-- import \
--connect "jdbc:mysql://172.16.1.127:3306/source?usessl=false&user=dwtest&password=123456" \
--table sales_order \
--target-dir /data/ext/sales_order \
--compress \
--where "entry_date--incremental append \
--check-column order_number \
--last-value 0
# 全量抽取客户表
sqoop import --connect jdbc:mysql://172.16.1.127:3306/source --username dwtest --password 123456 --table customer --targe
t-dir /data/ext/customer --delete-target-dir --compress
# 全量抽取产品表
sqoop import --connect jdbc:mysql://172.16.1.127:3306/source --username dwtest --password 123456 --table product --target
-dir /data/ext/product --delete-target-dir --compress
# 首次全量抽取销售订单表
sqoop job --exec myjob_incremental_import
chmod 755 ~/init_extract.sh
源数据 |
源数据类型 |
文件名/表名 |
数据仓库中的目标表 |
客户 |
MySQL表 |
customer |
customer_dim |
产品 |
MySQL表 |
product |
product_dim |
销售订单 |
MySQL表 |
sales_order |
order_dim、sales_order_fact |
-- 分析外部表analyze ext.customer;analyze ext.product;analyze ext.sales_order;-- 将外部数据装载到原始数据表set search_path to rds;truncate table customer; truncate table product; truncate table sales_order; insert into customer select * from ext.customer; insert into product select * from ext.product;insert into sales_order select * from ext.sales_order;-- 分析rds模式的表analyze rds.customer;analyze rds.product;analyze rds.sales_order;-- 装载数据仓库数据set search_path to tds;truncate table customer_dim; truncate table product_dim; truncate table order_dim; truncate table sales_order_fact; -- 序列初始化alter sequence customer_dim_customer_sk_seq restart with 1;alter sequence product_dim_product_sk_seq restart with 1;alter sequence order_dim_order_sk_seq restart with 1;-- 装载客户维度表 insert into customer_dim (customer_number, customer_name, customer_street_address, customer_zip_code, customer_city, customer_state, version, effective_date) select t1.customer_number, t1.customer_name, t1.customer_street_address, t1.customer_zip_code, t1.customer_city, t1.customer_state, 1, '2016-03-01' from rds.customer t1 order by t1.customer_number; -- 装载产品维度表 insert into product_dim (product_code, product_name, product_category, version, effective_date)select product_code, product_name, product_category, 1, '2016-03-01' from rds.product t1 order by t1.product_code; -- 装载订单维度表 insert into order_dim (order_number,version,effective_date) select order_number, 1, order_date from rds.sales_order t1 order by t1.order_number; -- 装载销售订单事实表 insert into sales_order_fact select order_sk, customer_sk, product_sk, date_sk, e.year*100 + e.month, order_amount from rds.sales_order a, order_dim b, customer_dim c, product_dim d, date_dim e where a.order_number = b.order_number and a.customer_number = c.customer_number and a.product_code = d.product_code and date(a.order_date) = e.date; -- 分析tds模式的表analyze customer_dim;analyze product_dim;analyze order_dim;analyze sales_order_fact;说明:
#!/bin/bash# 为了可以重复执行初始装载过程,先使用hdfs用户删除销售订单外部表目录su - hdfs -c 'hdfs dfs -rm -r /data/ext/sales_order/*'# 使用sqoop用户执行初始抽取脚本su - sqoop -c '~/init_extract.sh'# 使用gpadmin用户执行初始装载脚本su - gpadmin -c 'export PGPASSWORD=123456;psql -U dwtest -d dw -h hdp3 -f ~/init_load.sql'说明:
chmod 755 ~/init_etl.sh
~/init_etl.sh
select order_number, customer_name, product_name, date, order_amount amount from sales_order_fact a, customer_dim b, product_dim c, order_dim d, date_dim e where a.customer_sk = b.customer_sk and a.product_sk = c.product_sk and a.order_sk = d.order_sk and a.order_date_sk = e.date_sk order by order_number;共装载100条销售订单数据,最后20条如图1所示。