我需要处理分散在各个目录中的多个文件.我想在一个RDD中加载所有这些,然后对它执行map/reduce.我看到SparkContext能够使用通配符从单个目录加载多个文件.我不知道如何从多个文件夹加载文件.
以下代码段失败:
for fileEntry in files: fileName = basePath + "/" + fileEntry lines = sc.textFile(fileName) if retval == None: retval = lines else: retval = sc.union(retval, lines)
这在第三个循环上失败,并显示以下错误消息:
retval = sc.union(retval, lines) TypeError: union() takes exactly 2 arguments (3 given)
鉴于我只提供了2个参数,这是奇怪的.任何指针赞赏.
这句话怎么样呢?
sc.union([sc.textFile(basepath + "/" + f) for f in files])
在Scala中SparkContext.union()
有两个变体,一个采用vararg参数,另一个采用列表.只有第二个存在于Python中(因为Python没有多态性).
UPDATE
您可以使用单个textFile
调用来读取多个文件.
sc.textFile(','.join(files))
我使用通配符解决了类似的问题.
例如,我在火花中加载的文件中发现了一些特征,
DIR
subdir1 /文件夹1/x.txt
subdir2 /文件夹2/y.txt
你可以使用下面的句子
sc.textFile("dir/*/*/*.txt")
加载所有相关文件.
通配符'*'仅适用于单级目录,不是递归的.