尝试编写消耗来自Kafka的消息的Spark Streaming作业.这是我到目前为止的情况:
1)启动Zookeeper.
2)启动Kafka Server.
3)向服务器发送了一些消息.当我运行以下内容时,我可以看到它们:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 - topic mytopic --from-beginning
4)现在尝试编写一个程序来计算在5分钟内进入的消息数量.
代码看起来像这样:
Mapmap = new HashMap (); map.put("mytopic", new Integer(1)); JavaStreamingContext ssc = new JavaStreamingContext( sparkUrl, " Spark Streaming", new Duration(60 * 5 * 1000), sparkHome, new String[]{jarFile}); JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc, "localhost:2181", "1", map);
不确定第3个参数(使用者组)使用什么值.当我运行它时,我得到"无法连接到zookeeper服务器".但Zookeeper正在2181端口上运行; 否则步骤#3就行不通了.
好像我没有正确使用KafkaUtils.createStream.有任何想法吗?