热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

sparkstreaming小案例

sparkstreaming实时计算的案例数据{car:皖A9A7N2,city_code:340500,county_code:340522,card:1179880316030

spark streaming

实时计算的案例


数据

{"car":"皖A9A7N2","city_code":"340500","county_code":"340522","card":117988031603010,"camera_id":"00001","orientation":"西南","road_id":34052055,"time":1614711895,"speed":36.38}
{"car":"皖A9A7N2","city_code":"340500","county_code":"340522","card":117988031603010,"camera_id":"01001","orientation":"西南","road_id":34052056,"time":1614711904,"speed":35.38}
{"car":"皖A9A7N2","city_code":"340500","county_code":"340522","card":117985031601010,"camera_id":"01214","orientation":"西南","road_id":34052057,"time":1614711914,"speed":45.38}
{"car":"皖A9A7N2","city_code":"340500","county_code":"340522","card":117984031601010,"camera_id":"01024","orientation":"西北","road_id":34052058,"time":1614711924,"speed":45.29}
{"car":"皖A9A7N2","city_code":"340500","county_code":"340522","card":117970031606010,"camera_id":"01022","orientation":"西北","road_id":34052059,"time":1614712022,"speed":75.29}
{"car":"皖A9A7N2","city_code":"340500","county_code":"340522","card":117956031625010,"camera_id":"01132","orientation":"西北","road_id":34052060,"time":1614712120,"speed":46.29}
{"car":"皖A9A7N2","city_code":"340500","county_code":"340522","card":117925031638010,"camera_id":"00202","orientation":"西北","road_id":34052061,"time":1614712218,"speed":82.29}
{"car":"皖A9A7N2","city_code":"340500","county_code":"340522","card":117902031651010,"camera_id":"01102","orientation":"西北","road_id":34052062,"time":1614712316,"speed":82.29}
{"car":"皖A9A7N2","city_code":"340100","county_code":"340181","card":117885031666010,"camera_id":"01221","orientation":"西北","road_id":34308114,"time":1614712414,"speed":48.5}
{"car":"皖A9A7N2","city_code":"340100","county_code":"340181","card":117855031704010,"camera_id":"00231","orientation":"西北","road_id":34308115,"time":1614712619,"speed":59.5}
{"car":"皖A9A7N2","city_code":"340100","county_code":"340181","card":117817031742010,"camera_id":"01130","orientation":"西北","road_id":34308116,"time":1614712824,"speed":52.5}
{"car":"皖A9A7N2","city_code":"340100","county_code":"340181","card":117784031777010,"camera_id":"00123","orientation":"西北","road_id":34308117,"time":1614713030,"speed":71.5}
{"car":"皖A9A7N2","city_code":"340100","county_code":"340181","card":117720031793010,"camera_id":"00132","orientation":"西北","road_id":34308118,"time":1614713235,"speed":65.5}
...
...
...

注意点:

* 将数据保存到数据库存在的问题
* 1、如果直接使用foreach。会为每一条数据创建一个链接,效率低,而且会导致数据库压力过大
* 2、如果将网络链接放在foreach算子的外面,会报错, 网络链接不能再网络中传输
*
* 正确写法
* 使用foreachPartition,只会为每一个分区创建一个数据库链接
*
* rdd的foreach和foreachPartition
* foreach一次处理一条数据
* foreachPartition: 一次处理一个分区的数据

处理方法:

package com.shujia.spark.streaming
import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.text.SimpleDateFormat
import java.util.Date
object Demo8Card {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.appName("ds")
.master("local[2]")
.config("spark.sql.shuffle.partitions", 1)
.getOrCreate()
val sc: SparkCOntext= spark.sparkContext
val ssc = new StreamingContext(sc, Durations.seconds(5))
/**
* 读取卡口过车数据
*/
val linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888)
/**
* 1、解析json格式的数据
*
*/
val cardAndSpeedDS: DStream[(Long, (Double, Int))] = linesDS.map(line => {
//使用fastjson工具解析json数据
val carJson: JSOnObject= JSON.parseObject(line)
//取出卡口编号和车速
val card: LOng= carJson.getLong("card")
val speed: Double = carJson.getDouble("speed")
(card, (speed, 1))
})
/**
* 2、实时统计每隔卡口的平均车速,和车的数量
* 统计最近15秒的车辆,每隔5秒统计一次
*
*/
val sumSpeedAndNUmDS: DStream[(Long, (Double, Int))] = cardAndSpeedDS
.reduceByKeyAndWindow((kv1: (Double, Int), kv2: (Double, Int)) => {
//计算总的测试
val sumSpeed: Double = kv1._1 + kv2._1
//计算车的数量
val num: Int = kv1._2 + kv2._2
(sumSpeed, num)
}, Durations.seconds(15), Durations.seconds(5))
/**
* 3、计算平均车速
*
*/
val avgSpeedAndNumDs: DStream[(Long, Int, Double)] = sumSpeedAndNUmDS.map {
case (card: Long, (sumSpeed: Double, num: Int)) =>
val avgSpeed: Double = sumSpeed / num
(card, num, avgSpeed)
}
/**
* 4、将统计的结果保存到mysql中
*
* 将数据保存到数据库存在的问题
* 1、如果直接使用foreach。会为每一条数据创建一个链接,效率低,而且会导致数据库压力过大
* 2、如果将网络链接放在foreach算子的外面,会报错, 网络链接不能再网络中传输
*
* 正确写法
* 使用foreachPartition,只会为每一个分区创建一个数据库链接
*
* rdd的foreach和foreachPartition
* foreach一次处理一条数据
* foreachPartition: 一次处理一个分区的数据
*
*/
avgSpeedAndNumDs.foreachRDD(rdd => {
rdd.foreachPartition(iter => {
//获取统计的时间
val date = new Date()
val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val comDate: String = format.format(date)
//1、加载驱动
Class.forName("com.mysql.jdbc.Driver")
//2、创建链接
val con: COnnection= DriverManager.getConnection("jdbc:mysql://master:3306/bigdata", "root", "123456")
//3、编写插入数据的sql
val stat: PreparedStatement = con.prepareStatement("insert into card_avg_speed_and_num(card,com_date,num,avg_speed) values(?,?,?,?)")
//这里的foreach是迭代器的一个普通方法,不是一个算子
iter.foreach {
case (card: Long, num: Int, avgSpeed: Double) =>
//设置参数
stat.setLong(1, card)
stat.setString(2, comDate)
stat.setInt(3, num)
stat.setDouble(4, avgSpeed)
//插入数据
stat.execute()
}
stat.close()
con.close()
})
})
avgSpeedAndNumDs.print()
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}


