Scala Spark SQLContext程序抛出数组超出绑定的异常

 书友68570125 发布于 2022-12-27 13:04

我是Apache Spark的新手.我正在尝试创建一个架构并从hdfs加载数据.以下是我的代码:

// importing sqlcontext  
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.createSchemaRDD

//defining the schema
case class Author1(Author_Key: Long, Author_ID: Long, Author: String, First_Name: String, Last_Name: String, Middle_Name: String, Full_Name: String, Institution_Full_Name: String, Country: String, DIAS_ID: Int, R_ID: String)

val D_Authors1 = 
  sc.textFile("hdfs:///user/D_Authors.txt")
  .map(_.split("\\|"))
  .map(auth => Author1(auth(0).trim.toLong, auth(1).trim.toLong, auth(2), auth(3), auth(4), auth(5), auth(6), auth(7), auth(8), auth(9).trim.toInt, auth(10)))

//register the table
D_Authors1.registerAsTable("D_Authors1")
val auth = sqlContext.sql("SELECT * FROM D_Authors1")
sqlContext.sql("SELECT * FROM D_Authors").collect().foreach(println)

当我执行此代码时,它会将数组抛出绑定异常.以下是错误:

    14/08/18 06:57:14 INFO Analyzer: Max iterations (2) reached for batch MultiInstanceRelations
    14/08/18 06:57:14 INFO Analyzer: Max iterations (2) reached for batch CaseInsensitiveAttributeReferences
    14/08/18 06:57:14 INFO SQLContext$$anon$1: Max iterations (2) reached for batch Add exchange
    14/08/18 06:57:14 INFO SQLContext$$anon$1: Max iterations (2) reached for batch Prepare Expressions
    14/08/18 06:57:14 INFO FileInputFormat: Total input paths to process : 1
    14/08/18 06:57:14 INFO SparkContext: Starting job: collect at :24
    14/08/18 06:57:14 INFO DAGScheduler: Got job 5 (collect at :24) with 2 output partitions (allowLocal=false)
    14/08/18 06:57:14 INFO DAGScheduler: Final stage: Stage 5(collect at :24)
    14/08/18 06:57:14 INFO DAGScheduler: Parents of final stage: List()
    14/08/18 06:57:14 INFO DAGScheduler: Missing parents: List()
    14/08/18 06:57:14 INFO DAGScheduler: Submitting Stage 5 (SchemaRDD[26] at RDD at SchemaRDD.scala:98
    == Query Plan ==
    ExistingRdd [Author_Key#22L,Author_ID#23L,Author#24,First_Name#25,Last_Name#26,Middle_Name#27,Full_Name#28,Institution_Full_Name#29,Country#30,DIAS_ID#31,R_ID#32], MapPartitionsRDD[23] at mapPartitions at basicOperators.scala:174), which has no missing parents
    14/08/18 06:57:14 INFO DAGScheduler: Submitting 2 missing tasks from Stage 5 (SchemaRDD[26] at RDD at SchemaRDD.scala:98
    == Query Plan ==
    ExistingRdd [Author_Key#22L,Author_ID#23L,Author#24,First_Name#25,Last_Name#26,Middle_Name#27,Full_Name#28,Institution_Full_Name#29,Country#30,DIAS_ID#31,R_ID#32], MapPartitionsRDD[23] at mapPartitions at basicOperators.scala:174)
    14/08/18 06:57:14 INFO YarnClientClusterScheduler: Adding task set 5.0 with 2 tasks
    14/08/18 06:57:14 INFO TaskSetManager: Starting task 5.0:0 as TID 38 on executor 1: orf-bat.int..com (NODE_LOCAL)
    14/08/18 06:57:14 INFO TaskSetManager: Serialized task 5.0:0 as 4401 bytes in 1 ms
    14/08/18 06:57:15 INFO TaskSetManager: Starting task 5.0:1 as TID 39 on executor 1: orf-bat.int..com (NODE_LOCAL)
    14/08/18 06:57:15 INFO TaskSetManager: Serialized task 5.0:1 as 4401 bytes in 0 ms
    14/08/18 06:57:15 WARN TaskSetManager: Lost TID 38 (task 5.0:0)
    14/08/18 06:57:15 WARN TaskSetManager: Loss was due to java.lang.ArrayIndexOutOfBoundsException
    java.lang.ArrayIndexOutOfBoundsException: 10
            at $line39.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(:27)
            at $line39.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(:27)
            at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
            at scala.collection.Iterator$$anon$1.next(Iterator.scala:853)
            at scala.collection.Iterator$$anon$1.head(Iterator.scala:840)
            at org.apache.spark.sql.execution.ExistingRdd$$anonfun$productToRowRdd$1.apply(basicOperators.scala:179)
            at org.apache.spark.sql.execution.ExistingRdd$$anonfun$productToRowRdd$1.apply(basicOperators.scala:174)
            at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:559)
            at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:559)
            at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
            at org.apache.spark.sql.SchemaRDD.compute(SchemaRDD.scala:110)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
            at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
            at org.apache.spark.scheduler.Task.run(Task.scala:51)
            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
            at java.lang.Thread.run(Thread.java:745)
    14/08/18 06:57:15 WARN TaskSetManager: Lost TID 39 (task 5.0:1)
    14/08/18 06:57:15 WARN TaskSetManager: Loss was due to java.lang.ArrayIndexOutOfBoundsException
    java.lang.ArrayIndexOutOfBoundsException: 9
            at $line39.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(:27)
            at $line39.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(:27)
            at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
            at scala.collection.Iterator$$anon$1.next(Iterator.scala:853)
            at scala.collection.Iterator$$anon$1.head(Iterator.scala:840)
            at org.apache.spark.sql.execution.ExistingRdd$$anonfun$productToRowRdd$1.apply(basicOperators.scala:179)
            at org.apache.spark.sql.execution.ExistingRdd$$anonfun$productToRowRdd$1.apply(basicOperators.scala:174)
            at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:559)
            at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:559)
            at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
            at org.apache.spark.sql.SchemaRDD.compute(SchemaRDD.scala:110)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
            at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
            at org.apache.spark.scheduler.Task.run(Task.scala:51)
            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
            at java.lang.Thread.run(Thread.java:745)

