热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Kafka是如何管理消费位点的?

Kafka是一个高度可扩展的分布式消息系统,在实时事件流和流式处理为中心的架构越来越风靡的今天,它扮演了这个架构中核心存储的角色。从某种角度说,Kafka可以看成实时版的Hadoo

Kafka 是一个高度可扩展的分布式消息系统,在实时事件流和流式处理为中心的架构越来越风靡的今天,它扮演了这个架构中核心存储的角色。从某种角度说,Kafka 可以看成实时版的 Hadoop 系统。Hadoop 可以存储和定期处理大量的数据文件,而 Kafka 可以存储和持续处理大型的数据流。

Hadoop 和文件系统提供文件流的读取位点( offset ),并支持通过 seek 方法将文件流移动到特定位置;Kafka 对应的提供了主题下每个分区的消费位点( offset ),并允许消费者设置分区的读取位置。本文首先介绍 Kafka 消费者消费消息的方式,随后回答 Kafka 如何管理消费位点这一元数据的问题。后面一个主题包括 Kafka 如何提交以及设置消费位点的实现,这是 Kafka 为应用系统提供可靠性保障的重要组成部分


Kafka 消费者的消费模式

Kafka 的数据由主题和分区划分。应用程序使用 KafkaConsumer 向 Kafka 订阅主题,并从订阅的主题上接受消息,订阅主题的模板代码如下所示。

consumer.subscribe(Collections.singletonList("customTopic"));

可以看到,我们为每个消费者指定了它所消费的主题。

在分布式系统的语境下,当生产者通过水平扩展提高了整体主题写入消息的速度时,单个消费者很快就跟不上消息生产的速度。直观地,我们想要通过同样地水平扩展手段,使用多个消费者来分摊消息消费的压力。

Kafka 利用消费组的概念来支持消费者的水平扩展。消费者从属于消费组,消费组的消费者订阅同一个主题,每个消费者接受主题的一部分分区的消息。消费者通过创建时的 group.id 指定它所从属的消费组。

消费者加入消费组或离开消费组会引起消费组所消费的主题的分区在组内消费者之间的再均衡( rebalance )。消息的再均衡在流式处理的范畴里是一个复杂的话题,本文不讨论其细节,假设每个消费者都稳定地消费主题的若干个分区。

在 Kafka 与某些系统的整合里,消费者消费的分区是由外部系统所指定和协调的,Kafka 为了支持这样的场景提供了主动为消费者分配分区的接口。

consumer.assign(Collections.singletonList(new TopicPartition("customTopic", 1)));

当消费者指定了自己所要消费的主题和分区后,应用程序通过消息轮询来与 Kafka 集群交互并请求数据进行消费。Kafka 在轮询中进行很多操作,包括消费组协调、分区再均衡和获取数据。在这里我们主要关心获取数据进行消费的情况,模板代码如下所示。

