Spark Streaming中的Kafka消费者

 树上的女爷 发布于 2022-12-13 16:13

尝试编写消耗来自Kafka的消息的Spark Streaming作业.这是我到目前为止的情况:

1)启动Zookeeper.
2)启动Kafka Server.
3)向服务器发送了一些消息.当我运行以下内容时,我可以看到它们:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 - topic mytopic --from-beginning

4)现在尝试编写一个程序来计算在5分钟内进入的消息数量.

代码看起来像这样:

    Map map = new HashMap();
    map.put("mytopic", new Integer(1));

    JavaStreamingContext ssc = new JavaStreamingContext(
            sparkUrl, " Spark Streaming", new Duration(60 * 5 * 1000), sparkHome, new String[]{jarFile});


    JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc, "localhost:2181", "1", map);

不确定第3个参数(使用者组)使用什么值.当我运行它时,我得到"无法连接到zookeeper服务器".但Zookeeper正在2181端口上运行; 否则步骤#3就行不通了.

好像我没有正确使用KafkaUtils.createStream.有任何想法吗?

撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有