我正在尝试使用HBase作为spark的数据源.因此,第一步是从HBase表创建RDD.由于Spark使用hadoop输入格式,我可以通过创建rdd找到一种使用所有行的方法http://www.vidyasource.com/blog/Programming/Scala/Java/Data/Hadoop/Analytics/2014/01/25/lighting-a-spark-with-hbase但我们如何为范围扫描创建RDD?
欢迎所有建议.
以下是在Spark中使用Scan的示例:
import java.io.{DataOutputStream, ByteArrayOutputStream} import java.lang.String import org.apache.hadoop.hbase.client.Scan import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.util.Base64 def convertScanToString(scan: Scan): String = { val out: ByteArrayOutputStream = new ByteArrayOutputStream val dos: DataOutputStream = new DataOutputStream(out) scan.write(dos) Base64.encodeBytes(out.toByteArray) } val conf = HBaseConfiguration.create() val scan = new Scan() scan.setCaching(500) scan.setCacheBlocks(false) conf.set(TableInputFormat.INPUT_TABLE, "table_name") conf.set(TableInputFormat.SCAN, convertScanToString(scan)) val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]) rdd.count
您需要将相关库添加到Spark类路径,并确保它们与您的Spark兼容.提示:您可以使用hbase classpath
它们来查找它们.
你可以在下面设置 conf
val conf = HBaseConfiguration.create()//need to set all param for habse conf.set(TableInputFormat.SCAN_ROW_START, "row2"); conf.set(TableInputFormat.SCAN_ROW_STOP, "stoprowkey");
这将仅为那些reocrds加载rdd