我有一个包含+100,000个文件的输入文件夹.
我想对它们进行批量操作,即以某种方式重命名所有这些操作,或者根据每个文件名称中的信息将它们移动到新路径.
我想使用Spark来做到这一点,但不幸的是,当我尝试下面这段代码时:
final org.apache.hadoop.fs.FileSystem ghfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI(args[0]), new org.apache.hadoop.conf.Configuration()); org.apache.hadoop.fs.FileStatus[] paths = ghfs.listStatus(new org.apache.hadoop.fs.Path(args[0])); ListpathsList = new ArrayList<>(); for (FileStatus path : paths) { pathsList.add(path.getPath().toString()); } JavaRDD rddPaths = sc.parallelize(pathsList); rddPaths.foreach(new VoidFunction () { @Override public void call(String path) throws Exception { Path origPath = new Path(path); Path newPath = new Path(path.replace("taboola","customer")); ghfs.rename(origPath,newPath); } });
我得到一个错误,hadoop.fs.FileSystem不是Serializable(因此可能不能用于并行操作)
知道如何解决它或以其他方式完成它吗?