热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

如何在Spark中读取嵌套集合-HowtoreadanestedcollectioninSpark

Ihaveaparquettablewithoneofthecolumnsbeing我有一张镶有桌子的镶木桌子,array<struct<col1,co

I have a parquet table with one of the columns being

我有一张镶有桌子的镶木桌子

, array>

Can run queries against this table in Hive using LATERAL VIEW syntax.

可以使用LATERAL VIEW语法在Hive中对此表运行查询。

How to read this table into an RDD, and more importantly how to filter, map etc this nested collection in Spark?

如何将此表读入RDD,更重要的是如何在Spark中过滤,映射等嵌套集合?

Could not find any references to this in Spark documentation. Thanks in advance for any information!

在Spark文档中找不到对此的任何引用。提前感谢您的任何信息!

ps. Felt might be helpful to give some stats on the table. Number of columns in main table ~600. Number of rows ~200m. Number of "columns" in nested collection ~10. Avg number of records in nested collection ~35.

PS。毛毡可能有助于在桌子上给出一些统计数据。主表~600中的列数。行数~200m。嵌套集合中的“列”数〜10。平均集合中的平均记录数~35。

4 个解决方案

#1


There is no magic in the case of nested collection. Spark will handle the same way a RDD[(String, String)] and a RDD[(String, Seq[String])].

在嵌套集合的情况下没有魔力。 Spark将以与RDD [(String,String)]和RDD [(String,Seq [String])]相同的方式处理。

Reading such nested collection from Parquet files can be tricky, though.

但是,从Parquet文件中读取这样的嵌套集合可能会很棘手。

Let's take an example from the spark-shell (1.3.1):

让我们以spark-shell(1.3.1)为例:

scala> import sqlContext.implicits._
import sqlContext.implicits._

scala> case class Inner(a: String, b: String)
defined class Inner

scala> case class Outer(key: String, inners: Seq[Inner])
defined class Outer

Write the parquet file:

写下镶木地板文件:

scala> val outers = sc.parallelize(List(Outer("k1", List(Inner("a", "b")))))
outers: org.apache.spark.rdd.RDD[Outer] = ParallelCollectionRDD[0] at parallelize at :25

scala> outers.toDF.saveAsParquetFile("outers.parquet")

Read the parquet file:

阅读镶木地板文件:

scala> import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.expressions.Row

scala> val dataFrame = sqlContext.parquetFile("outers.parquet")
dataFrame: org.apache.spark.sql.DataFrame = [key: string, inners: array>]   

scala> val outers = dataFrame.map { row =>
     |   val key = row.getString(0)
     |   val inners = row.getAs[Seq[Row]](1).map(r => Inner(r.getString(0), r.getString(1)))
     |   Outer(key, inners)
     | }
outers: org.apache.spark.rdd.RDD[Outer] = MapPartitionsRDD[8] at map at DataFrame.scala:848

The important part is row.getAs[Seq[Row]](1). The internal representation of a nested sequence of struct is ArrayBuffer[Row], you could use any super-type of it instead of Seq[Row]. The 1 is the column index in the outer row. I used the method getAs here but there are alternatives in the latest versions of Spark. See the source code of the Row trait.

重要的部分是row.getAs [Seq [Row]](1)。嵌套的struct序列的内部表示是ArrayBuffer [Row],你可以使用它的任何超类型而不是Seq [Row]。 1是外行中的列索引。我在这里使用了getAs方法,但最新版本的Spark中有其他选择。请参阅Row特征的源代码。

Now that you have a RDD[Outer], you can apply any wanted transformation or action.

现在您有了RDD [Outer],您可以应用任何想要的转换或操作。

// Filter the outers
outers.filter(_.inners.nonEmpty)

// Filter the inners
outers.map(outer => outer.copy(inners = outer.inners.filter(_.a == "a")))

Note that we used the spark-SQL library only to read the parquet file. You could for example select only the wanted columns directly on the DataFrame, before mapping it to a RDD.

请注意,我们仅使用spark-SQL库来读取镶木地板文件。例如,您可以在将数据映射到RDD之前直接在DataFrame上选择所需的列。

dataFrame.select('col1, 'col2).map { row => ... }

#2


I'll give a Python-based answer since that's what I'm using. I think Scala has something similar.

