Spark按照值排序集合

 围脖上的博博_771 发布于 2023-01-04 12:56

我正在尝试本教程http://spark.apache.org/docs/latest/quick-start.html 我首先从文件创建了一个集合

textFile = sc.textFile("README.md")

然后我尝试了一个命令来解决这些问题:

wordCounts = textFile.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)

要打印集合:

 wordCounts.collect()

我找到了如何使用命令sortByKey逐字排序.我想知道如何通过值进行排序可以做同样的事情,在这种情况下,文档中出现一个单词的数字.

4 个回答
  • 排序通常应该在调用collect()之前完成,因为它会将数据集返回给驱动程序,这也就是在java中编写hadoop map-reduce作业的方式,以便编写所需的最终输出(通常)到HDFS.使用spark API,这种方法可以灵活地以"原始"形式将输出写入您想要的位置,例如可以将其用作进一步处理的输入的文件.

    使用spark的scala API在collect()之前进行排序可以按照eliasah的建议完成并使用Tuple2.swap()两次,一次在排序之前和之后一次,以便生成按第二个字段的递增或递减顺序排序的元组列表(其中名为_2)并包含第一个字段(名为_1)中的字数.下面是一个如何在spark-shell中编写脚本的示例:

    // this whole block can be pasted in spark-shell in :paste mode followed by <Ctrl>D
    val file = sc.textFile("some_local_text_file_pathname")
    val wordCounts = file.flatMap(line => line.split(" "))
      .map(word => (word, 1))
      .reduceByKey(_ + _, 1)  // 2nd arg configures one task (same as number of partitions)
      .map(item => item.swap) // interchanges position of entries in each tuple
      .sortByKey(true, 1) // 1st arg configures ascending sort, 2nd arg configures one task
      .map(item => item.swap)
    

    为了颠倒排序的顺序,使用sortByKey(false,1),因为它的第一个arg是升序的布尔值.它的第二个参数是任务数(等于分区数),它被设置为1,用于测试一个只需要一个输出数据文件的小输入文件; reduceByKey也接受这个可选参数.

    在此之后,wordCounts RDD可以作为文本文件保存到具有saveAsTextFile(directory_pathname)的目录中,其中将存放一个或多个part-xxxxx文件(从part-00000开始),具体取决于为作业配置的reducer的数量(1每个reducer输出数据文件),_SUCCESS文件取决于作业是否成功以及.crc文件.

    使用pyspark一个非常类似于上面显示的scala脚本的python脚本产生的输出实际上是相同的.以下是pyspark版本,演示按值排序集合:

    file = sc.textFile("file:some_local_text_file_pathname")
    wordCounts = file.flatMap(lambda line: line.strip().split(" ")) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda a, b: a + b, 1) \ # last arg configures one reducer task
        .map(lambda (a, b): (b, a)) \
        .sortByKey(1, 1) \ # 1st arg configures ascending sort, 2nd configures 1 task
        .map(lambda (a, b): (b, a))
    

    为了按降序排序bybyKey,它的第一个arg应为0.由于python捕获前导和尾随空格作为数据,因此在分割空格上的每一行之前插入strip(),但这不是必须使用spark-shell/scala.

    wordCount的spark和python版本输出的主要区别在于spark输出(word,3)python输出(u'word',3).

    有关spark RDD方法的更多信息,请参阅http://spark.apache.org/docs/1.1.0/api/python/pyspark.rdd.RDD-class.html for python和https://spark.apache.org/ scala的docs/latest/api/scala /#org.apache.spark.rdd.RDD.

    在spark-shell中,在wordCounts上运行collect()将它从RDD转换为Array [(String,Int)] = Array [Tuple2(String,Int)],它本身可以在每个Tuple2元素的第二个字段上排序使用:

    Array.sortBy(_._2) 
    

    sortBy还采用了一个可选的隐式math.Ordering参数,如Romeo Kienzler在前一个问题的答案中显示.Array.sortBy(_._ 2)将在其_2字段上对Array Tuple2元素进行反向排序,只需在运行map-reduce脚本之前定义隐式反向排序,因为它会覆盖Int的预先存在的排序.反向int由Romeo Kienzler定义的订购是:

    // for reverse order
    implicit val sortIntegersByString = new Ordering[Int] {
      override def compare(a: Int, b: Int) = a.compare(b)*(-1)
    }
    

    定义此反向排序的另一种常用方法是反转a和b的顺序并删除比较定义右侧的(-1):

    // for reverse order
    implicit val sortIntegersByString = new Ordering[Int] {
      override def compare(a: Int, b: Int) = b.compare(a)
    }   
    

    2023-01-04 12:58 回答
  • 你可以这样

    // for reverse order
    implicit val sortIntegersByString = new Ordering[Int] {
        override def compare(a: Int, b: Int) = a.compare(b)*(-1)
    }
    
    counts.collect.toSeq.sortBy(_._2)
    

    因此,基本上,您可以将RDD转换为序列,并使用sort方法对其进行排序。

    上面的代码块全局更改了排序行为,以获取降序排序。

    2023-01-04 12:58 回答
  • 以更加pythonic的方式做它.

    # In descending order
    ''' The first parameter tells number of elements
        to be present in output.
    ''' 
    data.takeOrdered(10, key=lambda x: -x[1])
    # In Ascending order
    data.takeOrdered(10, key=lambda x: x[1])
    

    2023-01-04 12:58 回答
  • 对于那些希望按值排序的前N个元素的人:

    theRDD.takeOrdered(N, lambda (key, value): -1 * len(value))
    

    如果您希望按字符串长度排序.

    另一方面,如果值已经采用适合您所需订购的形式,那么:

    theRDD.takeOrdered(N, lambda (key, value): -1 * value)
    

    就够了

    2023-01-04 12:58 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有