热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

PySpark通过Arrow加速

PySpark是Spark实现UnifyBigData&&MachineLearni

前言

PySpark是Spark 实现 Unify BigData && Machine Learning目标的基石之一。通过PySpark,我们可以用 Python 在一个脚本里完成数据加载,处理,训练,预测等完整Pipeline,加上DB良好的notebook的支持,数据科学家们会觉得非常开心。当然缺点也是有的,就是带来了比较大的性能损耗。

性能损耗点分析

如果使用PySpark,大概处理流程是这样的(注意,这些都是对用户透明的)

  1. python通过socket调用Spark API(py4j完成),一些计算逻辑,python会在调用时将其序列化,一并发送给Spark。
  2. Spark 触发计算,比如加载数据,然后把数据转成内部存储格式InternalRow,接着启动Python Deamon, Python Deamon再启动多个Worker,
  3. 数据通过socket协议发送给Python Worker(不跨网络),期间需要将InternalRow转化为 Java 对象,然后再用Java Pickle进行序列化(一次),这个时候才能通过网络发送给Worker
  4. Worker接收后,一条一条反序列化(python pickle,两次),然后转化为Python对象进行处理。拿到前面序列化好的函数反序列化,接着用这个函数对这些数据处理,处理完成后,再用pickle进行序列化(三次),发送给Java Executor.
  5. Java Executor获取数据后,需要反序列化(四次),然后转化为InternalRow继续进行处理。

所以可以看到,前后需要四次编码/解码动作。序列化反序列化耗时应该占用额外耗时的70%左右。我们说,有的时候把序列化框架设置为Kyro之后,速度明显快了很多,可见序列化的额外耗时是非常明显的。

前面是一个点,第二个点是,数据是按行进行处理的,一条一条,显然性能不好。

第三个点是,Socket协议通讯其实还是很快的,而且不跨网络,只要能克服前面两个问题,那么性能就会得到很大的提升。 另外可以跟大家说的是,Python如果使用一些C库的扩展,比如Numpy,本身也是非常快的。

如何开启Arrow进行加速,以及背后原理

开启方式很简单,启动时加上一个配置即可:

if __name__ == '__main__':
    cOnf= SparkConf()
    conf.set("spark.sql.execution.arrow.enabled", "true")

你也可以在submit命令行里添加。

那么Arrow是如何加快速度的呢?主要是有两点:

  1. 序列化友好
  2. 向量化

序列化友好指的是,Arrow提供了一个内存格式,该格式本身是跨应用的,无论你放到哪,都是这个格式,中间如果需要网络传输这个格式,那么也是序列化友好的,只要做下格式调整(不是序列化)就可以将数据发送到另外一个应用里。这样就大大的降低了序列化开销。

向量化指的是,首先Arrow是将数据按block进行传输的,其次是可以对立面的数据按列进行处理的。这样就极大的加快了处理速度。

实测效果

为了方便测试,我定义了一个基类:

from pyspark import SQLContext
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession
import os

os.environ["PYSPARK_PYTHON"] = "/Users/allwefantasy/deepavlovpy3/bin/python3"

class _SparkBase(object):
    @classmethod
    def start(cls, cOnf=SparkConf()):
        cls.sc = SparkContext(master='local[*]', appName=cls.__name__, cOnf=conf)
        cls.sql = SQLContext(cls.sc)
        cls.session = SparkSession.builder.getOrCreate()
        cls.dataDir = "/Users/allwefantasy/CSDNWorkSpace/spark-deep-learning_latest"

    @classmethod
    def shutdown(cls):
        cls.session.stop()
        cls.session = None
        cls.sc.stop()
        cls.sc = None

接着提供了一个性能测试辅助类:

import time
from functools import wraps
import logging

logger = logging.getLogger(__name__)

PROF_DATA = {}


def profile(fn):
    @wraps(fn)
    def with_profiling(*args, **kwargs):
        start_time = time.time()

        ret = fn(*args, **kwargs)

        elapsed_time = time.time() - start_time

        if fn.__name__ not in PROF_DATA:
            PROF_DATA[fn.__name__] = [0, []]
        PROF_DATA[fn.__name__][0] += 1
        PROF_DATA[fn.__name__][1].append(elapsed_time)

        return ret

    return with_profiling


def print_prof_data(clear):
    for fname, data in PROF_DATA.items():
        max_time = max(data[1])
        avg_time = sum(data[1]) / len(data[1])
        logger.warn("Function %s called %d times. " % (fname, data[0]))
        logger.warn('Execution time max: %.3f, average: %.3f' % (max_time, avg_time))
    if clear:
        clear_prof_data()


def clear_prof_data():
    global PROF_DATA
    PROF_DATA = {}

很简单,就是wrap一下实际的函数,然后进行时间计算。现在,我们写一个PySpark的类:

import logging
from random import Random

import pyspark.sql.functions as F
from pyspark import SparkConf
from pyspark.sql.types import *

from example.allwefantasy.base.spark_base import _SparkBase
import example.allwefantasy.time_profile as TimeProfile
import pandas as pd

logger = logging.getLogger(__name__)
class PySparkOptimize(_SparkBase):
    def trick1(self):   
        pass 

if __name__ == '__main__':
    cOnf= SparkConf()
    conf.set("spark.sql.execution.arrow.enabled", "true")
    PySparkOptimize.start(cOnf=conf)
    PySparkOptimize().trick1()
    PySparkOptimize.shutdown()

