我正在使用calliope即spark插件来连接cassandra.我创建了两个看起来像的RDD
class A
val persistLevel = org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK
val cas1 = CasBuilder.cql3.withColumnFamily("cassandra_keyspace", "cassandra_coulmn_family 1")
val sc1 = new SparkContext("local", "name it any thing ")
var rdd1 = sc.cql3Cassandra[SCALACLASS_1](cas1)
var rddResult1 = rdd1.persist(persistLevel)
class B
val cas2 = CasBuilder.cql3.withColumnFamily("cassandra_keyspace", "cassandra_coulmn_family 2")
var rdd2 = sc1.cql3Cassandra[SCALACLASS_2](cas2)
var rddResult2 = rdd2.persist(persistLevel)
以某种方式跟随使用其他2创建新RDD的代码库不起作用.是否有可能我们不能一起迭代2个RDD?
这是不起作用的代码片段 -
case class Report(id: Long, anotherId: Long) var reportRDD = rddResult2.flatMap(f => { val buf = List[Report]() **rddResult1.collect().toList**.foldLeft(buf)((k, v) => { val buf1 = new ListBuffer[Report] buf ++ v.INSTANCE_VAR_FROM_SCALACLASS_1.foldLeft(buf1)((ik, iv) => { buf1 += Report(f.INSTANCE_VAR_FROM_SCALACLASS_1, iv.INSTANCE_VAR_FROM_SCALACLASS_2) }) }) })
如果我替换大胆的东西并为它初始化一个val像 -
val collection = rddResult1.collect().toList var reportRDD = rddResult2.flatMap(f => { val buf = List[Report]() **collection**.foldLeft(buf)((k, v) => { val buf1 = new ListBuffer[Report] buf ++ v.INSTANCE_VAR_FROM_SCALACLASS_1.foldLeft(buf1)((ik, iv) => { buf1 += Report(f.INSTANCE_VAR_FROM_SCALACLASS_1, iv.INSTANCE_VAR_FROM_SCALACLASS_2) }) }) })
它有效,有没有解释?
您正在将转换与动作混合.关闭rdd2.flatMap
是在worker上执行的,而rdd1.collect
在Spark术语中是一个"动作",并将数据传递给驱动程序.所以,非正式地说,当你尝试对它进行flatMap时,你可以说数据不存在.(我不太了解内部因素 - 确切地指出确切的根本原因)
如果要分布式操作两个RDD,则应使用其中一个连接函数(join,leftOuterJoin,rightOuterJoin,cogroup)将它们连接起来.
例如
val mappedRdd1 = rdd1.map(x=> (x.id,x)) val mappedRdd2 = rdd2.map(x=> (x.customerId, x)) val joined = mappedRdd1.join(mappedRdd2) joined.flatMap(...reporting logic..).collect