热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

日志服务(SLS)集成Spark流计算实战

日志,服务,sls,集成,spa

前言

日志服务作为一站式的日志的采集与分析平台,提供了各种用户场景的日志采集能力,通过日志服务提供的各种与·与SDK,采集客户端(Logtail),Producer,用户可以非常容易的把各种数据源中的数据采集到日志服务的Logstore中。同时为了便于用户对日志进行处理,提供了各种支持流式消费的SDK,如各种语言的消费组,与 Spark,Flink,Storm 等各种流计算技术无缝对接的Connector,以便于用户根据自己的业务场景非常便捷的处理海量日志。

从最早的Spark Streaming到最新的Stuctured Streaming,Spark 一直是最流行的流计算框架之一。使用日志服务的Spark SDK,可以非常方便的在Spark 中消费日志服务中的数据,同时也支持将 Spark 的计算结果写入日志服务。

日志服务基础概念

日志服务的存储层是一个类似Kafka的Append only的FIFO消息队列,包含如下基本概念:

  • 日志(Log):由时间、及一组不定个数的Key-Value对组成。
  • 日志组(LogGroup):一组日志的集合,包含相同Meta信息如Topic,Source,Tags等。是读写的基本单位。

image

图-1 Log与LogGroup的关系

  • Shard:分区,LogGroup读写基本单元,对应于Kafka的partition。
  • Logstore:日志库,用以存放同一类日志数据。Logstore会包含1个或多个Shard。
  • Project:Logstore存放容器,包含一个或者多个Logstore。

准备工作

1)添加Maven依赖:

 com.aliyun.emr emr-logservice_2.11 1.9.0 

Github源码下载。
2)计划消费的日志服务project,logstore以及对应的endpoint。
3)用于访问日志服务Open API的Access Key。

对 Spark Streaming 的支持

Spark Streaming是Spark最早推出的流计算技术,现在已经进入维护状态,不再会增加新的功能。但是考虑到Spark Streaming 的使用仍然非常广泛,我们先从Spark Streaming开始介绍。Spark Streaming 提供了一个DStream 的数据模型抽象,本质是把无界数据集拆分成一个一个的RDD,转化为有界数据集的流式计算。每个批次处理的数据就是这段时间内从日志服务消费到的数据。

image

图-2 DStream

Spark Streaming 从日志服务消费支持 Receiver 和 Direct 两种消费方式。

Receiver模式

Receivers的实现内部实现基于日志服务的消费组(Consumer Library)。数据拉取与处理完全分离。消费组自动均匀分配Logstore内的所有shard到所有的Receiver,并且自动提交checkpoint到SLS。这就意味着Logstore内的shard个数与Spark 实际的并发没有对应关系。
对于所有的Receiver,接收到的数据默认会保存在Spark Executors中,所以Failover的时候有可能造成数据丢失,这个时候就需要开启WAL日志,Failover的时候可以从WAL中恢复,防止丢失数据。

SDK将SLS中的每行日志解析为JSON字符串形式,Receiver使用示例如下所示:

object SLSReceiverSample { def main(args: Array[String]): Unit = { val project = "your project" val logstore = "your logstore" val cOnsumerGroup= "consumer group" val endpoint = "your endpoint" val accessKeyId = "access key id" val accessKeySecret = "access key secret" val batchInterval = Milliseconds(5 * 1000) val cOnf= new SparkConf().setAppName("Test SLS Loghub") val ssc = new StreamingContext(conf, batchInterval) val stream = LoghubUtils.createStream( ssc, project, logstore, consumerGroup, endpoint, accessKeyId, accessKeySecret, StorageLevel.MEMORY_AND_DISK, LogHubCursorPosition.END_CURSOR) stream.checkpoint(batchInterval * 2).foreachRDD(rdd => rdd.map(bytes => new String(bytes)).top(10).foreach(println) ) ssc.checkpoint("hdfs:///tmp/spark/streaming") ssc.start() ssc.awaitTermination() } }

除Project,Logstore,Access Key 这些基础配置外,还可以指定StorageLevel,消费开始位置等。

Direct模式

Direct模式不再需要Receiver,也不依赖于消费组,而是使用日志服务的低级API,在每个批次内直接从服务端拉取数据处理。对于Logstore中的每个Shard来说,每个批次都会读取指定位置范围内的数据。为了保证一致性,只有在每个批次确认正常结束之后才能把每个Shard的消费结束位置(checkpoint)保存到服务端。