这样骨架就搭建好了。

我们写第一个方法,trick1,做一个简单的计数:

def trick1(self):
        df = self.session.range(0, 1000000).select("id", F.rand(seed=10).alias("uniform"),
                                                   F.randn(seed=27).alias("normal"))
        # 更少的内存和更快的速度
        TimeProfile.profile(lambda: df.toPandas())()
        TimeProfile.print_prof_data(clear=True)

并且将前面的arrow设置为false.结果如下:

Function  called 1 times. 
Execution time max: 6.716, average: 6.716

然后同样的代码,我们把arrow设置为true,是不是会好一些呢?

Function  called 1 times. 
Execution time max: 2.067, average: 2.067

当然我这个测试并不严谨,但是对于这种非常简单的示例,提升还是有效三倍的,不是么?而这,只是改个配置就可以达成了。

分组聚合使用Pandas处理

另外值得一提的是,PySpark是不支持自定义聚合函数的,现在如果是数据处理,可以把group by的小集合发给pandas处理,pandas再返回,比如

def trick7(self):
        df = self.session.createDataFrame(
            [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))

        @F.pandas_udf("id long", F.PandasUDFType.GROUPED_MAP)  
        def normalize(pdf):
            v = pdf.v
            return pdf.assign(v=(v - v.mean()) / v.std())[["id"]]

        df.groupby("id").apply(normalize).show()

这里是id进行gourp by ,这样就得到一张id列都是1的小表,接着呢把这个小表转化为pandas dataframe处理,处理完成后,还是返回一张小表,表结构则在注解里定义,比如只返回id字段,id字段是long类型。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 我们


推荐阅读
  • 开发笔记:加密&json&StringIO模块&BytesIO模块
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了加密&json&StringIO模块&BytesIO模块相关的知识,希望对你有一定的参考价值。一、加密加密 ... [详细]
  • Python如何调用类里面的方法
    本文介绍了在Python中调用同一个类中的方法需要加上self参数,并且规范写法要求每个函数的第一个参数都为self。同时还介绍了如何调用另一个类中的方法。详细内容请阅读剩余部分。 ... [详细]
  • 展开全部下面的代码是创建一个立方体Thisexamplescreatesanddisplaysasimplebox.#Thefirstlineloadstheinit_disp ... [详细]
  • 不同优化算法的比较分析及实验验证
    本文介绍了神经网络优化中常用的优化方法,包括学习率调整和梯度估计修正,并通过实验验证了不同优化算法的效果。实验结果表明,Adam算法在综合考虑学习率调整和梯度估计修正方面表现较好。该研究对于优化神经网络的训练过程具有指导意义。 ... [详细]
  • 本文介绍了Python对Excel文件的读取方法,包括模块的安装和使用。通过安装xlrd、xlwt、xlutils、pyExcelerator等模块,可以实现对Excel文件的读取和处理。具体的读取方法包括打开excel文件、抓取所有sheet的名称、定位到指定的表单等。本文提供了两种定位表单的方式,并给出了相应的代码示例。 ... [详细]
  • 本文介绍了闭包的定义和运转机制,重点解释了闭包如何能够接触外部函数的作用域中的变量。通过词法作用域的查找规则,闭包可以访问外部函数的作用域。同时还提到了闭包的作用和影响。 ... [详细]
  • 本文介绍了lua语言中闭包的特性及其在模式匹配、日期处理、编译和模块化等方面的应用。lua中的闭包是严格遵循词法定界的第一类值,函数可以作为变量自由传递,也可以作为参数传递给其他函数。这些特性使得lua语言具有极大的灵活性,为程序开发带来了便利。 ... [详细]
  • 本文介绍了在Python3中如何使用选择文件对话框的格式打开和保存图片的方法。通过使用tkinter库中的filedialog模块的asksaveasfilename和askopenfilename函数,可以方便地选择要打开或保存的图片文件,并进行相关操作。具体的代码示例和操作步骤也被提供。 ... [详细]
  • 本文介绍了一个Java猜拳小游戏的代码,通过使用Scanner类获取用户输入的拳的数字,并随机生成计算机的拳,然后判断胜负。该游戏可以选择剪刀、石头、布三种拳,通过比较两者的拳来决定胜负。 ... [详细]
  • Commit1ced2a7433ea8937a1b260ea65d708f32ca7c95eintroduceda+Clonetraitboundtom ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • 本文讨论了如何优化解决hdu 1003 java题目的动态规划方法,通过分析加法规则和最大和的性质,提出了一种优化的思路。具体方法是,当从1加到n为负时,即sum(1,n)sum(n,s),可以继续加法计算。同时,还考虑了两种特殊情况:都是负数的情况和有0的情况。最后,通过使用Scanner类来获取输入数据。 ... [详细]
  • 本文介绍了OC学习笔记中的@property和@synthesize,包括属性的定义和合成的使用方法。通过示例代码详细讲解了@property和@synthesize的作用和用法。 ... [详细]
  • 本文介绍了如何在给定的有序字符序列中插入新字符,并保持序列的有序性。通过示例代码演示了插入过程,以及插入后的字符序列。 ... [详细]
  • 本文介绍了计算机网络的定义和通信流程,包括客户端编译文件、二进制转换、三层路由设备等。同时,还介绍了计算机网络中常用的关键词,如MAC地址和IP地址。 ... [详细]
author-avatar
无石笑_987
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有