热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Flink源码:从KeyGroup到Rescale

通过阅读本文你能get到以下点:KeyGroup、KeyGroupRange介绍maxParallelism介绍及采坑记数据如何映射到每个subtask上ÿ

通过阅读本文你能 get 到以下点:

  • KeyGroup、KeyGroupRange 介绍
  • maxParallelism 介绍及采坑记
  • 数据如何映射到每个 subtask 上?
  • 任务改并发时,KeyGroup rescale 的过程

一、 KeyGroup、KeyGroupRange 介绍

Flink 中 KeyedState 恢复时,是按照 KeyGroup 为最小单元恢复的,每个 KeyGroup 负责一部分 key 的数据。这里的 key 指的就是 Flink 中 keyBy 中提取的 key。

每个 Flink 的 subtask 负责一部分相邻 KeyGroup 的数据,即一个 KeyGroupRange 的数据,有个 start 和 end(这里是闭区间)。

看到这里可能有点蒙,没关系后面有例子帮助读者理解这两个概念。

二、 maxParallelism 介绍及采坑记


1、最大并行度的概念

maxParallelism 表示当前算子设置的 maxParallelism,而不是 Flink 任务的并行度。maxParallelism 为 KeyGroup 的个数。

当设置算子的并行度大于 maxParallelism 时,有些并行度就分配不到 KeyGroup,此时 Flink 任务是无法从 Checkpoint 处恢复的。

2、maxParallelism 到底是多少呢?

如果设置了,就是设定的值。当然设置了,也需要检测合法性。如下图所示,Flink 要求 maxParallelism 应该介于 1 到 Short.MAX_VALUE 之间。

