Spark:写入Avro文件

 robinqianqcs521 发布于 2023-02-10 10:35

我在Spark,我有一个Avro文件的RDD.我现在想对该RDD进行一些转换并将其保存为Avro文件:

val job = new Job(new Configuration())
AvroJob.setOutputKeySchema(job, getOutputSchema(inputSchema))

rdd.map(elem => (new SparkAvroKey(doTransformation(elem._1)), elem._2))
   .saveAsNewAPIHadoopFile(outputPath, 
  classOf[AvroKey[GenericRecord]], 
  classOf[org.apache.hadoop.io.NullWritable], 
  classOf[AvroKeyOutputFormat[GenericRecord]], 
  job.getConfiguration)

运行时,Spark会抱怨Schema $ recordSchema不可序列化.

如果我取消注释.map调用(并且只有rdd.saveAsNewAPIHadoopFile),则调用成功.

我在这做错了什么?

任何的想法?

1 个回答
  • 此处的问题与作业中使用的avro.Schema类的不可序列化有关.当您尝试从map函数内的代码引用架构对象时,抛出异常.

    例如,如果您尝试执行以下操作,您将获得"Task not serializable"异常:

    val schema = new Schema.Parser().parse(new File(jsonSchema))
    ...
    rdd.map(t => {
      // reference to the schema object declared outside
      val record = new GenericData.Record(schema)
    })
    

    您可以通过在功能块内创建模式的新实例来使一切工作正常:

    val schema = new Schema.Parser().parse(new File(jsonSchema))
    // The schema above should not be used in closures, it's for other purposes
    ...
    rdd.map(t => {
      // create a new Schema object
      val innserSchema = new Schema.Parser().parse(new File(jsonSchema))
      val record = new GenericData.Record(innserSchema)
      ...
    })
    

    由于您不希望为您处理的每条记录解析avro架构,因此更好的解决方案是在分区级别解析架构.以下也有效:

    val schema = new Schema.Parser().parse(new File(jsonSchema))
    // The schema above should not be used in closures, it's for other purposes
    ...
    rdd.mapPartitions(tuples => {
      // create a new Schema object
      val innserSchema = new Schema.Parser().parse(new File(jsonSchema))
    
      tuples.map(t => {
        val record = new GenericData.Record(innserSchema)
        ...
        // this closure will be bundled together with the outer one 
        // (no serialization issues)
      })
    })
    

    只要您提供对jsonSchema文件的可移植引用,上面的代码就可以工作,因为map函数将由多个远程执行程序执行.它可以是对HDFS中文件的引用,也可以与JAR中的应用程序一起打包(在后一种情况下,您将使用类加载器函数来获取其内容).

    对于那些试图将Avro与Spark一起使用的人,请注意仍然存在一些未解决的编译问题,您必须在Maven POM上使用以下导入:

    <dependency>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro-mapred</artifactId>
      <version>1.7.7</version>
      <classifier>hadoop2</classifier>
    <dependency>
    

    注意"hadoop2"分类器.您可以在https://issues.apache.org/jira/browse/SPARK-3039上跟踪该问题.

    2023-02-10 10:38 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有