spark-sql中的NullPointerException

 思南sn99 发布于 2023-01-02 11:14

我正在编写一个程序,使用spark-sql在一个公共参数上连接两个文件.我认为我的代码很好但是当我试图将其保存为文本文件时,我收到错误.我把我的代码如下: -

import java.util.regex.Pattern;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;



import java.io.Serializable;


public class JoinCSV {
    @SuppressWarnings("serial")
    public static class CompleteSample implements Serializable {
        private String ASSETNUM;
        private String ASSETTAG;
        private String CALNUM;



        public String getASSETNUM() {
            return ASSETNUM;
        }
        public void setASSETNUM(String aSSETNUM) {
            ASSETNUM = aSSETNUM;
        }
        public String getASSETTAG() {
            return ASSETTAG;
        }
        public void setASSETTAG(String aSSETTAG) {
            ASSETTAG = aSSETTAG;
        }
        public String getCALNUM() {
            return CALNUM;
        }
        public void setCALNUM(String cALNUM) {
            CALNUM = cALNUM;
        }


      }

    @SuppressWarnings("serial")
    public static class ExtendedSample implements Serializable {

        private String ASSETNUM;
        private String CHANGEBY;
        private String CHANGEDATE;


        public String getASSETNUM() {
            return ASSETNUM;
        }
        public void setASSETNUM(String aSSETNUM) {
            ASSETNUM = aSSETNUM;
        }
        public String getCHANGEBY() {
            return CHANGEBY;
        }
        public void setCHANGEBY(String cHANGEBY) {
            CHANGEBY = cHANGEBY;
        }
        public String getCHANGEDATE() {
            return CHANGEDATE;
        }
        public void setCHANGEDATE(String cHANGEDATE) {
            CHANGEDATE = cHANGEDATE;
        }
    }

    private static final Pattern comma = Pattern.compile(",");
    @SuppressWarnings("serial")
    public static void main(String[] args) throws Exception {
        String path="C:/Users/cyg_server/Documents/bigDataExample/AssetsImportCompleteSample.csv";
        String path1="C:/Users/cyg_server/Documents/bigDataExample/AssetsImportExtendedSample.csv";

          JavaSparkContext ctx = new JavaSparkContext("local[2]", "JavaSparkSQL");
          JavaSQLContext sqlCtx = new JavaSQLContext(ctx);

          JavaRDD cs = ctx.textFile("C:/Users/cyg_server/Documents/bigDataExample/AssetsImportCompleteSample.csv").map(
                  new Function() {
                    public CompleteSample call(String line) throws Exception {
                      String[] parts = line.split(",");

                      CompleteSample cs = new CompleteSample();
                      cs.setASSETNUM(parts[0]);
                      cs.setASSETTAG(parts[1]);
                      cs.setCALNUM(parts[2]);

                      return cs;
                    }
                  });

          JavaRDD es = ctx.textFile("C:/Users/cyg_server/Documents/bigDataExample/AssetsImportExtendedSample.csv").map(
                  new Function() {
                    public ExtendedSample call(String line) throws Exception {
                      String[] parts = line.split(",");

                      ExtendedSample es = new ExtendedSample();
                      es.setASSETNUM(parts[0]);
                      es.setCHANGEBY(parts[1]);
                      es.setCHANGEDATE(parts[2]);

                      return es;
                    }
                  });

          JavaSchemaRDD complete = sqlCtx.applySchema(cs, CompleteSample.class);
            complete.registerAsTable("cs");

          JavaSchemaRDD extended = sqlCtx.applySchema(es, ExtendedSample.class);
          extended.registerAsTable("es");

          JavaSchemaRDD fs= sqlCtx.sql("SELECT ASSETTAG, CALNUM FROM cs INNER JOIN es ON cs.ASSETNUM=es.ASSETNUM;");
          fs.saveAsTextFile("result");                   //Here I am getting error
    }

}

我的错误如下: -

    14/07/19 00:40:13 INFO TaskSchedulerImpl: Cancelling stage 0
    org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:0 failed 1 times, most recent failure: Exception failure in TID 4 on host localhost: java.lang.NullPointerException
            java.lang.ProcessBuilder.start(Unknown Source)
            org.apache.hadoop.util.Shell.runCommand(Shell.java:404)
            org.apache.hadoop.util.Shell.run(Shell.java:379)
            org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589)
            org.apache.hadoop.util.Shell.execCommand(Shell.java:678)
------------
------------

 14/07/19 00:40:11 ERROR Shell: Failed to locate the winutils binary in the hadoop binary path
    java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
        at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:278)
        at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:300)
        at org.apache.hadoop.util.Shell.(Shell.java:293)
        at org.apache.hadoop.util.StringUtils.(StringUtils.java:76)
        at org.apache.hadoop.mapred.FileInputFormat.setInputPaths(FileInputFormat.java:362)
        at org.apache.spark.SparkContext$$anonfun$22.apply(SparkContext.scala:546)
        at org.apache.spark.SparkContext$$anonfun$22.apply(SparkContext.scala:546)
-----------------
-----------------

无论我使用spark,spark-sql还是spark-streaming,第二个错误都会出现.我不知道这个错误是什么.但似乎第二个错误对代码没有影响,因为即使在此错误之后,结果也会变得很好.但是每次运行程序时看到一个未知错误仍然非常恼人.

有人可以帮我理解这个问题吗?我非常困惑.谢谢

撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有