热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

【极简spark教程】RDD编程

入门valtextFilesc.textFile(test.csv)textFile为RDD类型,具有List的很多相似操作,可以进行循环遍历&#

入门

val textFile = sc.textFile("/test.csv")//textFile为RDD类型,具有List的很多相似操作,可以进行循环遍历,例如map,foreach,filter等

  1. map操作:对rdd中每行进行处理
  2. flatmap操作:对rdd中每行进行展开处理
  3. collect操作:将结果转换为Array类型
  4. cache操作:将rdd和dataset保存在内存,被session持有

RDD编程指引


  1. 创建rdd集合,可以将rdd看做是spark分布式环境下的list

    val data = Array(1, 2, 3, 4, 5)
    val distData = sc.parallelize(data, 5)// distData类型为ParallelCollectionRDD,且分片数为5

  2. 读取文件
    1. 若读取本地文件,本地文件需要在所有节点上可以被访问到
    2. 所有读取文件的方法都支持在目录上、通配符、压缩包上运行

      sc.textFile("/my/directory")
      sc.textFile("/my/directory/*.txt")
      sc.textFile("/my/directory/*.gz")

    3. 控制返回文件数量,通常情况下返回文件为一个文件夹下的多个文件,可以使用SparkContext.wholeTextFiles控制返回文件的个数,例如返回一个文件

      SparkContext.sequenceFile[Int, String]
      SparkContext.hadoopRDD
      SparkContext.objectFile

  3. RDD操作
    1. 转换transform:生成了新的RDD
      1. map:返回一个新的分布式数据集,该数据集是通过将源的每个元素传递给函数func形成的
      2. mapValues:返回一个新的分布式数据集,同map相似,mapValues在(K,V)对的数据集上调用,仅对V进行操作
      3. filter:返回一个新的数据集,该数据集是通过选择源中func返回true的那些元素形成的
      4. flatmap:与map相似,但是每个输入项都可以映射到0个或多个输出项(因此func应该返回Seq而不是单个项)
      5. mapPartitions:与map相似,但是分别在RDD的每个分区(块)上运行,因此func在类型T的RDD上运行时必须为Iterator => Iterator 类型
      6. mapPartitionsWithIndex:与mapPartitions相似,但它还为func提供表示分区索引的整数值,因此当在类型T的RDD上运行时,func必须为(Int,Iterator )=> Iterator 类型
      7. sample:使用给定的随机数发生器的种子进行抽样,共三个参数,WithReplacement为true表示有抽样放回,原数据集大小不变,为false表示无放回抽样,原数据集在抽样后减少百分比,fraction表示抽样比例,seed表示随机数种子,Long型整数,例如12345L
      8. union:返回一个新的数据集,其中包含源数据集中的元素的并集
      9. intersection:返回一个新的RDD,其中包含源数据集中的元素的交集
      10. distinct:返回一个新的数据集,其中包含源数据集的不同元素
      11. groupByKey:在(K,V)对的数据集上调用时,返回(K,Iterable )对的数据集。注意:如果要分组以便对每个键执行聚合(例如求和或平均值),则使用reduceByKey或aggregateByKey将产生更好的性能。注意:默认情况下,输出中的并行度取决于父RDD的分区数。您可以传递一个可选numPartitions参数来设置不同数量的任务。
      12. reduceByKey:在(K,V)对的数据集上调用时,返回(K,V)对的数据集,其中每个键的值使用给定的reduce函数func(其类型必须为(V,V)=>)进行汇总V.与in一样groupByKey,reduce任务的数量可以通过可选的第二个参数配置
      13. aggregateByKey:在(K,V)对的数据集上调用时,返回(K,U)对的数据集,其中每个键的值使用给定的Combine函数和中性的“零”值进行汇总。允许与输入值类型不同的聚合值类型,同时避免不必要的分配。像in中一样groupByKey,reduce任务的数量可以通过可选的第二个参数配置
      14. sortByKey:在由K实现Ordered的(K,V)对的数据集上调用时,返回(K,V)对的数据集,按布尔值指定,按键以升序或降序排序ascending
      15. join:在(K,V)和(K,W)类型的数据集上调用时,返回(K,(V,W))对的数据集,其中每个键都有所有成对的元素。外连接通过支持leftOuterJoin,rightOuterJoin和fullOuterJoin。注意:join之前最好确认rdd中元素的类型,防止出现Any类型,导致报错:but class RDD is invariant in type T.You may wish to define T as +T instead.
      16. cogroup:在(K,V)和(K,W)类型的数据集上调用时,返回(K,(Iterable ,Iterable ))元组的数据集。此操作也称为groupWith
      17. cartesian:笛卡尔积,在类型T和U的数据集上调用时,返回(T,U)对(所有元素对)的数据集
      18. pipe:通过shell命令(例如Perl或bash脚本)通过管道传输RDD的每个分区。将RDD元素写入进程的stdin,并将输出到其stdout的行作为字符串的RDD返回
      19. coalesce:将RDD中的分区数减少到numPartitions。筛选大型数据集后,对于更有效地运行操作很有用
      20. repartition:随机重排RDD中的数据以创建更多或更少的分区,并在整个分区之间保持平衡。这始终会拖曳网络上的所有数据
        1. repartition(1):重排RDD中的数据,合并为一个分区
        2. repartition(col("colName")):重排RDD中的数据,根据指定列的记录进行分区
      21. repartitionAndSortWithinPartitions:根据给定的分区程序对RDD重新分区,并在每个结果分区中,按其键对记录进行排序。这比repartition在每个分区内调用然后排序更为有效,因为它可以将排序推入洗牌机制
    2. 行动action:汇总所有结果返回驱动程序
      1. reduce:使用函数func(该函数接受两个参数并返回一个)来聚合数据集的元素。该函数应该是可交换的和关联的,以便可以并行正确地计算它
      2. collect:在驱动程序中将数据集的所有元素作为数组返回。这通常在返回足够小的数据子集的过滤器或其他操作之后很有用
      3. count:返回数据集中的元素数
      4. first:返回数据集的第一个元素(类似于take(1))
      5. take:返回数据集的前n个元素的数组
      6. takeSample:返回一个数组,该数组包含数据集num个元素的随机样本(是否替换),可以选择预先指定随机数生成器种子
      7. takeOrdered:使用自然顺序或自定义比较器返回RDD 的前n个元素
      8. saveAsTextFile:将数据集的元素以文本文件(或文本文件集)的形式写入本地文件系统,HDFS或任何其他Hadoop支持的文件系统中的给定目录中。Spark将在每个元素上调用toString,以将其转换为文件中的一行文本
      9. saveAsSequenceFile:在本地文件系统,HDFS或任何其他Hadoop支持的文件系统的给定路径中,将数据集的元素作为Hadoop SequenceFile写入。这在实现Hadoop的Writable接口的键/值对的RDD上可用。在Scala中,它也可用于隐式转换为Writable的类型(Spark包括对基本类型(如Int,Double,String等)的转换
      10. saveAsObjectFile:使用Java序列化以简单的格式编写数据集的元素,然后可以使用进行加载 SparkContext.objectFile()
      11. countByKey:仅在类型(K,V)的RDD上可用。返回(K,Int)对的哈希图以及每个键的计数
      12. foreach:在数据集的每个元素上运行函数func。通常这样做是出于副作用,例如更新累加器或与外部存储系统进行交互。注意:在之外修改除累加器以外的变量foreach()可能会导致不确定的行为。有关更多详细信息,请参见了解闭包。
    3. 缓存
      1. persist:可以根据参数进行不同级别的缓存MEMORY_ONLY、MEMORY_AND_DISK、MEMORY_ONLY_SER、MEMORY_AND_DISK_SER、DISK_ONLY
      2. cache:默认缓存级别MEMORY_ONLY
      3. 缓存级别选择:MEMORY_ONLY>MEMORY_ONLY_SER>MEMORY_AND_DISK
      4. unpersist:释放缓存
    4. 打印部分记录
      1. collect:将全部记录汇总到一台机器上,可能会耗尽内存
      2. take:获取部分记录
    5. 共享变量
      1. 广播变量:在所有节点上创建一个只读变量,在使用时不应该调用函数中的指定变量值,而是直接使用指定广播变量,而且防止修改节点上的广播变量,dataFrame和变量都可以使用broadcast进行广播,但是rdd不可以

        val broadcastVar = sc.broadcast(Array(1, 2, 3))val broadcastDF = functions.broadcast(df)

  4. 累加器
    1. 创建累加器

      val accum = sc.longAccumulator("My Accumulator")sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))println(accum.value)

    2. 构造累加器


      //继承AccumulatorV2class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {
      private val myVector: MyVector = MyVector.createZeroVector
      def reset(): Unit = {myVector.reset()}def add(v: MyVector): Unit = {myVector.add(v)}
      }// 创建累加器对象val myVectorAcc = new VectorAccumulatorV2
      //在spark上下文中进行注册
      sc.register(myVectorAcc, "MyVectorAcc1")

    3. 留意惰性(spark2.4.0中疑似取消了,因为以下代码在spark2.4.0中测试返回了正常结果)

      val accum = sc.longAccumulator
      data.map { x => accum.add(x); x }
      // 这里累加器仍然为0,因为没有行动action操作触发执行map操作.


