热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

【Spark2.0系列】:Catalog和自定义Optimizer

【Spark2.0系列】适合初学

Spark 2.0系列第一篇见【Spark 2.0系列】: Spark Session API和Dataset API,本文将讲解Spark 2.0 的Catalog 和Custom Optimizer。

首先,先了解下RDD 和Dataset 在开发中使用对比。

RDD 和Dataset 使用对比

Dataset API 是RDD 和DataFrame API 的统一,但大部分Dataset API 与RDD API使用方法看起来是相似的(其实实现方法是不同的)。所以RDD代码很容易转换成Dataset API。下面直接上代码:

WordCount
  • RDD

val rdd = sparkContext.textFile("src/main/resources/data.txt")

 

val wordsRDD = rdd.flatMap(value => value.split("\\s+"))

val wordsPair = wordsRDD.map(word => (word,1))

val wordCount = wordsPair.reduceByKey(_+_)

  • Dataset

val ds = sparkSession.read.text("src/main/resources/data.txt")

import sparkSession.implicits._

val wordsDs = ds.flatMap(value => value.split("\\s+"))

val wordsPairDs = wordsDs.groupByKey(value => value)

val wordCountDs = wordsPairDs.count()

其它


RDDDataset
Cachingrdd.cache()ds.cache()
Filter

val filteredRDD = wordsRDD.filter(value => value ==”hello”)

val filteredDS = wordsDs.filter(value => value ==”hello”)
Map Partition

val mapPartitiOnsRDD= rdd.mapPartitions(iterator => List(iterator.count(value => true)).iterator)

val mapPartitiOnsDs= ds.mapPartitions(iterator => List(iterator.count(value => true)).iterator)
reduceByKeyval reduceCountByRDD = wordsPair.reduceByKey(+)val reduceCountByDs = wordsPairDs.mapGroups((key,values) =>(key,values.length))
备注:此处表格横屏观看效果更佳。

Dataset 和RDD 相互转换
  • RDD

val dsToRDD = ds.rdd

  • Dataset

RDD 转换成Dataframe稍麻烦,需要指定schema。

val rddStringToRowRDD = rdd.map(value => Row(value))

val dfschema = StructType(Array(StructField("value",StringType)))

val rddToDF = sparkSession.createDataFrame(rddStringToRowRDD,dfschema)

val rDDToDataSet = rddToDF.as[String]

Catalog API

DataSet 和Dataframe API 支持结构化数据分析,而结构化数据重要的是管理metadata。这里的metadata包括temporary metadata(临时表);registered udfs;permanent metadata(Hive metadata或HCatalog)。

早期Spark版本并未提供标准的API访问metadata,开发者需要使用类似show tables的查询来查询metadata;而Spark 2.0 在Spark SQL中提供标准API 调用catalog来访问metadata。

访问Catalog

建立SparkSession,然后调用Catalog:

val catalog = sparkSession.catalog

查询数据库

catalog.listDatabases().select("name").show()

listDatabases可查询所有数据库。在Hive中,Catalog可以访问Hive metadata中的数据库。listDatabases返回一个dataset,所以你可以使用适用于dataset的所有操作去处理metadata。

用createTempView 注册Dataframe

早期版本Spark用registerTempTable注册dataframe,而Spark 2.0 用createTempView替代。

df.createTempView("sales")

一旦注册视图,即可使用listTables访问所有表。

查询表

catalog.listTables().select("name").show()

检查表缓存

通过Catalog可检查表是否缓存。访问频繁的表缓存起来是非常有用的。

catalog.isCached("sales")

默认表是不缓存的,所以你会得到false。

df.cache()

catalog.isCached("sales")

现在将会打印true。

删除视图

catalog.dropTempView("sales")

查询注册函数

catalog.listFunctions().

select("name","description","className","isTemporary").show(100)

Catalog不仅能查询表,也可以访问UDF。上面代码会显示Spark Session中所有的注册函数(包括内建函数)。

自定义 Optimizer
Catalyst optimizer

Spark SQL使用Catalyst优化所有的查询,优化之后的查询比直接操作RDD速度要快。Catalyst是基于rule的,每个rule都有一个特定optimization,比如,ConstantFolding rule用来移除常数表达式,具体可直接看Spark SQL源代码。

在早期版本Spark中,如果想自定义optimization,需要开发者修改Spark源代码。操作起来麻烦,而且要求开发者能读懂源码。在Spark 2.0中,已提供API自定义optimization。

访问Optimized plan

在开始编写自定义optimization之前,先来看看如何访问optimized plan:

val df = sparkSession.read.option("header","true").csv("src/main/resources/data.csv")

val multipliedDF = df.selectExpr("amountPaid * 1")

println(multipliedDF.queryExecution.optimizedPlan.numberedTreeString)

上面的代码是加载一个csv文件,并对某一行所有值乘以1。queryExecution 可访问查询相关的所有执行信息。 queryExecutionoptimizedPlan对象可以访问dataframe的optimized plan。

Spark中的执行计划以tree表示,所以用numberedTreeString打印optimized plan。打印结果如下:

