如何在apache spark(scala)中迭代RDD

 psgsd57424 发布于 2022-12-20 17:52

我使用以下命令用一堆包含2个字符串["filename","content"]的数组填充RDD.

现在,我希望迭代每个事件,以便对每个文件名和内容执行某些操作.

val someRDD = sc.wholeTextFiles("hdfs://localhost:8020/user/cloudera/*")

我似乎无法找到有关如何执行此操作的任何文档.

所以我想要的是:

foreach occurrence-in-the-rdd{
   //do stuff with the array found on loccation n of the RDD
} 

Spiro Michay.. 26

您可以在RDD上调用接受函数作为参数的各种方法.

// set up an example -- an RDD of arrays
val sparkConf = new SparkConf().setMaster("local").setAppName("Example")
val sc = new SparkContext(sparkConf)
val testData = Array(Array(1,2,3), Array(4,5,6,7,8))
val testRDD = sc.parallelize(testData, 2)

// Print the RDD of arrays.
testRDD.collect().foreach(a => println(a.size))

// Use map() to create an RDD with the array sizes.
val countRDD = testRDD.map(a => a.size)

// Print the elements of this new RDD.
countRDD.collect().foreach(a => println(a))

// Use filter() to create an RDD with just the longer arrays.
val bigRDD = testRDD.filter(a => a.size > 3)

// Print each remaining array.
bigRDD.collect().foreach(a => {
    a.foreach(e => print(e + " "))
    println()
  })
}

请注意,您编写的函数接受单个RDD元素作为输入,并返回某种统一类型的数据,因此您创建后一种类型的RDD.例如,countRDD是一个RDD[Int],而bigRDD仍然是一个RDD[Array[Int]].

在某些时候写一个foreach修改一些其他数据可能很诱人,但你应该抵制这个问题和答案中描述的原因.

编辑:不要尝试打印大RDDs

一些读者询问了如何使用collect()println()查看他们的结果,如上例所示.当然,这只适用于你在Spark REPL(read-eval-print-loop)之类的交互模式下运行.最好调用collect()RDD来获得顺序打印的顺序数组.但是collect()可能会带回太多数据,无论如何都可能会打印太多数据.RDD如果它们很大,可以使用以下方法深入了解您的s:

    RDD.take():这可以很好地控制你获得的元素数量,但不能控制它们的来源 - 定义为"第一"元素,这是一个概念,由其他各种问题和答案处理.

    // take() returns an Array so no need to collect()
    myHugeRDD.take(20).foreach(a => println(a))
    

    RDD.sample():这可以让你(粗略地)控制你得到的结果部分,无论采样是否使用替换,甚至是可选的随机数种子.

    // sample() does return an RDD so you may still want to collect()
    myHugeRDD.sample(true, 0.01).collect().foreach(a => println(a))
    

    RDD.takeSample():这是混合:使用您可以控制的随机抽样,但两者都允许您指定结果的确切数量并返回Array.

    // takeSample() returns an Array so no need to collect() 
    myHugeRDD.takeSample(true, 20).foreach(a => println(a))
    

    RDD.count():有时候最好的洞察力来自你最终获得了多少元素 - 我经常这样做.

    println(myHugeRDD.count())       
    


David.. 9

Spark中的基本操作是mapfilter.

val txtRDD = someRDD filter { case(id, content) => id.endsWith(".txt") }

txtRDD意志现在只包含有扩展名为".txt"文件

如果你想对这些文件进行字数统计,你可以说

//split the documents into words in one long list
val words = txtRDD flatMap { case (id,text) => text.split("\\s+") }
// give each word a count of 1
val wordT = words map (x => (x,1))  
//sum up the counts for each word
val wordCount = wordsT reduceByKey((a, b) => a + b)

您希望在mapPartitions需要执行一些昂贵的初始化时使用- 例如,如果您想使用像Stanford coreNLP工具这样的库执行命名实体识别.

硕士map,filter,flatMap,和reduce,你是用自己的方式来掌握的火花.

2 个回答
  • 您可以在RDD上调用接受函数作为参数的各种方法.

    // set up an example -- an RDD of arrays
    val sparkConf = new SparkConf().setMaster("local").setAppName("Example")
    val sc = new SparkContext(sparkConf)
    val testData = Array(Array(1,2,3), Array(4,5,6,7,8))
    val testRDD = sc.parallelize(testData, 2)
    
    // Print the RDD of arrays.
    testRDD.collect().foreach(a => println(a.size))
    
    // Use map() to create an RDD with the array sizes.
    val countRDD = testRDD.map(a => a.size)
    
    // Print the elements of this new RDD.
    countRDD.collect().foreach(a => println(a))
    
    // Use filter() to create an RDD with just the longer arrays.
    val bigRDD = testRDD.filter(a => a.size > 3)
    
    // Print each remaining array.
    bigRDD.collect().foreach(a => {
        a.foreach(e => print(e + " "))
        println()
      })
    }
    

    请注意,您编写的函数接受单个RDD元素作为输入,并返回某种统一类型的数据,因此您创建后一种类型的RDD.例如,countRDD是一个RDD[Int],而bigRDD仍然是一个RDD[Array[Int]].

    在某些时候写一个foreach修改一些其他数据可能很诱人,但你应该抵制这个问题和答案中描述的原因.

    编辑:不要尝试打印大RDDs

    一些读者询问了如何使用collect()println()查看他们的结果,如上例所示.当然,这只适用于你在Spark REPL(read-eval-print-loop)之类的交互模式下运行.最好调用collect()RDD来获得顺序打印的顺序数组.但是collect()可能会带回太多数据,无论如何都可能会打印太多数据.RDD如果它们很大,可以使用以下方法深入了解您的s:

      RDD.take():这可以很好地控制你获得的元素数量,但不能控制它们的来源 - 定义为"第一"元素,这是一个概念,由其他各种问题和答案处理.

      // take() returns an Array so no need to collect()
      myHugeRDD.take(20).foreach(a => println(a))
      

      RDD.sample():这可以让你(粗略地)控制你得到的结果部分,无论采样是否使用替换,甚至是可选的随机数种子.

      // sample() does return an RDD so you may still want to collect()
      myHugeRDD.sample(true, 0.01).collect().foreach(a => println(a))
      

      RDD.takeSample():这是混合:使用您可以控制的随机抽样,但两者都允许您指定结果的确切数量并返回Array.

      // takeSample() returns an Array so no need to collect() 
      myHugeRDD.takeSample(true, 20).foreach(a => println(a))
      

      RDD.count():有时候最好的洞察力来自你最终获得了多少元素 - 我经常这样做.

      println(myHugeRDD.count())       
      

    2022-12-20 17:55 回答
  • Spark中的基本操作是mapfilter.

    val txtRDD = someRDD filter { case(id, content) => id.endsWith(".txt") }
    

    txtRDD意志现在只包含有扩展名为".txt"文件

    如果你想对这些文件进行字数统计,你可以说

    //split the documents into words in one long list
    val words = txtRDD flatMap { case (id,text) => text.split("\\s+") }
    // give each word a count of 1
    val wordT = words map (x => (x,1))  
    //sum up the counts for each word
    val wordCount = wordsT reduceByKey((a, b) => a + b)
    

    您希望在mapPartitions需要执行一些昂贵的初始化时使用- 例如,如果您想使用像Stanford coreNLP工具这样的库执行命名实体识别.

    硕士map,filter,flatMap,和reduce,你是用自己的方式来掌握的火花.

    2022-12-20 17:55 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有