我正在使用Apache Spark 1.0.1.我有许多文件用UTF8分隔,\u0001
而不是通常的新行\n
.如何在Spark中读取此类文件?意思是,默认分隔符sc.textfile("hdfs:///myproject/*")
是\n
,我想将其更改为\u0001
.
您可以使用textinputformat.record.delimiter
设置分隔符TextInputFormat
,例如,
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.io.{LongWritable, Text} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat val conf = new Configuration(sc.hadoopConfiguration) conf.set("textinputformat.record.delimiter", "X") val input = sc.newAPIHadoopFile("file_path", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf) val lines = input.map { case (_, text) => text.toString} println(lines.collect)
例如,我的输入是一个包含一行的文件aXbXcXd
.上面的代码将输出
Array(a, b, c, d)
在Spark shell中,我根据spark中的Settinginputformat.record.delimiter提取数据:
$ spark-shell ... scala> import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.LongWritable scala> import org.apache.hadoop.io.Text import org.apache.hadoop.io.Text scala> import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration scala> import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.mapreduce.lib.input.TextInputFormat scala> val conf = new Configuration conf: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml scala> conf.set("textinputformat.record.delimiter", "\u0001") scala> val data = sc.newAPIHadoopFile("mydata.txt", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf).map(_._2.toString) data: org.apache.spark.rdd.RDD[(org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text)] = NewHadoopRDD[0] at newAPIHadoopFile at <console>:19
sc.newAPIHadoopFile("mydata.txt", ...)
是a RDD[(LongWritable, Text)]
,其中元素的第一部分是起始字符索引,第二部分是由分隔的实际文本"\u0001"
.
在python中,这可以通过以下方式实现:
rdd = sc.newAPIHadoopFile(YOUR_FILE, "org.apache.hadoop.mapreduce.lib.input.TextInputFormat", "org.apache.hadoop.io.LongWritable", "org.apache.hadoop.io.Text", conf={"textinputformat.record.delimiter": YOUR_DELIMITER}).map(lambda l:l[1])