我在HBase中有一个名为"orders"的表,它有列族'o',列为{id,fname,lname,email},行键为id.我试图只使用spark从hbase获取fname和email的值.目前,我正在做的是下面给出的
override def put(params: scala.collection.Map[String, Any]): Boolean = { var sparkConfig = new SparkConf().setAppName("Connector") var sc: SparkContext = new SparkContext(sparkConfig) var hbaseConfig = HBaseConfiguration.create() hbaseConfig.set("hbase.zookeeper.quorum", ZookeeperQourum) hbaseConfig.set("hbase.zookeeper.property.clientPort", zookeeperPort) hbaseConfig.set(TableInputFormat.INPUT_TABLE, schemdto.tableName); hbaseConfig.set(TableInputFormat.SCAN_COLUMNS, "o:fname,o:email"); var hBaseRDD = sc.newAPIHadoopRDD(hbaseConfig, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) try { hBaseRDD.map(tuple => tuple._2).map(result => result.raw()) .map(f => KeyValueToString(f)).saveAsTextFile(sink) return true; } catch { case ex: Exception => { println(ex.getMessage()) return false } } } def KeyValueToString(keyValues: Array[KeyValue]): String = { var it = keyValues.iterator var res = new StringBuilder while (it.hasNext) { res.append( Bytes.toString(it.next.getValue()) + ",") } res.substring(0, res.length-1); }
但没有返回任何内容,如果我尝试只获取一个列,如
hbaseConfig.set(TableInputFormat.SCAN_COLUMNS, "o:fname");
然后它返回列fname的所有值
所以我的问题是如何使用spark从hbase获取多个列
任何帮助将不胜感激.
根据文档,要扫描的列的列表需要以空格分隔.
hbaseConfig.set(TableInputFormat.SCAN_COLUMNS, "o:fname o:email");