我想从hdfs位置读取一堆文本文件,并使用spark在迭代中对其执行映射.
JavaRDD
能够一次只读取一个文件.
我想读取多个文件并将它们作为单个RDD处理.怎么样?
您可以指定整个目录,使用通配符甚至CSV目录和通配符.例如:
sc.textFile("/my/dir1,/my/paths/part-00[0-5]*,/another/dir,/a/specific/file")
正如Nick Chammas所指出的,这是Hadoop的曝光,FileInputFormat
因此这也适用于Hadoop(和Scalding).
您可以使用单个textFile调用来读取多个文件.斯卡拉:
sc.textFile(','.join(files))
你可以用它
首先,您可以获得S3路径的缓冲区/列表:
import scala.collection.JavaConverters._ import java.util.ArrayList import com.amazonaws.services.s3.AmazonS3Client import com.amazonaws.services.s3.model.ObjectListing import com.amazonaws.services.s3.model.S3ObjectSummary import com.amazonaws.services.s3.model.ListObjectsRequest def listFiles(s3_bucket:String, base_prefix : String) = { var files = new ArrayList[String] //S3 Client and List Object Request var s3Client = new AmazonS3Client(); var objectListing: ObjectListing = null; var listObjectsRequest = new ListObjectsRequest(); //Your S3 Bucket listObjectsRequest.setBucketName(s3_bucket) //Your Folder path or Prefix listObjectsRequest.setPrefix(base_prefix) //Adding s3:// to the paths and adding to a list do { objectListing = s3Client.listObjects(listObjectsRequest); for (objectSummary <- objectListing.getObjectSummaries().asScala) { files.add("s3://" + s3_bucket + "/" + objectSummary.getKey()); } listObjectsRequest.setMarker(objectListing.getNextMarker()); } while (objectListing.isTruncated()); //Removing Base Directory Name files.remove(0) //Creating a Scala List for same files.asScala }
现在将此List对象传递给下面的代码段,注意:sc是SQLContext的对象
var df: DataFrame = null; for (file <- files) { val fileDf= sc.textFile(file) if (df!= null) { df= df.unionAll(fileDf) } else { df= fileDf } }
现在你有一个最终的统一RDD即df
可选,您也可以在一个BigRDD中重新分配它
val files = sc.textFile(filename, 1).repartition(1)
重新分区始终有效:D
使用union
方法如下:
val sc = new SparkContext(...) val r1 = sc.textFile("xxx1") val r2 = sc.textFile("xxx2") ... val rdds = Seq(r1, r2, ...) val bigRdd = sc.union(rdds)
然后bigRdd
是包含所有文件的RDD.