在Apache Spark(Scala)中使用reduceByKey

 Yao2502880171 发布于 2023-01-11 13:42

我有一个类型的元组列表:(用户ID,名称,计数).

例如,

val x = sc.parallelize(List(
    ("a", "b", 1),
    ("a", "b", 1),
    ("c", "b", 1),
    ("a", "d", 1))
)

我正在尝试将此集合减少为计算每个元素名称的类型.

所以在上面val x被转换为:

(a,ArrayBuffer((d,1), (b,2)))
(c,ArrayBuffer((b,1)))

这是我目前使用的代码:

val byKey = x.map({case (id,uri,count) => (id,uri)->count})

val grouped = byKey.groupByKey
val count = grouped.map{case ((id,uri),count) => ((id),(uri,count.sum))}
val grouped2: org.apache.spark.rdd.RDD[(String, Seq[(String, Int)])] = count.groupByKey

grouped2.foreach(println)

我正在尝试使用reduceByKey,因为它比groupByKey执行得更快.

如何实现reduceByKey而不是上面的代码来提供相同的映射?

2 个回答
  • 按照你的代码:

    val byKey = x.map({case (id,uri,count) => (id,uri)->count})
    

    你可以这样做:

    val reducedByKey = byKey.reduceByKey(_ + _)
    
    scala> reducedByKey.collect.foreach(println)
    ((a,d),1)
    ((a,b),2)
    ((c,b),1)
    

    PairRDDFunctions[K,V].reduceByKey采用一个关联的reduce函数,可以应用于RDD [(K,V)]的V型.换句话说,你需要一个功能f[V](e1:V, e2:V) : V.在这个特殊情况下,Ints总和:(x:Int, y:Int) => x+y_ + _简短的下划线表示法.

    对于记录:reduceByKey表现更好,groupByKey因为它试图在shuffle/reduce阶段之前在本地应用reduce函数.groupByKey在分组之前将强制洗牌所有元素.

    2023-01-11 13:43 回答
  • 您的原始数据结构是:RDD [(String,String,Int)],并且reduceByKey只能在数据结构为RDD [(K,V)]时使用.

    val kv = x.map(e => e._1 -> e._2 -> e._3) // kv is RDD[((String, String), Int)]
    val reduced = kv.reduceByKey(_ + _)       // reduced is RDD[((String, String), Int)]
    val kv2 = reduced.map(e => e._1._1 -> (e._1._2 -> e._2)) // kv2 is RDD[(String, (String, Int))]
    val grouped = kv2.groupByKey()            // grouped is RDD[(String, Iterable[(String, Int)])]
    grouped.foreach(println)
    

    2023-01-11 13:43 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有