作者:mobiledu2502895753 | 来源:互联网 | 2023-05-19 06:37
I have a parquet table with one of the columns being
我有一张镶有桌子的镶木桌子
, array>
Can run queries against this table in Hive using LATERAL VIEW syntax.
可以使用LATERAL VIEW语法在Hive中对此表运行查询。
How to read this table into an RDD, and more importantly how to filter, map etc this nested collection in Spark?
如何将此表读入RDD,更重要的是如何在Spark中过滤,映射等嵌套集合?
Could not find any references to this in Spark documentation. Thanks in advance for any information!
在Spark文档中找不到对此的任何引用。提前感谢您的任何信息!
ps. Felt might be helpful to give some stats on the table. Number of columns in main table ~600. Number of rows ~200m. Number of "columns" in nested collection ~10. Avg number of records in nested collection ~35.
PS。毛毡可能有助于在桌子上给出一些统计数据。主表~600中的列数。行数~200m。嵌套集合中的“列”数〜10。平均集合中的平均记录数~35。
4 个解决方案
There is no magic in the case of nested collection. Spark will handle the same way a RDD[(String, String)]
and a RDD[(String, Seq[String])]
.
在嵌套集合的情况下没有魔力。 Spark将以与RDD [(String,String)]和RDD [(String,Seq [String])]相同的方式处理。
Reading such nested collection from Parquet files can be tricky, though.
但是,从Parquet文件中读取这样的嵌套集合可能会很棘手。
Let's take an example from the spark-shell
(1.3.1):
让我们以spark-shell(1.3.1)为例:
scala> import sqlContext.implicits._
import sqlContext.implicits._
scala> case class Inner(a: String, b: String)
defined class Inner
scala> case class Outer(key: String, inners: Seq[Inner])
defined class Outer
Write the parquet file:
写下镶木地板文件:
scala> val outers = sc.parallelize(List(Outer("k1", List(Inner("a", "b")))))
outers: org.apache.spark.rdd.RDD[Outer] = ParallelCollectionRDD[0] at parallelize at :25
scala> outers.toDF.saveAsParquetFile("outers.parquet")
Read the parquet file:
阅读镶木地板文件:
scala> import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.expressions.Row
scala> val dataFrame = sqlContext.parquetFile("outers.parquet")
dataFrame: org.apache.spark.sql.DataFrame = [key: string, inners: array>]
scala> val outers = dataFrame.map { row =>
| val key = row.getString(0)
| val inners = row.getAs[Seq[Row]](1).map(r => Inner(r.getString(0), r.getString(1)))
| Outer(key, inners)
| }
outers: org.apache.spark.rdd.RDD[Outer] = MapPartitionsRDD[8] at map at DataFrame.scala:848
The important part is row.getAs[Seq[Row]](1)
. The internal representation of a nested sequence of struct
is ArrayBuffer[Row]
, you could use any super-type of it instead of Seq[Row]
. The 1
is the column index in the outer row. I used the method getAs
here but there are alternatives in the latest versions of Spark. See the source code of the Row trait.
重要的部分是row.getAs [Seq [Row]](1)。嵌套的struct序列的内部表示是ArrayBuffer [Row],你可以使用它的任何超类型而不是Seq [Row]。 1是外行中的列索引。我在这里使用了getAs方法,但最新版本的Spark中有其他选择。请参阅Row特征的源代码。
Now that you have a RDD[Outer]
, you can apply any wanted transformation or action.
现在您有了RDD [Outer],您可以应用任何想要的转换或操作。
// Filter the outers
outers.filter(_.inners.nonEmpty)
// Filter the inners
outers.map(outer => outer.copy(inners = outer.inners.filter(_.a == "a")))
Note that we used the spark-SQL library only to read the parquet file. You could for example select only the wanted columns directly on the DataFrame, before mapping it to a RDD.
请注意,我们仅使用spark-SQL库来读取镶木地板文件。例如,您可以在将数据映射到RDD之前直接在DataFrame上选择所需的列。
dataFrame.select('col1, 'col2).map { row => ... }