热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

一个作业,多个TTL——FlinkSQL细粒度TTL配置的实现(一)

(转自我的微信公众号 KAMI说 )Flink是当前最流行的分布式计算框架,其提供的TableAPI和SQL特性,使得开发者可以通过成熟,直观、简洁、表达力强的标准SQL描述计算逻

(转自我的微信公众号 KAMI说 )

Flink 是当前最流行的分布式计算框架,其提供的 Table API 和 SQL 特性,使得开发者可以通过成熟,直观、简洁、表达力强的标准 SQL 描述计算逻辑,大大减少其学习、开发和维护成本。

 

Flink SQL 支持面向无边界输入流的流处理。然而。聚合统计、窗口统计等计算是有状态的。在流处理中,若这些状态数据随时间不断堆积、不断膨胀,会导致因为OOM频繁发生导致的作业崩溃、重启。

 

从 Flink 1.6 版本开始,社区引入了状态 TTL(Time-To-Live) 特性。在通过Flink SQL 实现流处理时,开发者可以为作业 SQL 设置TTL,实现过期状态的自动清理,从而防止作业状态无限膨胀。

 

然而,目前Flink SQL 只支持粗粒度的TTL设置,即一段 SQL 只能设置一个TTL。在一些常见的应用场景中,这不足够。

 

 

下面是一段计算DAU指标的 SQL 代码


SELECT

  t_date

, COUNT(DISTINCT user_id) AS cnt_login

, COUNT(DISTINCT CASE WHEN t_date = t_debut THEN user_id END) AS cnt_new

FROM

  (

    SELECT

       t_date

     , user_id

     , MIN(t_date) OVER (

             PARTITION BY user_id

             ORDER BY proctime

             ROWS BETWEEN 1 PRECEDING AND CURRENT ROW

       ) AS t_debut

    FROM Login

) AS t

GROUP BY t_date


这段SQL的业务意义很直观,就是计算实时每日登陆用户和新增登陆用户。

  • 第一层的窗口统计,计算每个用户有史以来最小的登陆日期,即其新增日期
  • 第二层的聚合统计,按天进行聚合,计算每天的登陆用户数和新增用户数

 

然而,在TTL的设置上,我们面临两难状况:

  • 不设置TTL。那么在第二层按天进行的聚合统计,COUNT DISTINCT计算带来的状态会随着天数近乎线性增长,状态会不断膨胀,带来OOM等一系列问题
  • 设置TTL,例如 n 天未访问的状态自动清理。那么在第一层的窗口统计,n天不活跃的用户的登陆日期状态就可能被清除,导致其后续再次登录时被误判为新增

 

要解决这个矛盾,我们实际上需要 Flink SQL 提供 TTL 的细粒度配置,即为一段SQL设置多个 TTL :

  • 第一层的窗口统计不设置TTL,所有用户的登陆日期状态永久保留
  • 第二层的聚合统计设置 n 天的 TTL,保证其状态不会无限增长

下面给大家介绍,如何实现Flink SQL的细粒度 TTL 配置。

 

 

大家都知道,在 Flink 中,通过 Table API 和 SQL 实现的流处理逻辑,最终会翻译为基于 DataStream API 实现的 DataStream 作业,返回这个作业输出的 DataStream (writeToSink 本质上也是先得到 DataStream 作业,再为其输出 DataStream 加上一个 DataStreamSink) 。

从一段 SQL 到 DataStream 作业,其过程简单描述如下:

  1. 在 TableEnvironment,即“表环境”,将数据源注册为动态表。例如,通过表环境的接口`registerDataStream`, 作为源的DataStream,即数据流, 在表环境注册为动态表
  2. 通过表环境的接口 `sqlQuery`,将 SQL 构造为 Table 对象
  3. 通过toAppendStream/toRetractedStream接口,即翻译接口,将 Table 对象表达的作业逻辑,翻译为 DataStream 作业。

一个作业,多个TTL——Flink SQL 细粒度TTL配置的实现(一)

在调用翻译接口,将 Table 对象翻译为 DataStream 作业时,通过翻译接口传入的 TTL 配置,递归传递到各个计算节点的翻译、构造逻辑里,使得翻译出来的 DataStream 算子的内部状态按照该 TTL 配置及时清理。

