热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

20210111Spark35(SparkStreaming4)

聚类运算Redis版非聚类运算hbase版Phoenix版求最大值Redis单机版是支持事务的,但是集

聚类运算Redis版
非聚类运算 hbase版 Phoenix版求最大值

Redis单机版是支持事务的,但是集群版是不支持事务的

回顾Redis的事务操作

object JedisTransactionTest { def main(args: Array[String]): Unit = { //数据库连接 val jedis = new Jedis("linux03", 6379) jedis.auth("123456") //开启jedis的pipeline(整体来执行) var pipeline: Pipeline = null try { pipeline = jedis.pipelined() //开启多个操作 pipeline.multi() pipeline.hincrBy("wc", "aaa", 18) //Redis Hincrby 命令用于为哈希表中的字段值加上指定增量值。 //val i = 1 / 0 pipeline.hset("dot", "bbb", "19") //将哈希表 key 中的域 field 的值设为 value 。 //如果 key 不存在,一个新的哈希表被创建并进行HSET 操作。 //如果域 field 已经存在于哈希表中,旧值将被覆盖 //提交事务 pipeline.sync() pipeline.exec() } catch { case e: Exception => { e.printStackTrace() pipeline.discard() //回滚事务 } } finally { pipeline.close() jedis.close() } } }

Ridis版聚类运算

//连接池 object JedisConnectionPool { val cOnfig= new JedisPoolConfig() config.setMaxTotal(5) config.setMaxIdle(2) config.setTestOnBorrow(true) val pool = new JedisPool(config, "linux03", 6379, 10000, "123456") def getConnection(): Jedis = { pool.getResource } }

//获取历史偏移量 object OffsetUtils { def queryHistoryOffsetFromRedis(appName: String, groupId: String): Map[TopicPartition, Long] = { val historyOffset = new mutable.HashMap[TopicPartition, Long]() val jedis = JedisConnectionPool.getConnection() val res: util.Map[String, String] = jedis.hgetAll(appName + "_" + groupId) import scala.collection.JavaConverters._ for (t

//保证ExactlyOnce,精确一次性语义(要求数据处理且被处理一次) object KafkaWordCountToRedis { def main(args: Array[String]): Unit = { val appName = this.getClass.getSimpleName val groupId = "g008" val cOnf= new SparkConf().setAppName(appName).setMaster("local[*]") val ssc = new StreamingContext(conf, Seconds(5)) ssc.sparkContext.setLogLevel("WARN") val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "linux03:9092,linux04:9092,linux05:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> groupId, "auto.offset.reset" -> "earliest", "enable.auto.commit" -> (false: java.lang.Boolean) //不自动提交偏移量 ) val topics = Array("wordcount18") val historyOffset: Map[TopicPartition, Long] = OffsetUtils.queryHistoryOffsetFromRedis(appName, groupId) //直连:sparkstreaming用来读取数据的task(kafka的消费者),直接连接到kafka指定topic的leader分区中 val lines: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent, //位置策略 ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, historyOffset) //消费策略 ) //调用完createDirectStream,返回的第一个DStream进行操作,因为只有第一手的DStream有偏移量 lines.foreachRDD(rdd => { //在指定的时间周期内,kafka中有新的数据写入 if (!rdd.isEmpty()) { //获取偏移量 val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges val lines: RDD[String] = rdd.map(_.value()) val reduced: RDD[(String, Int)] = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) //将计算好的结果收集到Driver端,然后写入到Redis val res: Array[(String, Int)] = reduced.collect() val jedis = JedisConnectionPool.getConnection() var pipeline: Pipeline = null try { pipeline = jedis.pipelined() pipeline.multi() //目的:是为了实现ExactlyOnce,将计算好的结果和偏移量在同一个事物中同时写入到Redis res.foreach(t => { pipeline.hincrBy("dot", t._1, t._2) }) offsetRanges.foreach(o => { val topic = o.topic val partition = o.partition val untilOffset = o.untilOffset pipeline.hset(appName + "_" + groupId, topic + "_" + partition, untilOffset.toString) }) pipeline.exec() pipeline.sync() } catch { case e: Exception => { e.printStackTrace() pipeline.discard() } } finally { pipeline.close() jedis.close() } } }) ssc.start() ssc.awaitTermination() } }

非聚类运算

hbase 只支持行级事务

2021-01-11-Spark-35(SparkStreaming 4)
image.png

数据在kafka中已经有唯一的id,可作为hbase中的rowkey
每一行的最后一个数据记录偏移量,即可无需读取,查找以一个组内的最大的偏移量即可
但是hbase是不支持分组查询的
解决方案:
1.自定义协处理器
2.使用Phoenix来SQL查询
如果消费者多读了,但是hbase的id是唯一的,所以会把先前的数据给覆盖掉

//每一次启动该程序,都要从Hbase查询历史偏移量 def queryHistoryOffsetFromHbase(appid: String, groupid: String): Map[TopicPartition, Long] = { val offsets = new mutable.HashMap[TopicPartition, Long]() val cOnnection= DriverManager.getConnection("jdbc:phoenix:linux03,linux04,linux05:2181") //分组求max,就是求没有分分区最(大)新的偏移量 val ps = connection.prepareStatement("select "topic_partition", max("offset") from "t_orders" where "appid_groupid" = ? group by "topic_partition"") ps.setString(1, appid + "_" + groupid) //查询返回结果 val rs: ResultSet = ps.executeQuery() while(rs.next()) { val topicAndPartition = rs.getString(1) val fields = topicAndPartition.split("_") val topic = fields(0) val partition = fields(1).toInt val offset = rs.getLong(2) offsets.put(new TopicPartition(topic, partition), offset) } offsets.toMap }

/** * https://www.jianshu.com/p/f1340eaa3e06 * * spark.task.maxFailures * yarn.resourcemanager.am.max-attempts * spark.speculation * * create view "t_orders" (pk VARCHAR PRIMARY KEY, "offset"."appid_groupid" VARCHAR, "offset"."topic_partition" VARCHAR, "offset"."offset" UNSIGNED_LONG); * select max("offset") from "t_orders" where "appid_groupid" = 'g1' group by "topic_partition"; * */ object KafkaToHbase { def main(args: Array[String]): Unit = { //true a1 g1 ta,tb val Array(isLocal, appName, groupId, allTopics) = args val cOnf= new SparkConf() .setAppName(appName) if (isLocal.toBoolean) { conf.setMaster("local[*]") } val sc = new SparkContext(conf) sc.setLogLevel("WARN") val ssc: StreamingCOntext= new StreamingContext(sc, Milliseconds(5000)) val topics = allTopics.split(",") //SparkSteaming跟kafka整合的参数 val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "linux03:9092,linux04:9092,linux05:9092", "key.deserializer" -> classOf[StringDeserializer].getName, "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", "group.id" -> groupId, "auto.offset.reset" -> "earliest", //如果没有记录偏移量,第一次从最开始读,有偏移量,接着偏移量读 "enable.auto.commit" -> (false: java.lang.Boolean) //消费者不自动提交偏移量 ) //查询历史偏移量【上一次成功写入到数据库的偏移量】 val historyOffsets: Map[TopicPartition, Long] = OffsetUtils.queryHistoryOffsetFromHbase(appName, groupId) //跟Kafka进行整合,需要引入跟Kafka整合的依赖 //createDirectStream更加高效,使用的是Kafka底层的消费API,消费者直接连接到Kafka的Leader分区进行消费 //直连方式,RDD的分区数量和Kafka的分区数量是一一对应的【数目一样】 val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, //调度task到Kafka所在的节点 ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, historyOffsets) //指定订阅Topic的规则, 从历史偏移量接着读取数据 ) // var offsetRanges: Array[OffsetRange] = null // // val ds2 = kafkaDStream.map(rdd => { // //获取KakfaRDD的偏移量(Driver) // offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges // rdd.value() // }) kafkaDStream.foreachRDD(rdd => { if (!rdd.isEmpty()) { //获取KakfaRDD的偏移量(Driver) val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //获取KafkaRDD中的数据 val lines: RDD[String] = rdd.map(_.value()) val orderRDD: RDD[Order] = lines.map(line => { var order: Order = null try { order = JSON.parseObject(line, classOf[Order]) } catch { case e: JSOnException=> { //TODO } } order }) //过滤问题数据 val filtered: RDD[Order] = orderRDD.filter(_ != null) //调用Action filtered.foreachPartition(iter => { if(iter.nonEmpty) { //将RDD中每一个分区中的数据保存到Hbase中 //创建一个Hbase的连接 val connection: COnnection= HBaseUtil.getConnection("linux03,linux04,linux05", 2181) val htable = connection.getTable(TableName.valueOf("t_orders")) //定义一个ArrayList,并且指定长度,用于批量写入put val puts = new util.ArrayList[Put](100) //可以获取当前Task的PartitionID,然后到offsetRanges数组中取对应下标的偏移量,就是对应分区的偏移量 val offsetRange: OffsetRange = offsetRanges(TaskContext.get.partitionId()) //在Executor端获取到偏移量 //遍历分区中的每一条数据 iter.foreach(order => { //取出想要的数据 val oid = order.oid val mOney= order.money //封装到put中 val put = new Put(Bytes.toBytes(oid)) //data列族,offset列族 put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("money"), Bytes.toBytes(money)) //put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("province"), Bytes.toBytes(province)) //分区中的最后一条,将偏移量区出来,存到hbase的offset列族 if(!iter.hasNext) { put.addColumn(Bytes.toBytes("offset"), Bytes.toBytes("appid_groupid"), Bytes.toBytes(appName + "_" + groupId)) put.addColumn(Bytes.toBytes("offset"), Bytes.toBytes("topic_partition"), Bytes.toBytes(offsetRange.topic + "_" + offsetRange.partition)) put.addColumn(Bytes.toBytes("offset"), Bytes.toBytes("offset"), Bytes.toBytes(offsetRange.untilOffset)) } //将封装到的put添加到puts集合中 puts.add(put) //满足一定大小,批量写入 if(puts.size() == 100) { htable.put(puts) puts.clear() //清空puts集合 } }) //将不满足批量写入条数的数据在写入 htable.put(puts) //关闭连接 htable.close() connection.close() } }) } }) ssc.start() ssc.awaitTermination() } }

推荐阅读
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • 本文介绍了九度OnlineJudge中的1002题目“Grading”的解决方法。该题目要求设计一个公平的评分过程,将每个考题分配给3个独立的专家,如果他们的评分不一致,则需要请一位裁判做出最终决定。文章详细描述了评分规则,并给出了解决该问题的程序。 ... [详细]
  • 本文详细介绍了Java中vector的使用方法和相关知识,包括vector类的功能、构造方法和使用注意事项。通过使用vector类,可以方便地实现动态数组的功能,并且可以随意插入不同类型的对象,进行查找、插入和删除操作。这篇文章对于需要频繁进行查找、插入和删除操作的情况下,使用vector类是一个很好的选择。 ... [详细]
  • Java学习笔记之面向对象编程(OOP)
    本文介绍了Java学习笔记中的面向对象编程(OOP)内容,包括OOP的三大特性(封装、继承、多态)和五大原则(单一职责原则、开放封闭原则、里式替换原则、依赖倒置原则)。通过学习OOP,可以提高代码复用性、拓展性和安全性。 ... [详细]
  • 开发笔记:实验7的文件读写操作
    本文介绍了使用C++的ofstream和ifstream类进行文件读写操作的方法,包括创建文件、写入文件和读取文件的过程。同时还介绍了如何判断文件是否成功打开和关闭文件的方法。通过本文的学习,读者可以了解如何在C++中进行文件读写操作。 ... [详细]
  • Java中包装类的设计原因以及操作方法
    本文主要介绍了Java中设计包装类的原因以及操作方法。在Java中,除了对象类型,还有八大基本类型,为了将基本类型转换成对象,Java引入了包装类。文章通过介绍包装类的定义和实现,解答了为什么需要包装类的问题,并提供了简单易用的操作方法。通过本文的学习,读者可以更好地理解和应用Java中的包装类。 ... [详细]
  • 本文介绍了使用Spark实现低配版高斯朴素贝叶斯模型的原因和原理。随着数据量的增大,单机上运行高斯朴素贝叶斯模型会变得很慢,因此考虑使用Spark来加速运行。然而,Spark的MLlib并没有实现高斯朴素贝叶斯模型,因此需要自己动手实现。文章还介绍了朴素贝叶斯的原理和公式,并对具有多个特征和类别的模型进行了讨论。最后,作者总结了实现低配版高斯朴素贝叶斯模型的步骤。 ... [详细]
  • 本文介绍了使用kotlin实现动画效果的方法,包括上下移动、放大缩小、旋转等功能。通过代码示例演示了如何使用ObjectAnimator和AnimatorSet来实现动画效果,并提供了实现抖动效果的代码。同时还介绍了如何使用translationY和translationX来实现上下和左右移动的效果。最后还提供了一个anim_small.xml文件的代码示例,可以用来实现放大缩小的效果。 ... [详细]
  • Java序列化对象传给PHP的方法及原理解析
    本文介绍了Java序列化对象传给PHP的方法及原理,包括Java对象传递的方式、序列化的方式、PHP中的序列化用法介绍、Java是否能反序列化PHP的数据、Java序列化的原理以及解决Java序列化中的问题。同时还解释了序列化的概念和作用,以及代码执行序列化所需要的权限。最后指出,序列化会将对象实例的所有字段都进行序列化,使得数据能够被表示为实例的序列化数据,但只有能够解释该格式的代码才能够确定数据的内容。 ... [详细]
  • Python语法上的区别及注意事项
    本文介绍了Python2x和Python3x在语法上的区别,包括print语句的变化、除法运算结果的不同、raw_input函数的替代、class写法的变化等。同时还介绍了Python脚本的解释程序的指定方法,以及在不同版本的Python中如何执行脚本。对于想要学习Python的人来说,本文提供了一些注意事项和技巧。 ... [详细]
  • IOS开发之短信发送与拨打电话的方法详解
    本文详细介绍了在IOS开发中实现短信发送和拨打电话的两种方式,一种是使用系统底层发送,虽然无法自定义短信内容和返回原应用,但是简单方便;另一种是使用第三方框架发送,需要导入MessageUI头文件,并遵守MFMessageComposeViewControllerDelegate协议,可以实现自定义短信内容和返回原应用的功能。 ... [详细]
  • Java SE从入门到放弃(三)的逻辑运算符详解
    本文详细介绍了Java SE中的逻辑运算符,包括逻辑运算符的操作和运算结果,以及与运算符的不同之处。通过代码演示,展示了逻辑运算符的使用方法和注意事项。文章以Java SE从入门到放弃(三)为背景,对逻辑运算符进行了深入的解析。 ... [详细]
  • Netty源代码分析服务器端启动ServerBootstrap初始化
    本文主要分析了Netty源代码中服务器端启动的过程,包括ServerBootstrap的初始化和相关参数的设置。通过分析NioEventLoopGroup、NioServerSocketChannel、ChannelOption.SO_BACKLOG等关键组件和选项的作用,深入理解Netty服务器端的启动过程。同时,还介绍了LoggingHandler的作用和使用方法,帮助读者更好地理解Netty源代码。 ... [详细]
  • 本文介绍了在Cpp中将字符串形式的数值转换为int或float等数值类型的方法,主要使用了strtol、strtod和strtoul函数。这些函数可以将以null结尾的字符串转换为long int、double或unsigned long类型的数值,且支持任意进制的字符串转换。相比之下,atoi函数只能转换十进制数值且没有错误返回。 ... [详细]
  • 本文介绍了解决java开源项目apache commons email简单使用报错的方法,包括使用正确的JAR包和正确的代码配置,以及相关参数的设置。详细介绍了如何使用apache commons email发送邮件。 ... [详细]
author-avatar
KellylikePchy_224
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有