如何将多个文本文件读入单个RDD?

 手机用户2502938985 发布于 2023-01-11 18:31

我想从hdfs位置读取一堆文本文件,并使用spark在迭代中对其执行映射.

JavaRDD records = ctx.textFile(args[1], 1); 能够一次只读取一个文件.

我想读取多个文件并将它们作为单个RDD处理.怎么样?

4 个回答
  • 您可以指定整个目录,使用通配符甚至CSV目录和通配符.例如:

    sc.textFile("/my/dir1,/my/paths/part-00[0-5]*,/another/dir,/a/specific/file")
    

    正如Nick Chammas所指出的,这是Hadoop的曝光,FileInputFormat因此这也适用于Hadoop(和Scalding).

    2023-01-11 18:33 回答
  • 您可以使用单个textFile调用来读取多个文件.斯卡拉:

    sc.textFile(','.join(files)) 
    

    2023-01-11 18:33 回答
  • 你可以用它

    首先,您可以获得S3路径的缓冲区/列表:

    import scala.collection.JavaConverters._
    import java.util.ArrayList
    import com.amazonaws.services.s3.AmazonS3Client
    import com.amazonaws.services.s3.model.ObjectListing
    import com.amazonaws.services.s3.model.S3ObjectSummary
    import com.amazonaws.services.s3.model.ListObjectsRequest
    
    def listFiles(s3_bucket:String, base_prefix : String) = {
        var files = new ArrayList[String]
    
        //S3 Client and List Object Request
        var s3Client = new AmazonS3Client();
        var objectListing: ObjectListing = null;
        var listObjectsRequest = new ListObjectsRequest();
    
        //Your S3 Bucket
        listObjectsRequest.setBucketName(s3_bucket)
    
        //Your Folder path or Prefix
        listObjectsRequest.setPrefix(base_prefix)
    
        //Adding s3:// to the paths and adding to a list
        do {
          objectListing = s3Client.listObjects(listObjectsRequest);
          for (objectSummary <- objectListing.getObjectSummaries().asScala) {
            files.add("s3://" + s3_bucket + "/" + objectSummary.getKey());
          }
          listObjectsRequest.setMarker(objectListing.getNextMarker());
        } while (objectListing.isTruncated());
    
        //Removing Base Directory Name
        files.remove(0)
    
        //Creating a Scala List for same
        files.asScala
      }
    

    现在将此List对象传递给下面的代码段,注意:sc是SQLContext的对象

    var df: DataFrame = null;
      for (file <- files) {
        val fileDf= sc.textFile(file)
        if (df!= null) {
          df= df.unionAll(fileDf)
        } else {
          df= fileDf
        }
      }
    

    现在你有一个最终的统一RDD即df

    可选,您也可以在一个BigRDD中重新分配它

    val files = sc.textFile(filename, 1).repartition(1)
    

    重新分区始终有效:D

    2023-01-11 18:34 回答
  • 使用union方法如下:

    val sc = new SparkContext(...)
    val r1 = sc.textFile("xxx1")
    val r2 = sc.textFile("xxx2")
    ...
    val rdds = Seq(r1, r2, ...)
    val bigRdd = sc.union(rdds)
    

    然后bigRdd是包含所有文件的RDD.

    2023-01-11 18:34 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有