Kafka Spark流媒体:无法阅读消息

 mobiledu2502886053 发布于 2022-12-09 12:52

我正在整合Kafka和Spark,使用spark-streaming.我创建了一个作为kafka制作人的主题:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test 

我正在kafka发布消息并尝试使用spark-streaming java代码读取它们并在屏幕上显示它们.
守护进程全都出现了:Spark-master,worker; 动物园管理员; 卡夫卡.
我正在编写一个java代码,使用KafkaUtils.createStream
代码如下:

public class SparkStream {
    public static void main(String args[])
    {
        if(args.length != 3)
        {
            System.out.println("SparkStream   ");
            System.exit(1);
        }


        Map topicMap = new HashMap();
        String[] topic = args[2].split(",");
        for(String t: topic)
        {
            topicMap.put(t, new Integer(1));
        }

        JavaStreamingContext jssc = new JavaStreamingContext("spark://192.168.88.130:7077", "SparkStream", new Duration(3000));
        JavaPairReceiverInputDStream messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap );

        System.out.println("Connection done++++++++++++++");
        JavaDStream data = messages.map(new Function, String>() 
                                                {
                                                    public String call(Tuple2 message)
                                                    {
                                                        System.out.println("NewMessage: "+message._2()+"++++++++++++++++++");
                                                        return message._2();
                                                    }
                                                }
                                                );
        data.print();

        jssc.start();
        jssc.awaitTermination();

    }
}

我正在运行这个工作,在其他终端我正在运行kafka-producer来发布消息:

Hi kafka
second message
another message

但是,spark-streaming控制台上的输出日志不会显示消息,但会显示收到的零块:

-------------------------------------------
Time: 1417438988000 ms
-------------------------------------------

2014-12-01 08:03:08,008 INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Starting job streaming job 1417438988000 ms.0 from job set of time 1417438988000 ms
2014-12-01 08:03:08,008 INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Finished job streaming job 1417438988000 ms.0 from job set of time 1417438988000 ms
2014-12-01 08:03:08,009 INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Total delay: 0.008 s for time 1417438988000 ms (execution: 0.000 s)
2014-12-01 08:03:08,010 INFO  [sparkDriver-akka.actor.default-dispatcher-15] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Added jobs for time 1417438988000 ms
2014-12-01 08:03:08,015 INFO  [sparkDriver-akka.actor.default-dispatcher-15] rdd.MappedRDD (Logging.scala:logInfo(59)) - Removing RDD 39 from persistence list
2014-12-01 08:03:08,024 INFO  [sparkDriver-akka.actor.default-dispatcher-4] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 39
2014-12-01 08:03:08,027 INFO  [sparkDriver-akka.actor.default-dispatcher-15] rdd.BlockRDD (Logging.scala:logInfo(59)) - Removing RDD 38 from persistence list
2014-12-01 08:03:08,031 INFO  [sparkDriver-akka.actor.default-dispatcher-2] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 38
2014-12-01 08:03:08,033 INFO  [sparkDriver-akka.actor.default-dispatcher-15] kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[38] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417438988000 ms
2014-12-01 08:03:09,002 INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.ReceiverTracker (Logging.scala:logInfo(59)) - Stream 0 received 0 blocks

为什么没有收到数据块?我尝试在控制台上使用kafka生产者 - 消费者bin/kafka-console-producer.... 并且bin/kafka-console-consumer...它的工作完美,但为什么我的代码没有...任何想法?

1 个回答
  • 问题解决了.

    上面的代码是正确的.我们将再添加两行来抑制生成的[INFO]和[WARN].所以最终的代码是:

    package com.spark;
    
    import scala.Tuple2;
    import org.apache.log4j.Logger;
    import org.apache.log4j.Level;
    import kafka.serializer.Decoder;
    import kafka.serializer.Encoder;
    import org.apache.spark.streaming.Duration;
    import org.apache.spark.*;
    import org.apache.spark.api.java.function.*;
    import org.apache.spark.api.java.*;
    import org.apache.spark.streaming.kafka.KafkaUtils;
    import org.apache.spark.streaming.kafka.*;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
    import java.util.Map;
    import java.util.HashMap;
    
    public class SparkStream {
        public static void main(String args[])
        {
            if(args.length != 3)
            {
                System.out.println("SparkStream <zookeeper_ip> <group_nm> <topic1,topic2,...>");
                System.exit(1);
            }
    
            Logger.getLogger("org").setLevel(Level.OFF);
            Logger.getLogger("akka").setLevel(Level.OFF);
            Map<String,Integer> topicMap = new HashMap<String,Integer>();
            String[] topic = args[2].split(",");
            for(String t: topic)
            {
                topicMap.put(t, new Integer(3));
            }
    
            JavaStreamingContext jssc = new JavaStreamingContext("local[4]", "SparkStream", new Duration(1000));
            JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap );
    
            System.out.println("Connection done++++++++++++++");
            JavaDStream<String> data = messages.map(new Function<Tuple2<String, String>, String>() 
                                                    {
                                                        public String call(Tuple2<String, String> message)
                                                        {
                                                            return message._2();
                                                        }
                                                    }
                                                    );
            data.print();
    
            jssc.start();
            jssc.awaitTermination();
    
        }
    }
    

    我们还需要在POM.xml中添加依赖项:

    <dependency>
    <groupId>com.msiops.footing</groupId>
    <artifactId>footing-tuple</artifactId>
    <version>0.2</version>
    </dependency>  
    

    这种依赖性用于利用由于spark-worker不可用而导致scala.Tuple2
    的错误,Stream 0 received 0 block并且spark-worker-core设置为1.对于spark-streaming,我们需要核心> = 2.所以我们需要在spark-config文件中进行更改.请参阅安装手册.添加行export SPARK_WORKER_CORE=5SPARK_MASTER='hostname'改为SPARK_MASTER=<your local IP>.当您访问Spark UI Web控制台时,这个本地IP就是您在BOLD中看到的...类似于:spark://192.168..:<port>.我们这里不需要这个端口.只需要IP.
    现在重新启动你的spark-master和spark-worker并开始流式传输:)

    输出:

    -------------------------------------------
    Time: 1417443060000 ms
    -------------------------------------------
    message 1
    
    -------------------------------------------
    Time: 1417443061000 ms
    -------------------------------------------
    message 2
    
    -------------------------------------------
    Time: 1417443063000 ms
    -------------------------------------------
    message 3
    message 4
    
    -------------------------------------------
    Time: 1417443064000 ms
    -------------------------------------------
    message 5
    message 6
    messag 7
    
    -------------------------------------------
    Time: 1417443065000 ms
    -------------------------------------------
    message 8
    

    2022-12-11 02:57 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有