热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

开发笔记:数据分析工具篇——pyspark应用详解

篇首语:本文由编程笔记#小编为大家整理,主要介绍了数据分析工具篇——pyspark应用详解相关的知识,希望对你有一定的参考价值。前面

篇首语:本文由编程笔记#小编为大家整理,主要介绍了数据分析工具篇——pyspark应用详解相关的知识,希望对你有一定的参考价值。


前面几篇文章我们讲解了大数据计算的主要架构:hadoopspark,从离线和实时解决了大数据分析过程中遇到的大部分问题,但是这是否是就代表了大数据计算引擎?

不是的~

现阶段流批一体盛行,Flink也逐渐进入大家的视野,大有发展壮大的趋势,我们后面会单独讲解这一工具,这篇文章我们重点讲解一下基于spark运算的pyspark工具。

pyspark不是所有的代码都在spark环境应用,可以将一些主要的运算单元切到spark环境运算完成,然后输出运算结果到本地,最后在本地运行一些简单的数据处理逻辑。

pyspark主要的功能为:

1)可以直接进行机器学习的训练,其中内嵌了机器学习的算法,也就是遇到算法类的运算可以直接调用对应的函数,将运算铺在spark上训练。

2)有一些内嵌的常规函数,这些函数可以在spark环境下处理完成对应的运算,然后将运算结果呈现在本地。

个人理解pyspark是本地环境和spark环境的结合用法,spark中的函数是打开本地环境到spark环境的大门,本地的数据和逻辑按照spark运算规则整理好之后,通过spark函数推到spark环境中完成运算。

所以关键在于有多少计算方式是可以放在spark环境计算的,有多少常用的pyspark函数;











pyspark原理介绍








数据分析工具篇——pyspark应用详解



原理图如下:

数据分析工具篇——pyspark应用详解

上图中,python中调用sparkcontext。

sparkcontext会通过py4j启动jvm中的javasparkcontextjavasparkcontext再将数据逻辑推到集群中完成运算。

数据分析工具篇——pyspark应用详解

结合上图,pyspark的运算逻辑为:

运算job时pyspark会通过py4j将写好的代码映射到jvm中,jvm调用自身的sparkcontext.runjob()函数,实现job的分发,分发的方式是spark的,每个job分发到集群各个executor上之后,各个executor中jvm会调用自身服务器的python进程,完成自身部分的运算,运算完成后再将结果集返回给jvm,原路返回,最终呈现在python的界面上。

有没有感觉jvm只是一个通道?

是的~

简单讲他的功能就是将python分发到各个节点上,然后再将运算结果收回来。











pyspark的常用函数






数据分析工具篇——pyspark应用详解


数据分析工具篇——pyspark应用详解






1)parallelize():list数据序列化成RDD格式,方便spark进行运算;

2)collect():将RDD格式数据转化成list数据,方便数据输出;

3)glom():显示出RDD被分配到哪个分区节点(exector)中进行计算;

4)map():针对RDD对应的列表的每一个元素,进行map()函数里面的函数;






mydata =mydata1.map(lambda x : (x[0], x[1]**2)).collect()

5)reduce(fun(a, b)):合并相同key值的数据。

是针对RDD对应的列表中的元素,递归地选择第一个和第二个元素进行操作,操作的结果作为一个元素用来替换这两个元素,其中函数需要有两个参数。






reduce() :rdd.reduce(func)

对同类型的数据的RDD进行聚合操作,返回值是一个同类型的数值结果:







num=sc.parallelize([1,2,3,4])sum=num.reduce(lambda x,y: x+y)

理解x,y: x指的是返回值,而y是对rdd各元素的遍历。所以,x+y表示对num中数据进行累加:






print(sum) #10

另外函数为:






reduceByKey(fun(a, b))

类似于hive中的groupby函数,按照key值a进行分组,对b进行聚合计算,返回的是list






reduceByKeyLocally(fun(a, b))

类似于hive中的groupby函数,按照key值a进行分组,对b进行聚合计算,返回的是字典;

6)在spark环境下构建dataframe数据块;






data = spark.createDataFrame(data, ["A""B"])











常用算子






数据分析工具篇——pyspark应用详解


数据分析工具篇——pyspark应用详解