如果我们将上述计算DAU的SQL拆分成两段,前者作为一个中间结果,提供给后者调用。

 

SQL1:


    SELECT

       t_date

     , user_id

     , MIN(t_date) OVER (

             PARTITION BY user_id

             ORDER BY proctime

             ROWS BETWEEN 1 PRECEDING AND CURRENT ROW

       ) AS t_debut

    FROM Login


 

SQL2:


  SELECT

      t_date

    , COUNT(DISTINCT user_id) AS cnt_login

    , COUNT(DISTINCT CASE WHEN t_date = t_debut THEN user_id END) AS cnt_new

  FROM t_middle

  GROUP BY t_date


 

从第一段 SQL 构建对应 Table 对象,再调用翻译接口,翻译成 DataStream 作业,其输出数据流为 `s_middle`。其可以使用 Row 作为流数据类型,各个字段的名称和类型可以通过 Table 对象的 Schema获得。显然,这个 DataStream 作业是原来完整DAU计算 DataStream 作业的一部分,其输出为一个中间结果。

然后,将这个中间结果数据流 `s_middle` 在表环境重新注册为动态表 `t_middle` ,各个字段的名称和类型可以通过 Table 对象的 Schema获得。这是第二段 SQL 需要调用的中间结果动态表。

最后,从第二段 SQL 构建对应 Table 对象,再调用翻译接口,加上 n 天的 TTL 配置,翻译成 DataStream 作业。显然,这个 DataStream 作业是原来完整DAU计算 DataStream 作业的另外一部分,其输出为完整的 DAU 计算结果。

显然,第一段 SQL 对应的计算节点,其状态 TTL 为永不过期。第二段 SQL 对应的计算节点,其状态 TTL 为 n 天后过期!TTL的细粒度配置实现!

 

归纳一下,如果要给 Flink SQL 设置细粒度TTL配置,我们只需要:

  1. 将原来一段 SQL 代码,按照不同的TTL,改写为前后依赖的多个子 SQL。
  2. 对于每个子 SQL,若不是最下游的,进行“翻译-重注册”:
    • 加上对应的 TTL 配置,翻译为 DataStream 作业,得到其输出数据流,其中,流数据类型使用 Row,各个字段的名称和类型可以通过 Table 对象的 Schema获得
    • 将中间结果数据流在表环境重新注册,表名为下游子SQL调用的表名,各个字段的名称和类型可以通过 Table 对象的 Schema获得
  3. 最后一个子 SQL,加上对应的 TTL 配置,翻译成 DataStream 作业,其输出数据流即为完整计算的输出。

一个作业,多个TTL——Flink SQL 细粒度TTL配置的实现(一)

需要注意的是,处理时间(Process-Time)和事件时间(Event-Time)字段,对应的数据类型在Flink Table API & SQL 的包 `flink-table` 中是私有的,在外部访问会出错。

所以,在“翻译-重注册”过程中,需要特殊处理时间和事件时间字段:

  1. 通过 Table 对象的 Schema 找出时间特性字段,然后通过 Table.select 方法,剔除时间特性字段,再翻译成 DataStream 作业,得到中间结果数据流。
  2. 为中间结果数据流重新构造时间特性字段,在重注册为动态表时,按照原字段名重新声明。

总结一下,整个细粒度TTL配置的实现过程实施:

  1. 按 TTL 的不同,将 SQL 拆解为多个子 SQL
  2. 对每个子 SQL 进行“翻译-重注册”,包括时间特性字段的处理
  3. 最后一个子 SQL 完成翻译,得到的 DataStream 作业的输出便是完整计算逻辑的输出

 

细心的读者会发现,如果中间的计算过程包含聚合计算,翻译出的 DataStream 作业的输出数据流只能是带撤回标志位的数据流(简称撤回流)`DataStream>`,无法直接重注册到表环境中。上述的方法无法应用于有多层 TTL 配置不一样的聚合操作的 Flink SQL 中。

也就是说,要实现所有场景下的 Flink SQL 的细粒度 TTL 配置,我们必须实现撤回流注册为动态表这一特性。

本系列文的第二篇《Flink SQL 细粒度TTL配置的实现(二)》将给大家介绍具体的实现方法,需要对Flink Table API & SQL 的包 `flink-table` 的源码进行一点修改,尽情期待。

 

