热门标签 | HotTags
当前位置:  开发笔记 > 后端 > 正文

Flink如何保证数据的一致性

当在分布式系统中引入状态时,自然也引入了一致性问题。一致性实际上是正确性级别的另一种说法,也就是说在成功处理故障并恢复之后得到的结果,与没有发生任何故障时得到的结果相比,前者到底

当在分布式系统中引入状态时,自然也引入了一致性问题。一致性实际上是"正确性级别"的另一种说法,也就是说在成功处理故障并恢复之后得到的结果,与没有发生任何故障时得到的结果相比,前者到底有多正确?举例来说,假设要对最近一小时登录的用户计数。在系统经历故障之后,计数结果是多少?如果有偏差,是有漏掉的计数还是重复计数?


一致性级别

在流处理中,一致性可以分为3个级别:



  • at-most-once: 这其实是没有正确性保障的委婉说法——故障发生之后,计数结果可能丢失。同样的还有udp。

  • at-least-once: 这表示计数结果可能大于正确值,但绝不会小于正确值。也就是说,计数程序在发生故障后可能多算,但是绝不会少算。

  • exactly-once: 这指的是系统保证在发生故障后得到的计数结果与正确值一致。

Flink的一个重大价值在于,它既保证了exactly-once,也具有低延迟和高吞吐的处理能力


端到端(end-to-end)状态一致性

目前我们看到的一致性保证都是由流处理器实现的,也就是说都是在 Flink 流处理器内部保证的;而在真实应用中,流处理应用除了流处理器以外还包含了数据源(例如 Kafka)和输出到持久化系统。

端到端的一致性保证,意味着结果的正确性贯穿了整个流处理应用的始终;每一个组件都保证了它自己的一致性,整个端到端的一致性级别取决于所有组件中一致性最弱的组件。具体可以划分如下:



  • 内部保证 —— 依赖checkpoint

  • source 端 —— 需要外部源可重设数据的读取位置

  • sink 端 —— 需要保证从故障恢复时,数据不会重复写入外部系统

而对于sink端,又有两种具体的实现方式:幂等(Idempotent)写入和事务性(Transactional)写入。



  • 幂等写入

    所谓幂等操作,是说一个操作,可以重复执行很多次,但只导致一次结果更改,也就是说,后面再重复执行就不起作用了。

  • 事务写入

    需要构建事务来写入外部系统,构建的事务对应着 checkpoint,等到 checkpoint 真正完成的时候,才把所有对应的结果写入 sink 系统中。

不同Source和Sink的一致性保证可用下表说明:

不同数据流的一致性


检查点

检查点的代码实践


public class CheckpointApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 开启checkpoint
/**
* 不开启checkpoint: 不重启
* 配置了重启策略: 使用配置的重启策略
* 1. 使用默认的重启策略: Integer.MAX_VALUE
* 2. 配置了重启策略, 使用配置的重启策略覆盖默认的
*
* 重启策略的配置:
* 1. code
* 2. yaml
*/
env.enableCheckpointing(5000);
// env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
// 作业完成后是否保留
CheckpointConfig cOnfig= env.getCheckpointConfig();
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 设置状态后端
config.setCheckpointStorage("file:////Users/carves/workspace/imook-flink");
// 自定义设置我们需要的重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // number of restart attempts, 正常运行之后,进入错误再运行的次数
Time.of(10, TimeUnit.SECONDS) // delay
));
DataStreamSource source = env.socketTextStream("localhost", 9527);
source.map(new MapFunction() {
@Override
public String map(String value) throws Exception {
if (value.contains("pk")) {
throw new RuntimeException("PK pk test!");
} else {
return value.toLowerCase();
}
}
}).flatMap(new FlatMapFunction() {
@Override
public void flatMap(String value, Collector out) throws Exception {
String[] splits = value.split(",");
for (String split:
splits) {
out.collect(split);
}
}
}).map(new MapFunction>() {
@Override
public Tuple2 map(String value) throws Exception {
return Tuple2.of(value, 1);
}
}).keyBy(value -> value.f0)
.sum(1)
.print();
env.execute("CheckpointApp");
}
}

检查点算法:

Flink检查点算法的正式名称是异步分界线快照(asynchronous barrier snapshotting)。该算法大致基于Chandy-Lamport分布式快照算法。