try {while (true) {for (ConsumerRecord<String, String> record : consumer.poll(Duration.ofMills(1000L))) {// do something with record...}}
} finally {consumer.close();
}

可以看到,应用程序通常在一个无限循环里通过轮询来消费 Kafka 里的数据。当消费者缓冲区有数据或 poll 最长的阻塞时间到达时,将返回本次轮询取得的消息集合。

返回的消息集合的元素包括消息所属主题的信息、所在分区的信息、所在分区的消费位点以及消息数据即其键值对。通常,我们遍历消息集合来处理轮询取得的消息。

最后,在 finally 块中我们调用了消费者的 close 方法,从而显式地关闭消费者,并关闭网络连接。这个操作同时会触发一次消费组的再均衡,从而避免必须等待消费组协调者在该消费者心跳超时后才发现其离开消费者并触发再均衡。


Kafka 如何提交消费位点

消费者每次调用 poll 方法总是返回由生产者写入 Kafka 但是还没有被消费者消费的消息,那么 Kafka 是怎么定位哪些消息还没被消费者消费的呢?

答案就是消费位点。

Kafka 通过消费位点来追踪消息在分区里的消费进度,而不需要强制对每个消息都进行确认。我们把更新分区消费位点的操作叫做提交( commit )。

Kafka 追踪消费位点的方式充分利用了 Kafka 自身的能力,通过向 Kafka 内部名为 __consumer_offsets 的主题发送包装了消费位点信息的消息来保存消费位点。消费者正常运行时,还会在内存中为每个分配的分区记录一个获取数据的数据位点。

因此,如果消费者一直正常运行,持久化在 __consumer_offsets 主题的消费位点元数据用处不大,因为消费者会自己追踪消费位点。但是在有的消费者发生崩溃重启或者主题分区发生再均衡时,重启的分区需要恢复丢失的内存中的消费位点信息,或者再均衡后的消费者接手新的分区的情形下,消费者就需要读取分区最后一次提交的消费位点,以从该消费位点继续往下消费数据。

在这种情形下,如果提交的消费位点小于应用程序消费者实际曾经处理过的最后一个消息的消费位点,那么这两点之间的消息就会被重复处理。反之,如果提交的消费位点大于应用程序消费者实际曾经处理过的最后一个消息的消费位点,那么这两点之间的消息就会被跳过,不被处理。

因此,为了提高应用程序处理消息的可靠性,Kafka 提供了若干种提交消费位点的方式,以支持应用程序根据自身逻辑提交尽可能准确的消费位点。


自动提交

简单的 Kafka 消费应用程序可以采用自动提交的手段让消费者自动提交消费位点。只要在创建消费者的时候将 enable.auto.commit 配置设置为 true 值,那么消费者就会在 poll 方法里在拉取新的消息之前自动提交当前的消费位点。决定自动提交周期的是 auto.commit.interval.ms 配置,默认是 5 秒,即每过 5 秒,在下一次 poll 时自动提交。

自动提交虽然方便,但是一切自动的行为,使用者都需要小心的确认其行为并了解它在极端情况下的表现。

一个典型的自动提交的边界场景是分区再均衡场景。假设我们采用默认 5 秒的自动提交时间间隔,在本分区最近一次提交后 3 秒发生了故障,再均衡之后,新的消费者从本分区的消费位点开始读取并处理消息。由于消费位点是故障前 3 秒前自动提交的,在这 3 秒之间读取的消息及其影响的消费位点没有被提交,因此这些数据将被重复处理。

可以通过缩短自动提交的间隔来减小重复数据的时间窗口,但是重复数据在理论上是不可避免的。此外,频繁的提交将带来额外的调度开销和通信开销。

另一个值得注意的是,自动提交的配置下,每一次 poll 调用都会提交上一次 poll 移动到的消费位点,在调用消费者的 close 方法时也会触发自动提交。通常来说,自动提交的消费位点总是不大于消费者实际处理的消息。但是,如果在轮询时拉取到一批消息,并在处理完所有消息之前抛出异常,就有可能导致自动提交时按照这批消息处理过的假设进行提交,从而导致部分消息被跳过,不被处理的情形。


主动提交

对于定制 Kafka 消费逻辑的应用,或者说整合 Kafka 到更大的流式处理系统的场景,主动提交当前的消费位点是一个必须的功能。应用程序或者复合系统通过控制消费位点的提交时间来消除消息丢失的可能性,并在发生再均衡时减少重复消息的数量。

首先,这要求把前面提到的 enable.auto.commit 配置设置为 false 值。随后,通过调用消费者的提交消费位点的接口来进行主动提交。Kafka 提供的接口包括以下几种。

void commitSync();
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);
void commitAsync();
void commitAsync(OffsetCommitCallback callback);
void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback);

可以看到,这些接口一方面分为同步和异步两类,另一方面分为是否带有 offsets 参数两类。

显然,同步的提交会阻塞应用程序的运行,默认情况下这个阻塞的超时时间是由 default.api.timeout.ms 配置决定的,默认为 1 分钟。在早期的版本中,这个时间是无限的,即应用程序会永远阻塞并无限重试直到抛出不可恢复的异常或成功提交。

异步的提交则会将提交动作放在异步线程完成,并支持传入可选的 OffsetCommitCallback 来定制提交动作成功后的回调逻辑。异步的提交的异步性在于消费者向 Kafka 集群发送提交消费位点的请求后,不阻塞等待集群的返回,而是注册一个集群返回时的回调来响应成功或者失败的事件。

由于发送请求是同步的,而且 Kafka 底层采用 TCP 进行通信,因此我们可以认为异步的消费位点提交也是顺序发生在 Kafka 集群的。然而,如果在提交失败的情形下消费者通过 Callback 尝试重新提交,就必须注意重试可能导致提交一个更早的消费位点从而在再平衡场景下导致不必要的重复消费的情况。

