热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

深入理解Kafka服务端请求队列中请求的处理

本文深入分析了Kafka服务端请求队列中请求的处理过程,详细介绍了请求的封装和放入请求队列的过程,以及处理请求的线程池的创建和容量设置。通过场景分析、图示说明和源码分析,帮助读者更好地理解Kafka服务端的工作原理。
一、场景分析

    在《深入理解Kafka服务端之Processor线程是如何工作的》中,通过分析得知Processor线程最终将接收到的客户端请求封装成Request对象,放入了RequestChannel的requestQueue请求队列中。那么这些队列中的请求是如何被处理的呢?这篇进行详细分析。

二、图示说明

ddbf5d3b03b83311ca3356b7f66fdcdc.png

三、源码分析

    既然请求都被封装成Request对象放到了请求队列中,那么就肯定会有一个线程去获取这些请求对象,进行相应的处理。在之前Acceptor线程启动的过程中,我们从服务端程序入口(即Kafka.main()方法)开始,一直找到了KafkaServer.startup()方法,当时提到过,整个Kafka服务端的功能都在这个startup()方法中启动,那么继续从startup方法中查找处理请求队列的相应代码,结果如下:

//TODO 创建处理Request请求的线程池,这里的numIoThreads就是线程池的容量,由服务端参数num.io.threads决定,默认为8dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time, config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix)前面分析Kafka网络架构采用了主从Reactor多线程模型时,提到真正的网络I/O操作会交给I/O线程池中的I/O线程完成,那么这里的KafkaRequestHandlerPool就是处理请求的I/O线程池。

    1. 查看的KafkaRequestHandlerPool主构造函数如下:

class KafkaRequestHandlerPool(val brokerId: Int, val requestChannel: RequestChannel, val apis: KafkaApis, time: Time, numThreads: Int, requestHandlerAvgIdleMetricName: String, logAndThreadNamePrefix : String) extends Logging with KafkaMetricsGroup { //线程池中线程数量  private val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads) private val aggregateIdleMeter = newMeter(requestHandlerAvgIdleMetricName, "percent", TimeUnit.NANOSECONDS) this.logIdent = "[" + logAndThreadNamePrefix + " Kafka Request Handler on Broker " + brokerId + "], " //管理线程的数组 val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads) for (i 0 until numThreads) { //创建IO处理线程 createHandler(i) } ...这里重点看几个参数:

  • requestChannel:SocketServer 中的请求通道对象。由于KafkaRequestHandlerPool是线程池对象,那么内部线程处理的请求来源在哪儿?请求恰恰是保存在 RequestChannel 中的请求队列requestQueue中,因此,Kafka 在构造 KafkaRequestHandlerPool实例时,必须关联 SocketServer 组件中的 RequestChannel 实例,让 I/O 线程能够找到请求被保存的地方。

  • apis:KafkaApis对象。IO线程会将拿到的请求对象Request交给KafkaApis去执行真正的逻辑处理。

  • numThreads:线程池中线程的数量。初始化线程池时要创建多少个线程由这个参数决定。而这个参数又是由broker端的参数num.io.threads决定的,默认为8。

除了这几个参数,在构造KafkaRequestHandlerPool实例时,会根据线程数创建KafkaRequestHandler线程,并将这些线程放入线程池管理线程的数组runnables中。

    除了主构造函数,再来看几个KafkaRequestHandlerPool的方法:

    a. createHandler(id:Int):创建线程的方法

  • 创建KafkaRequestHandler线程对象

  • 将线程对象放入runnables数组

