热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

【大数据开发】HBase高级应用之数据迁移方案及实战(四)

第五小节数据迁移 一.数据迁移方案  数据迁移,更多的场

第五小节 数据迁移


  一.数据迁移方案


    数据迁移,更多的场景是外部的数据源如何将数据写入到HBase


    1.数据库RDBMS

      1)sqoop 

      2)kettle ETL工具

      3)其他方式

        **写程序

        **导出文件加载


    2.数据文件(log)

      1)flume:实时数据收集,将数据的数据插入到HBase

source -> channel -> sink

      

     2)MapReduce

 input file -> mapreduce  -> hbase table


      3)completebulkload

input file -> mapreduce  -> hfile -> completebulkload -> hbase table



   二.数据迁移实施


     1.通过importtsv命令,将文件直接导入到HBase

export HADOOP_HOME=/opt/modules/hadoop-2.6.0-cdh5.5.0
export HBASE_HOME=/opt/modules/hbase-1.2.0
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \
${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/lib/hbase-server-1.2.0.jar \
importtsv -Dimporttsv.columns=HBASE_ROW_KEY,info:username,info:age,info:address \  //列簇信息
stutsv \ //HBase目标表
hdfs://bigdata-pro-m01.kfk.com:9000/user/kfk/datas/stu.tsv //hdfs源文件


*必须是HDFS路径,本地路径不可以


     2. 通过-Dimporttsv.bulk.output命令,我们可以将外部的数据文件直接生成一个HFile文件,然后通过completebulkload直接加载到HBase数据表中。

log文件 -> HFfile文件 ->HBase table表中

    

    1)第一步:

export HADOOP_HOME=/opt/modules/hadoop-2.6.0-cdh5.5.0
export HBASE_HOME=/opt/modules/hbase-1.2.0
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \
${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/lib/hbase-server-1.2.0.jar \
importtsv -Dimporttsv.columns=HBASE_ROW_KEY,info:username,info:age,info:address \
-Dimporttsv.bulk.output=hdfs://bigdata-pro-m01.kfk.com:9000/user/kfk/hfoutput \  //临时HFfile文件目录
stutsv \
hdfs://bigdata-pro-m01.kfk.com:9000/user/kfk/datas/stu.tsv  


    2)第二步:

export HADOOP_HOME=/opt/modules/hadoop-2.6.0-cdh5.5.0
export HBASE_HOME=/opt/modules/hbase-1.2.0
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/lib/hbase-server-1.2.0.jar \
completebulkload \    //将临时文件加载到HBase表中
hdfs://bigdata-pro-m01.kfk.com:9000/user/kfk/hfoutput \
stutsv


    3.不同文件中数据分割符的处理

export HADOOP_HOME=/opt/modules/hadoop-2.6.0-cdh5.5.0
export HBASE_HOME=/opt/modules/hbase-1.2.0
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \
${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/lib/hbase-server-1.2.0.jar \
importtsv -Dimporttsv.columns=HBASE_ROW_KEY,info:username,info:age,info:address \
-Dimporttsv.bulk.output=hdfs://bigdata-pro-m01.kfk.com:9000/user/kfk/hfoutputcsv \
-Dimporttsv.separator=, \  //源端以逗号分隔
stutsv \
hdfs://bigdata-pro-m01.kfk.com:9000/user/kfk/datas/stu.csv




export HADOOP_HOME=/opt/modules/hadoop-2.6.0-cdh5.5.0
export HBASE_HOME=/opt/modules/hbase-1.2.0
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/lib/hbase-server-1.2.0.jar \
completebulkload \
hdfs://bigdata-pro-m01.kfk.com:9000//user/kfk/hfoutputcsv \
stutsv


    4.自定义MR程序生成 HFile文件(企业常用的方案)

    

    1)第一步:

          log文件  ->  HFfile文件 

import com.kfk.hbase.HBaseConstant;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


import java.io.IOException;


