我是Apache Spark的新手.我正在尝试创建一个架构并从hdfs加载数据.以下是我的代码:
// importing sqlcontext val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.createSchemaRDD //defining the schema case class Author1(Author_Key: Long, Author_ID: Long, Author: String, First_Name: String, Last_Name: String, Middle_Name: String, Full_Name: String, Institution_Full_Name: String, Country: String, DIAS_ID: Int, R_ID: String) val D_Authors1 = sc.textFile("hdfs:///user/D_Authors.txt") .map(_.split("\\|")) .map(auth => Author1(auth(0).trim.toLong, auth(1).trim.toLong, auth(2), auth(3), auth(4), auth(5), auth(6), auth(7), auth(8), auth(9).trim.toInt, auth(10))) //register the table D_Authors1.registerAsTable("D_Authors1") val auth = sqlContext.sql("SELECT * FROM D_Authors1") sqlContext.sql("SELECT * FROM D_Authors").collect().foreach(println)
当我执行此代码时,它会将数组抛出绑定异常.以下是错误:
14/08/18 06:57:14 INFO Analyzer: Max iterations (2) reached for batch MultiInstanceRelations 14/08/18 06:57:14 INFO Analyzer: Max iterations (2) reached for batch CaseInsensitiveAttributeReferences 14/08/18 06:57:14 INFO SQLContext$$anon$1: Max iterations (2) reached for batch Add exchange 14/08/18 06:57:14 INFO SQLContext$$anon$1: Max iterations (2) reached for batch Prepare Expressions 14/08/18 06:57:14 INFO FileInputFormat: Total input paths to process : 1 14/08/18 06:57:14 INFO SparkContext: Starting job: collect at:24 14/08/18 06:57:14 INFO DAGScheduler: Got job 5 (collect at :24) with 2 output partitions (allowLocal=false) 14/08/18 06:57:14 INFO DAGScheduler: Final stage: Stage 5(collect at :24) 14/08/18 06:57:14 INFO DAGScheduler: Parents of final stage: List() 14/08/18 06:57:14 INFO DAGScheduler: Missing parents: List() 14/08/18 06:57:14 INFO DAGScheduler: Submitting Stage 5 (SchemaRDD[26] at RDD at SchemaRDD.scala:98 == Query Plan == ExistingRdd [Author_Key#22L,Author_ID#23L,Author#24,First_Name#25,Last_Name#26,Middle_Name#27,Full_Name#28,Institution_Full_Name#29,Country#30,DIAS_ID#31,R_ID#32], MapPartitionsRDD[23] at mapPartitions at basicOperators.scala:174), which has no missing parents 14/08/18 06:57:14 INFO DAGScheduler: Submitting 2 missing tasks from Stage 5 (SchemaRDD[26] at RDD at SchemaRDD.scala:98 == Query Plan == ExistingRdd [Author_Key#22L,Author_ID#23L,Author#24,First_Name#25,Last_Name#26,Middle_Name#27,Full_Name#28,Institution_Full_Name#29,Country#30,DIAS_ID#31,R_ID#32], MapPartitionsRDD[23] at mapPartitions at basicOperators.scala:174) 14/08/18 06:57:14 INFO YarnClientClusterScheduler: Adding task set 5.0 with 2 tasks 14/08/18 06:57:14 INFO TaskSetManager: Starting task 5.0:0 as TID 38 on executor 1: orf-bat.int..com (NODE_LOCAL) 14/08/18 06:57:14 INFO TaskSetManager: Serialized task 5.0:0 as 4401 bytes in 1 ms 14/08/18 06:57:15 INFO TaskSetManager: Starting task 5.0:1 as TID 39 on executor 1: orf-bat.int..com (NODE_LOCAL) 14/08/18 06:57:15 INFO TaskSetManager: Serialized task 5.0:1 as 4401 bytes in 0 ms 14/08/18 06:57:15 WARN TaskSetManager: Lost TID 38 (task 5.0:0) 14/08/18 06:57:15 WARN TaskSetManager: Loss was due to java.lang.ArrayIndexOutOfBoundsException java.lang.ArrayIndexOutOfBoundsException: 10 at $line39.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply( :27) at $line39.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply( :27) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$1.next(Iterator.scala:853) at scala.collection.Iterator$$anon$1.head(Iterator.scala:840) at org.apache.spark.sql.execution.ExistingRdd$$anonfun$productToRowRdd$1.apply(basicOperators.scala:179) at org.apache.spark.sql.execution.ExistingRdd$$anonfun$productToRowRdd$1.apply(basicOperators.scala:174) at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:559) at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:559) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.sql.SchemaRDD.compute(SchemaRDD.scala:110) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 14/08/18 06:57:15 WARN TaskSetManager: Lost TID 39 (task 5.0:1) 14/08/18 06:57:15 WARN TaskSetManager: Loss was due to java.lang.ArrayIndexOutOfBoundsException java.lang.ArrayIndexOutOfBoundsException: 9 at $line39.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply( :27) at $line39.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply( :27) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$1.next(Iterator.scala:853) at scala.collection.Iterator$$anon$1.head(Iterator.scala:840) at org.apache.spark.sql.execution.ExistingRdd$$anonfun$productToRowRdd$1.apply(basicOperators.scala:179) at org.apache.spark.sql.execution.ExistingRdd$$anonfun$productToRowRdd$1.apply(basicOperators.scala:174) at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:559) at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:559) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.sql.SchemaRDD.compute(SchemaRDD.scala:110) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)
samthebest.. 5
你的问题与Spark无关.
正确格式化代码(我已更正)
不要混合使用camel和下划线命名 - 对SQL字段使用下划线,对Scala val使用camel,
当你得到一个异常读取它通常告诉你你做错了什么,在你的情况下,可能是你的一些记录hdfs:///user/D_Authors.txt
不是你期望他们
当您获得异常调试时,请尝试实际捕获异常并打印出无法解析的记录
_.split("\\|")
忽略空的前导和尾随字符串,使用 _.split("\\|", -1)
在Scala中,您不需要手动访问数组元素的幻数,它很丑陋且更容易出错,使用模式匹配...
这是一个简单的例子,包括不寻常的记录处理!:
case class Author(author: String, authorAge: Int) myData.map(_.split("\t", -1) match { case Array(author, authorAge) => Author(author, authorAge.toInt) case unexpectedArrayForm => throw new RuntimeException("Record did not have correct number of fields: " + unexpectedArrayForm.mkString("\t")) })
现在,如果你这样编码,你的例外会立即告诉你确切的数据有什么问题.
最后一点/关注; 你为什么使用Spark SQL?您的数据是以文本形式存在的,您是否正在尝试将其转换为镶木地板?如果没有,为什么不使用常规的Scala API来执行分析,而且它的类型检查和编译检查,与SQL不同.
你的问题与Spark无关.
正确格式化代码(我已更正)
不要混合使用camel和下划线命名 - 对SQL字段使用下划线,对Scala val使用camel,
当你得到一个异常读取它通常告诉你你做错了什么,在你的情况下,可能是你的一些记录hdfs:///user/D_Authors.txt
不是你期望他们
当您获得异常调试时,请尝试实际捕获异常并打印出无法解析的记录
_.split("\\|")
忽略空的前导和尾随字符串,使用 _.split("\\|", -1)
在Scala中,您不需要手动访问数组元素的幻数,它很丑陋且更容易出错,使用模式匹配...
这是一个简单的例子,包括不寻常的记录处理!:
case class Author(author: String, authorAge: Int) myData.map(_.split("\t", -1) match { case Array(author, authorAge) => Author(author, authorAge.toInt) case unexpectedArrayForm => throw new RuntimeException("Record did not have correct number of fields: " + unexpectedArrayForm.mkString("\t")) })
现在,如果你这样编码,你的例外会立即告诉你确切的数据有什么问题.
最后一点/关注; 你为什么使用Spark SQL?您的数据是以文本形式存在的,您是否正在尝试将其转换为镶木地板?如果没有,为什么不使用常规的Scala API来执行分析,而且它的类型检查和编译检查,与SQL不同.