使用两个RDDs apache spark

 手机用户2602883115 发布于 2023-01-09 14:48

我正在使用calliope即spark插件来连接cassandra.我创建了两个看起来像的RDD

class A val persistLevel = org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK val cas1 = CasBuilder.cql3.withColumnFamily("cassandra_keyspace", "cassandra_coulmn_family 1") val sc1 = new SparkContext("local", "name it any thing ") var rdd1 = sc.cql3Cassandra[SCALACLASS_1](cas1) var rddResult1 = rdd1.persist(persistLevel)

class B val cas2 = CasBuilder.cql3.withColumnFamily("cassandra_keyspace", "cassandra_coulmn_family 2") var rdd2 = sc1.cql3Cassandra[SCALACLASS_2](cas2) var rddResult2 = rdd2.persist(persistLevel)

以某种方式跟随使用其他2创建新RDD的代码库不起作用.是否有可能我们不能一起迭代2个RDD?

这是不起作用的代码片段 -

case class Report(id: Long, anotherId: Long)

  var reportRDD = rddResult2.flatMap(f => {
    val buf = List[Report]()
    **rddResult1.collect().toList**.foldLeft(buf)((k, v) => {
      val buf1 = new ListBuffer[Report]
      buf ++ v.INSTANCE_VAR_FROM_SCALACLASS_1.foldLeft(buf1)((ik, iv) => {
        buf1 += Report(f.INSTANCE_VAR_FROM_SCALACLASS_1, iv.INSTANCE_VAR_FROM_SCALACLASS_2)
      })
    })
  })

如果我替换大胆的东西并为它初始化一个val像 -

val collection = rddResult1.collect().toList

var reportRDD = rddResult2.flatMap(f => {
    val buf = List[Report]()
    **collection**.foldLeft(buf)((k, v) => {
      val buf1 = new ListBuffer[Report]
      buf ++ v.INSTANCE_VAR_FROM_SCALACLASS_1.foldLeft(buf1)((ik, iv) => {
        buf1 += Report(f.INSTANCE_VAR_FROM_SCALACLASS_1, iv.INSTANCE_VAR_FROM_SCALACLASS_2)
      })
    })
  })

它有效,有没有解释?

1 个回答
  • 您正在将转换与动作混合.关闭rdd2.flatMap是在worker上执行的,而rdd1.collect在Spark术语中是一个"动作",并将数据传递给驱动程序.所以,非正式地说,当你尝试对它进行flatMap时,你可以说数据不存在.(我不太了解内部因素 - 确切地指出确切的根本原因)

    如果要分布式操作两个RDD,则应使用其中一个连接函数(join,leftOuterJoin,rightOuterJoin,cogroup)将它们连接起来.

    例如

    val mappedRdd1 = rdd1.map(x=> (x.id,x))
    val mappedRdd2 = rdd2.map(x=> (x.customerId, x))
    
    val joined = mappedRdd1.join(mappedRdd2)
    joined.flatMap(...reporting logic..).collect
    

    2023-01-09 14:51 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有