1)data.show()显示dataframe中的数据;

2)mydata.rdd.map():将dataframe转化成rdd,然后进行map运算;

   map运算是每行进行单独计算,返回每行的计算结果值,形成一个新的rdd

   一般map会与lambda结合使用,通过lambda函数对map中的每行数据进行计算,例如:





















from pyspark.sql import SparkSessionspark = SparkSession\ .builder\ .appName("PythonWordCount")\ .master("local") \ .getOrCreate()spark.conf.set("spark.executor.memory", "500M")sc = spark.sparkContext
print('see the difference of flatmap and map:')L = [1,2,3,4]rdd_1 = sc.parallelize(L, 2)rdd_2 = rdd_1.map(lambda x: (x, x**2))rdd_3 = rdd_1.flatMap(lambda x: (x, x**2))print('map:', rdd_2.glom().collect())print('flatmap:', rdd_3.glom().collect())

3)flatmap():将map中的数据元组展平到一个list中;

上图中的数据是一个parallelize,即为一个rdd结构的list值,其运算基本符合numpy的运算结构,map的每次运算都会取出一个元素进行计算;另外除了parallelize之外pyspark还提供了dataframe结构,这一结构在进行map运算时需要先转化成rdd,然后按照每次一行的结构将数据传入到map中进行运算,map中用lambda函数对每行进行深度计算,每行是一个dataframe格式,切记取其中某个值时需要使用x['a']结构。

map的运算结果为:[[(1,2),(3,4)],[(5,6),(7,8)]]

flatmap的运算结果为:[[1,2,3,4], [5,6,7,8]]

4)filter():用于删除/过滤,即删除不满足条件的元素,这个条件以lambda函数的形式作为参数传入filter()函数中;






rdd1.filter(lambda x : x>=2).collect()

5)distinct():用于去重,没有参数;

6)join():将两两具有相同key的元素的值,组成一个tuple作为这个keyvalue;

左连接:






print (kvRDD1.leftOuterJoin(kvRDD2).collect())

右链接:






print (kvRDD1.rightOuterJoin(kvRDD2).collect())

7)RDD1.union(RDD2):求两个RDD对象的所有元素的并,不去掉重复元素;

求交集:






intRDD1.intersection(intRDD2).collect()

求差集:






intRDD1.subtract(intRDD2).collect()

求笛卡尔积:






intRDD1.cartesian(intRDD2).collect()

8)randomsplit():将RDD按照一定的比例拆分成多个;






intRDD.randomSplit([0.4,0.6])

9)sortByKey():按照key进行排序;






kvRDD1.sortByKey().collect()

10)keys()/values():对键值对的数据获取;






print(kvRDD1.keys().collect()) print (kvRDD1.values().collect()

11)读取前2条数据;






kvRDD1.take(2)

12)按照key计数;






print (kvRDD1.countByKey().collect())

13)根据输入的key值来查找对应的Value值:






print (kvRDD1.lookup(3))











pyspark环境下的类SQL操作






数据分析工具篇——pyspark应用详解


数据分析工具篇——pyspark应用详解






pyspark环境下的类SQL操作主要是对spark—dataframe的操作:

1)查询一列或多列数据:








df.select(“name”)df.select(df[‘name’]df[‘age’]+1)df.select(df.a, df.b, df.c)

2)按照条件显示某一组数据:






df.where("income = 50" ).show()

3)新增一列数据:






df.withColumn('income1', df.income+10).show(5)

4)修改列名:






df.withColumnRenamed( "income" , "income2" ).show(3)

5)union实现的横向合并:






df.union(df).show()

6)两个表做连接:






df_join = df_left.join(df_right, df_left.key == df_right.key, "inner")

7)调用多个函数:






df.groupBy(“A”).agg(functions.avg(“B”), functions.min(“B”), functions.max(“B”)).show()

8)列表转化成dataframe的方法:






df = sc.parallelize([('india','japan'),('usa','uruguay')]).toDF(['col1','col2'])

9)单列求和(可以和分组求和比较):








from pyspark.sql.functions import sum as spark_sumresult = spark_sum(filter_df['_2'] * filter_df['_3'])df.select(result).show()








pyspark常用方法集合






