热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

SparkBulkload(Java)

1、使用Spark通过Bulkload的方式导数据到Hbase在未用Bulkload写Hbase时,使用RDD进行封装为Tuple2

1、使用Spark通过Bulkload的方式导数据到Hbase


在未用Bulkload写Hbase时,使用RDD进行封装为Tuple2的KVRDD,然后通过saveAsNewAPIHadoopDataset写Hbase,非常慢,400G的数据大概写了2H+还没写完,后面没有办法就考虑使用Bulkload来导入数据。
在测试之前网上很多资料都是Scala版本的,并且实现都是单个列来操作,实际生产中会存在多个列族和列的情况,并且这里面有很多坑。
先上代码:

public class HbaseSparkUtils {private static Configuration hbaseConf;static {hbaseConf &#61; HBaseConfiguration.create();hbaseConf.set(ConfigUtils.getHbaseZK()._1(), ConfigUtils.getHbaseZK()._2());hbaseConf.set(ConfigUtils.getHbaseZKPort()._1(), ConfigUtils.getHbaseZKPort()._2());}public static void saveHDFSHbaseHFile(SparkSession spark, // spark session Dataset ds, // 数据集String table_name, //hbase表名Integer rowKeyIndex, //rowkey的索引idString fields) throws Exception { // 数据集的字段列表hbaseConf.setInt("hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily", 1024);hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, table_name);Job job &#61; Job.getInstance();job.setMapOutputKeyClass(ImmutableBytesWritable.class);job.setMapOutputValueClass(KeyValue.class);job.setOutputFormatClass(HFileOutputFormat2.class);Connection conn &#61; ConnectionFactory.createConnection(hbaseConf);TableName tableName &#61; TableName.valueOf(table_name);HRegionLocator regionLocator &#61; new HRegionLocator(tableName, (ClusterConnection) conn);Table realTable &#61; ((ClusterConnection) conn).getTable(tableName);HFileOutputFormat2.configureIncrementalLoad(job, realTable, regionLocator);JavaRDD javaRDD &#61; ds.toJavaRDD();JavaPairRDD javaPairRDD &#61;javaRDD.mapToPair(new PairFunction>>() {&#64;Overridepublic Tuple2>> call(Row row) throws Exception {List> tps &#61; new ArrayList<>();String rowkey &#61; row.getString(rowKeyIndex);ImmutableBytesWritable writable &#61; new ImmutableBytesWritable(Bytes.toBytes(rowkey));// sort columns。这里需要对列进行排序&#xff0c;不然会报错ArrayList> tuple2s &#61; new ArrayList<>();String[] columns &#61; fields.split(",");for (int i &#61; 0; i (i, columns[i]));}for (Tuple2 t : tuple2s) {String[] fieldNames &#61; row.schema().fieldNames();// 不将作为rowkey的字段存在列里面if (t._2().equals(fieldNames[rowKeyIndex])) {System.out.println(String.format("%s &#61;&#61; %s continue", t._2(), fieldNames[rowKeyIndex]));continue;}if ("main".equals(t._2())) {continue;}String value &#61; getRowValue(row, t._1(), tuple2s.size());KeyValue kv &#61; new KeyValue(Bytes.toBytes(rowkey),Bytes.toBytes(ConfigUtils.getFamilyInfo()._2()),Bytes.toBytes(t._2()), Bytes.toBytes(value));tps.add(new Tuple2<>(writable, kv));}for (Tuple2 t : tuple2s) {String value &#61; getRowValue(row, t._1(), tuple2s.size());if ("main".equals(t._2())) { // filed &#61;&#61; &#39;main&#39;KeyValue kv &#61; new KeyValue(Bytes.toBytes(rowkey),Bytes.toBytes(ConfigUtils.getFamilyMain()._2()),Bytes.toBytes(t._2()), Bytes.toBytes(value));tps.add(new Tuple2<>(writable, kv));break;}}return new Tuple2<>(writable, tps);}// 这里一定要按照rowkey进行排序&#xff0c;这个效率很低&#xff0c;目前没有找到优化的替代方案}).sortByKey().flatMapToPair(new PairFlatMapFunction>>,ImmutableBytesWritable, KeyValue>() {&#64;Overridepublic Iterator> call(Tuple2>> tuple2s) throws Exception {return tuple2s._2().iterator();}});// 创建HDFS的临时HFile文件目录String temp &#61; "/tmp/bulkload/"&#43;table_name&#43;"_"&#43;System.currentTimeMillis();javaPairRDD.saveAsNewAPIHadoopFile(temp, ImmutableBytesWritable.class,KeyValue.class, HFileOutputFormat2.class, job.getConfiguration());LoadIncrementalHFiles loader &#61; new LoadIncrementalHFiles(hbaseConf);Admin admin &#61; conn.getAdmin();loader.doBulkLoad(new Path(temp), admin, realTable, regionLocator);}
}

2、下面是一些遇到的异常问题

1、Can not create a Path from a null string

源码分析&#xff1a;


需要添加下面属性&#xff1a;
job.getConfiguration().set("mapred.output.dir","/user/wangwei/tmp/"&#43;tableName);
job.getConfiguration().set("mapreduce.output.fileoutputformat.outputdir", "/tmp/"&#43;tableName); // 推荐的参数

2、 Bulk load operation did not find any files to load in directory /tmp/wwtest. Does it contain files in subdirectories that correspond to column family names?
17/10/11 15:54:09 WARN LoadIncrementalHFiles: Skipping non-directory file:/tmp/wwtest/_SUCCESS
17/10/11 15:54:09 WARN LoadIncrementalHFiles: Bulk load operation did not find any files to load in directory /tmp/wwtest. Does it contain files in subdirectories that correspond to column family names?
1、查看输入数据是否为空
2、setMapOutputKeyClass 和 saveAsNewAPIHadoopFile 中class是否一致
3、代码BUG


3、 Added a key not lexically larger than previous
java.io.IOException: Added a key not lexically larger than previous key&#61;\x00\x02Mi\x0BsearchIndexuserId\x00\x00\x01>\xD5\xD6\xF3\xA3\x04, lastkey&#61;\x00\x01w\x0BsearchIndexuserId\x00\x00\x01>\xD5\xD6\xF3\xA3\x04
最主要原因&#xff0c;在制作HFile文件的时候&#xff0c;一定要主键排序。Put进去会自动排序。但自己做成HFile文件不会自动排序。
所有一定要排序好&#xff0c;从
主键
列族
都要手动排序好。然后生成HFile文件。不然只会报错。

4、Caused by: java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put cannot be cast to org.apache.hadoop.hbase.Cell&#xff08;使用Put作为MapOutputKey出现&#xff0c;使用KeyValue不存在问题&#xff09;

没解决&#xff0c;使用KeyValue 放到一个List里面&#xff0c;然后FlatMap一下

5、java.io.IOException: Trying to load more than 32 hfiles to one family of one region

hbaseConf.setInt("hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily", 1024);




推荐阅读
author-avatar
艳斐儿M
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有