热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Spark:RDD简介及基础算子

本文主要介绍SparkCore的核心内容:RDD。包含以下章节和对应的内容章节内容1RDD简介2RDD分区3RDD的依赖关系4RDD的缓存机制和区别5RDD创建的两

本文主要介绍Spark Core的核心内容:RDD。包含以下章节和对应的内容

章节内容
1RDD简介
2RDD分区
3RDD的依赖关系
4RDD的缓存机制和区别
5RDD创建的两种方式
6RDD算子和总结
7RDD 算子操作案例

 


1、RDD简介

RDD(Resilient Distributed Datasets,弹性分布式数据集),是Spark最为核心的概念,自然也是理解Apache Spark 工作原理的最佳入口之一。

RDD的特点:

  1. 是一个分区的只读记录的集合;
  2. 一个具有容错机制的特殊集;
  3. 只能通过在稳定的存储器或其他RDD上的确定性操作(转换)来创建;
  4. 可以分布在集群的节点上,以函数式操作集合的方式,进行各种并行操作

RDD之所以为“弹性”的特点

  1. 基于Lineage的高效容错(第n个节点出错,会从第n-1个节点恢复,血统容错);
  2. Task如果失败会自动进行特定次数的重试(默认4次);
  3. Stage如果失败会自动进行特定次数的重试(可以值运行计算失败的阶段),只计算失败的数据分片;
  4. 数据调度弹性:DAG TASK 和资源管理无关;
  5. checkpoint;
  6. 自动的进行内存和磁盘数据存储的切换

2、RDD的分区

首先,RDD是一个逻辑概念,分区是一个物理概念,并且带有下标

举例:通过SparkContext.parallelize创建一个RDD: 

val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8),3)      # 后面的数字3 表示3个分区

也就是说,RDD 内部的数据集合在逻辑上(以及物理上)被划分成多个小集合,这样的每一个小集合被称为分区。

那么引出两个问题:

  1. 如何查看分区运行在哪个Worker(机器)上?
  2. 如何查看每个分区中的数据?

针对问题1和2: 在源码级别,RDD 类内存储一个 Partition 列表。每个 Partition 对象都包含一个 index 成员,通过 RDD 编号 + index 就能从唯一确定分区的 Block 编号,持久化的 RDD 就能通过这个 Block 编号从存储介质中获得对应的分区数据。

 

3、RDD的依赖关系

RDD和它依赖的parent RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。

  1. 窄依赖指的是每一个parent RDD的Partition最多被子RDD的一个Partition使用
  2. 宽依赖指的是多个子RDD的Partition会依赖同一个parent RDD的Partition

 

4、RDD的缓存机制和区别

RDD缓存机制有两种,cache和pesist,两种区别如下:

  1. cache和persist都是用于将一个RDD进行缓存的,这样在之后使用的过程中就不需要重新计算了,可以大大节省程序运行时间;
  2.  cache只有一个默认的缓存级别MEMORY_ONLY ,cache调用了persist,而persist可以根据情况设置其它的缓存级别;3)
  3. executor执行的时候,默认60%做cache,40%做task操作,persist最根本的函数,最底层的函数

5、RDD创建的两种方式

  1:通过SparkContext.parallelize创建
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8),3)    # 3个分区


  2:通过读取外部的数据源创建:比如:HDFS、本地目录

val rdd1 = sc.textFile("hdfs://bigdata:9000/input/data.txt")    # HDFS文件
val rdd2 = sc.textFile("/root/temp/data.txt")   # 本地文件

 

6、RDD算子

RDD中的所有转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这种设计让Spark更加有效率地运行。

从大方向来说:RDD算子分为Transformation算子 Action 算子,其中Transformation 操作是延迟计算的,也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算。Action 算子会触发 SparkContext 提交 Job 作业,并将数据输出 Spark系统

从小方向来说:RDD 算子大致可以分为以下三类:

  1. Value数据类型的Transformation算子,这种变换并不触发提交作业,针对处理的数据项是Value型的数据。
  2. Key-Value数据类型的Transfromation算子,这种变换并不触发提交 作业,针对处理的数据项是Key-Value型的数据对。
  3. Action算子,这类算子会触发SparkContext提交Job作业

