基本上,我必须使用Spark分析HDFS上的一些复杂的JSON.
我使用"for comprehensions"来(预)过滤json的jSON和"extract"方法,将它包装成一个case类
这个工作正常!
def foo(rdd: RDD[String]) = { case class View(C: String,b: Option[Array[List[String]]], t: Time) case class Time($numberLong: String) implicit val formats = DefaultFormats rdd.map { jsonString => val jsonObj = parse(jsonString) val listsOfView = for { JObject(value) <- jsonObj JField(("v"), JObject(views)) <- value normalized <- views.map(x => (x._2)) } yield normalized }
到现在为止还挺好!
当我尝试将(预)过滤的JSON提取到我的CaseClass时,我得到这个:
线程"main"中的异常org.apache.spark.SparkException:由于阶段失败而中止作业:任务不可序列化:java.io.NotSerializableException:org.json4s.DefaultFormats $
这里带有提取的代码:
def foo(rdd: RDD[String]) = { case class View(C: String,b: Option[Array[List[String]]], t: Time) case class Time($numberLong: String) implicit val formats = DefaultFormats rdd.map { jsonString => val jsonObj = parse(jsonString) val listsOfView = for { JObject(value) <- jsonObj JField(("v"), JObject(views)) <- value normalized <- views.map(x => (x._2)) } yield normalized.extract[View] }
我已经在scala上尝试了我的代码,以及它的工作!我真的很擅长使用hdfs和spark,所以我会很感激.