热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

SparkStreaming和Kafka整合之路(最新版本)

2019独角兽企业重金招聘Python工程师标准最近完成了SparkStreaming和Kafka的整合工作,耗时虽然不长,但是当中还是遇到了不少

2019独角兽企业重金招聘Python工程师标准>>> hot3.png

最近完成了Spark Streaming和Kafka的整合工作,耗时虽然不长,但是当中还是遇到了不少的坑,记录下来,大家方便绕行。

先说一下环境:

Spark 2.0.0    kafka_2.11-0.10.0.0

之前的项目当中,已经在pom当中添加了需要的Spark Streaming的依赖,这次只需要添加Spark Streaming Kafka的以来就行了,问题来了。首先是我之前添加的Spark Streaming的依赖:

   
      org.apache.spark
      spark-streaming_2.11
      2.0.0
   

然后是找到的spark streaming对kafka的支持依赖:


    org.apache.spark
    spark-streaming-kafka_2.11
    1.6.2

请注意2个version部分,好像差的有点多。不管了,照着例子写写看,果然报了各种class not found的错误。基本可以判断是版本差异造成的问题。

可是,在http://mvnrepository.com上找不到更高版本的依赖怎么办呢?

考虑了一下,只有一个办法了,下载spark源码,自行编译打包需要的jar包。

在github上找到spark项目,clone下来,懒病又犯了,也没仔细看当中的说明,直接就clean compile等等。结果又是各种报错。好吧,好好看看吧,github上给了个地址:http://spark.apache.org/docs/latest/building-spark.html,照着做就没问题了。

然后把项目当中pom里面对streaming kafka的依赖删掉,引入我们自己生成的jar包:

spark-streaming-kafka-0-10_2.11-2.1.0-SNAPSHOT.jar

 

然后贴上代码:

    val conf = new SparkConf().setAppName("kafkastream").setMaster("spark://master:7077").
      set("spark.driver.host", "192.168.1.142").
      setJars(List("/src/git/msgstream/out/artifacts/msgstream_jar/msgstream.jar",
        "/src/git/msgstream/lib/kafka-clients-0.10.0.0.jar",
        "/src/git/msgstream/lib/kafka_2.11-0.10.0.0.jar",
        "/src/git/msgstream/lib/spark-streaming-kafka-0-10_2.11-2.1.0-SNAPSHOT.jar"))
    val ssc = new StreamingContext(conf, Seconds(2))

    val topics = List("woozoom")
    val kafkaParams = Map(("bootstrap.servers", "master:9092,slave01:9092,slave02:9092"),
      ("group.id", "sparkstreaming"), ("key.deserializer", classOf[StringDeserializer]),
      ("value.deserializer", classOf[StringDeserializer]))
    val preferredHosts = LocationStrategies.PreferConsistent
    val offsets = Map(new TopicPartition("woozoom", 0) -> 2L)

    val lines = KafkaUtils.createDirectStream[String, String](
      ssc,
      preferredHosts,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets))

    lines.foreachRDD(rdd => {
      rdd.foreach(x => {
        println(x)
      })
    })

    ssc.start()
    ssc.awaitTermination()

上面标红的部分,是需要注意的,而这些本来我也是不会写的,后来去到spark源码找到test代码

/src/git/spark/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala

测试,通过!!!

 

总结:

1、spark项目很多时候,资源不是很充分,想找例子的话,2个途径,一个spark安装包当中的example但是这个很多时候,版本是比较老的,不是很理想。更好地是从spark源码当中找他的测试用例,这个基本上和你用的最新版本是完全匹配的。

2、编译过很多开源项目,一般大的项目都会有相应的build说明,照着那个做,会为你节省很多时间。

3、从最开始遇到的版本号的问题来看,很多时候我们遇到的问题并不一定是我们自己的问题,不迷信,大胆的相信自己的推测,非常有助于问题的解决。


转:https://my.oschina.net/dongtianxi/blog/748590