检查点是Flink最有价值的创新之一,因为它使Flink可以保证exactly-once,并且不需要牺牲性能


我们知道,端到端的状态一致性的实现,需要每一个组件都实现,对于Flink + Kafka的数据管道系统(Kafka进、Kafka出)而言,各组件怎样保证exactly-once语义呢?利用checkpoint机制,把状态存盘,发生故障的时候可以恢复,保证内部的状态一致性



  • source —— kafka consumer作为source,可以将偏移量保存下来,如果后续任务出现了故障,恢复的时候可以由连接器重置偏移量,重新消费数据,保证一致性

  • sink —— kafka producer作为sink,采用两阶段提交 sink,需要实现一个 TwoPhaseCommitSinkFunction

    内部的checkpoint机制我们已经有了了解,那source和sink具体又是怎样运行的呢?接下来我们逐步做一个分析。

    我们知道Flink由JobManager协调各个TaskManager进行checkpoint存储,checkpoint保存在 StateBackend中,默认StateBackend是内存级的,也可以改为文件级的进行持久化保存。


2阶段提交

执行过程实际上是一个两段式提交,每个算子执行完成,会进行“预提交”,直到执行完sink操作,会发起“确认提交”,如果执行失败,预提交会放弃掉。

当 checkpoint 启动时,JobManager 会将检查点分界线(barrier)注入数据流;barrier会在算子间传递下去。

2阶段提交流程

每个算子会对当前的状态做个快照,保存到状态后端。对于source任务而言,就会把当前的offset作为状态保存起来。下次从checkpoint恢复时,source任务可以重新提交偏移量,从上次保存的位置开始重新消费数据。



具体的两阶段提交步骤总结如下:第一条数据来了之后,开启一个 kafka 的事务(transaction),正常写入 kafka 分区日志但标记为未提交,这就是“预提交”。jobmanager 触发 checkpoint 操作,barrier 从 source 开始向下传递,遇到 barrier 的算子将状态存入状态后端,并通知 jobmanager。sink 连接器收到 barrier,保存当前状态,存入 checkpoint,通知 jobmanager,并开启下一阶段的事务,用于提交下个检查点的数据。jobmanager 收到所有任务的通知,发出确认信息,表示 checkpoint 完成。sink 任务收到 jobmanager 的确认信息,正式提交这段时间的数据。外部kafka关闭事务,提交的数据可以正常消费了。


2阶段提交步骤



  1. 第一条数据来了之后,开启一个 kafka 的事务(transaction),正常写入 kafka 分区日志但标记为未提交,这就是“预提交”jobmanager 触发 checkpoint 操作,barrier 从 source 开始向下传递,遇到 barrier 的算子将状态存入状态后端,并通知 jobmanager

  2. sink 连接器收到 barrier,保存当前状态,存入 checkpoint,通知 jobmanager,并开启下一阶段的事务,用于提交下个检查点的数据

  3. jobmanager 收到所有任务的通知,发出确认信息,表示 checkpoint 完成

  4. sink 任务收到 jobmanager 的确认信息,正式提交这段时间的数据

  5. 外部kafka关闭事务,提交的数据可以正常消费了。

state 

checkpointing 

状态后端

流式数据的处理



