我正在尝试运行以下的FlumeEvent示例
import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ import org.apache.spark.streaming.flume._ import org.apache.spark.util.IntParam import org.apache.spark.streaming.flume.FlumeUtils object FlumeEventCount { def main(args: Array[String]) { val batchInterval = Milliseconds(2000) // Create the context and set the batch size val sparkConf = new SparkConf().setAppName("FlumeEventCount") .set("spark.cleaner.ttl","3") val ssc = new StreamingContext(sparkConf, batchInterval) // Create a flume stream var stream = FlumeUtils.createStream(ssc, "192.168.1.5",3564, StorageLevel.MEMORY_ONLY_SER_2) // Print out the count of events received from this server in each batch stream.count().map(cnt => "Received flume events." + cnt ).print() stream.count.print() stream.print() ssc.start() ssc.awaitTermination() } }
我的sbt文件如下
import AssemblyKeys._ assemblySettings name := "flume-test" version := "1.0" scalaVersion := "2.10.4" libraryDependencies += "org.apache.spark" %% "spark-core" % "1.0.0" % "provided" libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.0.0" % "provided" libraryDependencies += "org.apache.spark" %% "spark-streaming-flume" % "1.0.0" exclude("org.apache.spark","spark-core") exclude("org.apache.spark", "spark-streaming_2.10") resolvers += "Akka Repository" at "http://repo.akka.io/releases/"
我用以下命令运行程序
/tmp/spark-1.0.0-bin-hadoop2/bin/spark-submit --class FlumeEventCount --master local --deploy-mode client /tmp/fooproj/target/scala-2.10/cert-log-manager-assembly-1.0.jar
另一方面,水槽应用程序正在正确发送所有内容,我可以在日志中看到它已收到.
我没有对spark的配置进行任何更改,也没有设置任何环境变量,我只是下载并解压缩程序.
有人能告诉我我做错了什么吗?
//编辑:当我执行spark的FlumeEventCount示例时,它可以工作// edit2:如果我删除了awaiTermination并添加了一个ssc.stop它会一次打印所有内容,我想这是因为某些事情正在刷新