热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

使用sqoop导入postgresql数据到Hbase

前言随着业务和大数据技术的发展,越来越多的公司需要在后端架设Hbase数据库,而原有的业务则需要从各种RDBMS数据库中迁移到Hbase当中。Appach的sqoop(发音:[skup

前言

随着业务和大数据技术的发展,越来越多的公司需要在后端架设Hbase数据库,而原有的业务则需要从各种RDBMS数据库中迁移到Hbase当中。Appach的sqoop(发音:[skup])就是基于这样的需求而诞生的,本文详细记录了一个通过sqoop将数据从postgresql迁移到Hbase的例子。

前期准备和假设

要完成数据的迁移,那前期毋庸置疑,目的集群上一定是已经安装好了:

  • java
  • hadoop
  • habase
  • ZoopKeeper

在我的例子中:

  • java的版本是oracle 1.7.0.25.
  • hadoop的版本是hadoop-2.7.3,10个机器的集群,2个namenode, 8个- datanode
  • hbase的版本是hbase-1.3.0, 7个机器的集群,2个source manager(一个backup master), 6个HRegionServer
  • zoopkepper的版本是zookeeper-3.4.8,7个节点

具体的集群安装,可以参考网上的各种文章。

这里需要注意的是,其实sqoop已经不支持最新版本的hbase了,但本文的操作至少是可以做的:
Sqoop does not support the latest versions of Hbase yet. The latest of Sqoop is compatible with versions of Hbase <= 0.95.2, there is an open issue (SQOOP-2759) for this hbase-sqoop integration.

sqoop的安装

下载:

http://mirrors.cnnic.cn/apache/sqoop/1.4.6/sqoop-1.4.6.bin__hadoop-2.0.4-alpha.tar.gz

安装

将sqoop解压到“/usr/lib/sqoop”目录.

$tar -xvf sqoop-1.4.6.bin__hadoop-2.0.4-alpha.tar.gz
# mv sqoop-1.4.6.bin__hadoop-2.0.4-alpha /usr/lib/sqoop

配置

把sqoop相关的环境变量配置到 ~/.bashrc 文件:

export SQOOP_HOME=/usr/lib/sqoop 
export PATH=$PATH:$SQOOP_HOME/bin

然后source ~/.bashrc 文件.

$ source ~/.bashrc

复制创建sqoop-env.sh

mv sqoop-env-template.sh       sqoop-env.sh

修改 sqoop-env.sh 加入下面三行,根据集群情况填写

HADOOP_COMMON_HOME=/opt/hadoop-2.7.3/

HADOOP_MAPRED_HOME=/opt/hadoop-2.7.3/

HBASE_HOME=/opt/hbase-1.3.0

添加jar包

下载postgresql的jdbc驱动,并放置到sqoop的lib目录

Crl -L 'http://jdbc.postgresql.org/download/postgresql-9.2-1002.jdbc4.jar' -o postgresql-9.2-1002.jdbc4.jar
mv postgresql-9.2-1002.jdbc4.jar /usr/lib/sqoop/lib/

测试

进入sqoop的bin目录下执行命令

./sqoop-list-tables --connect jdbc:mysql://your_postgresql_address:port/your_db_name --username mysqlusername --P

然后提示输入密码,输入数据库登录密码即可。然后终端显示该数据库下的所有表名称。表示Sqoop安装成功

使用sqoop导入postgresql的数据

现在我的postgresql里面有多张表:
这里写图片描述
假设准备导入aswu_operation这张表。因为sqoop不支持最新版的hbase,因此必须手动的在Hbase里面创建一张表来存储postgresql里的aswu_operation表。这里,仍然采用aswu_operation作为表名:

hbase(main):002:0> create 'aswu_operation' 'Op_info'

然后执行sqoop的import命令。

sqoop import --connect jdbc:postgresql://10.141.47.194/aswudb --table aswu_operation --hbase-table aswu_operation --column-family Op_info --hbase-row-key id --username postgres --P

该语句的意思是从jdbc:postgresql中读入aswu_operation表到Hbase的aswu_operation表,postgresql的aswu_operation表中的所有column,都归入到Hbase的aswu_operation表的‘Op_info’column family。而Hbase的aswu_operation表的row key采用postgresql的aswu_operation表中的id字段。

相关的命令的具体解释:

   --table <table-name>          Table to read
HBase arguments:
--column-family <family> Sets the target column family for the
import
--hbase-bulkload Enables HBase bulk loading
--hbase-create-table If specified, create missing HBase tables
--hbase-row-key <col> Specifies which input column to use as the row key
--hbase-table <table> Import to <table> in HBase

在该命令执行之后,sqoop会生成mapreduce的job,会有多个task并行的从postgresql上读取数据,并往hbase上插入数据。因为配置了集群的缘故,会有多个节点尝试去访问postgresql。因此,需要在postgresql上打开这些节点的访问权限。具体方法是编辑pg_hba.conf文件,加入各个hadoop节点的ip访问权限:

# IPv4 local connections:
host all all 127.0.0.1/32 md5
host all all 10.157.69.85/31 md5
host all all 10.157.69.216/31 md5
host all all 10.157.70.158/31 md5
host all all 10.157.70.38/31 md5
host all all 10.157.65.174/31 md5
host all all 10.157.68.60/31 md5
host all all 10.157.69.216/31 md5
host all all 10.157.66.49/31 md5
host all all 10.157.67.63/31 md5

完成之后,会看到如下输出:
这里写图片描述

打开hbase,查询数据是否已经插入:

hbase(main):002:0> get 'aswu_operation', '1'
COLUMN CELL
Op_info:end_time timestamp=1488176106248, value=2017-02-27 13:14:20.818
Op_info:failed_steps timestamp=1488176106248, value=0
Op_info:last_updated timestamp=1488176106248, value=2017-02-27 13:14:20.818
Op_info:progress timestamp=1488176106248, value=0
Op_info:restart_possible timestamp=1488176106248, value=false
Op_info:start_time timestamp=1488176106248, value=2017-02-27 13:14:00.193
Op_info:status timestamp=1488176106248, value=SUCCEED
Op_info:success_steps timestamp=1488176106248, value=1
Op_info:target timestamp=1488176106248, value={"externalFmReportId":1}
Op_info:target_name timestamp=1488176106248, value=
Op_info:total_steps timestamp=1488176106248, value=1
Op_info:type timestamp=1488176106248, value=FM_REPORT
Op_info:version timestamp=1488176106248, value=1
operation:end_time timestamp=1488183791964, value=2017-02-27 13:14:20.818
operation:failed_steps timestamp=1488183791964, value=0
operation:last_updated timestamp=1488183791964, value=2017-02-27 13:14:20.818
operation:progress timestamp=1488183791964, value=0
operation:restart_possible timestamp=1488183791964, value=false
operation:start_time timestamp=1488183791964, value=2017-02-27 13:14:00.193
operation:status timestamp=1488183791964, value=SUCCEED
operation:success_steps timestamp=1488183791964, value=1
operation:target timestamp=1488183791964, value={"externalFmReportId":1}
operation:target_name timestamp=1488183791964, value=
operation:total_steps timestamp=1488183791964, value=1
operation:type timestamp=1488183791964, value=FM_REPORT
operation:version timestamp=1488183791964, value=1
1 row(s) in 0.3200 seconds

可以看到对应的数据已经插入到Hbase里面。

修改Row Key

从上面已经完成的步骤,我们可以看到数据表已经被迁移到了Hbase当中,但我们知道在Hbase当中,row-key的设计非常重要。不可能简单的使用RDBMS中的主键(一般是big interger的id)作为row-key。row-key需要根据业务场景的需求,能够方便通过排序,筛选等操作得到我们需要读取或操作的数据集。因此,在导入的过程中,我们需要对Row key进行一定的变形或修饰。具体做法如下:

自定义PutTransformer类

sqoop在往hbase插入数据时,会调用类PutTransformer来生成插入hbase需要的Put类,该Put类定义了如何生成rowkey, cloumn name (column family name + column name)和column value。通过修改Put对象的行为,我们可以轻松的修改row key, column name和column value。
因此我们需要自定义PutTransformer的行为,并通过命令行参数,告诉sqoop使用我们自定义的PutTransformer类。一下是一个简单的定义。首先我们需要先extend PutTransformer。再对getPutCommand方法进行override。
具体这个类的行为,大部分代码可以参考sqoop本身自带的ToStringPutTransformer类。
这里的关键是这句:

 Put put = new Put( Bytes.toBytes( rowKey.toString() + ":operation" ) );

为rowkey加上了一个“operation”的后缀。
具体代码如下:

package com.sqoop.example;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.sqoop.hbase.PutTransformer;

/**
* @author Lex Li
* @date 27/02/2017
*/

public class AswuOperationTransFormat
extends PutTransformer
{

private Mapbyte[]> serializedFieldNames;

public AswuOperationTransFormat()
{
serializedFieldNames = new TreeMapbyte[]>();
}

/**
* Return the serialized bytes for a field name, using the cache if it's already in there.
*/

private byte[] getFieldNameBytes( String fieldName )
{
byte[] cachedName = serializedFieldNames.get( fieldName );
if( null != cachedName )
{
// Cache hit. We're done.
return cachedName;
}
// Do the serialization and memoize the result.
byte[] nameBytes = Bytes.toBytes( fieldName );
serializedFieldNames.put( fieldName, nameBytes );
return nameBytes;
}

@Override
public List getPutCommand( Map map ) throws IOException
{
String rowKeyCol = getRowKeyColumn();
String colFamily = getColumnFamily();
byte[] colFamilyBytes = Bytes.toBytes( colFamily );
Object rowKey = map.get( rowKeyCol );
if( null == rowKey )
{
// If the row-key column is null, we don't insert this row.
return null;
}
Put put = new Put( Bytes.toBytes( rowKey.toString() + ":operation" ) );
for( Map.Entry fieldEntry : map.entrySet() )
{
String colName = fieldEntry.getKey();
if( !colName.equals( rowKeyCol ) )
{
// This is a regular field, not the row key.
// Add it if it's not null.
Object val = fieldEntry.getValue();
if( null != val )
{
put.add( colFamilyBytes, getFieldNameBytes( colName ), Bytes.toBytes( val.toString() ) );
}
}
}
return Collections.singletonList( put );
}
}

编译自定义的PutTransformer类,并重新打包

  • 新建一个工程,加入如下依赖


    org.apache.sqoop
    sqoop
    1.4.6

  • 编译刚才的AswuOperationTransFormat类。
  • 解压sqoop-1.4.6.jar到sqoop-1.4.6目录
  • 将编译出来的AswuOperationTransFormat.class文件(连同目录结构。com.sqoop.example),一并加入到 sqoop-1.4.6目录中。
  • jar -cvf sqoop-1.4.6.jar sqoop-1.4.6/重新生成sqoop-1.4.6.jar文件。
  • 将sqoop-1.4.6.jar替换/usr/lib/sqoop下的sqoop-1.4.6.jar文件

指定需要运行的PutTransformer类

在sqoop import的命令行中加入:

 -D sqoop.hbase.insert.put.transformer.class=com.sqoop.example.AswuOperationTransFormat

具体的就是:

 sqoop import -D sqoop.hbase.insert.put.transformer.class=com.sqoop.example.AswuOperationTransFormat --connect jdbc:postgresql://10.141.47.194/aswudb --table aswu_operation --hbase-table aswu_operation --column-family Op_info --hbase-row-key id --username postgres --P

完成后,进入Hbase查看:

 10005:operation                      column=Op_info:end_time, timestamp=1488266002107, value=2017-02-28 12:47:08.509
10005:operation column=Op_info:error_code, timestamp=1488266002107, value=ASWUR01
10005:operation column=Op_info:error_description, timestamp=1488266002107, value=Unexpected error appears. Try to fail main o
peration.
10005:operation column=Op_info:failed_steps, timestamp=1488266002107, value=1
10005:operation column=Op_info:last_updated, timestamp=1488266002107, value=2017-02-28 12:47:08.509
10005:operation column=Op_info:progress, timestamp=1488266002107, value=0
10005:operation column=Op_info:restart_possible, timestamp=1488266002107, value=true
10005:operation column=Op_info:software_package_title, timestamp=1488266002107, value=4G
10005:operation column=Op_info:start_time, timestamp=1488266002107, value=2017-02-28 12:47:08.364
10005:operation column=Op_info:status, timestamp=1488266002107, value=FAILED
10005:operation column=Op_info:success_steps, timestamp=1488266002107, value=0
10005:operation column=Op_info:target, timestamp=1488266002107, value=CLUSTER-1269/PLMN-PLMN/MRBTS-567
10005:operation column=Op_info:target_name, timestamp=1488266002107, value=
10005:operation column=Op_info:total_steps, timestamp=1488266002107, value=2
10005:operation column=Op_info:type, timestamp=1488266002107, value=SW_UPLOAD
10005:operation column=Op_info:version, timestamp=1488266002107, value=1
10 row(s) in 0.4560 seconds

所有的row key加上了”:operation”的后缀


推荐阅读
  • 前言本文隶属于专栏《1000个问题搞定大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出, ... [详细]
  • 本文_大数据之非常详细Sqoop安装和基本操作
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了大数据之非常详细Sqoop安装和基本操作相关的知识,希望对你有一定的参考价值。大数据大数据之 ... [详细]
  • 阅读目录一、Hadoop简介二、Hadoop的特性三、hadoop组成与体系结构四、Hadoop安装方式五、Hadoop集群中的节点类型一、Hadoop简介Hadoop是Apac ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • MySQL中的MVVC多版本并发控制机制的应用及实现
    本文介绍了MySQL中MVCC的应用及实现机制。MVCC是一种提高并发性能的技术,通过对事务内读取的内存进行处理,避免写操作堵塞读操作的并发问题。与其他数据库系统的MVCC实现机制不尽相同,MySQL的MVCC是在undolog中实现的。通过undolog可以找回数据的历史版本,提供给用户读取或在回滚时覆盖数据页上的数据。MySQL的大多数事务型存储引擎都实现了MVCC,但各自的实现机制有所不同。 ... [详细]
  • Java 11相对于Java 8,OptaPlanner性能提升有多大?
    本文通过基准测试比较了Java 11和Java 8对OptaPlanner的性能提升。测试结果表明,在相同的硬件环境下,Java 11相对于Java 8在垃圾回收方面表现更好,从而提升了OptaPlanner的性能。 ... [详细]
  • MySQL数据库锁机制及其应用(数据库锁的概念)
    本文介绍了MySQL数据库锁机制及其应用。数据库锁是计算机协调多个进程或线程并发访问某一资源的机制,在数据库中,数据是一种供许多用户共享的资源,如何保证数据并发访问的一致性和有效性是数据库必须解决的问题。MySQL的锁机制相对简单,不同的存储引擎支持不同的锁机制,主要包括表级锁、行级锁和页面锁。本文详细介绍了MySQL表级锁的锁模式和特点,以及行级锁和页面锁的特点和应用场景。同时还讨论了锁冲突对数据库并发访问性能的影响。 ... [详细]
  • 我们在之前的文章中已经初步介绍了Cloudera。hadoop基础----hadoop实战(零)-----hadoop的平台版本选择从版本选择这篇文章中我们了解到除了hadoop官方版本外很多 ... [详细]
  • #python没有类似于java和C#的接口类(interface),需要使用抽象类和抽象方法来实现接口功能#!usrbinenvpython#_*_coding ... [详细]
  • Hadoop——Hive简介和环境配置
    一、Hive的简介和配置1.简介Hive是构建在Hadoop之上的数据操作平台lHive是一个SQL解析引擎,它将SQL转译成MapReduce作业,并 ... [详细]
  • 本文整理了Java中org.apache.hadoop.hbase.client.TableDescriptorBuilder.removeColumnFamily() ... [详细]
  • 【数据结构与算法】——快速排序
    Sqoop是一款开源的工具,主要用于在Hadoop(Hive)与传统的数据库(mysql、postgresql)间进行数据的传递,可以将一个关系型数据库(例如:MySQL,O ... [详细]
  • 大数据开发笔记(一):HDFS介绍
    ✨大数据开发笔记推荐:大数据开发面试知识点总结_GoAI的博客-CSDN博客_大数据开发面试​本文详细介绍大数据hadoop生态圈各部分知识,包括不限 ... [详细]
  • 怎么快速学好大数据开发?
    新如何学习大数据技术?大数据怎么入门?怎么做大数据分析?数据科学需要学习那些技术?大数据的应用前景等等问题,已成为热门大数据领域热门问题,以下是对新手如何学习大数据技术问题的解答! ... [详细]
  • hadoop常用操作命令https:www.cnblogs.comcerofangp10460494.htmlday3_day6https:www.cnblogs.comcerof ... [详细]
author-avatar
美好时光33_862
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有