数据分析工具篇——pyspark应用详解


数据分析工具篇——pyspark应用详解






1)构建字典结构,pyspark中没有对应的字典结构,如果需要可以用to_json()的方式实现:











from pyspark.sql.functions import udffrom pyspark.sql import types as T@udf(T.MapType(T.StringType(), T.StringType()))def create_struct(zip_code, dma):    return {zip_code: dma}data.withColumn('struct', create_struct(data.zip_code, data.dma)).toJSON().collect()

2)将一行中list的部分转化成列:笛卡尔积操作








import pyspark.sql.functions as Fexploded_df = df.select("*", F.explode("res").alias("exploded_data"))exploded_df.show(truncate=False)

修改对应列名:









exploded_df = exploded_df.withColumn(      "Budget", F.col("exploded_data").getItem("Budget")      )

取出对应的列:






exploded_df.select("Person", "Amount", "Budget", "Month", "Cluster").show(10, False)

3)RDD中需要以map、lambda和自定义函数来进行循环操作






sample2 = sample.rdd.map(lambda x: (x.name, x.age, x.city))

4)pyspark的文件读写:









from pyspark.sql import SQLContextfrom pyspark import SparkContextsc = SparkContext() # 只能运行一次sqlCOntext= SQLContext(sc)

# 读取数据:






raw_data = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('./data/train.csv')

# 写入csv文件:






save_data_test.write.csv('./data/small_train.csv')

5)pyspark中对循环有不便利,rdd无法直接进行循环,需要进行转化:

使用DataFrame.collect()方法,将Spark-SQL来自所有执行程序的查询结果聚合到驱动程序中。

collect()方法将返回Python list,其中每个元素都是Spark Row。

然后,你可以在for-loop中迭代此列表。

代码段:







data1 = hive_context.sql("select col_name from schema_def where data_type<>'string'")colum_names_as_python_list_of_rows = data1.collect()

6)如何按照一定的条件选择某一list中的值:

数据分析工具篇——pyspark应用详解

转变成:

这一思路有如下两种方法:

第一种:






df.select("index", f.expr("valuelist[CAST(index AS integer)]").alias("value")).show()

第二种:










import pyspark.sql.functions as fdf.select("index", f.posexplode("valuelist").alias("pos", "value"))\  .where(f.col("index").cast("int") == f.col("pos"))\ .select("index", "value")\  .show()

其中:

f.col("index")col 方法接收一个字符串列名作为参数, 根据指定的列名返回一个Column。作用和df.columnName相同。

df.selectExpr()/f.expr()用来选择某列并对某列进行变换,返回变换后的值;

df.selectExpr('length(key)').show()计算key列中每个元素的长度;

df.withColumn(colName, col):用来对某一列进行操作,如转换数据类型,根据某一列创建新列等:








add1 = udf(lambda x: x+1)df.withColumn('val1', add1('val')).show()df.withColumn('val', df.val.cast('float')).show()



欢迎大家关注公众号:





来都来了,点个关注再走呗~


