我正在使用Spark的Python API并运行Spark 0.8.
我正在存储一个大的RDD浮点向量,我需要对整个集合执行一个向量的计算.
RDD中的切片和分区之间有什么区别吗?
当我创建RDD时,我将其作为参数传递100,这使得它将RDD存储为100个切片并在执行计算时创建100个任务.我想知道,通过使系统更有效地处理数据,分区数据是否会提高切片之外的性能(即,在分区上执行操作与在切片RDD中的每个元素上操作之间是否存在差异).
例如,这两段代码之间是否有任何显着差异?
rdd = sc.textFile(demo.txt, 100)
VS
rdd = sc.textFile(demo.txt) rdd.partitionBy(100)
Nick Chammas.. 24
我相信slices
并且partitions
在Apache Spark中也是如此.
但是,您发布的两段代码之间存在微妙但可能存在显着差异.
此代码将尝试demo.txt
使用100个并发任务直接加载到100个分区:
rdd = sc.textFile('demo.txt', 100)
对于未压缩的文本,它将按预期工作.但是如果不是demo.txt
你有一个demo.gz
,你将得到一个只有1个分区的RDD.对gzip压缩文件的读取无法并行化.
另一方面,以下代码将首先打开demo.txt
具有默认分区数的RDD,然后它将显式地将数据重新分区为大小大致相等的100个分区.
rdd = sc.textFile('demo.txt') rdd = rdd.repartition(100)
因此,在这种情况下,即使使用了一个,demo.gz
您最终也会得到一个包含100个分区的RDD.
作为旁注,我替换了你partitionBy()
,repartition()
因为我认为你正在寻找.partitionBy()
要求RDD是元组的RDD.由于repartition()
在Spark 0.8.0中不可用,您应该可以使用coalesce(100, shuffle=True)
.
Spark可以为RDD的每个分区运行1个并发任务,最多可以为集群中的核心数量.因此,如果您有一个包含50个内核的群集,您希望您的RDD至少有50个分区(可能是该分区的2-3倍).
从Spark 1.1.0开始,您可以检查RDD具有的分区数,如下所示:
rdd.getNumPartitions() # Python API rdd.partitions.size // Scala API
在1.1.0之前,使用Python API执行此操作的方法是rdd._jrdd.splits().size()
.
我相信slices
并且partitions
在Apache Spark中也是如此.
但是,您发布的两段代码之间存在微妙但可能存在显着差异.
此代码将尝试demo.txt
使用100个并发任务直接加载到100个分区:
rdd = sc.textFile('demo.txt', 100)
对于未压缩的文本,它将按预期工作.但是如果不是demo.txt
你有一个demo.gz
,你将得到一个只有1个分区的RDD.对gzip压缩文件的读取无法并行化.
另一方面,以下代码将首先打开demo.txt
具有默认分区数的RDD,然后它将显式地将数据重新分区为大小大致相等的100个分区.
rdd = sc.textFile('demo.txt') rdd = rdd.repartition(100)
因此,在这种情况下,即使使用了一个,demo.gz
您最终也会得到一个包含100个分区的RDD.
作为旁注,我替换了你partitionBy()
,repartition()
因为我认为你正在寻找.partitionBy()
要求RDD是元组的RDD.由于repartition()
在Spark 0.8.0中不可用,您应该可以使用coalesce(100, shuffle=True)
.
Spark可以为RDD的每个分区运行1个并发任务,最多可以为集群中的核心数量.因此,如果您有一个包含50个内核的群集,您希望您的RDD至少有50个分区(可能是该分区的2-3倍).
从Spark 1.1.0开始,您可以检查RDD具有的分区数,如下所示:
rdd.getNumPartitions() # Python API rdd.partitions.size // Scala API
在1.1.0之前,使用Python API执行此操作的方法是rdd._jrdd.splits().size()
.