我正在编写一个程序,使用spark-sql在一个公共参数上连接两个文件.我认为我的代码很好但是当我试图将其保存为文本文件时,我收到错误.我把我的代码如下: -
import java.util.regex.Pattern; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.api.java.JavaSQLContext; import org.apache.spark.sql.api.java.JavaSchemaRDD; import java.io.Serializable; public class JoinCSV { @SuppressWarnings("serial") public static class CompleteSample implements Serializable { private String ASSETNUM; private String ASSETTAG; private String CALNUM; public String getASSETNUM() { return ASSETNUM; } public void setASSETNUM(String aSSETNUM) { ASSETNUM = aSSETNUM; } public String getASSETTAG() { return ASSETTAG; } public void setASSETTAG(String aSSETTAG) { ASSETTAG = aSSETTAG; } public String getCALNUM() { return CALNUM; } public void setCALNUM(String cALNUM) { CALNUM = cALNUM; } } @SuppressWarnings("serial") public static class ExtendedSample implements Serializable { private String ASSETNUM; private String CHANGEBY; private String CHANGEDATE; public String getASSETNUM() { return ASSETNUM; } public void setASSETNUM(String aSSETNUM) { ASSETNUM = aSSETNUM; } public String getCHANGEBY() { return CHANGEBY; } public void setCHANGEBY(String cHANGEBY) { CHANGEBY = cHANGEBY; } public String getCHANGEDATE() { return CHANGEDATE; } public void setCHANGEDATE(String cHANGEDATE) { CHANGEDATE = cHANGEDATE; } } private static final Pattern comma = Pattern.compile(","); @SuppressWarnings("serial") public static void main(String[] args) throws Exception { String path="C:/Users/cyg_server/Documents/bigDataExample/AssetsImportCompleteSample.csv"; String path1="C:/Users/cyg_server/Documents/bigDataExample/AssetsImportExtendedSample.csv"; JavaSparkContext ctx = new JavaSparkContext("local[2]", "JavaSparkSQL"); JavaSQLContext sqlCtx = new JavaSQLContext(ctx); JavaRDDcs = ctx.textFile("C:/Users/cyg_server/Documents/bigDataExample/AssetsImportCompleteSample.csv").map( new Function () { public CompleteSample call(String line) throws Exception { String[] parts = line.split(","); CompleteSample cs = new CompleteSample(); cs.setASSETNUM(parts[0]); cs.setASSETTAG(parts[1]); cs.setCALNUM(parts[2]); return cs; } }); JavaRDD es = ctx.textFile("C:/Users/cyg_server/Documents/bigDataExample/AssetsImportExtendedSample.csv").map( new Function () { public ExtendedSample call(String line) throws Exception { String[] parts = line.split(","); ExtendedSample es = new ExtendedSample(); es.setASSETNUM(parts[0]); es.setCHANGEBY(parts[1]); es.setCHANGEDATE(parts[2]); return es; } }); JavaSchemaRDD complete = sqlCtx.applySchema(cs, CompleteSample.class); complete.registerAsTable("cs"); JavaSchemaRDD extended = sqlCtx.applySchema(es, ExtendedSample.class); extended.registerAsTable("es"); JavaSchemaRDD fs= sqlCtx.sql("SELECT ASSETTAG, CALNUM FROM cs INNER JOIN es ON cs.ASSETNUM=es.ASSETNUM;"); fs.saveAsTextFile("result"); //Here I am getting error } }
我的错误如下: -
14/07/19 00:40:13 INFO TaskSchedulerImpl: Cancelling stage 0 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:0 failed 1 times, most recent failure: Exception failure in TID 4 on host localhost: java.lang.NullPointerException java.lang.ProcessBuilder.start(Unknown Source) org.apache.hadoop.util.Shell.runCommand(Shell.java:404) org.apache.hadoop.util.Shell.run(Shell.java:379) org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589) org.apache.hadoop.util.Shell.execCommand(Shell.java:678) ------------ ------------
和
14/07/19 00:40:11 ERROR Shell: Failed to locate the winutils binary in the hadoop binary path java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries. at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:278) at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:300) at org.apache.hadoop.util.Shell.(Shell.java:293) at org.apache.hadoop.util.StringUtils. (StringUtils.java:76) at org.apache.hadoop.mapred.FileInputFormat.setInputPaths(FileInputFormat.java:362) at org.apache.spark.SparkContext$$anonfun$22.apply(SparkContext.scala:546) at org.apache.spark.SparkContext$$anonfun$22.apply(SparkContext.scala:546) ----------------- -----------------
无论我使用spark,spark-sql还是spark-streaming,第二个错误都会出现.我不知道这个错误是什么.但似乎第二个错误对代码没有影响,因为即使在此错误之后,结果也会变得很好.但是每次运行程序时看到一个未知错误仍然非常恼人.
有人可以帮我理解这个问题吗?我非常困惑.谢谢