推荐阅读
  • 第四章高阶函数(参数传递、高阶函数、lambda表达式)(python进阶)的讲解和应用
    本文主要讲解了第四章高阶函数(参数传递、高阶函数、lambda表达式)的相关知识,包括函数参数传递机制和赋值机制、引用传递的概念和应用、默认参数的定义和使用等内容。同时介绍了高阶函数和lambda表达式的概念,并给出了一些实例代码进行演示。对于想要进一步提升python编程能力的读者来说,本文将是一个不错的学习资料。 ... [详细]
  • 本文介绍了在Python3中如何使用选择文件对话框的格式打开和保存图片的方法。通过使用tkinter库中的filedialog模块的asksaveasfilename和askopenfilename函数,可以方便地选择要打开或保存的图片文件,并进行相关操作。具体的代码示例和操作步骤也被提供。 ... [详细]
  • 向QTextEdit拖放文件的方法及实现步骤
    本文介绍了在使用QTextEdit时如何实现拖放文件的功能,包括相关的方法和实现步骤。通过重写dragEnterEvent和dropEvent函数,并结合QMimeData和QUrl等类,可以轻松实现向QTextEdit拖放文件的功能。详细的代码实现和说明可以参考本文提供的示例代码。 ... [详细]
  • 本文分享了一个关于在C#中使用异步代码的问题,作者在控制台中运行时代码正常工作,但在Windows窗体中却无法正常工作。作者尝试搜索局域网上的主机,但在窗体中计数器没有减少。文章提供了相关的代码和解决思路。 ... [详细]
  • 开发笔记:加密&json&StringIO模块&BytesIO模块
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了加密&json&StringIO模块&BytesIO模块相关的知识,希望对你有一定的参考价值。一、加密加密 ... [详细]
  • 本文介绍了使用PHP实现断点续传乱序合并文件的方法和源码。由于网络原因,文件需要分割成多个部分发送,因此无法按顺序接收。文章中提供了merge2.php的源码,通过使用shuffle函数打乱文件读取顺序,实现了乱序合并文件的功能。同时,还介绍了filesize、glob、unlink、fopen等相关函数的使用。阅读本文可以了解如何使用PHP实现断点续传乱序合并文件的具体步骤。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • XML介绍与使用的概述及标签规则
    本文介绍了XML的基本概念和用途,包括XML的可扩展性和标签的自定义特性。同时还详细解释了XML标签的规则,包括标签的尖括号和合法标识符的组成,标签必须成对出现的原则以及特殊标签的使用方法。通过本文的阅读,读者可以对XML的基本知识有一个全面的了解。 ... [详细]
  • 本文介绍了一个在线急等问题解决方法,即如何统计数据库中某个字段下的所有数据,并将结果显示在文本框里。作者提到了自己是一个菜鸟,希望能够得到帮助。作者使用的是ACCESS数据库,并且给出了一个例子,希望得到的结果是560。作者还提到自己已经尝试了使用"select sum(字段2) from 表名"的语句,得到的结果是650,但不知道如何得到560。希望能够得到解决方案。 ... [详细]
  • android listview OnItemClickListener失效原因
    最近在做listview时发现OnItemClickListener失效的问题,经过查找发现是因为button的原因。不仅listitem中存在button会影响OnItemClickListener事件的失效,还会导致单击后listview每个item的背景改变,使得item中的所有有关焦点的事件都失效。本文给出了一个范例来说明这种情况,并提供了解决方法。 ... [详细]
  • 判断数组是否全为0_连续子数组的最大和的解题思路及代码方法一_动态规划
    本文介绍了判断数组是否全为0以及求解连续子数组的最大和的解题思路及代码方法一,即动态规划。通过动态规划的方法,可以找出连续子数组的最大和,具体思路是尽量选择正数的部分,遇到负数则不选择进去,遇到正数则保留并继续考察。本文给出了状态定义和状态转移方程,并提供了具体的代码实现。 ... [详细]
  • 2020年第十一届蓝桥杯决赛JAVA B G题“皮亚诺曲线距离“的个人题解目录
    本文是2020年第十一届蓝桥杯决赛JAVA B G题“皮亚诺曲线距离“的个人题解目录。文章介绍了皮亚诺曲线的概念和特点,并提供了计算皮亚诺曲线上两点距离的方法。通过给定的两个点的坐标,可以计算出它们之间沿着皮亚诺曲线走的最短距离。本文还提供了个人题解的目录,供读者参考。 ... [详细]
  • 也就是|小窗_卷积的特征提取与参数计算
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了卷积的特征提取与参数计算相关的知识,希望对你有一定的参考价值。Dense和Conv2D根本区别在于,Den ... [详细]
  • [大整数乘法] java代码实现
    本文介绍了使用java代码实现大整数乘法的过程,同时也涉及到大整数加法和大整数减法的计算方法。通过分治算法来提高计算效率,并对算法的时间复杂度进行了研究。详细代码实现请参考文章链接。 ... [详细]
  • Oracle优化新常态的五大禁止及其性能隐患
    本文介绍了Oracle优化新常态中的五大禁止措施,包括禁止外键、禁止视图、禁止触发器、禁止存储过程和禁止JOB,并分析了这些禁止措施可能带来的性能隐患。文章还讨论了这些禁止措施在C/S架构和B/S架构中的不同应用情况,并提出了解决方案。 ... [详细]
author-avatar
念念不忘的叶子公寓
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有