我正在对存储在S3中的一些LZO压缩日志文件运行EMR Spark作业.有几个日志文件存储在同一个文件夹中,例如:
... s3://mylogfiles/2014-08-11-00111.lzo s3://mylogfiles/2014-08-11-00112.lzo ...
在spark-shell中,我正在运行一个计算文件中行数的作业.如果我为每个文件单独计算行数,则没有问题,例如:
// Works fine ... sc.textFile("s3://mylogfiles/2014-08-11-00111.lzo").count() sc.textFile("s3://mylogfiles/2014-08-11-00112.lzo").count() ...
如果我使用通配符加载带有单行的所有文件,我会得到两种异常.
// One-liner throws exceptions sc.textFile("s3://mylogfiles/*.lzo").count()
例外情况是:
java.lang.InternalError: lzo1x_decompress_safe returned: -6 at com.hadoop.compression.lzo.LzoDecompressor.decompressBytesDirect(Native Method)
和
java.io.IOException: Compressed length 1362309683 exceeds max block size 67108864 (probably corrupt file) at com.hadoop.compression.lzo.LzopInputStream.getCompressedData(LzopInputStream.java:291)
在我看来,解决方案是由最后一个例外给出的文字暗示,但我不知道如何继续.LZO文件允许的大小是有限制的,或者问题是什么?
我的问题是:我可以运行Spark查询加载S3文件夹中的所有LZO压缩文件,而不会获得与I/O相关的异常吗?
每个文件有66个大约200MB的文件.
编辑:只有在使用Hadoop2核心库(ami 3.1.0)运行Spark时才会出现异常.当使用Hadoop1核心库(ami 2.4.5)运行时,一切正常.两个案例都使用Spark 1.0.1进行了测试.
kgeyti的答案很好,但是:
LzoTextInputFormat
引入性能命中,因为它检查每个LZO文件的.index文件.对于S3上的许多LZO文件,这可能会特别痛苦(我经历了长达几分钟的延迟,这是由数千个对S3的请求引起的).
如果您事先知道您的LZO文件不可拆分,那么更高效的解决方案是创建自定义的不可拆分输入格式:
import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.JobContext import org.apache.hadoop.mapreduce.lib.input.TextInputFormat class NonSplittableTextInputFormat extends TextInputFormat { override def isSplitable(context: JobContext, file: Path): Boolean = false }
并读取这样的文件:
context.newAPIHadoopFile("s3://mylogfiles/*.lzo", classOf[NonSplittableTextInputFormat], classOf[org.apache.hadoop.io.LongWritable], classOf[org.apache.hadoop.io.Text]) .map(_._2.toString)