Apache火花消息理解

 那时候的我和你_173 发布于 2022-12-11 19:33

请求帮助以了解此消息..

INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 2 is **2202921** bytes

2202921在这里意味着什么?

我的工作是一个随机操作,当从前一个阶段读取随机文件时,它首先给出消息,然后在一段时间之后失败并出现以下错误:

14/11/12 11:09:46 WARN scheduler.TaskSetManager: Lost task 224.0 in stage 4.0 (TID 13938, ip-xx-xxx-xxx-xx.ec2.internal): FetchFailed(BlockManagerId(11, ip-xx-xxx-xxx-xx.ec2.internal, 48073, 0), shuffleId=2, mapId=7468, reduceId=224)
14/11/12 11:09:46 INFO scheduler.DAGScheduler: Marking Stage 4 (coalesce at :49) as failed due to a fetch failure from Stage 3 (map at :42)
14/11/12 11:09:46 INFO scheduler.DAGScheduler: Stage 4 (coalesce at :49) failed in 213.446 s
14/11/12 11:09:46 INFO scheduler.DAGScheduler: Resubmitting Stage 3 (map at :42) and Stage 4 (coalesce at :49) due to fetch failure
14/11/12 11:09:46 INFO scheduler.DAGScheduler: Executor lost: 11 (epoch 2)
14/11/12 11:09:46 INFO storage.BlockManagerMasterActor: Trying to remove executor 11 from BlockManagerMaster.
14/11/12 11:09:46 INFO storage.BlockManagerMaster: Removed 11 successfully in removeExecutor
14/11/12 11:09:46 INFO scheduler.Stage: Stage 3 is now unavailable on executor 11 (11893/12836, false)
14/11/12 11:09:46 INFO scheduler.DAGScheduler: Resubmitting failed stages
14/11/12 11:09:46 INFO scheduler.DAGScheduler: Submitting Stage 3 (MappedRDD[13] at map at :42), which has no missing parents
14/11/12 11:09:46 INFO storage.MemoryStore: ensureFreeSpace(25472) called with curMem=474762, maxMem=11113699737
14/11/12 11:09:46 INFO storage.MemoryStore: Block broadcast_6 stored as values in memory (estimated size 24.9 KB, free 10.3 GB)
14/11/12 11:09:46 INFO storage.MemoryStore: ensureFreeSpace(5160) called with curMem=500234, maxMem=11113699737
14/11/12 11:09:46 INFO storage.MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 5.0 KB, free 10.3 GB)
14/11/12 11:09:46 INFO storage.BlockManagerInfo: Added broadcast_6_piece0 in memory on ip-xx.ec2.internal:35571 (size: 5.0 KB, free: 10.4 GB)
14/11/12 11:09:46 INFO storage.BlockManagerMaster: Updated info of block broadcast_6_piece0
14/11/12 11:09:46 INFO scheduler.DAGScheduler: Submitting 943 missing tasks from Stage 3 (MappedRDD[13] at map at :42)
14/11/12 11:09:46 INFO cluster.YarnClientClusterScheduler: Adding task set 3.1 with 943 tasks

我的代码看起来像这样,

(rdd1 ++ rdd2).map { t => ((t.id), t) }.groupByKey(1280).map {
  case ((id), sequence) =>
    val newrecord = sequence.maxBy {
      case Fact(id, key, type, day, group, c_key, s_key, plan_id,size,
        is_mom, customer_shipment_id, customer_shipment_item_id, asin, company_key, product_line_key, dw_last_updated, measures) => dw_last_updated.toLong
    }
    ((PARTITION_KEY + "=" + newrecord.day.toString + "/part"), (newrecord))
}.coalesce(2048,true).saveAsTextFile("s3://myfolder/PT/test20nodes/")```

我派生了1280,因为我有20个节点,每个节点有32个核心.我派它像2*32*20.

1 个回答
  • 对于Shuffle阶段,它将创建一些ShuffleMapTasks,将中间结果输出到磁盘.位置信息将存储在MapStatuses中并发送给MapOutputTrackerMaster(驱动程序).

    然后当下一个阶段开始运行时,它需要这些位置状态.因此执行者会要求MapOutputTrackerMaster获取它们.MapOutputTrackerMaster将这些状态序列化为字节并将它们发送给执行程序.这是以字节为单位的这些状态的大小.

    这些状态将通过Akka发送.并且Akka对最大邮件大小有限制.您可以通过spark.akka.frameSize以下方式设置:

    "控制平面"通信中允许的最大消息大小(对于序列化任务和任务结果),以MB为单位.如果您的任务需要将大量结果发送回驱动程序(例如,在大型数据集上使用collect()),请增加此值.

    如果大小超过spark.akka.frameSize,Akka将拒绝传递消息,您的工作将失败.因此,它可以帮助您适应spark.akka.frameSize最好的一个.

    2022-12-11 19:36 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有