推荐阅读
  • 向QTextEdit拖放文件的方法及实现步骤
    本文介绍了在使用QTextEdit时如何实现拖放文件的功能,包括相关的方法和实现步骤。通过重写dragEnterEvent和dropEvent函数,并结合QMimeData和QUrl等类,可以轻松实现向QTextEdit拖放文件的功能。详细的代码实现和说明可以参考本文提供的示例代码。 ... [详细]
  • 本文介绍了使用PHP实现断点续传乱序合并文件的方法和源码。由于网络原因,文件需要分割成多个部分发送,因此无法按顺序接收。文章中提供了merge2.php的源码,通过使用shuffle函数打乱文件读取顺序,实现了乱序合并文件的功能。同时,还介绍了filesize、glob、unlink、fopen等相关函数的使用。阅读本文可以了解如何使用PHP实现断点续传乱序合并文件的具体步骤。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • XML介绍与使用的概述及标签规则
    本文介绍了XML的基本概念和用途,包括XML的可扩展性和标签的自定义特性。同时还详细解释了XML标签的规则,包括标签的尖括号和合法标识符的组成,标签必须成对出现的原则以及特殊标签的使用方法。通过本文的阅读,读者可以对XML的基本知识有一个全面的了解。 ... [详细]
  • 本文介绍了在实现了System.Collections.Generic.IDictionary接口的泛型字典类中如何使用foreach循环来枚举字典中的键值对。同时还讨论了非泛型字典类和泛型字典类在foreach循环中使用的不同类型,以及使用KeyValuePair类型在foreach循环中枚举泛型字典类的优势。阅读本文可以帮助您更好地理解泛型字典类的使用和性能优化。 ... [详细]
  • 抽空写了一个ICON图标的转换程序
    抽空写了一个ICON图标的转换程序,支持png\jpe\bmp格式到ico的转换。具体的程序就在下面,如果看的人多,过两天再把思路写一下。 ... [详细]
  • Iamtryingtomakeaclassthatwillreadatextfileofnamesintoanarray,thenreturnthatarra ... [详细]
  • 本文分享了一个关于在C#中使用异步代码的问题,作者在控制台中运行时代码正常工作,但在Windows窗体中却无法正常工作。作者尝试搜索局域网上的主机,但在窗体中计数器没有减少。文章提供了相关的代码和解决思路。 ... [详细]
  • android listview OnItemClickListener失效原因
    最近在做listview时发现OnItemClickListener失效的问题,经过查找发现是因为button的原因。不仅listitem中存在button会影响OnItemClickListener事件的失效,还会导致单击后listview每个item的背景改变,使得item中的所有有关焦点的事件都失效。本文给出了一个范例来说明这种情况,并提供了解决方法。 ... [详细]
  • 本文介绍了iOS数据库Sqlite的SQL语句分类和常见约束关键字。SQL语句分为DDL、DML和DQL三种类型,其中DDL语句用于定义、删除和修改数据表,关键字包括create、drop和alter。常见约束关键字包括if not exists、if exists、primary key、autoincrement、not null和default。此外,还介绍了常见的数据库数据类型,包括integer、text和real。 ... [详细]
  • 大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记
    本文介绍了大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记,包括outputFormat接口实现类、自定义outputFormat步骤和案例。案例中将包含nty的日志输出到nty.log文件,其他日志输出到other.log文件。同时提供了一些相关网址供参考。 ... [详细]
  • 本文介绍了如何使用OpenXML按页码访问文档内容,以及在处理分页符和XML元素时的一些挑战。同时,还讨论了基于页面的引用框架的局限性和超越基于页面的引用框架的方法。最后,给出了一个使用C#的示例代码来按页码访问OpenXML内容的方法。 ... [详细]
  • 本文详细介绍了Spring的JdbcTemplate的使用方法,包括执行存储过程、存储函数的call()方法,执行任何SQL语句的execute()方法,单个更新和批量更新的update()和batchUpdate()方法,以及单查和列表查询的query()和queryForXXX()方法。提供了经过测试的API供使用。 ... [详细]
  • ASP.NET2.0数据教程之十四:使用FormView的模板
    本文介绍了在ASP.NET 2.0中使用FormView控件来实现自定义的显示外观,与GridView和DetailsView不同,FormView使用模板来呈现,可以实现不规则的外观呈现。同时还介绍了TemplateField的用法和FormView与DetailsView的区别。 ... [详细]
  • php缓存ri,浅析ThinkPHP缓存之快速缓存(F方法)和动态缓存(S方法)(日常整理)
    thinkPHP的F方法只能用于缓存简单数据类型,不支持有效期和缓存对象。S()缓存方法支持有效期,又称动态缓存方法。本文是小编日常整理有关thinkp ... [详细]
author-avatar
海岛迷情
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有