推荐阅读
  • 本文介绍了在开发Android新闻App时,搭建本地服务器的步骤。通过使用XAMPP软件,可以一键式搭建起开发环境,包括Apache、MySQL、PHP、PERL。在本地服务器上新建数据库和表,并设置相应的属性。最后,给出了创建new表的SQL语句。这个教程适合初学者参考。 ... [详细]
  • 本文介绍了如何使用php限制数据库插入的条数并显示每次插入数据库之间的数据数目,以及避免重复提交的方法。同时还介绍了如何限制某一个数据库用户的并发连接数,以及设置数据库的连接数和连接超时时间的方法。最后提供了一些关于浏览器在线用户数和数据库连接数量比例的参考值。 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • 本文介绍了在Mac上搭建php环境后无法使用localhost连接mysql的问题,并通过将localhost替换为127.0.0.1或本机IP解决了该问题。文章解释了localhost和127.0.0.1的区别,指出了使用socket方式连接导致连接失败的原因。此外,还提供了相关链接供读者深入了解。 ... [详细]
  • Todayatworksomeonetriedtoconvincemethat:今天在工作中有人试图说服我:{$obj->getTableInfo()}isfine ... [详细]
  • 本文介绍了C#中数据集DataSet对象的使用及相关方法详解,包括DataSet对象的概述、与数据关系对象的互联、Rows集合和Columns集合的组成,以及DataSet对象常用的方法之一——Merge方法的使用。通过本文的阅读,读者可以了解到DataSet对象在C#中的重要性和使用方法。 ... [详细]
  • 本文介绍了OC学习笔记中的@property和@synthesize,包括属性的定义和合成的使用方法。通过示例代码详细讲解了@property和@synthesize的作用和用法。 ... [详细]
  • 后台获取视图对应的字符串
    1.帮助类后台获取视图对应的字符串publicclassViewHelper{将View输出为字符串(注:不会执行对应的ac ... [详细]
  • 标题: ... [详细]
  • 本文介绍了一种在PHP中对二维数组根据某个字段进行排序的方法,以年龄字段为例,按照倒序的方式进行排序,并给出了具体的代码实现。 ... [详细]
  • 如何在php文件中添加图片?
    本文详细解答了如何在php文件中添加图片的问题,包括插入图片的代码、使用PHPword在载入模板中插入图片的方法,以及使用gd库生成不同类型的图像文件的示例。同时还介绍了如何生成一个正方形文件的步骤。希望对大家有所帮助。 ... [详细]
  • Activiti7流程定义开发笔记
    本文介绍了Activiti7流程定义的开发笔记,包括流程定义的概念、使用activiti-explorer和activiti-eclipse-designer进行建模的方式,以及生成流程图的方法。还介绍了流程定义部署的概念和步骤,包括将bpmn和png文件添加部署到activiti数据库中的方法,以及使用ZIP包进行部署的方式。同时还提到了activiti.cfg.xml文件的作用。 ... [详细]
  • Hibernate延迟加载深入分析-集合属性的延迟加载策略
    本文深入分析了Hibernate延迟加载的机制,特别是集合属性的延迟加载策略。通过延迟加载,可以降低系统的内存开销,提高Hibernate的运行性能。对于集合属性,推荐使用延迟加载策略,即在系统需要使用集合属性时才从数据库装载关联的数据,避免一次加载所有集合属性导致性能下降。 ... [详细]
  • 大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记
    本文介绍了大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记,包括outputFormat接口实现类、自定义outputFormat步骤和案例。案例中将包含nty的日志输出到nty.log文件,其他日志输出到other.log文件。同时提供了一些相关网址供参考。 ... [详细]
  • 目录浏览漏洞与目录遍历漏洞的危害及修复方法
    本文讨论了目录浏览漏洞与目录遍历漏洞的危害,包括网站结构暴露、隐秘文件访问等。同时介绍了检测方法,如使用漏洞扫描器和搜索关键词。最后提供了针对常见中间件的修复方式,包括关闭目录浏览功能。对于保护网站安全具有一定的参考价值。 ... [详细]
author-avatar
孤鹜1101
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有