如何在单个作业中使用Spark写入依赖于键的多个输出.
相关:通过键Scalding Hadoop写入多个输出,一个MapReduce作业
例如
sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c"))) .writeAsMultiple(prefix, compressionCodecOption)
确保cat prefix/1
是
a b
并cat prefix/2
会
c
编辑:我最近添加了一个新的答案,其中包括完整的导入,皮条客和压缩编解码器,请参阅/sf/ask/17360801/,除了之前的答案,这可能会有所帮助.
如果您可能为给定密钥设置了许多值,我认为可扩展的解决方案是为每个分区的每个密钥写出一个文件.不幸的是,在Spark中没有内置的支持,但是我们可以鞭打一些东西.
sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c"))) .mapPartitionsWithIndex { (p, it) => val outputs = new MultiWriter(p.toString) for ((k, v) <- it) { outputs.write(k.toString, v) } outputs.close Nil.iterator } .foreach((x: Nothing) => ()) // To trigger the job. // This one is Local, but you could write one for HDFS class MultiWriter(suffix: String) { private val writers = collection.mutable.Map[String, java.io.PrintWriter]() def write(key: String, value: Any) = { if (!writers.contains(key)) { val f = new java.io.File("output/" + key + "/" + suffix) f.getParentFile.mkdirs writers(key) = new java.io.PrintWriter(f) } writers(key).println(value) } def close = writers.values.foreach(_.close) }
(替换PrintWriter
为您选择的分布式文件系统操作.)
这使得一次通过RDD并且不执行随机播放.它为每个键提供一个目录,每个目录中包含许多文件.
这包括请求的编解码器,必要的导入和请求的pimp.
import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext // TODO Need a macro to generate for each Tuple length, or perhaps can use shapeless implicit class PimpedRDD[T1, T2](rdd: RDD[(T1, T2)]) { def writeAsMultiple(prefix: String, codec: String, keyName: String = "key") (implicit sqlContext: SQLContext): Unit = { import sqlContext.implicits._ rdd.toDF(keyName, "_2").write.partitionBy(keyName) .format("text").option("codec", codec).save(prefix) } } val myRdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c"))) myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")
与OP的一个细微差别是它将<keyName>=
为目录名称添加前缀.例如
myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")
会给:
prefix/key=1/part-00000 prefix/key=2/part-00000
哪里prefix/my_number=1/part-00000
将包含行a
和b
,prefix/my_number=2/part-00000
并将包含该行c
.
和
myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec", "foo")
会给:
prefix/foo=1/part-00000 prefix/foo=2/part-00000
应该清楚如何编辑parquet
.
最后下面是一个例子Dataset
,使用元组可能更好.
implicit class PimpedDataset[T](dataset: Dataset[T]) { def writeAsMultiple(prefix: String, codec: String, field: String): Unit = { dataset.write.partitionBy(field) .format("text").option("codec", codec).save(prefix) } }
我会这样做,这是可扩展的
import org.apache.hadoop.io.NullWritable import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] { override def generateActualKey(key: Any, value: Any): Any = NullWritable.get() override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = key.asInstanceOf[String] } object Split { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Split" + args(1)) val sc = new SparkContext(conf) sc.textFile("input/path") .map(a => (k, v)) // Your own implementation .partitionBy(new HashPartitioner(num)) .saveAsHadoopFile("output/path", classOf[String], classOf[String], classOf[RDDMultipleTextOutputFormat]) spark.stop() } }
刚看到上面的类似答案,但实际上我们不需要自定义分区.MultipleTextOutputFormat将为每个键创建文件.具有相同键的多个记录可以归入同一分区.
new HashPartitioner(num),其中num是您想要的分区号.如果您有大量不同的密钥,可以将数字设置为大.在这种情况下,每个分区都不会打开太多的hdfs文件处理程序.
如果您使用Spark 1.4+,由于DataFrame API,这变得更加容易.(DataFrames是在Spark 1.3中引入的,但partitionBy()
我们需要的是1.4中引入的.)
如果您刚开始使用RDD,则首先需要将其转换为DataFrame:
val people_rdd = sc.parallelize(Seq((1, "alice"), (1, "bob"), (2, "charlie"))) val people_df = people_rdd.toDF("number", "name")
在Python中,相同的代码是:
people_rdd = sc.parallelize([(1, "alice"), (1, "bob"), (2, "charlie")]) people_df = people_rdd.toDF(["number", "name"])
拥有DataFrame后,基于特定键写入多个输出很简单.更重要的是 - 这就是DataFrame API的美妙之处 - Python,Scala,Java和R的代码几乎相同:
people_df.write.partitionBy("number").text("people")
如果您需要,您可以轻松使用其他输出格式:
people_df.write.partitionBy("number").json("people-json") people_df.write.partitionBy("number").parquet("people-parquet")
在每个示例中,Spark将为我们对DataFrame进行分区的每个键创建一个子目录:
people/ _SUCCESS number=1/ part-abcd part-efgh number=2/ part-abcd part-efgh