如何在Spark RDD中选择一系列元素?

 手机用户2702933712 发布于 2023-01-04 10:52

我想在Spark RDD中选择一系列元素.例如,我有一个带有一百个元素的RDD,我需要选择60到80之间的元素.我该怎么做?

我看到RDD有一个take(i:int)方法,它返回第一个i元素.但是没有相应的方法来获取最后的i元素,或者从某个索引开始的中间元素.

3 个回答
  • 我认为还没有一种有效的方法可以做到这一点.但是简单的方法是使用filter(),假设你有一个RDD,pairs具有键值对,你只需要60到80之间的元素.

    val 60to80 = pairs.filter {
        _ match {
            case (k,v) => k >= 60 && k <= 80
            case _ => false //incase of invalid input
        }
    }
    

    我认为通过使用sortByKey和保存有关映射到每个分区的值范围的信息,将来可能会更有效地完成此操作.请记住,如果您计划多次查询范围,这种方法只会保存任何内容,因为排序显然很昂贵.

    通过查看火花源,绝对可以使用RangePartitioner以下方法进行有效的范围查询:

    // An array of upper bounds for the first (partitions - 1) partitions
      private val rangeBounds: Array[K] = {
    

    这是一个私有成员,RangePartitioner知道分区的所有上限,只需查询必要的分区.看起来这是火花用户未来可能会看到的东西:SPARK-911

    更新:基于我正在为SPARK-911写的拉取请求的方式更好的答案.如果RDD已排序并且您多次查询它,它将高效运行.

    val sorted = sc.parallelize((1 to 100).map(x => (x, x))).sortByKey().cache()
    val p: RangePartitioner[Int, Int] = sorted.partitioner.get.asInstanceOf[RangePartitioner[Int, Int]];
    val (lower, upper) = (10, 20)
    val range = p.getPartition(lower) to p.getPartition(upper)
    println(range)
    val rangeFilter = (i: Int, iter: Iterator[(Int, Int)]) => {
      if (range.contains(i))
        for ((k, v) <- iter if k >= lower && k <= upper) yield (k, v)
      else
        Iterator.empty
    }
    for((k,v) <- sorted.mapPartitionsWithIndex(rangeFilter, preservesPartitioning = true).collect()) println(s"$k, $v")
    

    如果在内存中拥有整个分区是可以接受的,你甚至可以做这样的事情.
    val glommedAndCached = sorted.glom()cache(); glommedAndCached.map(a => a.slice(a.search(lower),a.search(upper)+1)).collect()

    search 不是BTW的成员我刚刚创建了一个具有二进制搜索功能的隐式类,这里没有显示

    2023-01-04 10:54 回答
  • 您的数据集有多大?您可以通过以下方式完成所需操作:

    data.take(80).drop(59)
    

    这似乎效率低下,但对于中小型数据,应该可行.

    是否有可能以另一种方式解决这个问题?从数据中间选择一个特定范围的情况是什么?会takeSample更好地为你服务吗?

    2023-01-04 10:54 回答
  • 以下应该能够获得范围.请注意,缓存将为您节省一些开销,因为内部zipWithIndex需要扫描RDD分区以获取每个分区中的元素数量.

    scala>val r1 = sc.parallelize(List("a", "b", "c", "d", "e", "f", "g"), 3).cache
    scala>val r2 = r1.zipWithIndex
    scala>val r3 = r2.filter(x=> {x._2>2 && x._2 < 4}).map(x=>x._1)
    scala>r3.foreach(println)
    d
    

    2023-01-04 10:54 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有