我会给出一个基于Python的答案,因为那是我正在使用的。我认为Scala有类似的东西。

The explode function was added in Spark 1.4.0 to handle nested arrays in DataFrames, according to the Python API docs.

根据Python API文档,Spark 1.4.0中添加了explode函数来处理DataFrames中的嵌套数组。

Create a test dataframe:

创建测试数据框:

from pyspark.sql import Row

df = sqlContext.createDataFrame([Row(a=1, intlist=[1,2,3]), Row(a=2, intlist=[4,5,6])])
df.show()

## +-+--------------------+
## |a|             intlist|
## +-+--------------------+
## |1|ArrayBuffer(1, 2, 3)|
## |2|ArrayBuffer(4, 5, 6)|
## +-+--------------------+

Use explode to flatten the list column:

使用explode展平列表列:

from pyspark.sql.functions import explode

df.select(df.a, explode(df.intlist)).show()

## +-+---+
## |a|_c0|
## +-+---+
## |1|  1|
## |1|  2|
## |1|  3|
## |2|  4|
## |2|  5|
## |2|  6|
## +-+---+

#3


Another approach would be using pattern matching like this:

另一种方法是使用这样的模式匹配:

val rdd: RDD[(String, List[(String, String)]] = dataFrame.map(_.toSeq.toList match { 
  case List(key: String, inners: Seq[Row]) => key -> inners.map(_.toSeq.toList match {
    case List(a:String, b: String) => (a, b)
  }).toList
})

You can pattern match directly on Row but it is likely to fail for a few reasons.

您可以直接在Row上进行模式匹配,但由于某些原因可能会失败。

#4


Above answers are all great answers and tackle this question from different sides; Spark SQL is also quite useful way to access nested data.

以上答案都是很好的答案,并从不同方面解决这个问题; Spark SQL也是访问嵌套数据的非常有用的方法。

Here's example how to use explode() in SQL directly to query nested collection.

下面是如何在SQL中直接使用explode()来查询嵌套集合的示例。

SELECT hholdid, tsp.person_seq_no 
FROM (  SELECT hholdid, explode(tsp_ids) as tsp 
        FROM disc_mrt.unified_fact uf
     )

tsp_ids is a nested of structs, which has many attributes, including person_seq_no which I'm selecting in the outer query above.

tsp_ids是一个嵌套的结构体,它有许多属性,包括我在上面的外部查询中选择的person_seq_no。

Above was tested in Spark 2.0. I did a small test and it doesn't work in Spark 1.6. This question was asked when Spark 2 wasn't around, so this answer adds nicely to the list of available options to deal with nested structures.

以上是在Spark 2.0中测试的。我做了一个小测试,它在Spark 1.6中不起作用。当Spark 2不在时问这个问题,所以这个答案很好地补充了处理嵌套结构的可用选项列表。

Noticable not resolved JIRAs on explode() for SQL access:

在用于SQL访问的explode()上,无法解析JIRA:

  • SPARK-13721: Add support for LATERAL VIEW OUTER explode()
  • SPARK-13​​721:添加对LATERAL VIEW OUTER爆炸的支持()

  • SPARK-7549: Support aggregating over nested fields
  • SPARK-7549:支持在嵌套字段上进行聚合


推荐阅读
  • Go Cobra命令行工具入门教程
    本文介绍了Go语言实现的命令行工具Cobra的基本概念、安装方法和入门实践。Cobra被广泛应用于各种项目中,如Kubernetes、Hugo和Github CLI等。通过使用Cobra,我们可以快速创建命令行工具,适用于写测试脚本和各种服务的Admin CLI。文章还通过一个简单的demo演示了Cobra的使用方法。 ... [详细]
  • 本文介绍了一种轻巧方便的工具——集算器,通过使用集算器可以将文本日志变成结构化数据,然后可以使用SQL式查询。集算器利用集算语言的优点,将日志内容结构化为数据表结构,SPL支持直接对结构化的文件进行SQL查询,不再需要安装配置第三方数据库软件。本文还详细介绍了具体的实施过程。 ... [详细]
  • MySQL多表数据库操作方法及子查询详解
    本文详细介绍了MySQL数据库的多表操作方法,包括增删改和单表查询,同时还解释了子查询的概念和用法。文章通过示例和步骤说明了如何进行数据的插入、删除和更新操作,以及如何执行单表查询和使用聚合函数进行统计。对于需要对MySQL数据库进行操作的读者来说,本文是一个非常实用的参考资料。 ... [详细]
  • 本文详细介绍了SQL日志收缩的方法,包括截断日志和删除不需要的旧日志记录。通过备份日志和使用DBCC SHRINKFILE命令可以实现日志的收缩。同时,还介绍了截断日志的原理和注意事项,包括不能截断事务日志的活动部分和MinLSN的确定方法。通过本文的方法,可以有效减小逻辑日志的大小,提高数据库的性能。 ... [详细]
  • Linux服务器密码过期策略、登录次数限制、私钥登录等配置方法
    本文介绍了在Linux服务器上进行密码过期策略、登录次数限制、私钥登录等配置的方法。通过修改配置文件中的参数,可以设置密码的有效期、最小间隔时间、最小长度,并在密码过期前进行提示。同时还介绍了如何进行公钥登录和修改默认账户用户名的操作。详细步骤和注意事项可参考本文内容。 ... [详细]
  • IOS开发之短信发送与拨打电话的方法详解
    本文详细介绍了在IOS开发中实现短信发送和拨打电话的两种方式,一种是使用系统底层发送,虽然无法自定义短信内容和返回原应用,但是简单方便;另一种是使用第三方框架发送,需要导入MessageUI头文件,并遵守MFMessageComposeViewControllerDelegate协议,可以实现自定义短信内容和返回原应用的功能。 ... [详细]
  • 本文整理了315道Python基础题目及答案,帮助读者检验学习成果。文章介绍了学习Python的途径、Python与其他编程语言的对比、解释型和编译型编程语言的简述、Python解释器的种类和特点、位和字节的关系、以及至少5个PEP8规范。对于想要检验自己学习成果的读者,这些题目将是一个不错的选择。请注意,答案在视频中,本文不提供答案。 ... [详细]
  • 大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记
    本文介绍了大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记,包括outputFormat接口实现类、自定义outputFormat步骤和案例。案例中将包含nty的日志输出到nty.log文件,其他日志输出到other.log文件。同时提供了一些相关网址供参考。 ... [详细]
  • 使用C++编写程序实现增加或删除桌面的右键列表项
    本文介绍了使用C++编写程序实现增加或删除桌面的右键列表项的方法。首先通过操作注册表来实现增加或删除右键列表项的目的,然后使用管理注册表的函数来编写程序。文章详细介绍了使用的五种函数:RegCreateKey、RegSetValueEx、RegOpenKeyEx、RegDeleteKey和RegCloseKey,并给出了增加一项的函数写法。通过本文的方法,可以方便地自定义桌面的右键列表项。 ... [详细]
  • 本文介绍了使用readlink命令获取文件的完整路径的简单方法,并提供了一个示例命令来打印文件的完整路径。共有28种解决方案可供选择。 ... [详细]
  • 【重识云原生】第四章云网络4.8.3.2节——Open vSwitch工作原理详解
    2OpenvSwitch架构2.1OVS整体架构ovs-vswitchd:守护程序,实现交换功能,和Linux内核兼容模块一起,实现基于流的交换flow-basedswitchin ... [详细]
  • linux进阶50——无锁CAS
    1.概念比较并交换(compareandswap,CAS),是原⼦操作的⼀种,可⽤于在多线程编程中实现不被打断的数据交换操作࿰ ... [详细]
  • Python脚本编写创建输出数据库并添加模型和场数据的方法
    本文介绍了使用Python脚本编写创建输出数据库并添加模型数据和场数据的方法。首先导入相应模块,然后创建输出数据库并添加材料属性、截面、部件实例、分析步和帧、节点和单元等对象。接着向输出数据库中添加场数据和历程数据,本例中只添加了节点位移。最后保存数据库文件并关闭文件。文章还提供了部分代码和Abaqus操作步骤。另外,作者还建立了关于Abaqus的学习交流群,欢迎加入并提问。 ... [详细]
  • python3 logging
    python3logginghttps:docs.python.org3.5librarylogging.html,先3.5是因为我当前的python版本是3.5之所 ... [详细]
  • 【技术分享】一个 ELF 蠕虫分析
    【技术分享】一个 ELF 蠕虫分析 ... [详细]
author-avatar
mobiledu2502895753
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有