作者:changeverything77_262 | 来源:互联网 | 2023-02-05 15:38
我有一个大约5M行x20列的数据集,包含groupID和rowID.我的目标是检查(某些)列是否包含组内缺少(空)值的固定分数(例如,50%).如果找到,则该组的整个列设置为missing(null).
df = spark.read.parquet('path/to/parquet/')
check_columns = {'col1': ..., 'col2': ..., ...} # currently len(check_columns) = 8
for col, _ in check_columns.items():
total = (df
.groupBy('groupID').count()
.toDF('groupID', 'n_total')
)
missing = (df
.where(F.col(col).isNull())
.groupBy('groupID').count()
.toDF('groupID', 'n_missing')
)
# count_missing = count_missing.persist() # PERSIST TRY 1
# print('col {} found {} missing'.format(col, missing.count())) # missing.count() is b/w 1k-5k
poor_df = (total
.join(missing, 'groupID')
.withColumn('freq', F.col('n_missing') / F.col('n_total'))
.where(F.col('freq') > 0.5)
.select('groupID')
.toDF('poor_groupID')
)
df = (df
.join(poor_df, df['groupID'] == poor_df['poor_groupID'], 'left_outer')
.withColumn(col, (F.when(F.col('poor_groupID').isNotNull(), None)
.otherwise(df[col])
)
)
.select(df.columns)
)
stats = (missing
.withColumnRenamed('n_missing', 'cnt')
.collect() # FAIL 1
)
# df = df.persist() # PERSIST TRY 2
print(df.count()) # FAIL 2
我最初分配1G spark.driver.memory
和4G spark.executor.memory
,最终增加到spark.driver.memory
10G.
问题:
循环在第一次迭代期间像魅力一样运行,但到最后,在第6或第7次迭代时,我看到我的CPU利用率下降(使用1而不是6个核心).与此同时,一次迭代的执行时间显着增加.在某些时候,我得到一个OutOfMemory错误:
spark.driver.memory <4G
:at collect()
(FAIL 1
)
4G <= spark.driver.memory <10G
:在count()
步骤(FAIL 2
)
FAIL 1
案例的堆栈跟踪(相关部分):
[...]
py4j.protocol.Py4JJavaError: An error occurred while calling o1061.collectToPython.
: java.lang.OutOfMemoryError: Java heap space
[...]
执行程序UI不反映过多的内存使用情况(它显示驱动程序使用的内存<50k,执行程序<1G).Spark metrics system(app-XXX.driver.BlockManager.memory.memUsed_MB
)也没有:它显示600M到1200M的已用内存,但总是> 300M剩余内存.(这表明2G驱动程序内存应该这样做,但事实并非如此.)
首先处理哪个列也无关紧要(因为它是a上的循环dict()
,它可以是任意顺序).
我的问题是:
导致OutOfMemory错误的原因是什么并非所有可用的CPU核心都用于最终?
spark.driver.memory
当我从执行器转移到驱动程序只需几KB时,为什么需要10G ?
一些(一般)问题,以确保我理解正确的事情:
如果我收到OOM错误,正确看的地方几乎总是驱动程序(b/c执行程序溢出到磁盘)?
为什么会count()
导致OOM错误 - 我认为这个操作只会占用exector上的资源(向驱动程序提供几个字节)?
上面提到的内存指标(指标系统,UI)是否正确?
BTW:我在独立模式下运行Spark 2.1.0.
更新2017-04-28
为了进一步深入,我为驱动程序启用了堆转储:
cfg = SparkConfig()
cfg.set('spark.driver.extraJavaOptions', '-XX:+HeapDumpOnOutOfMemoryError')
我跑了它8G
的spark.driver.memory
和我分析与Eclipse MAT堆转储.事实证明,有两个相当大的类(每个~4G):
java.lang.Thread
- char (2G)
- scala.collection.IndexedSeqLike
- scala.collection.mutable.WrappedArray (1G)
- java.lang.String (1G)
org.apache.spark.sql.execution.ui.SQLListener
- org.apache.spark.sql.execution.ui.SQLExecutionUIData
(various of up to 1G in size)
- java.lang.String
- ...
我尝试关闭UI,使用
cfg.set('spark.ui.enabled', 'false')
这使得UI不可用,但对OOM错误没有帮助.此外,我尝试使用UI来保持较少的历史记录
cfg.set('spark.ui.retainedJobs', '1')
cfg.set('spark.ui.retainedStages', '1')
cfg.set('spark.ui.retainedTasks', '1')
cfg.set('spark.sql.ui.retainedExecutions', '1')
cfg.set('spark.ui.retainedDeadExecutors', '1')
这也没有帮助.
更新2017-05-18
我发现了Spark的pyspark.sql.DataFrame.checkpoint
方法.这就像是persist
摆脱了数据帧的血统.因此,它有助于规避上述问题.