在Spark中加入并行化很好吗?

 军魂永驻1971 发布于 2023-01-11 11:27

我在一个小于400MB的非常小的数据集上运行一个相当小的Spark程序,其中包含一些map和reduceByKey操作.

在某些时候,我有一个我要排序的元组的RDD,我调用sortByKey.这是我程序中最慢的部分.其他一切似乎几乎立即运行,但这需要20秒.

问题是,我的笔记本电脑以及AWS m3.large机器群集需要20秒.我尝试过1,2和3个奴隶,执行时间的差异非常小.Ganglia和spark web控制台表明CPU和内存正在被用于所有从站的最大容量,所以我认为配置是可以的.

我还在我预期之前发现了执行问题,但后来我读到了这个线程,它指向Sp​​ark中的一个未解决的问题.我不认为这完全相关.

它是sortByKey固有的慢,无论我添加多少节点,它将决定我的程序的最短执行时间?希望不是,而且我只是做错了什么并且可以修复.

编辑

事实证明,我所看到的与我发布的链接有关.sortByKey恰好是第一个动作(记录为转换),看起来好像程序在排序时很慢,但实际排序速度非常快.问题出在先前的连接操作中.

我说的所有内容都适用于通过连接更改排序.当我添加更多节点(或numTask到连接函数)时,为什么执行时间不会下降,为什么它甚至不比普通的SQL连接更好?我之前发现其他人有这个问题,但除了建议调整序列化之外没有答案,我真的不认为是我的情况.

1 个回答
  • 连接本质上是一个繁重的操作,因为具有相同键的值必须移动到同一台机器(网络混洗).添加更多节点只会增加额外的IO开销.

    我能想到两件事:

    选项1

    如果要使用较小的数据集加入大型数据集,则可以通过广播较小的数据集来获得回报:

    val large = sc.textFile("large.txt").map(...) 
    val smaller = sc.textFile("smaller.txt").collect().toMap() 
    val bc = sc.broadcast(smaller)
    

    然后做一个'手动加入':

    large.map(x => (x.value, bc.value(x.value)))
    

    这在Advanced Spark演示中有更详细的描述.

    选项2

    您可以使用与大数据集相同的分区来重新分区小数据集(即确保类似的键位于同一台计算机上).因此,调整小集的分区以匹配大集的分区.

    这将仅触发小套装的随机播放.一旦分区正确,连接应该相当快,因为​​它将在每个群集节点上本地运行.

    2023-01-11 11:28 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有