我使用以下命令用一堆包含2个字符串["filename","content"]的数组填充RDD.
现在,我希望迭代每个事件,以便对每个文件名和内容执行某些操作.
val someRDD = sc.wholeTextFiles("hdfs://localhost:8020/user/cloudera/*")
我似乎无法找到有关如何执行此操作的任何文档.
所以我想要的是:
foreach occurrence-in-the-rdd{ //do stuff with the array found on loccation n of the RDD }
Spiro Michay.. 26
您可以在RDD上调用接受函数作为参数的各种方法.
// set up an example -- an RDD of arrays val sparkConf = new SparkConf().setMaster("local").setAppName("Example") val sc = new SparkContext(sparkConf) val testData = Array(Array(1,2,3), Array(4,5,6,7,8)) val testRDD = sc.parallelize(testData, 2) // Print the RDD of arrays. testRDD.collect().foreach(a => println(a.size)) // Use map() to create an RDD with the array sizes. val countRDD = testRDD.map(a => a.size) // Print the elements of this new RDD. countRDD.collect().foreach(a => println(a)) // Use filter() to create an RDD with just the longer arrays. val bigRDD = testRDD.filter(a => a.size > 3) // Print each remaining array. bigRDD.collect().foreach(a => { a.foreach(e => print(e + " ")) println() }) }
请注意,您编写的函数接受单个RDD元素作为输入,并返回某种统一类型的数据,因此您创建后一种类型的RDD.例如,countRDD
是一个RDD[Int]
,而bigRDD
仍然是一个RDD[Array[Int]]
.
在某些时候写一个foreach
修改一些其他数据可能很诱人,但你应该抵制这个问题和答案中描述的原因.
编辑:不要尝试打印大RDD
s
一些读者询问了如何使用collect()
和println()
查看他们的结果,如上例所示.当然,这只适用于你在Spark REPL(read-eval-print-loop)之类的交互模式下运行.最好调用collect()
RDD来获得顺序打印的顺序数组.但是collect()
可能会带回太多数据,无论如何都可能会打印太多数据.RDD
如果它们很大,可以使用以下方法深入了解您的s:
RDD.take()
:这可以很好地控制你获得的元素数量,但不能控制它们的来源 - 定义为"第一"元素,这是一个概念,由其他各种问题和答案处理.
// take() returns an Array so no need to collect() myHugeRDD.take(20).foreach(a => println(a))
RDD.sample()
:这可以让你(粗略地)控制你得到的结果部分,无论采样是否使用替换,甚至是可选的随机数种子.
// sample() does return an RDD so you may still want to collect() myHugeRDD.sample(true, 0.01).collect().foreach(a => println(a))
RDD.takeSample()
:这是混合:使用您可以控制的随机抽样,但两者都允许您指定结果的确切数量并返回Array
.
// takeSample() returns an Array so no need to collect() myHugeRDD.takeSample(true, 20).foreach(a => println(a))
RDD.count()
:有时候最好的洞察力来自你最终获得了多少元素 - 我经常这样做.
println(myHugeRDD.count())
David.. 9
Spark中的基本操作是map
和filter
.
val txtRDD = someRDD filter { case(id, content) => id.endsWith(".txt") }
的txtRDD
意志现在只包含有扩展名为".txt"文件
如果你想对这些文件进行字数统计,你可以说
//split the documents into words in one long list val words = txtRDD flatMap { case (id,text) => text.split("\\s+") } // give each word a count of 1 val wordT = words map (x => (x,1)) //sum up the counts for each word val wordCount = wordsT reduceByKey((a, b) => a + b)
您希望在mapPartitions
需要执行一些昂贵的初始化时使用- 例如,如果您想使用像Stanford coreNLP工具这样的库执行命名实体识别.
硕士map
,filter
,flatMap
,和reduce
,你是用自己的方式来掌握的火花.
您可以在RDD上调用接受函数作为参数的各种方法.
// set up an example -- an RDD of arrays val sparkConf = new SparkConf().setMaster("local").setAppName("Example") val sc = new SparkContext(sparkConf) val testData = Array(Array(1,2,3), Array(4,5,6,7,8)) val testRDD = sc.parallelize(testData, 2) // Print the RDD of arrays. testRDD.collect().foreach(a => println(a.size)) // Use map() to create an RDD with the array sizes. val countRDD = testRDD.map(a => a.size) // Print the elements of this new RDD. countRDD.collect().foreach(a => println(a)) // Use filter() to create an RDD with just the longer arrays. val bigRDD = testRDD.filter(a => a.size > 3) // Print each remaining array. bigRDD.collect().foreach(a => { a.foreach(e => print(e + " ")) println() }) }
请注意,您编写的函数接受单个RDD元素作为输入,并返回某种统一类型的数据,因此您创建后一种类型的RDD.例如,countRDD
是一个RDD[Int]
,而bigRDD
仍然是一个RDD[Array[Int]]
.
在某些时候写一个foreach
修改一些其他数据可能很诱人,但你应该抵制这个问题和答案中描述的原因.
编辑:不要尝试打印大RDD
s
一些读者询问了如何使用collect()
和println()
查看他们的结果,如上例所示.当然,这只适用于你在Spark REPL(read-eval-print-loop)之类的交互模式下运行.最好调用collect()
RDD来获得顺序打印的顺序数组.但是collect()
可能会带回太多数据,无论如何都可能会打印太多数据.RDD
如果它们很大,可以使用以下方法深入了解您的s:
RDD.take()
:这可以很好地控制你获得的元素数量,但不能控制它们的来源 - 定义为"第一"元素,这是一个概念,由其他各种问题和答案处理.
// take() returns an Array so no need to collect() myHugeRDD.take(20).foreach(a => println(a))
RDD.sample()
:这可以让你(粗略地)控制你得到的结果部分,无论采样是否使用替换,甚至是可选的随机数种子.
// sample() does return an RDD so you may still want to collect() myHugeRDD.sample(true, 0.01).collect().foreach(a => println(a))
RDD.takeSample()
:这是混合:使用您可以控制的随机抽样,但两者都允许您指定结果的确切数量并返回Array
.
// takeSample() returns an Array so no need to collect() myHugeRDD.takeSample(true, 20).foreach(a => println(a))
RDD.count()
:有时候最好的洞察力来自你最终获得了多少元素 - 我经常这样做.
println(myHugeRDD.count())
Spark中的基本操作是map
和filter
.
val txtRDD = someRDD filter { case(id, content) => id.endsWith(".txt") }
的txtRDD
意志现在只包含有扩展名为".txt"文件
如果你想对这些文件进行字数统计,你可以说
//split the documents into words in one long list val words = txtRDD flatMap { case (id,text) => text.split("\\s+") } // give each word a count of 1 val wordT = words map (x => (x,1)) //sum up the counts for each word val wordCount = wordsT reduceByKey((a, b) => a + b)
您希望在mapPartitions
需要执行一些昂贵的初始化时使用- 例如,如果您想使用像Stanford coreNLP工具这样的库执行命名实体识别.
硕士map
,filter
,flatMap
,和reduce
,你是用自己的方式来掌握的火花.