如何使用spark处理一系列hbase行?

 香港买iphone 发布于 2022-12-29 12:25

我正在尝试使用HBase作为spark的数据源.因此,第一步是从HBase表创建RDD.由于Spark使用hadoop输入格式,我可以通过创建rdd找到一种使用所有行的方法http://www.vidyasource.com/blog/Programming/Scala/Java/Data/Hadoop/Analytics/2014/01/25/lighting-a-spark-with-hbase但我们如何为范围扫描创建RDD?

欢迎所有建议.

2 个回答
  • 以下是在Spark中使用Scan的示例:

    import java.io.{DataOutputStream, ByteArrayOutputStream}
    import java.lang.String
    import org.apache.hadoop.hbase.client.Scan
    import org.apache.hadoop.hbase.HBaseConfiguration
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.hbase.client.Result
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    import org.apache.hadoop.hbase.util.Base64
    
    def convertScanToString(scan: Scan): String = {
      val out: ByteArrayOutputStream = new ByteArrayOutputStream
      val dos: DataOutputStream = new DataOutputStream(out)
      scan.write(dos)
      Base64.encodeBytes(out.toByteArray)
    }
    
    val conf = HBaseConfiguration.create()
    val scan = new Scan()
    scan.setCaching(500)
    scan.setCacheBlocks(false)
    conf.set(TableInputFormat.INPUT_TABLE, "table_name")
    conf.set(TableInputFormat.SCAN, convertScanToString(scan))
    val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
    rdd.count
    

    您需要将相关库添加到Spark类路径,并确保它们与您的Spark兼容.提示:您可以使用hbase classpath它们来查找它们.

    2022-12-29 12:26 回答
  • 你可以在下面设置 conf

     val conf = HBaseConfiguration.create()//need to set all param for habse
     conf.set(TableInputFormat.SCAN_ROW_START, "row2");
     conf.set(TableInputFormat.SCAN_ROW_STOP, "stoprowkey");
    

    这将仅为那些reocrds加载rdd

    2022-12-29 12:26 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有