推荐阅读
  • 修复一个 Bug 竟耗时两天?真的有那么复杂吗?
    修复一个 Bug 竟然耗费了两天时间?这背后究竟隐藏着怎样的复杂性?本文将深入探讨这个看似简单的 Bug 为何会如此棘手,从代码层面剖析问题根源,并分享解决过程中遇到的技术挑战和心得。 ... [详细]
  • 本文详细介绍了HDFS的基础知识及其数据读写机制。首先,文章阐述了HDFS的架构,包括其核心组件及其角色和功能。特别地,对NameNode进行了深入解析,指出其主要负责在内存中存储元数据、目录结构以及文件块的映射关系,并通过持久化方案确保数据的可靠性和高可用性。此外,还探讨了DataNode的角色及其在数据存储和读取过程中的关键作用。 ... [详细]
  • 深入理解Spark框架:RDD核心概念与操作详解
    RDD是Spark框架的核心计算模型,全称为弹性分布式数据集(Resilient Distributed Dataset)。本文详细解析了RDD的基本概念、特性及其在Spark中的关键操作,包括创建、转换和行动操作等,帮助读者深入理解Spark的工作原理和优化策略。通过具体示例和代码片段,进一步阐述了如何高效利用RDD进行大数据处理。 ... [详细]
  • 深入理解Java事务编程:可串行化隔离级别的快照隔离机制解析
    深入理解Java事务编程:可串行化隔离级别的快照隔离机制解析 ... [详细]
  • 如何正确配置与使用日志组件:Log4j、SLF4J及Logback的连接与整合方法
    在当前的软件开发实践中,无论是开源项目还是日常工作中,日志框架都是不可或缺的工具之一。本文详细探讨了如何正确配置与使用Log4j、SLF4J及Logback这三个流行的日志组件,并深入解析了它们之间的连接与整合方法,旨在帮助开发者高效地管理和优化日志记录流程。 ... [详细]
  • ActiveMQ是由Apache开发的一款广受欢迎且功能强大的开源消息中间件。作为完全符合JMS 1.1和J2EE 1.4规范的JMS Provider实现,尽管JMS规范已问世多年,但ActiveMQ依然保持了其在消息队列领域的领先地位。本文将带你初步了解ActiveMQ的核心概念及其应用场景,帮助你快速入门这一重要的消息传递技术。 ... [详细]
  • 比特币的成功为区块链技术构建了可信货币的基石,标志着区块链1.0时代的到来。以太坊通过引入智能合约,极大地推动了去中心化应用的开发和普及,开启了区块链2.0时代。本文深入探讨了侧链技术在提升区块链扩展性方面的潜力和应用,分析了其在提高交易速度、降低成本和增强安全性等方面的优势,并讨论了当前面临的技术挑战和未来的发展方向。 ... [详细]
  • 2019年斯坦福大学CS224n课程笔记:深度学习在自然语言处理中的应用——Word2Vec与GloVe模型解析
    本文详细解析了2019年斯坦福大学CS224n课程中关于深度学习在自然语言处理(NLP)领域的应用,重点探讨了Word2Vec和GloVe两种词嵌入模型的原理与实现方法。通过具体案例分析,深入阐述了这两种模型在提升NLP任务性能方面的优势与应用场景。 ... [详细]
  • 利用Redis HyperLogLog高效统计微博日活跃和月活跃用户数
    本文探讨了如何利用Redis的HyperLogLog数据结构高效地统计微博平台的日活跃用户(DAU)和月活跃用户(MAU)数量。通过HyperLogLog的高精度和低内存消耗特性,可以实现对大规模用户数据的实时统计与分析,为平台运营提供有力的数据支持。 ... [详细]
  • 本体获邀入驻公益在线教育平台,主讲“区块链助力慈善事业”核心课程
    近日,本体全球生态合作负责人Gloria Wu受公益在线教育平台邀请,参与了“新媒体新技术新公益”系列直播课程,深入探讨了区块链技术在慈善公益领域的应用前景及其潜在影响。 ... [详细]
  • 开发心得:利用 Redis 构建分布式系统的轻量级协调机制
    开发心得:利用 Redis 构建分布式系统的轻量级协调机制 ... [详细]
  • 基于Matlab的区域多微电网社区分层协同能源管理系统设计与实现
    本文提出了一种创新的分层协同能源管理系统(EMS),专为多微电网社区设计与实现。该系统利用Matlab进行建模和仿真,详细探讨了微电网社区的结构及其在能源管理中的应用。通过高效的算法和优化策略,该系统能够有效提高能源利用效率,实现能源的智能分配与调度。实验结果验证了该系统的可行性和优越性。 ... [详细]
  • Gear 月度进展报告:2023年7月最新动态与技术升级
    Gear 月度进展报告:2023年7月最新动态与技术升级 ... [详细]
  • NoSQL数据库,即非关系型数据库,有时也被称作Not Only SQL,是一种区别于传统关系型数据库的管理系统。这类数据库设计用于处理大规模、高并发的数据存储与查询需求,特别适用于需要快速读写大量非结构化或半结构化数据的应用场景。NoSQL数据库通过牺牲部分一致性来换取更高的可扩展性和性能,支持分布式部署,能够有效应对互联网时代的海量数据挑战。 ... [详细]
  • Linux学习精华:程序管理、终端种类与命令帮助获取方法综述 ... [详细]
author-avatar
泰有趣
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有