为了实现Direct模式,SDK依赖一个本地的ZooKeeper,每个shard的checkpoint会临时保存到本地的ZooKeeper,等用户手动提交checkpoint时,再从ZooKeeper中同步到服务端。Failover时也是先从本地ZooKeeper中尝试读上一次的checkpoint,如果没有读到再从服务端获取。

object SLSDirectSample { def main(args: Array[String]): Unit = { val project = "your project" val logstore = "your logstore" val cOnsumerGroup= "consumerGroup" val endpoint = "endpoint" val accessKeyId = "access key id" val accessKeySecret = "access key secret" val batchInterval = Milliseconds(5 * 1000) val zkAddress = "localhost:2181" val cOnf= new SparkConf().setAppName("Test Direct SLS Loghub") val ssc = new StreamingContext(conf, batchInterval) val zkParas = Map("zookeeper.connect" -> zkAddress) val loghubStream = LoghubUtils.createDirectStream( ssc, project, logstore, consumerGroup, accessKeyId, accessKeySecret, endpoint, zkParas, LogHubCursorPosition.END_CURSOR) loghubStream.checkpoint(batchInterval).foreachRDD(rdd => { println(s"count by key: ${rdd.map(s => { s.sorted (s.length, s) }).countByKey().size}") // 手动更新checkpoint loghubStream.asInstanceOf[CanCommitOffsets].commitAsync() }) ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory ssc.start() ssc.awaitTermination() } }

Direct模式示例

如何限速

在Receiver中,如果需要限制消费速度,我们只需要调整 Consumer Library 本身的参数即可。而Direct方式是在每个批次开始时从SLS拉取数据,这就涉及到一个问题:一个批次内拉取多少数据才合适。如果太多,一个批次内处理不完,造成处理延时。如果太少会导worker空闲,工作不饱和,消费延时。这个时候我们就需要合理配置拉取的速度和行数,实现一个批次尽可能多处理又能及时完成的目标。理想状态下Spark 消费的整体速率应该与SLS采集速率一致,才能实现真正的实时处理。

由于SLS的数据模型是以LogGroup作为读写的基本单位,而一个LogGroup中可能包含上万行日志,这就意味着Spark中直接限制每个批次的行数难以实现。因此,Direct限流涉及到两个配置参数:

参数 说明 默认值
spark.streaming.loghub.maxRatePerShard 每个批次每个Shard读取行数,决定了限流的下限 10000
spark.loghub.batchGet.step 每次请求读取LogGroup个数,决定了限流的粒度 100

可以通过适当缩小spark.loghub.batchGet.step来控制限流的精度,但是即便如此,在某些情况下还是会存在较大误差,如一个LogGroup中存在10000行日志,spark.streaming.loghub.maxRatePerShard设置为100,spark.loghub.batchGet.step设置为1,那一个批次内该shard还是会拉取10000行日志。

两种模式的对比

和Receiver相比,Direct有如下的优势:

  1. 降低资源消耗,不需要占用Executor资源来作为Receiver的角色。
  2. 鲁棒性更好,在计算的时候才会从服务端真正消费数据,降低内存使用,不再需要WAL,Failover 直接在读一次就行了,更容易实现exactly once语义。
  3. 简化并行。Spark partition 与 Logstore 的 shard 个数对应,增加shard个数就能提高Spark任务处理并发上限。

但是也存在一些缺点:

  1. 在SLS场景下,需要依赖本地的 ZooKeeper 来保存临时 checkpoint,当调用 commitAsync 时从 ZooKeeper同步到日志服务服务端。所以当需要重置 checkpoint 时,也需要先删除本地 ZooKeeper 中的 checkpoint 才能生效。
  2. 上一个批次保存 checkpoint 之前,下一个批次无法真正开始,否则 ZooKeeper 中的 checkpoint 可能会被更新成一个中间状态。目前SDK在每个批次会检查是否上一个批次的 checkpoint 还没有提交,如果没有提交则生成一个空批次,而不是继续从服务端消费。
  3. 在 SLS 场景下,限流方式不够精确。

Spark Streaming结果写入SLS

与消费SLS相反,Spark Streaming的处理结果也可以直接写入SLS。使用示例:

... val lines = loghubStream.map(x => x) // 转换函数把结果中每条记录转为一行日志 def transformFunc(x: String): LogItem = { val r = new LogItem() r.PushBack("key", x) r } val callback = new Callback with Serializable { override def onCompletion(result: Result): Unit = { println(s"Send result ${result.isSuccessful}") } } // SLS producer config val producerCOnfig= Map( "sls.project" -> loghubProject, "sls.logstore" -> targetLogstore, "access.key.id" -> accessKeyId, "access.key.secret" -> accessKeySecret, "sls.endpoint" -> endpoint, "sls.ioThreadCount" -> "2" ) lines.writeToLoghub( producerConfig, "topic", "streaming", transformFunc, Option.apply(callback)) ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory ssc.start() ssc.awaitTermination()

对Structured Streaming的支持

Structured  Streaming 并不是最近才出现的技术,而是早在16年就已经出现,但是直到 Spark 2.2.0 才正式推出。其数据模型是基于无界表的概念,流数据相当于往一个表上不断追加行。

image

图-3 无界表模型

与Spark Streaming相比,Structured Streaming主要有如下特点:

  1. 底层实现基于Spark SQL引擎,可以使用大多数Spark SQL的函数。和Spark SQL共用大部分API,如果对Spark SQL熟悉的用户,非常容易上手。复用Spark SQL的执行引用,性能更佳。
  2. 支持 Process time 和 Event time,而Spark Streaming只支持 Process Time。
  3. 批流同一的API。Structured Streaming 复用Spark SQL的 DataSet/DataFrame模型,和 RDD/DStream相比更High level,易用性更好。
  4. 实时性更好,默认基于micro-batch模式。在 Spark 2.3 中,还增加了连续处理模型,号称可以做到毫秒级延迟。
  5. API 对用户更友好,只保留了SparkSession一个入口,不需要创建各种Context对象,使用起来更简单。

SDK使用示例

import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.{StringType, StructField, StructType} object StructuredStreamingDemo { def main(args: Array[String]) { val spark = SparkSession .builder .appName("StructuredLoghubWordCount") .master("local") .getOrCreate() import spark.implicits._ val schema = new StructType( Array(StructField("content", StringType))) val lines = spark .readStream .format("loghub") .schema(schema) .option("sls.project", "your project") .option("sls.store", "your logstore") .option("access.key.id", "your access key id") .option("access.key.secret", "your access key secret") .option("endpoint", "your endpoint") .option("startingoffsets", "latest") .load() .select("content") .as[String] val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count() val query = wordCounts.writeStream .outputMode("complete") .format("loghub") .option("sls.project", "sink project") .option("sls.store", "sink logstore") .option("access.key.id", "your access key id") .option("access.key.secret", "your access key secret") .option("endpoint", "your endpoint") .option("checkpointLocation", "your checkpoint dir") .start() query.awaitTermination() } }

代码解释:
1)schema 声明了我们需要的字段,除了日志中的字段外,还有如下的内部字段:

__logProject__ __logStore__ __shard__ __time__ __topic__ __source__ __sequence_number__ // 每行日志唯一id

如果没有指定schema,SDK默认提供一个__value__字段,其内容为由所有字段组成的一个JSON字符串。

2)lines 定义了一个流。
startingoffsets:开始位置,支持:

  • latest :日志服务最新写入位置。强烈建议从latest开始,从其他位置开始意味着需要先处理历史数据,可能需要等待较长时间才能结束。
  • earliest:日志服务中最早的日志对应的位置。
  • 或者为每个shard指定一个开始时间,以JSON形式指定。

maxOffsetsPerTrigger:批次读取行数,SDK中默认是64*1024 。

3)结果写入到日志服务
format 指定为Loghub即可。

不足之处

  1. 不支持手动提交checkpoint,SDK内部自动保存checkpoint到checkpointLocation中。
  2. 不再需要提供consumerGroup名称,也就是说checkpoint没有保存到SLS服务端,无法在日志服务里面监控消费延迟,只能通过Spark 任务日志观察消费进度。

参考资料

官方文档:https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
SLS SDK例子:https://github.com/aliyun/aliyun-emapreduce-sdk/tree/master-2.x/examples/src/main/scala/com/aliyun/emr/examples/sql/streaming
日志服务实时消费:https://help.aliyun.com/document_detail/28998.html

欢迎扫群加入日志服务技术交流钉钉群
image


推荐阅读
  • 云原生边缘计算之KubeEdge简介及功能特点
    本文介绍了云原生边缘计算中的KubeEdge系统,该系统是一个开源系统,用于将容器化应用程序编排功能扩展到Edge的主机。它基于Kubernetes构建,并为网络应用程序提供基础架构支持。同时,KubeEdge具有离线模式、基于Kubernetes的节点、群集、应用程序和设备管理、资源优化等特点。此外,KubeEdge还支持跨平台工作,在私有、公共和混合云中都可以运行。同时,KubeEdge还提供数据管理和数据分析管道引擎的支持。最后,本文还介绍了KubeEdge系统生成证书的方法。 ... [详细]
  • 电话号码的字母组合解题思路和代码示例
    本文介绍了力扣题目《电话号码的字母组合》的解题思路和代码示例。通过使用哈希表和递归求解的方法,可以将给定的电话号码转换为对应的字母组合。详细的解题思路和代码示例可以帮助读者更好地理解和实现该题目。 ... [详细]
  • 本文介绍了设计师伊振华受邀参与沈阳市智慧城市运行管理中心项目的整体设计,并以数字赋能和创新驱动高质量发展的理念,建设了集成、智慧、高效的一体化城市综合管理平台,促进了城市的数字化转型。该中心被称为当代城市的智能心脏,为沈阳市的智慧城市建设做出了重要贡献。 ... [详细]
  • SpringBoot uri统一权限管理的实现方法及步骤详解
    本文详细介绍了SpringBoot中实现uri统一权限管理的方法,包括表结构定义、自动统计URI并自动删除脏数据、程序启动加载等步骤。通过该方法可以提高系统的安全性,实现对系统任意接口的权限拦截验证。 ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • 使用在线工具jsonschema2pojo根据json生成java对象
    本文介绍了使用在线工具jsonschema2pojo根据json生成java对象的方法。通过该工具,用户只需将json字符串复制到输入框中,即可自动将其转换成java对象。该工具还能解析列表式的json数据,并将嵌套在内层的对象也解析出来。本文以请求github的api为例,展示了使用该工具的步骤和效果。 ... [详细]
  • sklearn数据集库中的常用数据集类型介绍
    本文介绍了sklearn数据集库中常用的数据集类型,包括玩具数据集和样本生成器。其中详细介绍了波士顿房价数据集,包含了波士顿506处房屋的13种不同特征以及房屋价格,适用于回归任务。 ... [详细]
  • XML介绍与使用的概述及标签规则
    本文介绍了XML的基本概念和用途,包括XML的可扩展性和标签的自定义特性。同时还详细解释了XML标签的规则,包括标签的尖括号和合法标识符的组成,标签必须成对出现的原则以及特殊标签的使用方法。通过本文的阅读,读者可以对XML的基本知识有一个全面的了解。 ... [详细]
  • 本文介绍了Web学习历程记录中关于Tomcat的基本概念和配置。首先解释了Web静态Web资源和动态Web资源的概念,以及C/S架构和B/S架构的区别。然后介绍了常见的Web服务器,包括Weblogic、WebSphere和Tomcat。接着详细讲解了Tomcat的虚拟主机、web应用和虚拟路径映射的概念和配置过程。最后简要介绍了http协议的作用。本文内容详实,适合初学者了解Tomcat的基础知识。 ... [详细]
  • 本文详细介绍了Spring的JdbcTemplate的使用方法,包括执行存储过程、存储函数的call()方法,执行任何SQL语句的execute()方法,单个更新和批量更新的update()和batchUpdate()方法,以及单查和列表查询的query()和queryForXXX()方法。提供了经过测试的API供使用。 ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • CF:3D City Model(小思维)问题解析和代码实现
    本文通过解析CF:3D City Model问题,介绍了问题的背景和要求,并给出了相应的代码实现。该问题涉及到在一个矩形的网格上建造城市的情景,每个网格单元可以作为建筑的基础,建筑由多个立方体叠加而成。文章详细讲解了问题的解决思路,并给出了相应的代码实现供读者参考。 ... [详细]
  • 标题: ... [详细]
  • 本文讨论了如何使用IF函数从基于有限输入列表的有限输出列表中获取输出,并提出了是否有更快/更有效的执行代码的方法。作者希望了解是否有办法缩短代码,并从自我开发的角度来看是否有更好的方法。提供的代码可以按原样工作,但作者想知道是否有更好的方法来执行这样的任务。 ... [详细]
author-avatar
mobiledu2502861465
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有