扫描下方二维码关注公众号“KAMI说”,获取更多精彩原创内容~

一个作业,多个TTL——Flink SQL 细粒度TTL配置的实现(一)

 


推荐阅读
  • 2022.4.2学习成果
    Flink中的编程模型4.1编程模型在Flink,编程模型的抽象层级主要分为以下4种,越往下抽象度越低,编程越复杂,灵活度越高。这里先不一一介绍,后续会做详细说明。这4层中,一般用 ... [详细]
  • 微信商户扫码支付 java开发 [从零开发]
    这个教程可以用作了解扫码支付的整体运行过程,已经实现了前端扫码,记录订单,回调等一套完整的微信扫码支付。相关链接:微信支 ... [详细]
  • 马蜂窝数据总监分享:从数仓到数据中台,大数据演进技术选型最优解
    大家好,今天分享的议题主要包括几大内容:带大家回顾一下大数据在国内的发展,从传统数仓到当前数据中台的演进过程;我个人认为数 ... [详细]
  • 本文介绍了Redis的基础数据结构string的应用场景,并以面试的形式进行问答讲解,帮助读者更好地理解和应用Redis。同时,描述了一位面试者的心理状态和面试官的行为。 ... [详细]
  • 本文介绍了OpenStack的逻辑概念以及其构成简介,包括了软件开源项目、基础设施资源管理平台、三大核心组件等内容。同时还介绍了Horizon(UI模块)等相关信息。 ... [详细]
  • 本文介绍了Oracle存储过程的基本语法和写法示例,同时还介绍了已命名的系统异常的产生原因。 ... [详细]
  • 微信官方授权及获取OpenId的方法,服务器通过SpringBoot实现
    主要步骤:前端获取到code(wx.login),传入服务器服务器通过参数AppID和AppSecret访问官方接口,获取到OpenId ... [详细]
  • TableAPI报一下异常:FieldtypesofqueryresultandregisteredTableSink
    报错信息如下:Exceptioninthread“main”org.apache.flink.table.api.ValidationException:Fieldtypesofq ... [详细]
  • 目录摘要SQL的现在NoSQL,NotOnlySQL要分布式,也要SQL总结引用摘要毫不夸张的说,关系数据库是企业软件系统的核心,企业形形色色信息行为的背后,都有关系数据库的支撑。 ... [详细]
  • 实践解析可视化开发平台FlinkSever优势
    实践,解析,可,视,化,开发,平台,fli ... [详细]
  • 从Oracle安全移植到国产达梦数据库的DBA实践与攻略
    随着我国对信息安全和自主可控技术的重视,国产数据库在党政机关、军队和大型央企等行业中得到了快速应用。本文介绍了如何降低从Oracle到国产达梦数据库的技术门槛,保障用户现有业务系统投资。具体包括分析待移植系统、确定移植对象、数据迁移、PL/SQL移植、校验移植结果以及应用系统的测试和优化等步骤。同时提供了移植攻略,包括待移植系统分析和准备移植环境的方法。通过本文的实践与攻略,DBA可以更好地完成Oracle安全移植到国产达梦数据库的工作。 ... [详细]
  • 本文介绍了关系型数据库和NoSQL数据库的概念和特点,列举了主流的关系型数据库和NoSQL数据库,同时描述了它们在新闻、电商抢购信息和微博热点信息等场景中的应用。此外,还提供了MySQL配置文件的相关内容。 ... [详细]
  • Sleuth+zipkin链路追踪SpringCloud微服务的解决方案
    在庞大的微服务群中,随着业务扩展,微服务个数增多,系统调用链路复杂化。Sleuth+zipkin是解决SpringCloud微服务定位和追踪的方案。通过TraceId将不同服务调用的日志串联起来,实现请求链路跟踪。通过Feign调用和Request传递TraceId,将整个调用链路的服务日志归组合并,提供定位和追踪的功能。 ... [详细]
  • Python15行代码实现免费发送手机短信,推送消息「建议收藏」
    Python15行代码实现免费发 ... [详细]
  • 如何使用人人账号进行快捷登录
    在人人开放平台的技术架构中,一个人人Connect站点也相当于一个人人网应用(App),所以在安装之前你需要申请创建一个应用 ... [详细]
author-avatar
melanie_0409
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有