如何使用Avro文件中的架构在Spark中加载Avros?

 李林1108_965 发布于 2023-01-12 15:27

我正在使用Cloudera包裹中的Spark 0.9.0运行CDH 4.4.

我有一堆通过Pig的AvroStorage UDF创建的Avro文件.我想在Spark中加载这些文件,使用通用记录或Avro文件上的架构.到目前为止,我试过这个:

import org.apache.avro.mapred.AvroKey
import org.apache.avro.mapreduce.AvroKeyInputFormat
import org.apache.hadoop.io.NullWritable
import org.apache.commons.lang.StringEscapeUtils.escapeCsv

import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.conf.Configuration
import java.net.URI
import java.io.BufferedInputStream
import java.io.File
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.specific.SpecificDatumReader
import org.apache.avro.file.DataFileStream
import org.apache.avro.io.DatumReader
import org.apache.avro.file.DataFileReader
import org.apache.avro.mapred.FsInput

val input = "hdfs://hivecluster2/securityx/web_proxy_mef/2014/05/29/22/part-m-00016.avro"
val inURI = new URI(input)
val inPath = new Path(inURI)

val fsInput = new FsInput(inPath, sc.hadoopConfiguration)
val reader =  new GenericDatumReader[GenericRecord]
val dataFileReader = DataFileReader.openReader(fsInput, reader)
val schemaString = dataFileReader.getSchema

val buf = scala.collection.mutable.ListBuffer.empty[GenericRecord]
while(dataFileReader.hasNext)  {
  buf += dataFileReader.next
}
sc.parallelize(buf)

这适用于一个文件,但它无法扩展 - 我将所有数据加载到本地RAM,然后从那里分配到火花节点.

2 个回答
  • 回答我自己的问题:

    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    
    import org.apache.avro.generic.GenericRecord
    import org.apache.avro.mapred.AvroKey
    import org.apache.avro.mapred.AvroInputFormat
    import org.apache.avro.mapreduce.AvroKeyInputFormat
    import org.apache.hadoop.io.NullWritable
    import org.apache.commons.lang.StringEscapeUtils.escapeCsv
    
    import org.apache.hadoop.fs.FileSystem
    import org.apache.hadoop.fs.Path
    import org.apache.hadoop.conf.Configuration
    import java.io.BufferedInputStream
    import org.apache.avro.file.DataFileStream
    import org.apache.avro.io.DatumReader
    import org.apache.avro.file.DataFileReader
    import org.apache.avro.file.DataFileReader
    import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
    import org.apache.avro.mapred.FsInput
    import org.apache.avro.Schema
    import org.apache.avro.Schema.Parser
    import org.apache.hadoop.mapred.JobConf
    import java.io.File
    import java.net.URI
    
    // spark-shell -usejavacp -classpath "*.jar"
    
    val input = "hdfs://hivecluster2/securityx/web_proxy_mef/2014/05/29/22/part-m-00016.avro"
    
    val jobConf= new JobConf(sc.hadoopConfiguration)
    val rdd = sc.hadoopFile(
      input,
      classOf[org.apache.avro.mapred.AvroInputFormat[GenericRecord]],
      classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]],
      classOf[org.apache.hadoop.io.NullWritable],
      10)
    val f1 = rdd.first
    val a = f1._1.datum
    a.get("rawLog") // Access avro fields
    

    2023-01-12 15:28 回答
  • 这对我有用:

    import org.apache.avro.generic.GenericRecord
    import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper}
    import org.apache.hadoop.io.NullWritable
    
    ...
    val path = "hdfs:///path/to/your/avro/folder"
    val avroRDD = sc.hadoopFile[AvroWrapper[GenericRecord], NullWritable, AvroInputFormat[GenericRecord]](path)
    

    2023-01-12 15:28 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有