热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

技术日志:深入探讨SparkStreaming与SparkSQL的融合应用

篇首语:本文由编程笔记#小编为大家整理,主要介绍了071 SparkStreaming与SparkSQL集成相关的知识,希望对你有一定的参考价值。 1.说明  虽然DStream可以转换成RDD,但是

篇首语:本文由编程笔记#小编为大家整理,主要介绍了071 SparkStreaming与SparkSQL集成相关的知识,希望对你有一定的参考价值。


1.说明

  虽然DStream可以转换成RDD,但是如果比较复杂,可以考虑使用SparkSQL。

 

2.集成方式

  Streaming和Core整合:
    transform或者foreachRDD方法
  Core和SQL整合:
    RDD <==> DataFrame 互换

 

3.程序


1 package com.sql.it
2 import org.apache.spark.sql.SQLContext
3 import org.apache.spark.storage.StorageLevel
4 import org.apache.spark.streaming.kafka.KafkaUtils
5 import org.apache.spark.streaming.{Seconds, StreamingContext}
6 import org.apache.spark.{SparkConf, SparkContext}
7 object StreamingSQL {
8 def main(args: Array[String]): Unit = {
9 val cOnf= new SparkConf()
10 .setAppName("StreamingWindowOfKafka22")
11 .setMaster("local[*]")
12 val sc = SparkContext.getOrCreate(conf)
13 val ssc = new StreamingContext(sc, Seconds(5))
14 // 当调用updateStateByKey函数API的时候,必须给定checkpoint dir
15 // 路径对应的文件夹不能存在
16 ssc.checkpoint("hdfs://linux-hadoop01.ibeifeng.com:8020/beifeng/spark/streaming/9421151351")
17
18 val kafkaParams = Map(
19 "group.id" -> "streaming-kafka-78912151",
20 "zookeeper.connect" -> "linux-hadoop01.ibeifeng.com:2181/kafka",
21 "auto.offset.reset" -> "smallest"
22 )
23 val topics = Map("beifeng" -> 4) // topics中value是读取数据的线程数量,所以必须大于等于1
24 val dstream = KafkaUtils.createStream[String, String, kafka.serializer.StringDecoder, kafka.serializer.StringDecoder](
25 ssc, // 给定SparkStreaming上下文
26 kafkaParams, // 给定连接kafka的参数信息 ===> 通过Kafka HighLevelConsumerAPI连接
27 topics, // 给定读取对应topic的名称以及读取数据的线程数量
28 StorageLevel.MEMORY_AND_DISK_2 // 指定数据接收器接收到kafka的数据后保存的存储级别
29 ).map(_._2)
30
31 /**
32 * transform:将DStream的操作转换为RDD的操作,调用该api最终只需要返回一个新的RDD即可
33 */
34 dstream.transform(rdd => {
35 // 使用sql统计wordcoount
36 val sqlCOntext= SQLContextSingelton.getSQLContext(rdd.sparkContext)
37 import sqlContext.implicits._
38 val procedRDD = rdd.filter(_.nonEmpty).flatMap(_.split(" ").map((_, 1)))
39 procedRDD.toDF("word", "c").registerTempTable("tb_word")
40 val resultRDD = sqlContext.sql("select word, count(c) as vc from tb_word group by word").map(row => {
41 val word = row.getAs[String]("word")
42 val count = row.getAs[Long]("vc")
43 (word, count)
44 })
45
46 resultRDD
47 }).print()
48
49 // 启动开始处理
50 ssc.start()
51 ssc.awaitTermination() // 等等结束,监控一个线程的中断操作
52 }
53 }
54
55 object SQLContextSingelton {
56 @transient private var instance: SQLCOntext= _
57
58 def getSQLContext(sc: SparkContext): SQLCOntext= {
59 if (instance == null) {
60 synchronized[SQLContext] {
61 if (instance == null) {
62 instance = new SQLContext(sc)
63 }
64 instance
65 }
66 }
67 instance
68 }
69 }

 