到现在我们所讨论的接口都是不带 offsets 参数的,在这种情况下,Kafka 会自动获取当前消费者内存所保存的消费位点来进行提交。对于某些定制化的消费位点管理逻辑,可以通过传入 offsets 参数来自定义需要提交的消费位点的内容。offsets 参数是一个主题及分区锁定具体分区的键和将要提交的该分区的消费位点及元数据的值组成的映射。


集群交互

上面介绍的提交消费位点的技术都是从消费者角度看那些接口参与实现这个功能的。对于想深入理解这一过程的同学,这里简要介绍一下相关的逻辑在源代码的位置及相应的逻辑线。

在消费者一侧的底层逻辑,上面自动提交和主动提交的逻辑最终都由 ConsumerCoordicator 类来执行,对应的接口定义如下。

class ConsumerCoordinator {void maybeAutoCommitOffsetsAsync(long now);boolean commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets, Timer timer);void commitOffsetsAsync(Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback);
}

这些方法在底层都会首先查找当前消费者对应的消费组协调者,即 Kafka 集群中的某个服务器,随后向它发起 OffsetCommitRequest 请求,该请求包含了服务器向 __consumer_offsets 主题写入消费位点所需要的所有信息。

消费组协调者通过 SocketServer 组件接受到请求后,反序列化请求并交给 KafkaApis 组件处理请求。KafkaApis 是 Kafka 服务器所有业务逻辑的聚合类。它识别出 OffsetCommitRequest 后转发到 handleOffsetCommitRequest 方法进行处理。在一系列的参数检查之后,主流程的消息处理最终将由 GroupCoordinator#handleCommitOffsets 处理。


Kafka 如何设置消费位点

现在,我们知道了 Kafka 提交消费位点的方式,并且知道了持久化到 Kafka 集群的消费位点通常在消费者崩溃或者集群发生再均衡的时候被读取和使用。但是,同样是在某些定制场景下,Kafka 消费者的消费位点是由外部系统维护的。

在这种情况下,Kafka 也支持从特定的消费位点开始处理消息,对应的接口定义如下。

void seekToBeginning(Collection<TopicPartition> partitions);
void seekToEnd(Collection<TopicPartition> partitions);
void seek(TopicPartition partition, long offset);
void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata);

其中 seekToBeginning 方法将分区的消费位点回拨到分区的起始位置开始读取消息,而 seekToEnd 方法将分区的消费位点跳到分区的末尾开始读取信息。显然,前者将会导致大量的重复消息处理,而后者将带来跳过某些消息不做处理的风险。

在本文的开头我们提到,Hadoop 和文件系统提供文件流的读取位点,并支持通过 seek 方法将文件流移动到特定位置。Kafka 同样支持 seek 方法来设置消息的消费位点,从新的消费位点开始消费数据。

从实现上说,设置消费位点是一个消费者的本地操作。它直接改动了订阅状态下主题分区状态的消费位点消息,从而在下一次 poll 方法调用时从新设置的消费位点开始向 Kafka 集群拉取消息进行消费。

这一功能的实际应用场景包括应用程序需要保证某种程度的数据可靠性的情形。

通常,数据的消费者在接收到 Kafka 消息之后会进行相应的处理并生成新的数据,典型场景下这一数据将被持久化到数据库中或者进入到下一阶段的流式处理系统里。如果数据保存在数据库里或进入其他系统之中,而消费位点提交到 Kafka 上,这样多个系统之间天然的异步性将使原子提交的操作成为不可能。

但是,如果在同一个事务里将数据和消费位点都写到数据库里,或者进入到流式处理系统里追踪起来,我们就可以保证这两者是原子地被提交或者失败。此时,消费位点保存在 Kafka 系统之外,因此需要上面的 seek 方法来主动设置消费位点以告诉消费者从什么位置开始读取并消费数据。

另外两个方法跟 auto.offset.reset 配置的 ealiest 和 latest 选项相对应,常作为主题分区消费位点不存在时采用的兜底设置消费位点的方案。


