我有一个类型的元组列表:(用户ID,名称,计数).
例如,
val x = sc.parallelize(List( ("a", "b", 1), ("a", "b", 1), ("c", "b", 1), ("a", "d", 1)) )
我正在尝试将此集合减少为计算每个元素名称的类型.
所以在上面val x被转换为:
(a,ArrayBuffer((d,1), (b,2))) (c,ArrayBuffer((b,1)))
这是我目前使用的代码:
val byKey = x.map({case (id,uri,count) => (id,uri)->count}) val grouped = byKey.groupByKey val count = grouped.map{case ((id,uri),count) => ((id),(uri,count.sum))} val grouped2: org.apache.spark.rdd.RDD[(String, Seq[(String, Int)])] = count.groupByKey grouped2.foreach(println)
我正在尝试使用reduceByKey,因为它比groupByKey执行得更快.
如何实现reduceByKey而不是上面的代码来提供相同的映射?
按照你的代码:
val byKey = x.map({case (id,uri,count) => (id,uri)->count})
你可以这样做:
val reducedByKey = byKey.reduceByKey(_ + _) scala> reducedByKey.collect.foreach(println) ((a,d),1) ((a,b),2) ((c,b),1)
PairRDDFunctions[K,V].reduceByKey
采用一个关联的reduce函数,可以应用于RDD [(K,V)]的V型.换句话说,你需要一个功能f[V](e1:V, e2:V) : V
.在这个特殊情况下,Ints总和:(x:Int, y:Int) => x+y
或_ + _
简短的下划线表示法.
对于记录:reduceByKey
表现更好,groupByKey
因为它试图在shuffle/reduce阶段之前在本地应用reduce函数.groupByKey
在分组之前将强制洗牌所有元素.
您的原始数据结构是:RDD [(String,String,Int)],并且reduceByKey
只能在数据结构为RDD [(K,V)]时使用.
val kv = x.map(e => e._1 -> e._2 -> e._3) // kv is RDD[((String, String), Int)] val reduced = kv.reduceByKey(_ + _) // reduced is RDD[((String, String), Int)] val kv2 = reduced.map(e => e._1._1 -> (e._1._2 -> e._2)) // kv2 is RDD[(String, (String, Int))] val grouped = kv2.groupByKey() // grouped is RDD[(String, Iterable[(String, Int)])] grouped.foreach(println)