我想将DStream转换为数组,列表等,然后我可以将其转换为json并在端点上提供它.我正在使用apache spark,注入twitter数据.如何在Dstream上执行此操作statuses
?除了,我似乎无法获得任何工作print()
.
import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.spark.streaming._ import org.apache.spark.streaming.twitter._ import org.apache.spark.streaming.StreamingContext._ import TutorialHelper._ object Tutorial { def main(args: Array[String]) { // Location of the Spark directory val sparkHome = "/opt/spark" // URL of the Spark cluster val sparkUrl = "local[8]" // Location of the required JAR files val jarFile = "target/scala-2.10/tutorial_2.10-0.1-SNAPSHOT.jar" // HDFS directory for checkpointing val checkpointDir = "/tmp" // Configure Twitter credentials using twitter.txt TutorialHelper.configureTwitterCredentials() val ssc = new StreamingContext(sparkUrl, "Tutorial", Seconds(1), sparkHome, Seq(jarFile)) val filters = Array("#americasgottalent", "iamawesome") val tweets = TwitterUtils.createStream(ssc, None, filters) val statuses = tweets.map(status => status.getText()) val arry = Array("firstval") statuses.foreachRDD { arr :+ _.collect() } ssc.checkpoint(checkpointDir) ssc.start() ssc.awaitTermination() } }
aaronman.. 10
如果您的RDD是statuses
可以的.
val arr = new ArrayBuffer[String](); statuses.foreachRDD { arr ++= _.collect() //you can now put it in an array or d w/e you want with it ... }
请记住,由于DStream可能很大,因此最终可能会比您在驱动程序中拥有的数据更多.
如果您的RDD是statuses
可以的.
val arr = new ArrayBuffer[String](); statuses.foreachRDD { arr ++= _.collect() //you can now put it in an array or d w/e you want with it ... }
请记住,由于DStream可能很大,因此最终可能会比您在驱动程序中拥有的数据更多.