通过键Spark写入多个输出 - 一个Spark作业

  发布于 2023-01-12 10:26

如何在单个作业中使用Spark写入依赖于键的多个输出.

相关:通过键Scalding Hadoop写入多个输出,一个MapReduce作业

例如

sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
.writeAsMultiple(prefix, compressionCodecOption)

确保cat prefix/1

a
b

cat prefix/2

c

编辑:我最近添加了一个新的答案,其中包括完整的导入,皮条客和压缩编解码器,请参阅/sf/ask/17360801/,除了之前的答案,这可能会有所帮助.

4 个回答
  • 如果您可能为给定密钥设置了许多值,我认为可扩展的解决方案是为每个分区的每个密钥写出一个文件.不幸的是,在Spark中没有内置的支持,但是我们可以鞭打一些东西.

    sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
      .mapPartitionsWithIndex { (p, it) =>
        val outputs = new MultiWriter(p.toString)
        for ((k, v) <- it) {
          outputs.write(k.toString, v)
        }
        outputs.close
        Nil.iterator
      }
      .foreach((x: Nothing) => ()) // To trigger the job.
    
    // This one is Local, but you could write one for HDFS
    class MultiWriter(suffix: String) {
      private val writers = collection.mutable.Map[String, java.io.PrintWriter]()
      def write(key: String, value: Any) = {
        if (!writers.contains(key)) {
          val f = new java.io.File("output/" + key + "/" + suffix)
          f.getParentFile.mkdirs
          writers(key) = new java.io.PrintWriter(f)
        }
        writers(key).println(value)
      }
      def close = writers.values.foreach(_.close)
    }
    

    (替换PrintWriter为您选择的分布式文件系统操作.)

    这使得一次通过RDD并且不执行随机播放.它为每个键提供一个目录,每个目录中包含许多文件.

    2023-01-12 10:28 回答
  • 这包括请求的编解码器,必要的导入和请求的pimp.

    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.SQLContext
    
    // TODO Need a macro to generate for each Tuple length, or perhaps can use shapeless
    implicit class PimpedRDD[T1, T2](rdd: RDD[(T1, T2)]) {
      def writeAsMultiple(prefix: String, codec: String,
                          keyName: String = "key")
                         (implicit sqlContext: SQLContext): Unit = {
        import sqlContext.implicits._
    
        rdd.toDF(keyName, "_2").write.partitionBy(keyName)
        .format("text").option("codec", codec).save(prefix)
      }
    }
    
    val myRdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
    myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")
    

    与OP的一个细微差别是它将<keyName>=为目录名称添加前缀.例如

    myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")
    

    会给:

    prefix/key=1/part-00000
    prefix/key=2/part-00000
    

    哪里prefix/my_number=1/part-00000将包含行ab,prefix/my_number=2/part-00000并将包含该行c.

    myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec", "foo")
    

    会给:

    prefix/foo=1/part-00000
    prefix/foo=2/part-00000
    

    应该清楚如何编辑parquet.

    最后下面是一个例子Dataset,使用元组可能更好.

    implicit class PimpedDataset[T](dataset: Dataset[T]) {
      def writeAsMultiple(prefix: String, codec: String, field: String): Unit = {
        dataset.write.partitionBy(field)
        .format("text").option("codec", codec).save(prefix)
      }
    }
    

    2023-01-12 10:28 回答
  • 我会这样做,这是可扩展的

    import org.apache.hadoop.io.NullWritable
    
    import org.apache.spark._
    import org.apache.spark.SparkContext._
    
    import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
    
    class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
      override def generateActualKey(key: Any, value: Any): Any = 
        NullWritable.get()
    
      override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = 
        key.asInstanceOf[String]
    }
    
    object Split {
      def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("Split" + args(1))
        val sc = new SparkContext(conf)
        sc.textFile("input/path")
        .map(a => (k, v)) // Your own implementation
        .partitionBy(new HashPartitioner(num))
        .saveAsHadoopFile("output/path", classOf[String], classOf[String],
          classOf[RDDMultipleTextOutputFormat])
        spark.stop()
      }
    }
    

    刚看到上面的类似答案,但实际上我们不需要自定义分区.MultipleTextOutputFormat将为每个键创建文件.具有相同键的多个记录可以归入同一分区.

    new HashPartitioner(num),其中num是您想要的分区号.如果您有大量不同的密钥,可以将数字设置为大.在这种情况下,每个分区都不会打开太多的hdfs文件处理程序.

    2023-01-12 10:29 回答
  • 如果您使用Spark 1.4+,由于DataFrame API,这变得更加容易.(DataFrames是在Spark 1.3中引入的,但partitionBy()我们需要的是1.4中引入的.)

    如果您刚开始使用RDD,则首先需要将其转换为DataFrame:

    val people_rdd = sc.parallelize(Seq((1, "alice"), (1, "bob"), (2, "charlie")))
    val people_df = people_rdd.toDF("number", "name")
    

    在Python中,相同的代码是:

    people_rdd = sc.parallelize([(1, "alice"), (1, "bob"), (2, "charlie")])
    people_df = people_rdd.toDF(["number", "name"])
    

    拥有DataFrame后,基于特定键写入多个输出很简单.更重要的是 - 这就是DataFrame API的美妙之处 - Python,Scala,Java和R的代码几乎相同:

    people_df.write.partitionBy("number").text("people")
    

    如果您需要,您可以轻松使用其他输出格式:

    people_df.write.partitionBy("number").json("people-json")
    people_df.write.partitionBy("number").parquet("people-parquet")
    

    在每个示例中,Spark将为我们对DataFrame进行分区的每个键创建一个子目录:

    people/
      _SUCCESS
      number=1/
        part-abcd
        part-efgh
      number=2/
        part-abcd
        part-efgh
    

    2023-01-12 10:29 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有