我正在尝试本教程http://spark.apache.org/docs/latest/quick-start.html 我首先从文件创建了一个集合
textFile = sc.textFile("README.md")
然后我尝试了一个命令来解决这些问题:
wordCounts = textFile.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
要打印集合:
wordCounts.collect()
我找到了如何使用命令sortByKey逐字排序.我想知道如何通过值进行排序可以做同样的事情,在这种情况下,文档中出现一个单词的数字.
排序通常应该在调用collect()之前完成,因为它会将数据集返回给驱动程序,这也就是在java中编写hadoop map-reduce作业的方式,以便编写所需的最终输出(通常)到HDFS.使用spark API,这种方法可以灵活地以"原始"形式将输出写入您想要的位置,例如可以将其用作进一步处理的输入的文件.
使用spark的scala API在collect()之前进行排序可以按照eliasah的建议完成并使用Tuple2.swap()两次,一次在排序之前和之后一次,以便生成按第二个字段的递增或递减顺序排序的元组列表(其中名为_2)并包含第一个字段(名为_1)中的字数.下面是一个如何在spark-shell中编写脚本的示例:
// this whole block can be pasted in spark-shell in :paste mode followed by <Ctrl>D val file = sc.textFile("some_local_text_file_pathname") val wordCounts = file.flatMap(line => line.split(" ")) .map(word => (word, 1)) .reduceByKey(_ + _, 1) // 2nd arg configures one task (same as number of partitions) .map(item => item.swap) // interchanges position of entries in each tuple .sortByKey(true, 1) // 1st arg configures ascending sort, 2nd arg configures one task .map(item => item.swap)
为了颠倒排序的顺序,使用sortByKey(false,1),因为它的第一个arg是升序的布尔值.它的第二个参数是任务数(等于分区数),它被设置为1,用于测试一个只需要一个输出数据文件的小输入文件; reduceByKey也接受这个可选参数.
在此之后,wordCounts RDD可以作为文本文件保存到具有saveAsTextFile(directory_pathname)的目录中,其中将存放一个或多个part-xxxxx文件(从part-00000开始),具体取决于为作业配置的reducer的数量(1每个reducer输出数据文件),_SUCCESS文件取决于作业是否成功以及.crc文件.
使用pyspark一个非常类似于上面显示的scala脚本的python脚本产生的输出实际上是相同的.以下是pyspark版本,演示按值排序集合:
file = sc.textFile("file:some_local_text_file_pathname") wordCounts = file.flatMap(lambda line: line.strip().split(" ")) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda a, b: a + b, 1) \ # last arg configures one reducer task .map(lambda (a, b): (b, a)) \ .sortByKey(1, 1) \ # 1st arg configures ascending sort, 2nd configures 1 task .map(lambda (a, b): (b, a))
为了按降序排序bybyKey,它的第一个arg应为0.由于python捕获前导和尾随空格作为数据,因此在分割空格上的每一行之前插入strip(),但这不是必须使用spark-shell/scala.
wordCount的spark和python版本输出的主要区别在于spark输出(word,3)python输出(u'word',3).
有关spark RDD方法的更多信息,请参阅http://spark.apache.org/docs/1.1.0/api/python/pyspark.rdd.RDD-class.html for python和https://spark.apache.org/ scala的docs/latest/api/scala /#org.apache.spark.rdd.RDD.
在spark-shell中,在wordCounts上运行collect()将它从RDD转换为Array [(String,Int)] = Array [Tuple2(String,Int)],它本身可以在每个Tuple2元素的第二个字段上排序使用:
Array.sortBy(_._2)
sortBy还采用了一个可选的隐式math.Ordering参数,如Romeo Kienzler在前一个问题的答案中显示.Array.sortBy(_._ 2)将在其_2字段上对Array Tuple2元素进行反向排序,只需在运行map-reduce脚本之前定义隐式反向排序,因为它会覆盖Int的预先存在的排序.反向int由Romeo Kienzler定义的订购是:
// for reverse order implicit val sortIntegersByString = new Ordering[Int] { override def compare(a: Int, b: Int) = a.compare(b)*(-1) }
定义此反向排序的另一种常用方法是反转a和b的顺序并删除比较定义右侧的(-1):
// for reverse order implicit val sortIntegersByString = new Ordering[Int] { override def compare(a: Int, b: Int) = b.compare(a) }
你可以这样
// for reverse order implicit val sortIntegersByString = new Ordering[Int] { override def compare(a: Int, b: Int) = a.compare(b)*(-1) } counts.collect.toSeq.sortBy(_._2)
因此,基本上,您可以将RDD转换为序列,并使用sort方法对其进行排序。
上面的代码块全局更改了排序行为,以获取降序排序。
以更加pythonic的方式做它.
# In descending order ''' The first parameter tells number of elements to be present in output. ''' data.takeOrdered(10, key=lambda x: -x[1]) # In Ascending order data.takeOrdered(10, key=lambda x: x[1])
对于那些希望按值排序的前N个元素的人:
theRDD.takeOrdered(N, lambda (key, value): -1 * len(value))
如果您希望按字符串长度排序.
另一方面,如果值已经采用适合您所需订购的形式,那么:
theRDD.takeOrdered(N, lambda (key, value): -1 * value)
就够了