更多详细算子介绍可以参考:Scala官方API

以下表格是关于Transformation算子-会延时加载(计算)

转换

含义

map(func)

返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成

filter(func)

返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成

flatMap(func)

类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)

mapPartitions(func)

类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]

mapPartitionsWithIndex(func)

类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U]

sample(withReplacement, fraction, seed)

根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子

union(otherDataset)

对源RDD和参数RDD求并集后返回一个新的RDD

intersection(otherDataset)

对源RDD和参数RDD求交集后返回一个新的RDD

distinct([numTasks]))

对源RDD进行去重后返回一个新的RDD

groupByKey([numTasks])   

在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD

reduceByKey(func, [numTasks])

在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置

aggregateByKey(zeroValue)(seqOp,combOp,[numTasks])

 

sortByKey([ascending], [numTasks])

在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD

sortBy(func,[ascending], [numTasks])

与sortByKey类似,但是更灵活

join(otherDataset, [numTasks])

在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD

cogroup(otherDataset, [numTasks])

在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD

cartesian(otherDataset)

笛卡尔积

pipe(command, [envVars])

 

coalesce(numPartitions)    

 

repartition(numPartitions)

 

repartitionAndSortWithinPartitions(partitioner)

 

 

以下表格是关于Action算子- 触发计算

动作

含义

reduce(func)

通过func函数聚集RDD中的所有元素,这个功能必须是课交换且可并联的

collect()

在驱动程序中,以数组的形式返回数据集的所有元素

count()

返回RDD的元素个数

first()

返回RDD的第一个元素(类似于take(1))

take(n)

返回一个由数据集的前n个元素组成的数组

takeSample(withReplacement,num, [seed])

返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子

takeOrdered(n[ordering])

 

saveAsTextFile(path)

将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本

saveAsSequenceFile(path

将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。

saveAsObjectFile(path

 

countByKey()

针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。

foreach(func)

在数据集的每一个元素上,运行函数func进行更新。

 

7、RDD 算子操作案例:

案例1:映射和过滤

创建一个RDD,也可以使用List
val rdd1 = sc.parallelize(List(5,6,7,8,1,2,3,100,30))每个元素乘以2,再排序
val rdd2 = rdd1.map(_*2).sortBy(x=>x,true) #降序过滤出大于20的元素
val rdd3 = rdd2.filter(_ > 20)输出结果
rdd3.collect

 

案例2:字符串操作和字符计数

val rdd4 = sc.parallelize(Array("a b c","d b c","a y c"))val rdd5 = rdd4.flatMap(_.split(" ")) # 切分字符并切平val rdd6 = rdd5.map((_,1)) # 逐一统计val rdd7 = rdd6.reduceByKey(_+_).collect # 计数输出结果

 

 

案例3:集合操作

集合运算、去重
val rdd6 = sc.parallelize(List(5,6,7,8,1,2,3,100,30))
val rdd7 = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10))并集:union(如果是SQL中的集合运算,对集合是有要求的)
val rdd8 = rdd6.union(rdd7)
去重
rdd8.distinct.collect交集: intersect(如果是SQL中的集合运算,对集合是有要求的)
val rdd9 = rdd6.intersection(rdd7)

 

 

案例4:key-value操作

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5)))
val rdd3 = rdd1.union(rdd2)
//按key进行聚合
val rdd4 = rdd3.reduceByKey(_ + _)
rdd4.collect
//按value的降序排序
val rdd5 = rdd4.map(t => (t._2, t._1)).sortByKey(false).map(t => (t._2, t._1))
rdd5.collect提示:交换了key-value的位置,并且交换了两次


