热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

手把手带你入门PySpark!

PySpark数据科学入门PySpark是一种很好的语言,可以大规模地进行探索性数据分析、构建机器学习管道以及为数据平台创建ETL。如果您已经熟悉Python和Pandas等库,那

PySpark数据科学入门

PySpark是一种很好的语言,可以大规模地进行探索性数据分析、构建机器学习管道以及为数据平台创建ETL。如果您已经熟悉Python和Pandas等库,那么PySpark是一种很好的语言,可以用来创建更具扩展性的分析和管道。这篇文章的目的是展示如何启动和运行PySpark并执行常见任务。

我们将使用Databricks作为Spark环境,将Kaggle的NHL数据集用作分析的数据源。这篇文章展示了如何在Spark 数据帧写入数据,创建这些帧的转换和聚合,可视化结果以及执行线性回归。我还将展示如何使用Pandas UDF以可扩展的方式将常规Python代码与PySpark混合。为了简单起见,我们将专注于批处理并避免流数据管道出现的一些复杂问题。

这篇文章的完整笔记本可以在github上找到。(https://github.com/bgweber/StartupDataScience/blob/master/EDA/PySpark_NHL.ipynb)

环境

启动和运行Spark有许多不同的选项:



  • 自托管:您可以使用裸机或虚拟机自行设置群集。 Apache Ambari是这个选项的一个有用的项目,但它不是我推荐的快速启动和运行的方法。

  • 云提供商:大多数云提供商都提供Spark集群:AWS具有EMR,GCP具有DataProc,它可以比自托管更快地进入互动环境。

  • 供应商解决方案:包括Databricks和Cloudera在内的公司提供Spark解决方案,使Spark易于启动和运行。

使用的解决方案因安全性、成本和现有基础架构而异。如果您正在尝试使用并运行一个需要的环境来学习,那么我建议您使用Databricks Community Edition。


在Databricks Community Edition中创建PySpark集群

使用此环境,可以轻松启动并运行Spark群集和笔记本环境。在本教程中,我使用Spark 2.4运行时和Python 3创建了一个集群。要运行本文中的代码,您至少需要Spark版本2.3,才能获得Pandas UDF功能。

Spark 数据帧

PySpark中使用的关键数据类型是Spark 数据帧。此对象可以被视为分布在集群中的表,其功能类似于R和Pandas中的数据帧。如果你想使用PySpark进行分布式计算,那么你需要对Spark 数据帧执行操作,而不是其他python数据类型。

使用Spark时,也可以通过在Spark数据帧上调用toPandas()来使用Pandas数据帧,该数据帧返回一个pandas对象。但是,除了处理小型数据帧之外,通常应该避免使用此函数,因为它将整个对象拉入单个节点的内存中。

Pandas和Spark数据帧之间的主要区别之一是急切执行和延迟执行。在PySpark中,操作被延迟,直到管道中实际需要结果。例如,您可以指定从S3加载数据集并对数据帧应用多个转换的操作,但不会立即应用这些操作。相反,记录转换图,并且一旦实际需要数据,例如当将结果写回S3时,则转换被应用为单个管道操作。此方法用于避免将完整数据帧拉入内存,并在整个计算机集群中实现更有效的处理。使用Pandas数据帧,一切都被拉入内存,并立即应用每个Pandas操作。

通常,如果可能的话,***避免在Spark中进行急切操作,因为它限制了有效分配的管道数量。

阅读数据

使用Spark时学习的第一步是将数据集加载到数据帧中。将数据加载到数据帧后,您可以应用转换、执行分析和建模,创建可视化并保留结果。在Python中,您可以使用Pandas直接从本地文件系统加载文件:


 

在PySpark中,加载CSV文件要复杂一些。在分布式环境中,没有本地存储,因此需要使用分布式文件系统(如HDFS,Databricks文件存储(DBFS)或S3)来指定文件的路径。

通常,在使用PySpark时,我使用S3中的数据。许多数据库都提供了对S3功能的卸载,并且还可以使用AWS控制台将文件从本地计算机移动到S3。对于这篇文章,我将使用Databricks文件系统(DBFS),它以/ FileStore的形式提供路径。第一步是上传您要处理的CSV文件。


将文件上载到Databricks文件存储

下一步是将CSV文件读入Spark数据帧,如下所示。此代码段指定CSV文件的路径,并将许多参数传递给read函数以处理该文件。最后一步显示加载的数据帧的子集,类似于Pandas中的df.head()。


 

在使用Spark时,我更喜欢使用 parquet 格式,因为它是一种文件格式,包含有关列数据类型的元数据,提供文件压缩,并且是一种旨在与Spark配合使用的文件格式。 AVRO是另一种适用于Spark的格式。下面的代码段显示了如何从过去的代码段中获取数据帧并将其保存为DBFS上的parquet文件,然后从保存的parquet文件中重新加载数据帧。


 

此步骤的结果是相同的,但执行流程明显不同。当将CSV文件读入数据帧时,Spark以急切模式执行操作,这意味着在下一步开始执行之前将所有数据加载到内存中,已经为大家精心准备了大数据的系统学习资料,从Linux-Hadoop-spark-......,需要的小伙伴可以点击而在读取parquet格式的文件时使用惰性方法。通常,您希望在使用Spark时避免急切操作,如果我需要处理大型CSV文件,将首先把数据集转换为parquet格式,然后再执行其余的管道。

通常,您需要处理大量文件,例如位于DBFS中某个路径或目录的数百个parquet文件。使用Spark,您可以在路径中包含通配符来处理文件集合。例如,您可以从S3加载一批parquet文件,如下所示:


 

如果您每天都有一个单独的parquet文件,或者如果您的管道中有一个先前的步骤会输出数百个parquet文件,则此方法很有用。

如果要从数据库(例如Redshift)读取数据,***先将数据卸载到S3,然后再使用Spark进行处理。在Redshift中,卸载命令可用于将数据导出到S3进行处理:


 

还有用于数据库的库,例如spark-redshift,使这个过程更容易执行。

写数据

与使用Spark读取数据类似,不建议在使用PySpark时将数据写入本地存储。相反,您应该使用分布式文件系统,如S3或HDFS。如果您要使用Spark处理结果,则parquet是用于保存数据框架的良好格式。下面的代码段显示了如何将数据帧保存为DBFS和S3作为parquet。


 

以parquet格式保存数据帧时,通常将其划分为多个文件,如下图所示。


将数据帧保存到DBFS时生成的parquet文件

如果您需要CSV文件中的结果,则需要稍微不同的输出步骤。这种方法的主要区别之一是所有数据在输出到CSV之前将被拉到单个节点。当您需要保存小型数据帧并在Spark之外的系统中处理它时,建议使用此方法。下面的代码段显示了如何将数据帧保存为DBFS和S3上的单个CSV文件。


 

Spark脚本的另一个常见输出是NoSQL数据库,如Cassandra、DynamoDB或Couchbase。这超出了本文的范围,但我过去看过的一种方法是将数据帧写入S3,已经为大家精心准备了大数据的系统学习资料,从Linux-Hadoop-spark-......,需要的小伙伴可以点击然后启动一个加载过程,告诉NoSQL系统从S3上的指定路径加载数据。

我也省略了对流式输出源的写入,如Kafka或Kinesis。这些系统在使用Spark流时更有用。

转换数据

可以在Spark数据帧上执行许多不同类型的操作,就像可以在Pandas数据帧上应用的各种操作一样。在Spark数据帧上执行操作的方法之一是通过Spark SQL,它可以像查询表一样查询数据帧。下面的代码段显示了如何在数据集中找到最高得分的玩家。


 

结果是玩家ID、游戏次数和这些游戏中的总进球数列表。如果我们想要显示播放器的名称,那么我们需要加载一个额外的文件,使其可作为临时视图,然后使用Spark SQL加入它。


数据集中得分最高的玩家

在上面的代码片段中,我使用display命令输出数据集的样本,但也可以将结果分配给另一个数据帧,这可以在管道的后续步骤中使用。下面的代码显示了如何执行这些步骤,其中第一个查询结果被分配给新的数据帧,然后将其分配给临时视图并与一组播放器名称连接。


 

这个过程的结果如下所示,根据Kaggle数据集确定Alex Ovechkin是NHL中的得分最高的球员。


使用Spark SQL连接数据帧的进程的输出

对于常见任务,有Spark数据帧操作,例如添加新列、删除列、执行连接以及计算聚合和分析统计信息,但是在开始使用时,使用Spark 已经为大家精心准备了大数据的系统学习资料,从Linux-Hadoop-spark-......,需要的小伙伴可以点击SQL执行这些操作可能更容易。此外,如果您已经使用PandaSQL或framequery等库来使用SQL操作Pandas数据帧,那么将代码从Python移植到PySpark会更容易。

与Spark数据帧上的大多数操作一样,Spark SQL操作以延迟执行模式执行,这意味着在需要结果之前不会评估SQL步骤。 Spark SQL提供了一种挖掘PySpark的好方法,而无需首先学习数据帧的新库。

如果您正在使用Databricks,您还可以直接在笔记本中创建可视化,而无需显式使用可视化库。例如,我们可以使用下面的Spark SQL代码绘制每个游戏的平均目标数。


 

Databricks笔记本中显示的初始输出是结果表,但我们可以使用绘图功能将输出转换为不同的可视化,例如下面显示的条形图。这种方法不支持数据科学家可能需要的每个可视化,但它确实使得在Spark中执行探索性数据分析变得更加容易。如果需要,我们可以使用toPandas()函数在驱动程序节点上创建Pandas数据帧,这意味着任何Python绘图库都可用于可视化结果。但是,这种方法应仅用于小型数据帧,因为所有数据都被急切地提取到驱动程序节点上的内存中。


2月和3月的平均每场比赛进球数

对于至少进5球的球员,我也考察了每次射门的平均进球数。


 

此转换的结果如下图所示。大多数至少有5个进球的球员在4%到12%的时间内完成投篮。


Kaggle数据集中玩家的每次射门目标

MLlib

用于数据科学家的Python的常见用例之一是构建预测模型。虽然scikit-learn在使用pandas时非常棒,但它不能扩展到分布式环境中的大型数据集(尽管有很多方法可以与Spark并行化)。在使用PySpark和海量数据集构建预测模型时,MLlib是首选库,因为它本身可以在Spark数据帧上运行。并非所有scikit-learn中的算法都可以在MLlib中使用,但是有很多选项可以涵盖许多用例。

为了在MLib中使用一种监督算法,您需要使用特征向量和标签作为标量来设置数据帧。准备好后,您可以使用fit函数来训练模型。下面的代码段显示了如何使用VectorAssembler将数据框中的多个列组合成单个要素向量。我们使用结果数据帧调用fit函数,然后生成模型的摘要统计信息。


 

该模型根据射击次数、游戏时间和其他因素预测玩家将获得多少目标。然而,该模型的性能较差,导致均方根误差(RMSE)为0.375,R平方值为0.125。具有最大值的系数是镜头列,但是这并没有提供足够的信号以使模型准确。

使用PySpark构建ML管道时需要考虑许多其他步骤,包括训练和测试数据集,超参数调整和模型存储。上面的代码段只是开始使用MLlib的起点。

Pandas UDF

我最近使用的Spark中的一个功能是Pandas用户定义函数(UDF),它使您能够在Spark环境中使用Pandas数据帧执行分布式计算。这些UDF的一般工作方式是首先使用groupby语句对Spark数据帧进行分区,并将每个分区发送到工作节点并转换为传递给UDF的Pandas数据帧。然后,UDF返回转换后的Pandas数据帧,该数据帧与所有其他分区组合,然后转换回Spark数据帧。最终结果非常有用,您可以使用需要Pandas的Python库,现在也可以扩展到海量数据集,只要您有一种分区数据帧的好方法。 Pandas UDF是在Spark 2.3中引入的,我将讨论在Spark Summit 2019期间我们如何在Zynga使用此功能。

曲线拟合是我作为数据科学家执行的常见任务。下面的代码片段显示了如何执行曲线拟合来描述玩家在游戏过程中记录的击球次数和击球次数之间的关系。该片段显示了我们如何通过在过滤到单个播放器的数据集上调用toPandas()来为单个玩家执行此任务。该步骤的输出是两个参数(线性回归系数),他们试图描述这些变量之间的关系。


 

如果我们想为每个玩家计算这条曲线并拥有一个海量数据集,那么由于内存不足异常,toPandas()调用将失败。我们可以通过调用player_id上的groupby(),然后应用下面显示的Pandas UDF,将此操作扩展到整个数据集。该函数将Pandas数据框作为输入,描述单个玩家的游戏统计数据,并返回包含player_id和拟合系数的摘要数据帧。然后将每个摘要Pandas数据帧组合成一个Spark数据帧,该数据帧显示在代码片段的末尾。使用Pandas UDF的另一个设置是为结果数据帧定义模式,其中模式描述了从应用步骤生成的Spark数据帧的格式。


 

此过程的输出如下所示。我们现在有一个数据帧,总结了每个玩家的曲线拟合,并且可以在海量数据集上运行此操作。在处理大量数据集时,选择或生成分区**,以在数据分区的数量和大小之间实现良好的权衡。


来自Pandas UDF的输出,显示每位玩家的曲线拟合

***实践

我已经介绍了使用PySpark的一些常见任务,但也希望提供一些建议,使其更容易从Python到PySpark。以下是我根据在这些环境之间移植一些项目的经验收集的一些***实践:



  • 避免使用库,使用数据帧:使用诸如字典之类的Python数据类型意味着代码可能无法在分布式模式下执行。与其使用键来索引字典中的值,不如考虑将另一列添加到可用作过滤器的数据帧中。

  • 谨慎使用toPandas:调用toPandas()将导致所有数据被加载到驱动程序节点上的内存中,并阻止在分布式模式下执行操作。当数据已经聚合并且您想要使用熟悉的Python绘图工具时,可以使用此函数,但它不应该用于大型数据帧。

  • 避免for循环:如果可能,***使用groupby-apply模式重写for循环逻辑以支持并行化代码执行。我注意到,专注于在Python中使用这种模式也导致清理代码更容易转换为PySpark。

  • 尝试最小化急切操作:为了使您的管道尽可能具有可扩展性,***避免将整个数据帧拉入内存的急切操作。我注意到用CSV读取是一个急切的操作,我的工作是将数据帧保存为parquet,然后从parquet重新加载以构建更具可扩展性的管道。

  • 使用framequery / pandasql可以更轻松地进行移植:如果您正在使用其他人的Python代码,那么解读一些Pandas操作正在实现的内容可能会很棘手。如果您计划将代码从Python移植到PySpark,那么使用Pandas的SQL库可以使这种转换更容易。

我发现在PySpark中编写代码的时间也因Python编码技巧而得到改善。

结论

PySpark是数据科学家学习的理想语言,因为它支持可扩展的分析和ML管道。如果您已经熟悉Python和Pandas,那么您的大部分知识都可以应用于Spark。我已经展示了如何使用PySpark执行一些常见的操作来引导学习过程。我还展示了一些最近使用Pandas UDF的Spark功能,它使Python代码能够以分布式模式执行。有很好的环境可以让你轻松启动和运行Spark集群,现在正是学习PySpark的好时机!



推荐阅读
  • Oracle优化新常态的五大禁止及其性能隐患
    本文介绍了Oracle优化新常态中的五大禁止措施,包括禁止外键、禁止视图、禁止触发器、禁止存储过程和禁止JOB,并分析了这些禁止措施可能带来的性能隐患。文章还讨论了这些禁止措施在C/S架构和B/S架构中的不同应用情况,并提出了解决方案。 ... [详细]
  • CDH4简介
    原文地址:CDH4简介作者:HadoopChinaWebelievethatduring2012,enterprisedistributionsofHa ... [详细]
  • Oozie任务调度框架详解及使用简介(一)
    摘要:个人最近一段时间一直在使用oozie,从刚开始的各种别扭到现在越来越觉得有意思的情况下,想整理一下关于oozie的认知,整理出来一个oozie系列,本来市面上关于oozie的 ... [详细]
  • 本文介绍了Oracle数据库中tnsnames.ora文件的作用和配置方法。tnsnames.ora文件在数据库启动过程中会被读取,用于解析LOCAL_LISTENER,并且与侦听无关。文章还提供了配置LOCAL_LISTENER和1522端口的示例,并展示了listener.ora文件的内容。 ... [详细]
  • 计算机存储系统的层次结构及其优势
    本文介绍了计算机存储系统的层次结构,包括高速缓存、主存储器和辅助存储器三个层次。通过分层存储数据可以提高程序的执行效率。计算机存储系统的层次结构将各种不同存储容量、存取速度和价格的存储器有机组合成整体,形成可寻址存储空间比主存储器空间大得多的存储整体。由于辅助存储器容量大、价格低,使得整体存储系统的平均价格降低。同时,高速缓存的存取速度可以和CPU的工作速度相匹配,进一步提高程序执行效率。 ... [详细]
  • 原创 | 大数据入门基础系列之ClouderaManager版本的Hive安装部署
    添加服务,一 ... [详细]
  • Flume 开源分布式日志收集系统
    为什么80%的码农都做不了架构师?Flume--开源分布式日志收集系统Flume是Cloudera提供的一个高可用的、高可靠的开源分布式海量日志收集系统 ... [详细]
  • 关于hadoop及相关模块的安装,自己下载模块安装的话较为麻烦,有配置、版本对应的些许问题,使用cloudera集成好的平台也不错 ... [详细]
  • Spark学习之路(一)Spark概述
    一,什么是spark定义:Spark一种基于内存的快速,通用,可扩展的大数据分析引擎.官网地址:http:spark.apache.org历史:2009年诞生于加州伯 ... [详细]
  • 2017-11-05卓明_开源中国开源中国开源中国微信号oschina2013功能介绍OSChina开源中国官方微信账号业务系统中,通常会遇到这些场景:A系统向B系统主动推送一个处 ... [详细]
  • 一、四种存储格式介绍1、TestFileTextFile文件不支持块压缩,默认格式,数据不做压缩,磁盘开销大,数据解析开销 ... [详细]
  • CDH 版 Hadoop 下载
    --昨夜西风凋碧树,独上高楼,望尽天涯路下载地址:http:archive.cloudera.comcdh5网页显示如下: ... [详细]
  • 大数据基础(八)Spark2.0.0下IPython和Notebook的安装配置,Go语言社区,Golang程序员人脉社 ... [详细]
  • 经过多年的发展,Hadoop生态系统不断完善和成熟,目前已经包括了多个子项目,除了核心的HDFS和MapReduce以外,Hadoop生态系统还包括要ZoopKer、HBase、H ... [详细]
  • hue访问mysql,【原创】大叔经验分享(50)hue访问mysql(librdbms)
    clouderamanager安装hue后想开启访问mysql(librdbms)需要在这里配置(hue_safety_valve.ini)添加配置如下[librdbms]#The ... [详细]
author-avatar
入骨红豆撕不撕
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有