我想在Spark RDD中选择一系列元素.例如,我有一个带有一百个元素的RDD,我需要选择60到80之间的元素.我该怎么做?
我看到RDD有一个take(i:int)方法,它返回第一个i元素.但是没有相应的方法来获取最后的i元素,或者从某个索引开始的中间元素.
我认为还没有一种有效的方法可以做到这一点.但是简单的方法是使用filter()
,假设你有一个RDD,pairs
具有键值对,你只需要60到80之间的元素.
val 60to80 = pairs.filter {
_ match {
case (k,v) => k >= 60 && k <= 80
case _ => false //incase of invalid input
}
}
我认为通过使用sortByKey
和保存有关映射到每个分区的值范围的信息,将来可能会更有效地完成此操作.请记住,如果您计划多次查询范围,这种方法只会保存任何内容,因为排序显然很昂贵.
通过查看火花源,绝对可以使用RangePartitioner
以下方法进行有效的范围查询:
// An array of upper bounds for the first (partitions - 1) partitions
private val rangeBounds: Array[K] = {
这是一个私有成员,RangePartitioner
知道分区的所有上限,只需查询必要的分区.看起来这是火花用户未来可能会看到的东西:SPARK-911
更新:基于我正在为SPARK-911写的拉取请求的方式更好的答案.如果RDD已排序并且您多次查询它,它将高效运行.
val sorted = sc.parallelize((1 to 100).map(x => (x, x))).sortByKey().cache()
val p: RangePartitioner[Int, Int] = sorted.partitioner.get.asInstanceOf[RangePartitioner[Int, Int]];
val (lower, upper) = (10, 20)
val range = p.getPartition(lower) to p.getPartition(upper)
println(range)
val rangeFilter = (i: Int, iter: Iterator[(Int, Int)]) => {
if (range.contains(i))
for ((k, v) <- iter if k >= lower && k <= upper) yield (k, v)
else
Iterator.empty
}
for((k,v) <- sorted.mapPartitionsWithIndex(rangeFilter, preservesPartitioning = true).collect()) println(s"$k, $v")
如果在内存中拥有整个分区是可以接受的,你甚至可以做这样的事情.
val glommedAndCached = sorted.glom()cache();
glommedAndCached.map(a => a.slice(a.search(lower),a.search(upper)+1)).collect()
search
不是BTW的成员我刚刚创建了一个具有二进制搜索功能的隐式类,这里没有显示
您的数据集有多大?您可以通过以下方式完成所需操作:
data.take(80).drop(59)
这似乎效率低下,但对于中小型数据,应该可行.
是否有可能以另一种方式解决这个问题?从数据中间选择一个特定范围的情况是什么?会takeSample
更好地为你服务吗?
以下应该能够获得范围.请注意,缓存将为您节省一些开销,因为内部zipWithIndex需要扫描RDD分区以获取每个分区中的元素数量.
scala>val r1 = sc.parallelize(List("a", "b", "c", "d", "e", "f", "g"), 3).cache scala>val r2 = r1.zipWithIndex scala>val r3 = r2.filter(x=> {x._2>2 && x._2 < 4}).map(x=>x._1) scala>r3.foreach(println) d