  • 设置线程为守护线程并启动

def createHandler(id: Int): Unit = synchronized { //创建KafkaRequestHandler线程,并放入runnables数组 runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time) //设置线程为守护线程,并启动 KafkaThread.daemon(logAndThreadNamePrefix + "-kafka-request-handler-" + id, runnables(id)).start()}

    b. resizeThreadPool(newSize: Int):根据给定值调整线程池的线程数量

  • 如果给定的值大于当前线程池中的线程数,则创建差值个新的线程,放入线程池并启动

  • 如果给定的值小于当前线程池中的线程数,则将多余的线程从线程池中移除,并关闭移除的线程

  • 更新线程池中线程的数量

def resizeThreadPool(newSize: Int): Unit = synchronized { val currentSize = threadPoolSize.get info(s"Resizing request handler thread pool size from $currentSize to $newSize") //如果给定的值大于当前线程池的容量,则创建差值个新的线程,放入线程池并启动 if (newSize > currentSize) { for (i createHandler(i) } //如果给定的值小于当前线程池容量,则将多余的线程从线程池中移除,并关闭 } else if (newSize

    2. 这里我们再看一下KafkaRequestHandler的主构造函数:

class KafkaRequestHandler(id: Int, //IO线程编号 brokerId: Int, val aggregateIdleMeter: Meter, val totalHandlerThreads: AtomicInteger,//线程池中线程数量 val requestChannel: RequestChannel,//请求处理的通道,里面包含了requestQueue apis: KafkaApis,//KafkaApis类,用于真正实现请求处理逻辑的类 time: Time) extends Runnable with Logging { this.logIdent = "[Kafka Request Handler " + id + " on Broker " + brokerId + "], "  //定义一个CountDownLatch对象,便于管理线程的执行 private val shutdownComplete = new CountDownLatch(1) @volatile private var stopped = false ...参数apis、requestChannel和中KafkaRequestHandlerPool的一样:

  • id:这里的id是线程池中的线程序号
  • totalHandlerThreads:线程池中的线程总数

既然KafkaRequestHandler是一个线程类,那么它的工作逻辑就在run()方法中:

def run() { //只要线程未关闭,就不断地进行循环  while (!stopped) { val startSelectTime = time.nanoseconds //从requestQueue中获取一个Request对象 val req = requestChannel.receiveRequest(300) val endTime = time.nanoseconds //统计线程的空闲时间 val idleTime = endTime - startSelectTime //更新线程空闲百分比指标 aggregateIdleMeter.mark(idleTime / totalHandlerThreads.get) //判断Request请求的类型 req match { //如果是关闭线程的请求 case RequestChannel.ShutdownRequest => debug(s"Kafka request handler $id on broker $brokerId received shut down command") //关闭线程 shutdownComplete.countDown() return //如果是普通请求 case request: RequestChannel.Request => try { request.requestDequeueTimeNanos = endTime trace(s"Kafka request handler $id on broker $brokerId handling request $request") //由KafkaApis.handle方法执行相应处理逻辑 apis.handle(request) } catch { //出现严重错误,立即关闭连接 case e: FatalExitError => shutdownComplete.countDown() Exit.exit(e.statusCode) //如果是普通异常,则记录日志 case e: Throwable => error("Exception when handling request", e) } finally { //释放Request占用的缓冲区资源 request.releaseBuffer() } case null => // 继续循环 } } shutdownComplete.countDown()}

整个run()方法的流程图如下:

db3d9f10aa7693383126cda0b16e279a.png

这里最重要的就是调用KafkaApis的handle方法处理请求:

//由KafkaApis.handle方法执行相应处理逻辑apis.handle(request)这个方法会根据不同的请求类型进行最终的逻辑处理,之后封装Response对象并返回给Processor线程的responseQueue队列。

    KafkaApis的处理逻辑较多,下一篇再进行分析。

总结:

请求队列中的Request对象的处理逻辑分以下几步:

  1. 服务端程序启动时,会创建KafkaRequestHandlerPool线程池对象
  2. 构建线程池对象时,根据num.io.threads参数创建KafkaRequestHandler线程,默认创建8个
  3. KafkaRequestHandler线程不断地从requestQueue请求队列中获取请求
  • 如果是普通请求,则调用KafkaApis的handle方法进行处理
  • 如果是关闭线程的请求,则关闭线程
更新线程空闲百分比指标参考资料:https://time.geekbang.org/column/article/233233



推荐阅读
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • 重入锁(ReentrantLock)学习及实现原理
    本文介绍了重入锁(ReentrantLock)的学习及实现原理。在学习synchronized的基础上,重入锁提供了更多的灵活性和功能。文章详细介绍了重入锁的特性、使用方法和实现原理,并提供了类图和测试代码供读者参考。重入锁支持重入和公平与非公平两种实现方式,通过对比和分析,读者可以更好地理解和应用重入锁。 ... [详细]
  • Iamtryingtomakeaclassthatwillreadatextfileofnamesintoanarray,thenreturnthatarra ... [详细]
  • 本文介绍了使用Java实现大数乘法的分治算法,包括输入数据的处理、普通大数乘法的结果和Karatsuba大数乘法的结果。通过改变long类型可以适应不同范围的大数乘法计算。 ... [详细]
  • 本文介绍了Redis的基础数据结构string的应用场景,并以面试的形式进行问答讲解,帮助读者更好地理解和应用Redis。同时,描述了一位面试者的心理状态和面试官的行为。 ... [详细]
  • 本文讨论了使用差分约束系统求解House Man跳跃问题的思路与方法。给定一组不同高度,要求从最低点跳跃到最高点,每次跳跃的距离不超过D,并且不能改变给定的顺序。通过建立差分约束系统,将问题转化为图的建立和查询距离的问题。文章详细介绍了建立约束条件的方法,并使用SPFA算法判环并输出结果。同时还讨论了建边方向和跳跃顺序的关系。 ... [详细]
  • 本文介绍了如何在给定的有序字符序列中插入新字符,并保持序列的有序性。通过示例代码演示了插入过程,以及插入后的字符序列。 ... [详细]
  • 动态规划算法的基本步骤及最长递增子序列问题详解
    本文详细介绍了动态规划算法的基本步骤,包括划分阶段、选择状态、决策和状态转移方程,并以最长递增子序列问题为例进行了详细解析。动态规划算法的有效性依赖于问题本身所具有的最优子结构性质和子问题重叠性质。通过将子问题的解保存在一个表中,在以后尽可能多地利用这些子问题的解,从而提高算法的效率。 ... [详细]
  • 本文介绍了UVALive6575题目Odd and Even Zeroes的解法,使用了数位dp和找规律的方法。阶乘的定义和性质被介绍,并给出了一些例子。其中,部分阶乘的尾零个数为奇数,部分为偶数。 ... [详细]
  • 本文详细介绍了Java中vector的使用方法和相关知识,包括vector类的功能、构造方法和使用注意事项。通过使用vector类,可以方便地实现动态数组的功能,并且可以随意插入不同类型的对象,进行查找、插入和删除操作。这篇文章对于需要频繁进行查找、插入和删除操作的情况下,使用vector类是一个很好的选择。 ... [详细]
  • 猜字母游戏
    猜字母游戏猜字母游戏——设计数据结构猜字母游戏——设计程序结构猜字母游戏——实现字母生成方法猜字母游戏——实现字母检测方法猜字母游戏——实现主方法1猜字母游戏——设计数据结构1.1 ... [详细]
  • [大整数乘法] java代码实现
    本文介绍了使用java代码实现大整数乘法的过程,同时也涉及到大整数加法和大整数减法的计算方法。通过分治算法来提高计算效率,并对算法的时间复杂度进行了研究。详细代码实现请参考文章链接。 ... [详细]
  • 本文讨论了一个数列求和问题,该数列按照一定规律生成。通过观察数列的规律,我们可以得出求解该问题的算法。具体算法为计算前n项i*f[i]的和,其中f[i]表示数列中有i个数字。根据参考的思路,我们可以将算法的时间复杂度控制在O(n),即计算到5e5即可满足1e9的要求。 ... [详细]
  • 李逍遥寻找仙药的迷阵之旅
    本文讲述了少年李逍遥为了救治婶婶的病情,前往仙灵岛寻找仙药的故事。他需要穿越一个由M×N个方格组成的迷阵,有些方格内有怪物,有些方格是安全的。李逍遥需要避开有怪物的方格,并经过最少的方格,找到仙药。在寻找的过程中,他还会遇到神秘人物。本文提供了一个迷阵样例及李逍遥找到仙药的路线。 ... [详细]
  • 本文介绍了Codeforces Round #321 (Div. 2)比赛中的问题Kefa and Dishes,通过状压和spfa算法解决了这个问题。给定一个有向图,求在不超过m步的情况下,能获得的最大权值和。点不能重复走。文章详细介绍了问题的题意、解题思路和代码实现。 ... [详细]
author-avatar
houxue
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有