推荐阅读
  • SpringBoot整合SpringSecurity+JWT实现单点登录
    SpringBoot整合SpringSecurity+JWT实现单点登录,Go语言社区,Golang程序员人脉社 ... [详细]
  • 在Android开发中,使用Picasso库可以实现对网络图片的等比例缩放。本文介绍了使用Picasso库进行图片缩放的方法,并提供了具体的代码实现。通过获取图片的宽高,计算目标宽度和高度,并创建新图实现等比例缩放。 ... [详细]
  • 本文介绍了UVALive6575题目Odd and Even Zeroes的解法,使用了数位dp和找规律的方法。阶乘的定义和性质被介绍,并给出了一些例子。其中,部分阶乘的尾零个数为奇数,部分为偶数。 ... [详细]
  • CF:3D City Model(小思维)问题解析和代码实现
    本文通过解析CF:3D City Model问题,介绍了问题的背景和要求,并给出了相应的代码实现。该问题涉及到在一个矩形的网格上建造城市的情景,每个网格单元可以作为建筑的基础,建筑由多个立方体叠加而成。文章详细讲解了问题的解决思路,并给出了相应的代码实现供读者参考。 ... [详细]
  • 也就是|小窗_卷积的特征提取与参数计算
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了卷积的特征提取与参数计算相关的知识,希望对你有一定的参考价值。Dense和Conv2D根本区别在于,Den ... [详细]
  • 成功安装Sabayon Linux在thinkpad X60上的经验分享
    本文分享了作者在国庆期间在thinkpad X60上成功安装Sabayon Linux的经验。通过修改CHOST和执行emerge命令,作者顺利完成了安装过程。Sabayon Linux是一个基于Gentoo Linux的发行版,可以将电脑快速转变为一个功能强大的系统。除了作为一个live DVD使用外,Sabayon Linux还可以被安装在硬盘上,方便用户使用。 ... [详细]
  • [大整数乘法] java代码实现
    本文介绍了使用java代码实现大整数乘法的过程,同时也涉及到大整数加法和大整数减法的计算方法。通过分治算法来提高计算效率,并对算法的时间复杂度进行了研究。详细代码实现请参考文章链接。 ... [详细]
  • 本文介绍了南邮ctf-web的writeup,包括签到题和md5 collision。在CTF比赛和渗透测试中,可以通过查看源代码、代码注释、页面隐藏元素、超链接和HTTP响应头部来寻找flag或提示信息。利用PHP弱类型,可以发现md5('QNKCDZO')='0e830400451993494058024219903391'和md5('240610708')='0e462097431906509019562988736854'。 ... [详细]
  • 前景:当UI一个查询条件为多项选择,或录入多个条件的时候,比如查询所有名称里面包含以下动态条件,需要模糊查询里面每一项时比如是这样一个数组条件:newstring[]{兴业银行, ... [详细]
  • 本文讨论了如何在codeigniter中识别来自angularjs的请求,并提供了两种方法的代码示例。作者尝试了$this->input->is_ajax_request()和自定义函数is_ajax(),但都没有成功。最后,作者展示了一个ajax请求的示例代码。 ... [详细]
  • 学习Java异常处理之throws之抛出并捕获异常(9)
    任务描述本关任务:在main方法之外创建任意一个方法接收给定的两个字符串,把第二个字符串的长度减1生成一个整数值,输出第一个字符串长度是 ... [详细]
  • 本文介绍了OpenStack的逻辑概念以及其构成简介,包括了软件开源项目、基础设施资源管理平台、三大核心组件等内容。同时还介绍了Horizon(UI模块)等相关信息。 ... [详细]
  • 本文讨论了如何使用GStreamer来删除H264格式视频文件中的中间部分,而不需要进行重编码。作者提出了使用gst_element_seek(...)函数来实现这个目标的思路,并提到遇到了一个解决不了的BUG。文章还列举了8个解决方案,希望能够得到更好的思路。 ... [详细]
  • 本文整理了Java中org.apache.solr.common.SolrDocument.setField()方法的一些代码示例,展示了SolrDocum ... [详细]
  • 本博文基于《Amalgamationofproteinsequence,structureandtextualinformationforimprovingprote ... [详细]
author-avatar
河南王修华
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有