推荐阅读
  • springboot基于redis配置session共享项目环境配置pom.xml引入依赖application.properties配置Cookie序列化(高版本不需要)测试启 ... [详细]
  • Tomcat安装与配置教程及常见问题解决方法
    本文介绍了Tomcat的安装与配置教程,包括jdk版本的选择、域名解析、war文件的部署和访问、常见问题的解决方法等。其中涉及到的问题包括403问题、数据库连接问题、1130错误、2003错误、Java Runtime版本不兼容问题以及502错误等。最后还提到了项目的前后端连接代码的配置。通过本文的指导,读者可以顺利完成Tomcat的安装与配置,并解决常见的问题。 ... [详细]
  • 本文讨论了Alink回归预测的不完善问题,指出目前主要针对Python做案例,对其他语言支持不足。同时介绍了pom.xml文件的基本结构和使用方法,以及Maven的相关知识。最后,对Alink回归预测的未来发展提出了期待。 ... [详细]
  • 在IDEA中如何安装配置maven
    这篇文章主要介绍在IDEA中如何安装配置maven,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!一、下载maven:1、maven官网:h ... [详细]
  • 微软头条实习生分享深度学习自学指南
    本文介绍了一位微软头条实习生自学深度学习的经验分享,包括学习资源推荐、重要基础知识的学习要点等。作者强调了学好Python和数学基础的重要性,并提供了一些建议。 ... [详细]
  • 如何实现织梦DedeCms全站伪静态
    本文介绍了如何通过修改织梦DedeCms源代码来实现全站伪静态,以提高管理和SEO效果。全站伪静态可以避免重复URL的问题,同时通过使用mod_rewrite伪静态模块和.htaccess正则表达式,可以更好地适应搜索引擎的需求。文章还提到了一些相关的技术和工具,如Ubuntu、qt编程、tomcat端口、爬虫、php request根目录等。 ... [详细]
  • 本文介绍了在Mac上搭建php环境后无法使用localhost连接mysql的问题,并通过将localhost替换为127.0.0.1或本机IP解决了该问题。文章解释了localhost和127.0.0.1的区别,指出了使用socket方式连接导致连接失败的原因。此外,还提供了相关链接供读者深入了解。 ... [详细]
  • Android系统移植与调试之如何修改Android设备状态条上音量加减键在横竖屏切换的时候的显示于隐藏
    本文介绍了如何修改Android设备状态条上音量加减键在横竖屏切换时的显示与隐藏。通过修改系统文件system_bar.xml实现了该功能,并分享了解决思路和经验。 ... [详细]
  • 本文介绍了在Windows环境下如何配置php+apache环境,包括下载php7和apache2.4、安装vc2015运行时环境、启动php7和apache2.4等步骤。希望对需要搭建php7环境的读者有一定的参考价值。摘要长度为169字。 ... [详细]
  • 本文介绍了一些Java开发项目管理工具及其配置教程,包括团队协同工具worktil,版本管理工具GitLab,自动化构建工具Jenkins,项目管理工具Maven和Maven私服Nexus,以及Mybatis的安装和代码自动生成工具。提供了相关链接供读者参考。 ... [详细]
  • MyBatis多表查询与动态SQL使用
    本文介绍了MyBatis多表查询与动态SQL的使用方法,包括一对一查询和一对多查询。同时还介绍了动态SQL的使用,包括if标签、trim标签、where标签、set标签和foreach标签的用法。文章还提供了相关的配置信息和示例代码。 ... [详细]
  • 目录浏览漏洞与目录遍历漏洞的危害及修复方法
    本文讨论了目录浏览漏洞与目录遍历漏洞的危害,包括网站结构暴露、隐秘文件访问等。同时介绍了检测方法,如使用漏洞扫描器和搜索关键词。最后提供了针对常见中间件的修复方式,包括关闭目录浏览功能。对于保护网站安全具有一定的参考价值。 ... [详细]
  • 本文总结了初学者在使用dubbo设计架构过程中遇到的问题,并提供了相应的解决方法。问题包括传输字节流限制、分布式事务、序列化、多点部署、zk端口冲突、服务失败请求3次机制以及启动时检查。通过解决这些问题,初学者能够更好地理解和应用dubbo设计架构。 ... [详细]
  • IamsettingupApacheserverwithTortoiseSVNforalocalsourcecoderepository.Ihaveobservedt ... [详细]
  • Checklist[x]Ihaveincludedinformationaboutrelevantversions ... [详细]
author-avatar
mobiledu2502917797
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有