I have a parquet table with one of the columns being


, array>

Can run queries against this table in Hive using LATERAL VIEW syntax.

可以使用LATERAL VIEW语法在Hive中对此表运行查询。

How to read this table into an RDD, and more importantly how to filter, map etc this nested collection in Spark?


Could not find any references to this in Spark documentation. Thanks in advance for any information!


ps. Felt might be helpful to give some stats on the table. Number of columns in main table ~600. Number of rows ~200m. Number of "columns" in nested collection ~10. Avg number of records in nested collection ~35.


There is no magic in the case of nested collection. Spark will handle the same way a RDD[(String, String)] and a RDD[(String, Seq[String])].

在嵌套集合的情况下没有魔力。 Spark将以与RDD [(String,String)]和RDD [(String,Seq [String])]相同的方式处理。

Reading such nested collection from Parquet files can be tricky, though.


Let's take an example from the spark-shell (1.3.1):


scala> import sqlContext.implicits._
import sqlContext.implicits._

scala> case class Inner(a: String, b: String)
defined class Inner

scala> case class Outer(key: String, inners: Seq[Inner])
defined class Outer

Write the parquet file:


scala> val outers = sc.parallelize(List(Outer("k1", List(Inner("a", "b")))))
outers: org.apache.spark.rdd.RDD[Outer] = ParallelCollectionRDD[0] at parallelize at :25

scala> outers.toDF.saveAsParquetFile("outers.parquet")

Read the parquet file:


scala> import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.expressions.Row

scala> val dataFrame = sqlContext.parquetFile("outers.parquet")
dataFrame: org.apache.spark.sql.DataFrame = [key: string, inners: array>]   

scala> val outers = dataFrame.map { row =>
     |   val key = row.getString(0)
     |   val inners = row.getAs[Seq[Row]](1).map(r => Inner(r.getString(0), r.getString(1)))
     |   Outer(key, inners)
     | }
outers: org.apache.spark.rdd.RDD[Outer] = MapPartitionsRDD[8] at map at DataFrame.scala:848

The important part is row.getAs[Seq[Row]](1). The internal representation of a nested sequence of struct is ArrayBuffer[Row], you could use any super-type of it instead of Seq[Row]. The 1 is the column index in the outer row. I used the method getAs here but there are alternatives in the latest versions of Spark. See the source code of the Row trait.

重要的部分是row.getAs [Seq [Row]](1)。嵌套的struct序列的内部表示是ArrayBuffer [Row],你可以使用它的任何超类型而不是Seq [Row]。 1是外行中的列索引。我在这里使用了getAs方法,但最新版本的Spark中有其他选择。请参阅Row特征的源代码。

Now that you have a RDD[Outer], you can apply any wanted transformation or action.

现在您有了RDD [Outer],您可以应用任何想要的转换或操作。

// Filter the outers

// Filter the inners
outers.map(outer => outer.copy(inners = outer.inners.filter(_.a == "a")))

Note that we used the spark-SQL library only to read the parquet file. You could for example select only the wanted columns directly on the DataFrame, before mapping it to a RDD.


dataFrame.select('col1, 'col2).map { row => ... }


I'll give a Python-based answer since that's what I'm using. I think Scala has something similar.


The explode function was added in Spark 1.4.0 to handle nested arrays in DataFrames, according to the Python API docs.

根据Python API文档,Spark 1.4.0中添加了explode函数来处理DataFrames中的嵌套数组。

Create a test dataframe:


from pyspark.sql import Row

df = sqlContext.createDataFrame([Row(a=1, intlist=[1,2,3]), Row(a=2, intlist=[4,5,6])])

## +-+--------------------+
## |a|             intlist|
## +-+--------------------+
## |1|ArrayBuffer(1, 2, 3)|
## |2|ArrayBuffer(4, 5, 6)|
## +-+--------------------+

Use explode to flatten the list column:


from pyspark.sql.functions import explode

df.select(df.a, explode(df.intlist)).show()

## +-+---+
## |a|_c0|
## +-+---+
## |1|  1|
## |1|  2|
## |1|  3|
## |2|  4|
## |2|  5|
## |2|  6|
## +-+---+


Another approach would be using pattern matching like this:


val rdd: RDD[(String, List[(String, String)]] = dataFrame.map(_.toSeq.toList match { 
  case List(key: String, inners: Seq[Row]) => key -> inners.map(_.toSeq.toList match {
    case List(a:String, b: String) => (a, b)

You can pattern match directly on Row but it is likely to fail for a few reasons.



Above answers are all great answers and tackle this question from different sides; Spark SQL is also quite useful way to access nested data.

以上答案都是很好的答案,并从不同方面解决这个问题; Spark SQL也是访问嵌套数据的非常有用的方法。

Here's example how to use explode() in SQL directly to query nested collection.


SELECT hholdid, tsp.person_seq_no 
FROM (  SELECT hholdid, explode(tsp_ids) as tsp 
        FROM disc_mrt.unified_fact uf

tsp_ids is a nested of structs, which has many attributes, including person_seq_no which I'm selecting in the outer query above.


Above was tested in Spark 2.0. I did a small test and it doesn't work in Spark 1.6. This question was asked when Spark 2 wasn't around, so this answer adds nicely to the list of available options to deal with nested structures.

以上是在Spark 2.0中测试的。我做了一个小测试,它在Spark 1.6中不起作用。当Spark 2不在时问这个问题,所以这个答案很好地补充了处理嵌套结构的可用选项列表。

Noticable not resolved JIRAs on explode() for SQL access:


  • SPARK-13721: Add support for LATERAL VIEW OUTER explode()
  • SPARK-13​​721:添加对LATERAL VIEW OUTER爆炸的支持()

  • SPARK-7549: Support aggregating over nested fields
  • SPARK-7549:支持在嵌套字段上进行聚合

