下面的代码将从hbase读取,然后将其转换为json结构并转换为schemaRDD,但问题是我using List
要存储json字符串然后传递给javaRDD,对于大约100 GB的数据,master将被加载内存中的数据.从hbase加载数据然后执行操作然后转换为JavaRDD的正确方法是什么.
package hbase_reader;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import scala.Function1;
import scala.Tuple2;
import scala.runtime.AbstractFunction1;
import com.google.common.collect.Lists;
public class hbase_reader {
public static void main(String[] args) throws IOException, ParseException {
List jars = Lists.newArrayList("");
SparkConf spconf = new SparkConf();
spconf.setMaster("local[2]");
spconf.setAppName("HBase");
//spconf.setSparkHome("/opt/human/opt/spark-0.9.0-hdp1");
spconf.setJars(jars.toArray(new String[jars.size()]));
JavaSparkContext sc = new JavaSparkContext(spconf);
//spconf.set("spark.executor.memory", "1g");
JavaSQLContext jsql = new JavaSQLContext(sc);
HBaseConfiguration conf = new HBaseConfiguration();
String tableName = "HBase.CounData1_Raw_Min1";
HTable table = new HTable(conf,tableName);
try {
ResultScanner scanner = table.getScanner(new Scan());
List jsonList = new ArrayList();
String json = null;
for(Result rowResult:scanner) {
json = "";
String rowKey = Bytes.toString(rowResult.getRow());
for(byte[] s1:rowResult.getMap().keySet()) {
String s1_str = Bytes.toString(s1);
String jsonSame = "";
for(byte[] s2:rowResult.getMap().get(s1).keySet()) {
String s2_str = Bytes.toString(s2);
for(long s3:rowResult.getMap().get(s1).get(s2).keySet()) {
String s3_str = new String(rowResult.getMap().get(s1).get(s2).get(s3));
jsonSame += "\""+s2_str+"\":"+s3_str+",";
}
}
jsonSame = jsonSame.substring(0,jsonSame.length()-1);
json += "\""+s1_str+"\""+":{"+jsonSame+"}"+",";
}
json = json.substring(0,json.length()-1);
json = "{\"RowKey\":\""+rowKey+"\","+json+"}";
jsonList.add(json);
}
JavaRDD jsonRDD = sc.parallelize(jsonList);
JavaSchemaRDD schemaRDD = jsql.jsonRDD(jsonRDD);
System.out.println(schemaRDD.take(2));
} finally {
table.close();
}
}
}
Murtaza Kanc.. 50
使用Spark(Scala)读取HBase数据的基本示例,您还可以在Java中使用它:
import org.apache.hadoop.hbase.client.{HBaseAdmin, Result} import org.apache.hadoop.hbase.{ HBaseConfiguration, HTableDescriptor } import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.spark._ object HBaseRead { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("HBaseRead").setMaster("local[2]") val sc = new SparkContext(sparkConf) val conf = HBaseConfiguration.create() val tableName = "table1" System.setProperty("user.name", "hdfs") System.setProperty("HADOOP_USER_NAME", "hdfs") conf.set("hbase.master", "localhost:60000") conf.setInt("timeout", 120000) conf.set("hbase.zookeeper.quorum", "localhost") conf.set("zookeeper.znode.parent", "/hbase-unsecure") conf.set(TableInputFormat.INPUT_TABLE, tableName) val admin = new HBaseAdmin(conf) if (!admin.isTableAvailable(tableName)) { val tableDesc = new HTableDescriptor(tableName) admin.createTable(tableDesc) } val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]) println("Number of Records found : " + hBaseRDD.count()) sc.stop() } }
从Spark 1.0.x +开始,现在您也可以使用Spark-HBase Connector:
Maven依赖包括:
it.nerdammer.bigdata spark-hbase-connector_2.10 1.0.3 // Version can be changed as per your Spark version, I am using Spark 1.6.x
并找到相同的以下示例代码:
import org.apache.spark._ import it.nerdammer.spark.hbase._ object HBaseRead extends App { val sparkConf = new SparkConf().setAppName("Spark-HBase").setMaster("local[4]") sparkConf.set("spark.hbase.host", "") //e.g. 192.168.1.1 or localhost or your hostanme val sc = new SparkContext(sparkConf) // For Example If you have an HBase Table as 'Document' with ColumnFamily 'SMPL' and qualifier as 'DocID, Title' then: val docRdd = sc.hbaseTable[(Option[String], Option[String])]("Document") .select("DocID", "Title").inColumnFamily("SMPL") println("Number of Records found : " + docRdd .count()) }
从Spark 1.6.x +开始,现在您也可以使用SHC Connector(Hortonworks或HDP用户):
Maven依赖包括:
com.hortonworks shc 1.0.0-2.0-s_2.11 // Version depends on the Spark version and is supported upto Spark 2.x
使用此连接器的主要优点是它在Schema定义中具有灵活性,并且不需要像nerdammer/spark-hbase-connector那样的Hardcoded params.还要记住它支持Spark 2.x,因此这个连接器非常灵活,可以在问题和PR中提供端到端的支持.
找到以下存储库路径以获取最新的自述文件和示例:
Hortonworks Spark HBase连接器
您还可以将此RDD转换为DataFrame并在其上运行SQL,或者您可以将这些Dataset或DataFrame映射到用户定义的Java Pojo或Case类.它很棒.
如果您还有其他需要,请在下面评论.
使用Spark(Scala)读取HBase数据的基本示例,您还可以在Java中使用它:
import org.apache.hadoop.hbase.client.{HBaseAdmin, Result} import org.apache.hadoop.hbase.{ HBaseConfiguration, HTableDescriptor } import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.spark._ object HBaseRead { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("HBaseRead").setMaster("local[2]") val sc = new SparkContext(sparkConf) val conf = HBaseConfiguration.create() val tableName = "table1" System.setProperty("user.name", "hdfs") System.setProperty("HADOOP_USER_NAME", "hdfs") conf.set("hbase.master", "localhost:60000") conf.setInt("timeout", 120000) conf.set("hbase.zookeeper.quorum", "localhost") conf.set("zookeeper.znode.parent", "/hbase-unsecure") conf.set(TableInputFormat.INPUT_TABLE, tableName) val admin = new HBaseAdmin(conf) if (!admin.isTableAvailable(tableName)) { val tableDesc = new HTableDescriptor(tableName) admin.createTable(tableDesc) } val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]) println("Number of Records found : " + hBaseRDD.count()) sc.stop() } }
从Spark 1.0.x +开始,现在您也可以使用Spark-HBase Connector:
Maven依赖包括:
<dependency> <groupId>it.nerdammer.bigdata</groupId> <artifactId>spark-hbase-connector_2.10</artifactId> <version>1.0.3</version> // Version can be changed as per your Spark version, I am using Spark 1.6.x </dependency>
并找到相同的以下示例代码:
import org.apache.spark._ import it.nerdammer.spark.hbase._ object HBaseRead extends App { val sparkConf = new SparkConf().setAppName("Spark-HBase").setMaster("local[4]") sparkConf.set("spark.hbase.host", "<YourHostnameOnly>") //e.g. 192.168.1.1 or localhost or your hostanme val sc = new SparkContext(sparkConf) // For Example If you have an HBase Table as 'Document' with ColumnFamily 'SMPL' and qualifier as 'DocID, Title' then: val docRdd = sc.hbaseTable[(Option[String], Option[String])]("Document") .select("DocID", "Title").inColumnFamily("SMPL") println("Number of Records found : " + docRdd .count()) }
从Spark 1.6.x +开始,现在您也可以使用SHC Connector(Hortonworks或HDP用户):
Maven依赖包括:
<dependency> <groupId>com.hortonworks</groupId> <artifactId>shc</artifactId> <version>1.0.0-2.0-s_2.11</version> // Version depends on the Spark version and is supported upto Spark 2.x </dependency>
使用此连接器的主要优点是它在Schema定义中具有灵活性,并且不需要像nerdammer/spark-hbase-connector那样的Hardcoded params.还要记住它支持Spark 2.x,因此这个连接器非常灵活,可以在问题和PR中提供端到端的支持.
找到以下存储库路径以获取最新的自述文件和示例:
Hortonworks Spark HBase连接器
您还可以将此RDD转换为DataFrame并在其上运行SQL,或者您可以将这些Dataset或DataFrame映射到用户定义的Java Pojo或Case类.它很棒.
如果您还有其他需要,请在下面评论.
只是添加关于如何添加扫描的注释:
TableInputFormat具有以下属性:
SCAN_ROW_START
SCAN_ROW_STOP
conf.set(TableInputFormat.SCAN_ROW_START, "startrowkey"); conf.set(TableInputFormat.SCAN_ROW_STOP, "stoprowkey");
由于这个问题不是新问题,现在还有其他一些替代方案:
hbase-spark,一个直接在HBase 仓库中提供的模块
Hortonworks的Spark-on-HBase
我对第一个项目了解不多,但看起来它不支持Spark 2.x. 但是,它在Spark 1.6.x的RDD级别上提供了丰富的支持.
另一方面,Spark-on-HBase拥有Spark 2.0和即将推出的Spark 2.1的分支.该项目非常有前途,因为它专注于数据集/数据框架API.在幕后,它实现了标准的Spark Datasource API,并利用Spark Catalyst引擎进行查询优化.开发人员在此声称它能够进行分区修剪,列修剪,谓词下推和实现数据局部性.
下面将介绍一个使用com.hortonworks:shc:1.0.0-2.0-s_2.11
此repo和Spark 2.0.2中的工件的简单示例:
case class Record(col0: Int, col1: Int, col2: Boolean) val spark = SparkSession .builder() .appName("Spark HBase Example") .master("local[4]") .getOrCreate() def catalog = s"""{ |"table":{"namespace":"default", "name":"table1"}, |"rowkey":"key", |"columns":{ |"col0":{"cf":"rowkey", "col":"key", "type":"int"}, |"col1":{"cf":"cf1", "col":"col1", "type":"int"}, |"col2":{"cf":"cf2", "col":"col2", "type":"boolean"} |} |}""".stripMargin val artificialData = (0 to 100).map(number => Record(number, number, number % 2 == 0)) // write spark .createDataFrame(artificialData) .write .option(HBaseTableCatalog.tableCatalog, catalog) .option(HBaseTableCatalog.newTable, "5") .format("org.apache.spark.sql.execution.datasources.hbase") .save() // read val df = spark .read .option(HBaseTableCatalog.tableCatalog, catalog) .format("org.apache.spark.sql.execution.datasources.hbase") .load() df.count()
我更喜欢从hbase读取并在spark中执行json操作.
Spark提供了JavaSparkContext.newAPIHadoopRDD函数来从hadoop存储中读取数据,包括HBase.您必须在配置参数和表输入格式中提供HBase配置,表名和扫描,并且它的键值
您可以使用表输入格式类及其作业参数来提供表名和扫描配置
例:
conf.set(TableInputFormat.INPUT_TABLE, "tablename"); JavaPairRDD<ImmutableBytesWritable, Result> data = jsc.newAPIHadoopRDD(conf, TableInputFormat.class,ImmutableBytesWritable.class, Result.class);
然后你可以在spark中进行json操作.由于spark可以在内存已满时重新计算,它只会加载重新计算部分所需的数据(cmiiw),因此您不必担心数据大小