00 Project [(cast(amountPaid#3 as double) * 1.0) AS (amountPaid * 1)#5]01 +- Relation[transactionId#0,customerId#1,itemId#2,amountPaid#3] csv

所有执行计划是由底向上读取:

  • 01 Relation - 从csv 文件建立一个dataframe

  • 00 Project - 投影操作

编写自定义optimizer rule

从上面的执行计划可以清晰的看到:对一列的每个值乘以1 这里并没有优化。我们知道,乘以1 这个操作应该返回的是值本身,所以可以利用这个特点来增加只能点的optimizer。代码如下:

object MultiplyOptimizationRule extends Rule[LogicalPlan] {

   def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {

     case Multiply(left,right) if right.isInstanceOf[Literal] &&

       right.asInstanceOf[Literal].value.asInstanceOf[Double] == 1.0 =>

       println("optimization of one applied")

       left

   }

 }

这里MultiplyOptimizationRule扩展自Rule类,采用Scala的模式匹配编写。检测右操作数是否是 1,如果是1 则直接返回左节点。

把MultiplyOptimizationRule加入进optimizer:

sparkSession.experimental.extraOptimizatiOns= Seq(MultiplyOptimizationRule)

你可以使用extraOptimizations将定义好的Rule加入 catalyst。

下面实际使用看看效果:

val multipliedDFWithOptimization = df.selectExpr("amountPaid * 1")

println("after optimization")

println(multipliedDFWithOptimization.queryExecution.

optimizedPlan.numberedTreeString)

我们看到打印结果:

00 Project [cast(amountPaid#3 as double) AS (amountPaid * 1)#7]01 +- Relation[transactionId#0,customerId#1,itemId#2,amountPaid#3] csv

说明自定义Optimizer已生效。


侠天,专注于大数据、机器学习和数学相关的内容,并有个人公众号:bigdata_ny分享相关技术文章。

若发现以上文章有任何不妥,请联系我。





推荐阅读
  • 本文介绍了Hive常用命令及其用途,包括列出数据表、显示表字段信息、进入数据库、执行select操作、导出数据到csv文件等。同时还涉及了在AndroidManifest.xml中获取meta-data的value值的方法。 ... [详细]
  • Java序列化对象传给PHP的方法及原理解析
    本文介绍了Java序列化对象传给PHP的方法及原理,包括Java对象传递的方式、序列化的方式、PHP中的序列化用法介绍、Java是否能反序列化PHP的数据、Java序列化的原理以及解决Java序列化中的问题。同时还解释了序列化的概念和作用,以及代码执行序列化所需要的权限。最后指出,序列化会将对象实例的所有字段都进行序列化,使得数据能够被表示为实例的序列化数据,但只有能够解释该格式的代码才能够确定数据的内容。 ... [详细]
  • 本文介绍了在Win10上安装WinPythonHadoop的详细步骤,包括安装Python环境、安装JDK8、安装pyspark、安装Hadoop和Spark、设置环境变量、下载winutils.exe等。同时提醒注意Hadoop版本与pyspark版本的一致性,并建议重启电脑以确保安装成功。 ... [详细]
  • IT方面的论坛太多了,有综合,有专业,有行业,在各个论坛里混了几年,体会颇深,以前是论坛哪里人多 ... [详细]
  • 超级简单加解密工具的方案和功能
    本文介绍了一个超级简单的加解密工具的方案和功能。该工具可以读取文件头,并根据特定长度进行加密,加密后将加密部分写入源文件。同时,该工具也支持解密操作。加密和解密过程是可逆的。本文还提到了一些相关的功能和使用方法,并给出了Python代码示例。 ... [详细]
  • Python的参数解析argparse模块的学习
    本文介绍了Python中参数解析的重要模块argparse的学习内容。包括位置参数和可选参数的定义和使用方式,以及add_argument()函数的详细参数关键字解释。同时还介绍了命令行参数的操作和可接受数量的设置,其中包括整数类型的参数。通过学习本文内容,可以更好地理解和使用argparse模块进行参数解析。 ... [详细]
  • OpenMap教程4 – 图层概述
    本文介绍了OpenMap教程4中关于地图图层的内容,包括将ShapeLayer添加到MapBean中的方法,OpenMap支持的图层类型以及使用BufferedLayer创建图像的MapBean。此外,还介绍了Layer背景标志的作用和OMGraphicHandlerLayer的基础层类。 ... [详细]
  • 如何优化Webpack打包后的代码分割
    本文介绍了如何通过优化Webpack的代码分割来减小打包后的文件大小。主要包括拆分业务逻辑代码和引入第三方包的代码、配置Webpack插件、异步代码的处理、代码分割重命名、配置vendors和cacheGroups等方面的内容。通过合理配置和优化,可以有效减小打包后的文件大小,提高应用的加载速度。 ... [详细]
  • 本文整理了Java中org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc.getTypeInfo()方法的一些代码示例,展 ... [详细]
  • 前言:拿到一个案例,去分析:它该是做分类还是做回归,哪部分该做分类,哪部分该做回归,哪部分该做优化,它们的目标值分别是什么。再挑影响因素,哪些和分类有关的影响因素,哪些和回归有关的 ... [详细]
  • 什么是大数据lambda架构
    一、什么是Lambda架构Lambda架构由Storm的作者[NathanMarz]提出,根据维基百科的定义,Lambda架构的设计是为了在处理大规模数 ... [详细]
  • 基于词向量计算文本相似度1.测试数据:链接:https:pan.baidu.coms1fXJjcujAmAwTfsuTg2CbWA提取码:f4vx2.实验代码:imp ... [详细]
  • 【Python 爬虫】破解按照顺序点击验证码(非自动化浏览器)
    #请求到验证码base64编码json_img_datajson_raw.get(Vimage)#获取到验证码编码 #保存验证码图片到本地defbase64_to_img(bstr ... [详细]
  • Flink使用java实现读取csv文件简单实例首先我们来看官方文档中给出的几种方法:首先我们来看官方文档中给出的几种方法:第一种:Da ... [详细]
  • 简介数组、CSV、表格、东西将一个数组转化为逗号为支解符的字符串(CSV)即表格数据。该源码来自于https:30secondsofcode.orgconstarrayToCSV( ... [详细]
author-avatar
xeyuxing369
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有