如何使saveAsTextFile NOT分割输出到多个文件?

 U友50122053 发布于 2023-01-08 00:27

在Spark中使用Scala时,每当我使用结果转储结果时saveAsTextFile,它似乎将输出分成多个部分.我只是将一个参数(路径)传递给它.

val year = sc.textFile("apat63_99.txt").map(_.split(",")(1)).flatMap(_.split(",")).map((_,1)).reduceByKey((_+_)).map(_.swap)
year.saveAsTextFile("year")

    输出数量是否与其使用的减速器数量相对应?

    这是否意味着输出被压缩了?

    我知道我可以使用bash将输出组合在一起,但是有一个选项可以将输出存储在单个文本文件中,而不会拆分吗?我查看了API文档,但对此并没有太多说明.

aaronman.. 96

它将其保存为多个文件的原因是因为计算是分布式的.如果输出足够小,以至于您认为可以将它放在一台机器上,那么您可以使用

val arr = year.collect()

然后将生成的数组保存为文件,另一种方法是使用自定义分区程序partitionBy,然后使所有内容都转到一个分区,虽然这是不可取的,因为你不会得到任何并行化.

如果您需要保存文件,saveAsTextFile可以使用coalesce(1,true).saveAsTextFile().这基本上意味着计算然后合并到1分区.你也可以使用shuffle参数设置为true repartition(1)的包装器coalesce.看看RDD.scala的来源是我如何找出这些东西的大部分,你应该看一看.

3 个回答
  • 它将其保存为多个文件的原因是因为计算是分布式的.如果输出足够小,以至于您认为可以将它放在一台机器上,那么您可以使用

    val arr = year.collect()
    

    然后将生成的数组保存为文件,另一种方法是使用自定义分区程序partitionBy,然后使所有内容都转到一个分区,虽然这是不可取的,因为你不会得到任何并行化.

    如果您需要保存文件,saveAsTextFile可以使用coalesce(1,true).saveAsTextFile().这基本上意味着计算然后合并到1分区.你也可以使用shuffle参数设置为true repartition(1)的包装器coalesce.看看RDD.scala的来源是我如何找出这些东西的大部分,你应该看一看.

    2023-01-08 00:40 回答
  • 对于使用更大数据集的人:

    rdd.collect()不应该在这种情况下使用,因为它将收集Array驱动程序中的所有数据,这是最简单的内存不足的方法.

    rdd.coalesce(1).saveAsTextFile() 也不应该使用上游阶段的并行性将丢失,以便在单个节点上执行,其中将存储数据.

    rdd.coalesce(1, shuffle = true).saveAsTextFile() 是最简单的选项,因为它将保持上游任务的处理并行,然后只对一个节点执行shuffle(rdd.repartition(1).saveAsTextFile()是一个确切的同义词).

    rdd.saveAsSingleTextFile()另外,bellow还允许将rdd存储在具有特定名称的单个文件中,同时保持其并行性rdd.coalesce(1, shuffle = true).saveAsTextFile().

    可能不方便的rdd.coalesce(1, shuffle = true).saveAsTextFile("path/to/file.txt")是它实际上生成了一个路径是path/to/file.txt/part-00000和否的文件path/to/file.txt.

    以下解决方案rdd.saveAsSingleTextFile("path/to/file.txt")实际上将生成一个路径为的文件path/to/file.txt:

    package com.whatever.package
    
    import org.apache.spark.rdd.RDD
    import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
    import org.apache.hadoop.io.compress.CompressionCodec
    
    object SparkHelper {
    
      // This is an implicit class so that saveAsSingleTextFile can be attached to
      // SparkContext and be called like this: sc.saveAsSingleTextFile
      implicit class RDDExtensions(val rdd: RDD[String]) extends AnyVal {
    
        def saveAsSingleTextFile(path: String): Unit =
          saveAsSingleTextFileInternal(path, None)
    
        def saveAsSingleTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit =
          saveAsSingleTextFileInternal(path, Some(codec))
    
        private def saveAsSingleTextFileInternal(
            path: String, codec: Option[Class[_ <: CompressionCodec]]
        ): Unit = {
    
          // The interface with hdfs:
          val hdfs = FileSystem.get(rdd.sparkContext.hadoopConfiguration)
    
          // Classic saveAsTextFile in a temporary folder:
          hdfs.delete(new Path(s"$path.tmp"), true) // to make sure it's not there already
          codec match {
            case Some(codec) => rdd.saveAsTextFile(s"$path.tmp", codec)
            case None        => rdd.saveAsTextFile(s"$path.tmp")
          }
    
          // Merge the folder of resulting part-xxxxx into one file:
          hdfs.delete(new Path(path), true) // to make sure it's not there already
          FileUtil.copyMerge(
            hdfs, new Path(s"$path.tmp"),
            hdfs, new Path(path),
            true, rdd.sparkContext.hadoopConfiguration, null
          )
          // Working with Hadoop 3?: /sf/ask/17360801/
    
          hdfs.delete(new Path(s"$path.tmp"), true)
        }
      }
    }
    

    可以这样使用:

    import com.whatever.package.SparkHelper.RDDExtensions
    
    rdd.saveAsSingleTextFile("path/to/file.txt")
    

    该片段首先将rdd存储rdd.saveAsTextFile("path/to/file.txt")在临时文件夹中path/to/file.txt.tmp,就像我们不想将数据存储在一个文件中一样(这使得上游任务的处理保持并行).

    然后,只使用hadoop文件系统api,我们继续使用不同输出文件的merge(FileUtil.copyMerge())来创建我们的最终输出单个文件path/to/file.txt.

    2023-01-08 00:42 回答
  • 你可以打电话coalesce(1)然后saveAsTextFile()- 但如果你有很多数据可能是个坏主意.生成每个拆分的单独文件就像在Hadoop中一样,以便让单独的映射器和Reducer写入不同的文件.如果您的数据非常少,那么只有一个输出文件是个好主意,在这种情况下,您也可以执行collect(),就像@aaronman所说的那样.

    2023-01-08 00:44 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有