作者:亚丶喃7_789 | 来源:互联网 | 2023-01-31 14:31
当使用dropDuplicates
Spark DF中的功能时,将保留哪一行?Spark文档中没有说明.
保持第一(根据行顺序)
保持最后(根据行顺序)
随机?
PS假设在分布式YARN环境中(不是本地主站)
1> Jacek Laskow..:
TL; DR保持第一(根据行顺序)
dropDuplicates
Spark SQL中的运算符使用Deduplicate
运算符创建逻辑计划.
该Deduplicate
操作符由Spark SQL的Catalyst Optimizer 转换为First
逻辑运算符,可以很好地回答您的问题(!)
您可以Deduplicate
在下面的逻辑计划中查看运算符.
// create datasets with duplicates
val dups = spark.range(9).map(_ % 3)
val q = dups.dropDuplicates
以下是q
数据集的逻辑计划.
scala> println(q.queryExecution.logical.numberedTreeString)
00 Deduplicate [value#64L], false
01 +- SerializeFromObject [input[0, bigint, false] AS value#64L]
02 +- MapElements , class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint
03 +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, cast(id#58L as bigint), true), obj#62: java.lang.Long
04 +- Range (0, 9, step=1, splits=Some(8))
Deduplicate
然后将运算符转换为First
逻辑运算符(Aggregate
在优化之后将其自身显示为运算符).
scala> println(q.queryExecution.optimizedPlan.numberedTreeString)
00 Aggregate [value#64L], [value#64L]
01 +- SerializeFromObject [input[0, bigint, false] AS value#64L]
02 +- MapElements , class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint
03 +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#58L, true), obj#62: java.lang.Long
04 +- Range (0, 9, step=1, splits=Some(8))
花一些时间审查阿帕奇星火的代码后,dropDuplicates
运营商就相当于groupBy
后面第一个功能.
first(columnName:String,ignoreNulls:Boolean):Column Aggregate函数:返回组中列的第一个值.
import org.apache.spark.sql.functions.first
val firsts = dups.groupBy("value").agg(first("value") as "value")
scala> println(firsts.queryExecution.logical.numberedTreeString)
00 'Aggregate [value#64L], [value#64L, first('value, false) AS value#139]
01 +- SerializeFromObject [input[0, bigint, false] AS value#64L]
02 +- MapElements , class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint
03 +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, cast(id#58L as bigint), true), obj#62: java.lang.Long
04 +- Range (0, 9, step=1, splits=Some(8))
scala> firsts.explain
== Physical Plan ==
*HashAggregate(keys=[value#64L], functiOns=[first(value#64L, false)])
+- Exchange hashpartitioning(value#64L, 200)
+- *HashAggregate(keys=[value#64L], functiOns=[partial_first(value#64L, false)])
+- *SerializeFromObject [input[0, bigint, false] AS value#64L]
+- *MapElements , obj#63: bigint
+- *DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#58L, true), obj#62: java.lang.Long
+- *Range (0, 9, step=1, splits=8)
我也认为是dropDuplicates
运营商可能是更好的性能.
Spark groupBy()和first()聚合不保留排序.如果这是drop_duplicates的实现,则不应指望保留任何排序.