/*** Sets the maximum degree of parallelism defined for the program. The upper limit (inclusive)* is Short.MAX_VALUE.**

The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also* defines the number of key groups used for partitioned state.** &#64;param maxParallelism Maximum degree of parallelism to be used for the program.,* with 0 public StreamExecutionEnvironment setMaxParallelism(int maxParallelism) {Preconditions.checkArgument(maxParallelism > 0 &&maxParallelism <&#61; KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM,"maxParallelism is out of bounds 0 &#43;KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM &#43; ". Found: " &#43; maxParallelism);config.setMaxParallelism(maxParallelism);return this;}

如果没有设置&#xff0c;则 Flink 引擎会自动通过 KeyGroupRangeAssignment 类的 computeDefaultMaxParallelism 方法计算得出&#xff0c;computeDefaultMaxParallelism 源码如下所示&#xff1a;

/**根据算子的并行度计算 maxParallelism* 计算规则&#xff1a;* 1. 将算子并行度 * 1.5 后&#xff0c;向上取整到 2 的 n 次幂* 2. 跟 DEFAULT_LOWER_BOUND_MAX_PARALLELISM 相比&#xff0c;取 max* 3. 跟 UPPER_BOUND_MAX_PARALLELISM 相比&#xff0c;取 min*/
public static int computeDefaultMaxParallelism(int operatorParallelism) {checkParallelismPreconditions(operatorParallelism);return Math.min(Math.max(MathUtils.roundUpToPowerOfTwo(operatorParallelism &#43; (operatorParallelism / 2)),DEFAULT_LOWER_BOUND_MAX_PARALLELISM),UPPER_BOUND_MAX_PARALLELISM);
}

computeDefaultMaxParallelism 会根据算子的并行度计算 maxParallelism&#xff0c;计算规则&#xff1a;将算子并行度 * 1.5 后&#xff0c;向上取整到 2 的 n 次幂&#xff0c;同时保证计算的结果在最小值和最大值之间。

最小值 DEFAULT_LOWER_BOUND_MAX_PARALLELISM 是 2 的 7 次方 &#61; 128。

最大值 UPPER_BOUND_MAX_PARALLELISM 是 2 的 15 次方 &#61; 32768。

即&#xff1a;Flink 自动生成的 maxParallelism 介于 128 和 32768 之间。

采坑记

新开发的 Job 业务数据量较小&#xff0c;所以初期设置的并行度也会很小。同时没有给每个 Job 主动设置 maxParallelism&#xff0c;根据上面的规则&#xff0c;Flink 自动生成的 maxParallelism 为 128&#xff0c;后期随着业务数据量暴涨&#xff0c;当 Job 的并发数调大 128 以上时&#xff0c;发现 Job 无法从 Checkpoint 或 Savepoint 中恢复了&#xff0c;这就是所谓的 “并发调不上去了”。当然可以选择不从状态恢复&#xff0c;选择直接启动的方式去启动任务。但是有些 Flink 任务对状态是强依赖的&#xff0c;即&#xff1a;必须从 State 中恢复&#xff0c;对于这样的 Job 就不好办了。

所以按照开发规范&#xff0c;应该结合业务场景主动为每个 Job 设置合理的 maxParallelism&#xff0c;防止出现类似情况。

三、每个 key 应该分配到哪个 subtask 上运行&#xff1f;

根据 key 计算其对应的 subtaskIndex&#xff0c;即应该分配给哪个 subtask 运行&#xff0c;计算过程包括以下两步&#xff0c;源码都在相应的 KeyGroupRangeAssignment 类中:

  • 第一步&#xff1a;根据 key 计算其对应哪个 KeyGroup
  • 第二步&#xff1a;计算 KeyGroup 属于哪个并行度

第一步&#xff1a;根据 key 计算其对应哪个 KeyGroup

computeKeyGroupForKeyHash 源码如下所示&#xff1a;

/*** Assigns the given key to a key-group index.** &#64;param keyHash the hash of the key to assign* &#64;param maxParallelism the maximum supported parallelism, aka the number of key-groups.* &#64;return the key-group to which the given key is assigned* 根据 Key 的 hash 值来计算其对应的 KeyGroup 的 index*/
public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {return MathUtils.murmurHash(keyHash) % maxParallelism;
}

根据 Key 的 hash 值进行 murmurHash 后&#xff0c;对 maxParallelism 进行求余&#xff0c;就是对应的 KeyGroupIndex。

第二步&#xff1a;计算 KeyGroup 属于哪个并行度

computeOperatorIndexForKeyGroup 源码如下所示&#xff1a;

// 根据 maxParallelism、算子的并行度 parallelism 和 keyGroupId&#xff0c;
// 计算 keyGroupId 对应的 subtask 的 index
public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {return keyGroupId * parallelism / maxParallelism;
}

示例

假如 maxParallelism 为 50&#xff0c;parallelism 为 10&#xff0c;那么数据是如何分布的&#xff1f;

MathUtils.murmurHash(key.hashCode()) % maxParallelism&#xff1a;所有 key 的 hashCode 通过 Murmurhash 对 50 求余得到的范围为 0~49&#xff0c;也就是说&#xff1a;总共有 keyGroupId 为 0~49 的这 50 个 KeyGroup。

subtask 与 KeyGroupId 对应关系&#xff1a;

  • 0~4 号 KeyGroup 位于第 0 个 subtask。即&#xff1a;subtask0 处理 KeyGroupRange(0,4 ) 的数据
  • 5~9 号 KeyGroup 位于第 1 个 subtask。即&#xff1a;subtask1 处理 KeyGroupRange(5,9 ) 的数据
  • 10~14 号 KeyGroup 位于第 2 个 subtask。即&#xff1a;subtask2 处理 KeyGroupRange(10,14 ) 的数据
  • 15~19 号 KeyGroup 位于第 3 个 subtask。即&#xff1a;subtask3 处理 KeyGroupRange(15,19 ) 的数据
  • 。。。以此类推

这里我们看到了每个 subtask 对应一个 KeyGroupRange 的数据&#xff0c;且是闭区间。

计算某个并行度上负载哪些 KeyGroup&#xff1f;

计算某个并行度上负载哪些 KeyGroup&#xff1f;等价于求某个 subtask 负载的 KeyGroupRange。

在 KeyGroupRangeAssignment 类中有 computeKeyGroupRangeForOperatorIndex 方法可以完成这个操作&#xff1a;

// 根据 maxParallelism, parallelism 计算 operatorIndex 对应的 subtask 负责哪个范围的 KeyGroup
public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(int maxParallelism,int parallelism,int operatorIndex) {checkParallelismPreconditions(parallelism);checkParallelismPreconditions(maxParallelism);Preconditions.checkArgument(maxParallelism >&#61; parallelism,"Maximum parallelism must not be smaller than parallelism.");int start &#61; ((operatorIndex * maxParallelism &#43; parallelism - 1) / parallelism);int end &#61; ((operatorIndex &#43; 1) * maxParallelism - 1) / parallelism;return new KeyGroupRange(start, end);
}

四、 Rescale 过程

如下图所示是 Flink 依赖 KeyGroup 修改并发的 Rescale 过程&#xff08;并发度从 3 改成 4&#xff09;&#xff1a;
KeyGroup Rescale 过程图示
由图中可得知 key 的范围是 0~19&#xff0c; maxParallelism &#61; 10。

0->{0,10} 表示 key 为 0 和 10 的数据&#xff0c;对应的 KeyGroupId 为 0。

1->{1,11} 表示 key 为 1 和 11 的数据&#xff0c;对应的 KeyGroupId 为 1。

以此类推。。。

修改并发前的映射关系

并发度是 3:

  • Subtask0 负责 KeyGroupRange(0,3)
  • Subtask1 负责 KeyGroupRange(4,6)
  • Subtask2 负责 KeyGroupRange(7,9)

修改并发后的映射关系

并发度是 4:

  • subtask0 负责 KeyGroupRange(0,2)
  • Subtask1 负责 KeyGroupRange(3,4)
  • Subtask2 负责 KeyGroupRange(5,7)
  • Subtask3 负责 KeyGroupRange(8,9)

maxParallelism 修改则任务不能恢复

KeyGroup 的数量为 maxParallelism&#xff0c;一旦 maxParallelism 变了&#xff0c;说明 KeyGroup 的分组完全变了&#xff0c;而 KeyedState 恢复是以 KeyGroup 为最小单元的&#xff0c;所以 maxParallelism 改变后&#xff0c;任务将无法恢复。在 Checkpoint 恢复过程中也会对新旧 Job 的 maxParallelism 进行检查匹配&#xff0c;如果某个算子的 maxParallelism 变了&#xff0c;则任务将不能恢复。

五、总结

本文主要介绍了 KeyGroup、KeyGroupRange 和 maxParallelism 的一些概念&#xff0c;及他们之间的关系。最后讲述了改并发的情况状态的 Rescale 流程。其实在 Flink 内部不只是状态恢复时需要用到 KeyGroup&#xff0c;数据 keyBy 后进行 shuffle 数据传输时也需要按照 KeyGroup 的规则来将分配数据&#xff0c;将数据分发到对应的 subtask 上。

本文比较简单&#xff0c;主要是为后续 State 恢复流程做一个铺垫。


推荐阅读
  • 本文介绍了在rhel5.5操作系统下搭建网关+LAMP+postfix+dhcp的步骤和配置方法。通过配置dhcp自动分配ip、实现外网访问公司网站、内网收发邮件、内网上网以及SNAT转换等功能。详细介绍了安装dhcp和配置相关文件的步骤,并提供了相关的命令和配置示例。 ... [详细]
  • SpringBoot uri统一权限管理的实现方法及步骤详解
    本文详细介绍了SpringBoot中实现uri统一权限管理的方法,包括表结构定义、自动统计URI并自动删除脏数据、程序启动加载等步骤。通过该方法可以提高系统的安全性,实现对系统任意接口的权限拦截验证。 ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • 阿,里,云,物,联网,net,core,客户端,czgl,aliiotclient, ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • eclipse学习(第三章:ssh中的Hibernate)——11.Hibernate的缓存(2级缓存,get和load)
    本文介绍了eclipse学习中的第三章内容,主要讲解了ssh中的Hibernate的缓存,包括2级缓存和get方法、load方法的区别。文章还涉及了项目实践和相关知识点的讲解。 ... [详细]
  • HDFS2.x新特性
    一、集群间数据拷贝scp实现两个远程主机之间的文件复制scp-rhello.txtroothadoop103:useratguiguhello.txt推pushscp-rr ... [详细]
  • 本文介绍了iOS数据库Sqlite的SQL语句分类和常见约束关键字。SQL语句分为DDL、DML和DQL三种类型,其中DDL语句用于定义、删除和修改数据表,关键字包括create、drop和alter。常见约束关键字包括if not exists、if exists、primary key、autoincrement、not null和default。此外,还介绍了常见的数据库数据类型,包括integer、text和real。 ... [详细]
  • 本文讨论了在openwrt-17.01版本中,mt7628设备上初始化启动时eth0的mac地址总是随机生成的问题。每次随机生成的eth0的mac地址都会写到/sys/class/net/eth0/address目录下,而openwrt-17.01原版的SDK会根据随机生成的eth0的mac地址再生成eth0.1、eth0.2等,生成后的mac地址会保存在/etc/config/network下。 ... [详细]
  • 本文讨论了如何使用IF函数从基于有限输入列表的有限输出列表中获取输出,并提出了是否有更快/更有效的执行代码的方法。作者希望了解是否有办法缩短代码,并从自我开发的角度来看是否有更好的方法。提供的代码可以按原样工作,但作者想知道是否有更好的方法来执行这样的任务。 ... [详细]
  • JDK源码学习之HashTable(附带面试题)的学习笔记
    本文介绍了JDK源码学习之HashTable(附带面试题)的学习笔记,包括HashTable的定义、数据类型、与HashMap的关系和区别。文章提供了干货,并附带了其他相关主题的学习笔记。 ... [详细]
  • 模板引擎StringTemplate的使用方法和特点
    本文介绍了模板引擎StringTemplate的使用方法和特点,包括强制Model和View的分离、Lazy-Evaluation、Recursive enable等。同时,还介绍了StringTemplate语法中的属性和普通字符的使用方法,并提供了向模板填充属性的示例代码。 ... [详细]
  • 本文介绍了Swing组件的用法,重点讲解了图标接口的定义和创建方法。图标接口用来将图标与各种组件相关联,可以是简单的绘画或使用磁盘上的GIF格式图像。文章详细介绍了图标接口的属性和绘制方法,并给出了一个菱形图标的实现示例。该示例可以配置图标的尺寸、颜色和填充状态。 ... [详细]
  • 本文介绍了一个适用于PHP应用快速接入TRX和TRC20数字资产的开发包,该开发包支持使用自有Tron区块链节点的应用场景,也支持基于Tron官方公共API服务的轻量级部署场景。提供的功能包括生成地址、验证地址、查询余额、交易转账、查询最新区块和查询交易信息等。详细信息可参考tron-php的Github地址:https://github.com/Fenguoz/tron-php。 ... [详细]
  • GreenDAO快速入门
    前言之前在自己做项目的时候,用到了GreenDAO数据库,其实对于数据库辅助工具库从OrmLite,到litePal再到GreenDAO,总是在不停的切换,但是没有真正去了解他们的 ... [详细]
author-avatar
葉_沛峰
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有