4.效果

  

 



推荐阅读
  • 本文详细分析了Hive在启动过程中遇到的权限拒绝错误,并提供了多种解决方案,包括调整文件权限、用户组设置以及环境变量配置等。 ... [详细]
  • Hadoop入门与核心组件详解
    本文详细介绍了Hadoop的基础知识及其核心组件,包括HDFS、MapReduce和YARN。通过本文,读者可以全面了解Hadoop的生态系统及应用场景。 ... [详细]
  • 本文详细介绍了 Flink 和 YARN 的交互机制。YARN 是 Hadoop 生态系统中的资源管理组件,类似于 Spark on YARN 的配置方式。我们将基于官方文档,深入探讨如何在 YARN 上部署和运行 Flink 任务。 ... [详细]
  • 深入解析Spark核心架构与部署策略
    本文详细探讨了Spark的核心架构,包括其运行机制、任务调度和内存管理等方面,以及四种主要的部署模式:Standalone、Apache Mesos、Hadoop YARN和Kubernetes。通过本文,读者可以深入了解Spark的工作原理及其在不同环境下的部署方式。 ... [详细]
  • 本文详细介绍了如何配置Apache Flume与Spark Streaming,实现高效的数据传输。文中提供了两种集成方案,旨在帮助用户根据具体需求选择最合适的配置方法。 ... [详细]
  • 本文详细探讨了如何在 SparkSQL 中创建 DataFrame,涵盖了从基本概念到具体实践的各种方法。作为持续学习的一部分,本文将持续更新以提供最新信息。 ... [详细]
  • 全面解读Apache Flink的核心架构与优势
    Apache Flink作为大数据处理领域的新兴力量,凭借其独特的流处理能力和高效的批处理性能,迅速获得了广泛的关注。本文旨在深入探讨Flink的关键技术特点及其应用场景,为大数据处理提供新的视角。 ... [详细]
  • 深入浅出:Hadoop架构详解
    Hadoop作为大数据处理的核心技术,包含了一系列组件如HDFS(分布式文件系统)、YARN(资源管理框架)和MapReduce(并行计算模型)。本文将通过实例解析Hadoop的工作原理及其优势。 ... [详细]
  • 本文详细介绍如何使用Python进行配置文件的读写操作,涵盖常见的配置文件格式(如INI、JSON、TOML和YAML),并提供具体的代码示例。 ... [详细]
  • 1:有如下一段程序:packagea.b.c;publicclassTest{privatestaticinti0;publicintgetNext(){return ... [详细]
  • 本文详细介绍了 Dockerfile 的编写方法及其在网络配置中的应用,涵盖基础指令、镜像构建与发布流程,并深入探讨了 Docker 的默认网络、容器互联及自定义网络的实现。 ... [详细]
  • 本文探讨了如何优化和正确配置Kafka Streams应用程序以确保准确的状态存储查询。通过调整配置参数和代码逻辑,可以有效解决数据不一致的问题。 ... [详细]
  • 大数据SQL优化:全面解析数据倾斜解决方案
    本文深入探讨了大数据SQL优化中的数据倾斜问题,提供了多种解决策略和实际案例,旨在帮助读者理解和应对这一常见挑战。 ... [详细]
  • 面对众多的数据分析工具,如何选择最适合自己的那一个?对于初学者而言,了解并掌握几种核心工具是快速入门的关键。本文将从数据处理的不同阶段出发,推荐三种广泛使用的数据分析工具。 ... [详细]
  • 龙蜥社区开发者访谈:技术生涯的三次蜕变 | 第3期
    龙蜥社区的开发者们通过自己的实践和经验,推动着开源技术的发展。本期「龙蜥开发者说」聚焦于一位资深开发者的三次技术转型,分享他在龙蜥社区的成长故事。 ... [详细]
author-avatar
Liushan2502897753
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有