热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

20210111Spark35(SparkStreaming4)

聚类运算Redis版非聚类运算hbase版Phoenix版求最大值Redis单机版是支持事务的,但是集

聚类运算Redis版
非聚类运算 hbase版 Phoenix版求最大值

Redis单机版是支持事务的,但是集群版是不支持事务的

回顾Redis的事务操作

object JedisTransactionTest { def main(args: Array[String]): Unit = { //数据库连接 val jedis = new Jedis("linux03", 6379) jedis.auth("123456") //开启jedis的pipeline(整体来执行) var pipeline: Pipeline = null try { pipeline = jedis.pipelined() //开启多个操作 pipeline.multi() pipeline.hincrBy("wc", "aaa", 18) //Redis Hincrby 命令用于为哈希表中的字段值加上指定增量值。 //val i = 1 / 0 pipeline.hset("dot", "bbb", "19") //将哈希表 key 中的域 field 的值设为 value 。 //如果 key 不存在,一个新的哈希表被创建并进行HSET 操作。 //如果域 field 已经存在于哈希表中,旧值将被覆盖 //提交事务 pipeline.sync() pipeline.exec() } catch { case e: Exception => { e.printStackTrace() pipeline.discard() //回滚事务 } } finally { pipeline.close() jedis.close() } } }

Ridis版聚类运算

//连接池 object JedisConnectionPool { val cOnfig= new JedisPoolConfig() config.setMaxTotal(5) config.setMaxIdle(2) config.setTestOnBorrow(true) val pool = new JedisPool(config, "linux03", 6379, 10000, "123456") def getConnection(): Jedis = { pool.getResource } }

//获取历史偏移量 object OffsetUtils { def queryHistoryOffsetFromRedis(appName: String, groupId: String): Map[TopicPartition, Long] = { val historyOffset = new mutable.HashMap[TopicPartition, Long]() val jedis = JedisConnectionPool.getConnection() val res: util.Map[String, String] = jedis.hgetAll(appName + "_" + groupId) import scala.collection.JavaConverters._ for (t

//保证ExactlyOnce,精确一次性语义(要求数据处理且被处理一次) object KafkaWordCountToRedis { def main(args: Array[String]): Unit = { val appName = this.getClass.getSimpleName val groupId = "g008" val cOnf= new SparkConf().setAppName(appName).setMaster("local[*]") val ssc = new StreamingContext(conf, Seconds(5)) ssc.sparkContext.setLogLevel("WARN") val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "linux03:9092,linux04:9092,linux05:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> groupId, "auto.offset.reset" -> "earliest", "enable.auto.commit" -> (false: java.lang.Boolean) //不自动提交偏移量 ) val topics = Array("wordcount18") val historyOffset: Map[TopicPartition, Long] = OffsetUtils.queryHistoryOffsetFromRedis(appName, groupId) //直连:sparkstreaming用来读取数据的task(kafka的消费者),直接连接到kafka指定topic的leader分区中 val lines: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent, //位置策略 ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, historyOffset) //消费策略 ) //调用完createDirectStream,返回的第一个DStream进行操作,因为只有第一手的DStream有偏移量 lines.foreachRDD(rdd => { //在指定的时间周期内,kafka中有新的数据写入 if (!rdd.isEmpty()) { //获取偏移量 val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges val lines: RDD[String] = rdd.map(_.value()) val reduced: RDD[(String, Int)] = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) //将计算好的结果收集到Driver端,然后写入到Redis val res: Array[(String, Int)] = reduced.collect() val jedis = JedisConnectionPool.getConnection() var pipeline: Pipeline = null try { pipeline = jedis.pipelined() pipeline.multi() //目的:是为了实现ExactlyOnce,将计算好的结果和偏移量在同一个事物中同时写入到Redis res.foreach(t => { pipeline.hincrBy("dot", t._1, t._2) }) offsetRanges.foreach(o => { val topic = o.topic val partition = o.partition val untilOffset = o.untilOffset pipeline.hset(appName + "_" + groupId, topic + "_" + partition, untilOffset.toString) }) pipeline.exec() pipeline.sync() } catch { case e: Exception => { e.printStackTrace() pipeline.discard() } } finally { pipeline.close() jedis.close() } } }) ssc.start() ssc.awaitTermination() } }

非聚类运算

hbase 只支持行级事务

2021-01-11-Spark-35(SparkStreaming 4)
image.png

数据在kafka中已经有唯一的id,可作为hbase中的rowkey
每一行的最后一个数据记录偏移量,即可无需读取,查找以一个组内的最大的偏移量即可
但是hbase是不支持分组查询的
解决方案:
1.自定义协处理器
2.使用Phoenix来SQL查询
如果消费者多读了,但是hbase的id是唯一的,所以会把先前的数据给覆盖掉

//每一次启动该程序,都要从Hbase查询历史偏移量 def queryHistoryOffsetFromHbase(appid: String, groupid: String): Map[TopicPartition, Long] = { val offsets = new mutable.HashMap[TopicPartition, Long]() val cOnnection= DriverManager.getConnection("jdbc:phoenix:linux03,linux04,linux05:2181") //分组求max,就是求没有分分区最(大)新的偏移量 val ps = connection.prepareStatement("select "topic_partition", max("offset") from "t_orders" where "appid_groupid" = ? group by "topic_partition"") ps.setString(1, appid + "_" + groupid) //查询返回结果 val rs: ResultSet = ps.executeQuery() while(rs.next()) { val topicAndPartition = rs.getString(1) val fields = topicAndPartition.split("_") val topic = fields(0) val partition = fields(1).toInt val offset = rs.getLong(2) offsets.put(new TopicPartition(topic, partition), offset) } offsets.toMap }

/** * https://www.jianshu.com/p/f1340eaa3e06 * * spark.task.maxFailures * yarn.resourcemanager.am.max-attempts * spark.speculation * * create view "t_orders" (pk VARCHAR PRIMARY KEY, "offset"."appid_groupid" VARCHAR, "offset"."topic_partition" VARCHAR, "offset"."offset" UNSIGNED_LONG); * select max("offset") from "t_orders" where "appid_groupid" = 'g1' group by "topic_partition"; * */ object KafkaToHbase { def main(args: Array[String]): Unit = { //true a1 g1 ta,tb val Array(isLocal, appName, groupId, allTopics) = args val cOnf= new SparkConf() .setAppName(appName) if (isLocal.toBoolean) { conf.setMaster("local[*]") } val sc = new SparkContext(conf) sc.setLogLevel("WARN") val ssc: StreamingCOntext= new StreamingContext(sc, Milliseconds(5000)) val topics = allTopics.split(",") //SparkSteaming跟kafka整合的参数 val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "linux03:9092,linux04:9092,linux05:9092", "key.deserializer" -> classOf[StringDeserializer].getName, "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", "group.id" -> groupId, "auto.offset.reset" -> "earliest", //如果没有记录偏移量,第一次从最开始读,有偏移量,接着偏移量读 "enable.auto.commit" -> (false: java.lang.Boolean) //消费者不自动提交偏移量 ) //查询历史偏移量【上一次成功写入到数据库的偏移量】 val historyOffsets: Map[TopicPartition, Long] = OffsetUtils.queryHistoryOffsetFromHbase(appName, groupId) //跟Kafka进行整合,需要引入跟Kafka整合的依赖 //createDirectStream更加高效,使用的是Kafka底层的消费API,消费者直接连接到Kafka的Leader分区进行消费 //直连方式,RDD的分区数量和Kafka的分区数量是一一对应的【数目一样】 val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, //调度task到Kafka所在的节点 ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, historyOffsets) //指定订阅Topic的规则, 从历史偏移量接着读取数据 ) // var offsetRanges: Array[OffsetRange] = null // // val ds2 = kafkaDStream.map(rdd => { // //获取KakfaRDD的偏移量(Driver) // offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges // rdd.value() // }) kafkaDStream.foreachRDD(rdd => { if (!rdd.isEmpty()) { //获取KakfaRDD的偏移量(Driver) val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //获取KafkaRDD中的数据 val lines: RDD[String] = rdd.map(_.value()) val orderRDD: RDD[Order] = lines.map(line => { var order: Order = null try { order = JSON.parseObject(line, classOf[Order]) } catch { case e: JSOnException=> { //TODO } } order }) //过滤问题数据 val filtered: RDD[Order] = orderRDD.filter(_ != null) //调用Action filtered.foreachPartition(iter => { if(iter.nonEmpty) { //将RDD中每一个分区中的数据保存到Hbase中 //创建一个Hbase的连接 val connection: COnnection= HBaseUtil.getConnection("linux03,linux04,linux05", 2181) val htable = connection.getTable(TableName.valueOf("t_orders")) //定义一个ArrayList,并且指定长度,用于批量写入put val puts = new util.ArrayList[Put](100) //可以获取当前Task的PartitionID,然后到offsetRanges数组中取对应下标的偏移量,就是对应分区的偏移量 val offsetRange: OffsetRange = offsetRanges(TaskContext.get.partitionId()) //在Executor端获取到偏移量 //遍历分区中的每一条数据 iter.foreach(order => { //取出想要的数据 val oid = order.oid val mOney= order.money //封装到put中 val put = new Put(Bytes.toBytes(oid)) //data列族,offset列族 put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("money"), Bytes.toBytes(money)) //put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("province"), Bytes.toBytes(province)) //分区中的最后一条,将偏移量区出来,存到hbase的offset列族 if(!iter.hasNext) { put.addColumn(Bytes.toBytes("offset"), Bytes.toBytes("appid_groupid"), Bytes.toBytes(appName + "_" + groupId)) put.addColumn(Bytes.toBytes("offset"), Bytes.toBytes("topic_partition"), Bytes.toBytes(offsetRange.topic + "_" + offsetRange.partition)) put.addColumn(Bytes.toBytes("offset"), Bytes.toBytes("offset"), Bytes.toBytes(offsetRange.untilOffset)) } //将封装到的put添加到puts集合中 puts.add(put) //满足一定大小,批量写入 if(puts.size() == 100) { htable.put(puts) puts.clear() //清空puts集合 } }) //将不满足批量写入条数的数据在写入 htable.put(puts) //关闭连接 htable.close() connection.close() } }) } }) ssc.start() ssc.awaitTermination() } }

推荐阅读
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • 本文介绍了一种解析GRE报文长度的方法,通过分析GRE报文头中的标志位来计算报文长度。具体实现步骤包括获取GRE报文头指针、提取标志位、计算报文长度等。该方法可以帮助用户准确地获取GRE报文的长度信息。 ... [详细]
  • 本文介绍了iOS数据库Sqlite的SQL语句分类和常见约束关键字。SQL语句分为DDL、DML和DQL三种类型,其中DDL语句用于定义、删除和修改数据表,关键字包括create、drop和alter。常见约束关键字包括if not exists、if exists、primary key、autoincrement、not null和default。此外,还介绍了常见的数据库数据类型,包括integer、text和real。 ... [详细]
  • JDK源码学习之HashTable(附带面试题)的学习笔记
    本文介绍了JDK源码学习之HashTable(附带面试题)的学习笔记,包括HashTable的定义、数据类型、与HashMap的关系和区别。文章提供了干货,并附带了其他相关主题的学习笔记。 ... [详细]
  • 本文介绍了GregorianCalendar类的基本信息,包括它是Calendar的子类,提供了世界上大多数国家使用的标准日历系统。默认情况下,它对应格里高利日历创立时的日期,但可以通过调用setGregorianChange()方法来更改起始日期。同时,文中还提到了GregorianCalendar类为每个日历字段使用的默认值。 ... [详细]
  • 本文介绍了使用Spark实现低配版高斯朴素贝叶斯模型的原因和原理。随着数据量的增大,单机上运行高斯朴素贝叶斯模型会变得很慢,因此考虑使用Spark来加速运行。然而,Spark的MLlib并没有实现高斯朴素贝叶斯模型,因此需要自己动手实现。文章还介绍了朴素贝叶斯的原理和公式,并对具有多个特征和类别的模型进行了讨论。最后,作者总结了实现低配版高斯朴素贝叶斯模型的步骤。 ... [详细]
  • 阿,里,云,物,联网,net,core,客户端,czgl,aliiotclient, ... [详细]
  • 本文介绍了OC学习笔记中的@property和@synthesize,包括属性的定义和合成的使用方法。通过示例代码详细讲解了@property和@synthesize的作用和用法。 ... [详细]
  • 使用Ubuntu中的Python获取浏览器历史记录原文: ... [详细]
  • Redis底层数据结构之压缩列表的介绍及实现原理
    本文介绍了Redis底层数据结构之压缩列表的概念、实现原理以及使用场景。压缩列表是Redis为了节约内存而开发的一种顺序数据结构,由特殊编码的连续内存块组成。文章详细解释了压缩列表的构成和各个属性的含义,以及如何通过指针来计算表尾节点的地址。压缩列表适用于列表键和哈希键中只包含少量小整数值和短字符串的情况。通过使用压缩列表,可以有效减少内存占用,提升Redis的性能。 ... [详细]
  • Java中包装类的设计原因以及操作方法
    本文主要介绍了Java中设计包装类的原因以及操作方法。在Java中,除了对象类型,还有八大基本类型,为了将基本类型转换成对象,Java引入了包装类。文章通过介绍包装类的定义和实现,解答了为什么需要包装类的问题,并提供了简单易用的操作方法。通过本文的学习,读者可以更好地理解和应用Java中的包装类。 ... [详细]
  • React项目中运用React技巧解决实际问题的总结
    本文总结了在React项目中如何运用React技巧解决一些实际问题,包括取消请求和页面卸载的关联,利用useEffect和AbortController等技术实现请求的取消。文章中的代码是简化后的例子,但思想是相通的。 ... [详细]
  • Week04面向对象设计与继承学习总结及作业要求
    本文总结了Week04面向对象设计与继承的重要知识点,包括对象、类、封装性、静态属性、静态方法、重载、继承和多态等。同时,还介绍了私有构造函数在类外部无法被调用、static不能访问非静态属性以及该类实例可以共享类里的static属性等内容。此外,还提到了作业要求,包括讲述一个在网上商城购物或在班级博客进行学习的故事,并使用Markdown的加粗标记和语句块标记标注关键名词和动词。最后,还提到了参考资料中关于UML类图如何绘制的范例。 ... [详细]
  • Netty源代码分析服务器端启动ServerBootstrap初始化
    本文主要分析了Netty源代码中服务器端启动的过程,包括ServerBootstrap的初始化和相关参数的设置。通过分析NioEventLoopGroup、NioServerSocketChannel、ChannelOption.SO_BACKLOG等关键组件和选项的作用,深入理解Netty服务器端的启动过程。同时,还介绍了LoggingHandler的作用和使用方法,帮助读者更好地理解Netty源代码。 ... [详细]
  • 使用C++编写程序实现增加或删除桌面的右键列表项
    本文介绍了使用C++编写程序实现增加或删除桌面的右键列表项的方法。首先通过操作注册表来实现增加或删除右键列表项的目的,然后使用管理注册表的函数来编写程序。文章详细介绍了使用的五种函数:RegCreateKey、RegSetValueEx、RegOpenKeyEx、RegDeleteKey和RegCloseKey,并给出了增加一项的函数写法。通过本文的方法,可以方便地自定义桌面的右键列表项。 ... [详细]
author-avatar
KellylikePchy_224
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有