public class MRHFHBase extends Configured implements Tool {




//
//    //复杂版本,不用循环(不适用于多字段)
// public static class HFMapper extends Mapper {
//
// // rowkey username age address
// // 00001, henry, 20, city-10
//
// ImmutableBytesWritable rowkey = new ImmutableBytesWritable();
// @Override
// protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//
// String[] values = value.toString().split(",");
// rowkey.set(Bytes.toBytes(values[0]));
//
// Put put = new Put(rowkey.get());
// put.addColumn(Bytes.toBytes(HBaseConstant.HBASE_STU_CF_INFO),Bytes.toBytes("username"),Bytes.toBytes(values[1]));
// put.addColumn(Bytes.toBytes(HBaseConstant.HBASE_STU_CF_INFO),Bytes.toBytes("age"),Bytes.toBytes(values[2]));
// put.addColumn(Bytes.toBytes(HBaseConstant.HBASE_STU_CF_INFO),Bytes.toBytes("address"),Bytes.toBytes(values[3]));
//
// context.write(rowkey,put);
// }
// }




//简单版本,采用循环(适用于字段多)
public static class HFMapper extends Mapper{
// rowkey username age address
// 00001, henry, 20, city-10


ImmutableBytesWritable rowkey = new ImmutableBytesWritable();
String[] COLUMN = new String[]{"username","age","address"};


@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] values = value.toString().split(",");
rowkey.set(Bytes.toBytes(values[0]));
Put put = new Put(rowkey.get());


for(int index = 1 ; index
put.addColumn(Bytes.toBytes(HBaseConstant.HBASE_STU_CF_INFO),Bytes.toBytes(COLUMN[index]),Bytes.toBytes(values[index]));
}
context.write(rowkey,put);
}
}






//driver组装
public int run(String args[]) throws Exception{


//get configuration
Configuration cOnfiguration= this.getConf();


//create job
Job job = Job.getInstance(configuration,this.getClass().getSimpleName());
job.setJarByClass(this.getClass());


//input
Path inputpath = new Path(args[0]);
FileInputFormat.addInputPath(job,inputpath);


//map
job.setMapperClass(HFMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);


//获取链接信息
TableName tableName = TableName.valueOf(HBaseConstant.HBASE_STU);
Connection cOnnection= ConnectionFactory.createConnection();
Table table = connection.getTable(tableName);
RegionLocator regiOnLocator= connection.getRegionLocator(tableName);
HFileOutputFormat2.configureIncrementalLoad(job,table,regionLocator);


//标示
boolean isSuc = job.waitForCompletion(true);
return (isSuc) ? 0 : 1 ;


}




public static void main(String[] args) {


//demo
// args = new String[]{
// "hdfs://bigdata-pro-m01.kfk.com:9000/user/kfk/datas/stu.csv", //hdfs上源文件路径
// "hdfs://bigdata-pro-m01.kfk.com:9000/user/kfk/datas/hfcsv-output" //hdfs上HF临时文件路径
// };


Configuration cOnfiguration= HBaseConfiguration.create();


try{
Path outputFilePath = new Path(args[1]);
FileSystem fileSystem = FileSystem.get(configuration);
if(fileSystem.exists(outputFilePath)){
fileSystem.delete(outputFilePath,true);
}
int status = ToolRunner.run(configuration,new MRHFHBase(),args);
System.exit(status);
}catch (Exception e){
e.printStackTrace();
}


}
}


    2)第二步:上传Jar包并执行

export HADOOP_HOME=/opt/modules/hadoop-2.6.0-cdh5.5.0
export HBASE_HOME=/opt/modules/hbase-1.2.0
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \
${HADOOP_HOME}/bin/hadoop jar /opt/jars/student.jar hdfs://bigdata-pro-m01.kfk.com:9000/user/kfk/datas/stu.csv hdfs://bigdata-pro-m01.kfk.com:9000/user/kfk/datas/hfcsv-output1


    3)第三步:加载HBase数据

HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/lib/hbase-server-1.2.0.jar completebulkload \
hdfs://bigdata-pro-m01.kfk.com:9000/user/kfk/datas/hfcsv-output1 \
stutsv




推荐阅读
author-avatar
木木的亦尘_283
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有