samthebest.. 5

你的问题与Spark无关.

    正确格式化代码(我已更正)

    不要混合使用camel和下划线命名 - 对SQL字段使用下划线,对Scala val使用camel,

    当你得到一个异常读取它通常告诉你你做错了什么,在你的情况下,可能是你的一些记录hdfs:///user/D_Authors.txt不是你期望他们

    当您获得异常调试时,请尝试实际捕获异常并打印出无法解析的记录

    _.split("\\|") 忽略空的前导和尾随字符串,使用 _.split("\\|", -1)

    在Scala中,您不需要手动访问数组元素的幻数,它很丑陋且更容易出错,使用模式匹配...

这是一个简单的例子,包括不寻常的记录处理!:

case class Author(author: String, authorAge: Int)

myData.map(_.split("\t", -1) match {
  case Array(author, authorAge) => Author(author, authorAge.toInt)
  case unexpectedArrayForm => 
    throw new RuntimeException("Record did not have correct number of fields: " +
      unexpectedArrayForm.mkString("\t"))
})

现在,如果你这样编码,你的例外会立即告诉你确切的数据有什么问题.

最后一点/关注; 你为什么使用Spark SQL?您的数据是以文本形式存在的,您是否正在尝试将其转换为镶木地板?如果没有,为什么不使用常规的Scala API来执行分析,而且它的类型检查和编译检查,与SQL不同.

1 个回答
  • 你的问题与Spark无关.

      正确格式化代码(我已更正)

      不要混合使用camel和下划线命名 - 对SQL字段使用下划线,对Scala val使用camel,

      当你得到一个异常读取它通常告诉你你做错了什么,在你的情况下,可能是你的一些记录hdfs:///user/D_Authors.txt不是你期望他们

      当您获得异常调试时,请尝试实际捕获异常并打印出无法解析的记录

      _.split("\\|") 忽略空的前导和尾随字符串,使用 _.split("\\|", -1)

      在Scala中,您不需要手动访问数组元素的幻数,它很丑陋且更容易出错,使用模式匹配...

    这是一个简单的例子,包括不寻常的记录处理!:

    case class Author(author: String, authorAge: Int)
    
    myData.map(_.split("\t", -1) match {
      case Array(author, authorAge) => Author(author, authorAge.toInt)
      case unexpectedArrayForm => 
        throw new RuntimeException("Record did not have correct number of fields: " +
          unexpectedArrayForm.mkString("\t"))
    })
    

    现在,如果你这样编码,你的例外会立即告诉你确切的数据有什么问题.

    最后一点/关注; 你为什么使用Spark SQL?您的数据是以文本形式存在的,您是否正在尝试将其转换为镶木地板?如果没有,为什么不使用常规的Scala API来执行分析,而且它的类型检查和编译检查,与SQL不同.

    2022-12-27 13:08 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有