我正在写一个关于Spark的程序,我只是按键进行聚合.该计划非常简单.我的输入数据只有2GB,在多核服务器(8核,32GB RAM)上运行,设置为local [2].那就是使用两个内核进行并行化.但是,我发现性能非常糟糕.它几乎需要两个小时才能完成.我正在使用KryoSerializer.我想这可能是由Serializer引起的.如何解决这个问题呢?
val dataPoints = SparkContextManager.textFile(dataLocation) .map(x => { val delimited = x.split(",") (delimited(ColumnIndices.HOME_ID_COLUMN).toLong, delimited(ColumnIndices.USAGE_READING_COLUMN).toDouble) }) def process(step: Int): Array[(Long, List[Double])] = { val resultRDD = new PairRDDFunctions(dataPoints.map(x =>(x._1, List[Double](x._2)))) resultRDD.reduceByKey((x, y) => x++y).collect() }
输出将是:
1, [1, 3, 13, 21, ..., 111] // The size of list is about 4000 2, [24,34,65, 24, ..., 245] ....
Josh Rosen.. 9
看起来您正在尝试编写一个Spark作业,该作业将与同一个键关联的值组合在一起.PairRDDFunctions有一个groupByKey
执行此操作的操作.Spark的实现groupByKey
利用了几个性能优化来创建更少的临时对象,并通过网络缓存更少的数据(因为每个值都不会包含在a中List
).
如果导入Spark的隐式转换,请使用
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._
那么你不需要手动包装你的映射RDD,PairRDDFunctions
以便访问像groupByKey
.这不会对性能产生影响,并使大型Spark程序更易于阅读.
使用groupByKey
,我认为你的process
功能可以改写为
def process(step: Int): Array[(Long, Seq[Double])] = { dataPoints.groupByKey().collect() }
我还考虑提高并行度:两者groupByKey
并reduceByKey
采用numTasks
控制减速器数量的可选参数; 默认情况下,星火仅用8个并行任务groupByKey
和reduceByKey
." Spark Scala编程指南"以及Scaladoc中对此进行了描述.
看起来您正在尝试编写一个Spark作业,该作业将与同一个键关联的值组合在一起.PairRDDFunctions有一个groupByKey
执行此操作的操作.Spark的实现groupByKey
利用了几个性能优化来创建更少的临时对象,并通过网络缓存更少的数据(因为每个值都不会包含在a中List
).
如果导入Spark的隐式转换,请使用
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._
那么你不需要手动包装你的映射RDD,PairRDDFunctions
以便访问像groupByKey
.这不会对性能产生影响,并使大型Spark程序更易于阅读.
使用groupByKey
,我认为你的process
功能可以改写为
def process(step: Int): Array[(Long, Seq[Double])] = { dataPoints.groupByKey().collect() }
我还考虑提高并行度:两者groupByKey
并reduceByKey
采用numTasks
控制减速器数量的可选参数; 默认情况下,星火仅用8个并行任务groupByKey
和reduceByKey
." Spark Scala编程指南"以及Scaladoc中对此进行了描述.