推荐阅读
  • 基于分布式锁的防止重复请求解决方案
    一、前言关于重复请求,指的是我们服务端接收到很短的时间内的多个相同内容的重复请求。而这样的重复请求如果是幂等的(每次请求的结果都相同,如查 ... [详细]
  • 阿,里,云,物,联网,net,core,客户端,czgl,aliiotclient, ... [详细]
  • 在重复造轮子的情况下用ProxyServlet反向代理来减少工作量
    像不少公司内部不同团队都会自己研发自己工具产品,当各个产品逐渐成熟,到达了一定的发展瓶颈,同时每个产品都有着自己的入口,用户 ... [详细]
  • 本文讨论了在手机移动端如何使用HTML5和JavaScript实现视频上传并压缩视频质量,或者降低手机摄像头拍摄质量的问题。作者指出HTML5和JavaScript无法直接压缩视频,只能通过将视频传送到服务器端由后端进行压缩。对于控制相机拍摄质量,只有使用JAVA编写Android客户端才能实现压缩。此外,作者还解释了在交作业时使用zip格式压缩包导致CSS文件和图片音乐丢失的原因,并提供了解决方法。最后,作者还介绍了一个用于处理图片的类,可以实现图片剪裁处理和生成缩略图的功能。 ... [详细]
  • 本文介绍了解决Netty拆包粘包问题的一种方法——使用特殊结束符。在通讯过程中,客户端和服务器协商定义一个特殊的分隔符号,只要没有发送分隔符号,就代表一条数据没有结束。文章还提供了服务端的示例代码。 ... [详细]
  • 本文介绍了在开发Android新闻App时,搭建本地服务器的步骤。通过使用XAMPP软件,可以一键式搭建起开发环境,包括Apache、MySQL、PHP、PERL。在本地服务器上新建数据库和表,并设置相应的属性。最后,给出了创建new表的SQL语句。这个教程适合初学者参考。 ... [详细]
  • 本文介绍了在rhel5.5操作系统下搭建网关+LAMP+postfix+dhcp的步骤和配置方法。通过配置dhcp自动分配ip、实现外网访问公司网站、内网收发邮件、内网上网以及SNAT转换等功能。详细介绍了安装dhcp和配置相关文件的步骤,并提供了相关的命令和配置示例。 ... [详细]
  • 本文介绍了一个Java猜拳小游戏的代码,通过使用Scanner类获取用户输入的拳的数字,并随机生成计算机的拳,然后判断胜负。该游戏可以选择剪刀、石头、布三种拳,通过比较两者的拳来决定胜负。 ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • C语言注释工具及快捷键,删除C语言注释工具的实现思路
    本文介绍了C语言中注释的两种方式以及注释的作用,提供了删除C语言注释的工具实现思路,并分享了C语言中注释的快捷键操作方法。 ... [详细]
  • Go Cobra命令行工具入门教程
    本文介绍了Go语言实现的命令行工具Cobra的基本概念、安装方法和入门实践。Cobra被广泛应用于各种项目中,如Kubernetes、Hugo和Github CLI等。通过使用Cobra,我们可以快速创建命令行工具,适用于写测试脚本和各种服务的Admin CLI。文章还通过一个简单的demo演示了Cobra的使用方法。 ... [详细]
  • 本文介绍了如何使用C#制作Java+Mysql+Tomcat环境安装程序,实现一键式安装。通过将JDK、Mysql、Tomcat三者制作成一个安装包,解决了客户在安装软件时的复杂配置和繁琐问题,便于管理软件版本和系统集成。具体步骤包括配置JDK环境变量和安装Mysql服务,其中使用了MySQL Server 5.5社区版和my.ini文件。安装方法为通过命令行将目录转到mysql的bin目录下,执行mysqld --install MySQL5命令。 ... [详细]
  • 深入解析Linux下的I/O多路转接epoll技术
    本文深入解析了Linux下的I/O多路转接epoll技术,介绍了select和poll函数的问题,以及epoll函数的设计和优点。同时讲解了epoll函数的使用方法,包括epoll_create和epoll_ctl两个系统调用。 ... [详细]
  • 本文介绍了如何使用n3-charts绘制以日期为x轴的数据,并提供了相应的代码示例。通过设置x轴的类型为日期,可以实现对日期数据的正确显示和处理。同时,还介绍了如何设置y轴的类型和其他相关参数。通过本文的学习,读者可以掌握使用n3-charts绘制日期数据的方法。 ... [详细]
  • Python中的PyInputPlus模块原文:https ... [详细]
author-avatar
he2134
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有