热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

KafkaBroker源码解析二:API层设计

一、简介版本:1.1.1API层,是一个Facade模式,封装了Kafka所有功能对外提供服务,通过请求中的ApiKeys,进行请求分发,调用对应的API进行处理API层,创建了个

一、简介



  • 版本:1.1.1

  • API层,是一个Facade模式,封装了Kafka所有功能对外提供服务,通过请求中的ApiKeys,进行请求分发,调用对应的API进行处理

  • API层,创建了个线程用于进行逻辑处理、IO操作,所有的API行为均线程中完成


二、整体架构


2.1 核心逻辑


相对于网络层,API层的实现相当简单,只是对底层实现的封装,得益于此,从API层几乎就能了解到Kafka broker所能提供的全部功能




  1. 启动num.io.threads个RequestHandler

  2. 每个RequestHandler都从全局requestQueue中poll Request

  3. 根据Request的ApiKeys调用对应的handle*Api进行实际的业务逻辑处理


2.2 核心类、方法介绍

KafkaRequestHandler
|-- run()
KafkaApis
|-- handle() // 所有请求的入口,根据ApiKeys分发请求

三、核心流程分析


3.1 启动流程


启动流程前半部分和网络层一致,这里不再赘述


// KafkaServer.scala
def startup() {
// 初始化api层
apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
fetchManager, brokerTopicStats, clusterId, time, tokenManager)
// 初始化IO线程池
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
config.numIoThreads)
}
// KafkaRequestHandler.scala
class KafkaRequestHandlerPool(val brokerId: Int,
val requestChannel: RequestChannel,
val apis: KafkaApis,
time: Time,
numThreads: Int) extends Logging with KafkaMetricsGroup {
val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
for (i <- 0 until numThreads) {
createHandler(i)
}

def createHandler(id: Int): Unit = synchronized {
runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time)
KafkaThread.daemon("kafka-request-handler-" + id, runnables(id)).start()
}
}

实际就是实例化API层的入口,再启动num.io.threads个RequestHandler线程,用于实际处理请求,值得注意的是这里的线程池,其实不是真正意义上的线程池,线程数目是固定的,只有通过动态参数改变线程池大小时,才会重新调整线程数目

def resizeThreadPool(newSize: Int): Unit = synchronized {
val currentSize = threadPoolSize.get
info(s"Resizing request handler thread pool size from $currentSize to $newSize")
if (newSize > currentSize) {
// 补上差额的线程
for (i <- currentSize until newSize) {
createHandler(i)
}
} else if (newSize for (i <- 1 to (currentSize - newSize)) {
// 关闭部分线程直到达到目标值
runnables.remove(currentSize - i).stop()
}
}
threadPoolSize.set(newSize)
}

3.2 请求分发流程


RequestHandler线程启动后,每个线程都一直从网络层获取Request再交给KafkaApis进行请求分发


// KafkaRequestHandler.scala
def run() {
while (!stopped) {
// 从全局的阻塞队列里面拉取请求(网络层处理完并反序列化后转换成request)
val req = requestChannel.receiveRequest(300)
req match {
// 关闭命令
case RequestChannel.ShutdownRequest =>
shutdownComplete.countDown()
return
// 正常请求
case request: RequestChannel.Request =>
// 请求路由分发
apis.handle(request)
case null => // continue
}
}
shutdownComplete.countDown()
}
// RequestChannel.scala
private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
def receiveRequest(timeout: Long): RequestChannel.BaseRequest =
requestQueue.poll(timeout, TimeUnit.MILLISECONDS)

// KafkaApis.scala
def handle(request: RequestChannel.Request) {
// 只是根据request中的apiKey分发请求,实际处理逻辑由对应的handle*方法实现
request.header.apiKey match {
case ApiKeys.PRODUCE => handleProduceRequest(request)
case ApiKeys.FETCH => handleFetchRequest(request)
case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
case ApiKeys.METADATA => handleTopicMetadataRequest(request)
case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request)
case ApiKeys.CONTROLLED_SHUTDOWN => handleControlledShutdownRequest(request)
case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
case ApiKeys.API_VERSIOnS=> handleApiVersionsRequest(request)
case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request)
case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request)
case ApiKeys.ADD_PARTITIONS_TO_TXN => handleAddPartitionToTxnRequest(request)
case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request)
case ApiKeys.END_TXN => handleEndTxnRequest(request)
case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request)
case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request)
case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request)
case ApiKeys.CREATE_ACLS => handleCreateAcls(request)
case ApiKeys.DELETE_ACLS => handleDeleteAcls(request)
case ApiKeys.ALTER_COnFIGS=> handleAlterConfigsRequest(request)
case ApiKeys.DESCRIBE_COnFIGS=> handleDescribeConfigsRequest(request)
case ApiKeys.ALTER_REPLICA_LOG_DIRS => handleAlterReplicaLogDirsRequest(request)
case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)
case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
case ApiKeys.CREATE_PARTITIOnS=> handleCreatePartitionsRequest(request)
case ApiKeys.CREATE_DELEGATION_TOKEN => handleCreateTokenRequest(request)
case ApiKeys.RENEW_DELEGATION_TOKEN => handleRenewTokenRequest(request)
case ApiKeys.EXPIRE_DELEGATION_TOKEN => handleExpireTokenRequest(request)
case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request)
case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request)
}
}


  1. RequestHandler从全局阻塞队列获取网络层组装完的Request,并调用KafkaApi的handle方法

  2. handle方法类似web开发中的dispatch,根据ApiKeys调用对应的handle*执行实际业务逻辑

可以看到API层只是对执行线程、根据ApiKeys进行请求路由的封装。实际逻辑由KafkaApis类中的handle*方法实现,而其中最核心的方法为PRODUCE与FETCH请求,分别对应消息的生产与消费,下面的文章将专门针对这两种API进行源码分析


同类文章



  • Kafka Broker源码解析一:网络层设计

  • Kafka Broker源码解析三:PRODUCE请求 TODO

  • Kafka Broker源码解析四:FETCH请求求 TODO


推荐阅读
  • 本文介绍了SPOJ2829题目的解法及优化方法。题目要求找出满足一定条件的数列,并对结果取模。文章详细解释了解题思路和算法实现,并提出了使用FMT优化的方法。最后,对于第三个限制条件,作者给出了处理方法。文章最后给出了代码实现。 ... [详细]
  • 深入理解Kafka服务端请求队列中请求的处理
    本文深入分析了Kafka服务端请求队列中请求的处理过程,详细介绍了请求的封装和放入请求队列的过程,以及处理请求的线程池的创建和容量设置。通过场景分析、图示说明和源码分析,帮助读者更好地理解Kafka服务端的工作原理。 ... [详细]
  • QuestionThereareatotalofncoursesyouhavetotake,labeledfrom0ton-1.Somecoursesmayhaveprerequi ... [详细]
  • 本文介绍了lua语言中闭包的特性及其在模式匹配、日期处理、编译和模块化等方面的应用。lua中的闭包是严格遵循词法定界的第一类值,函数可以作为变量自由传递,也可以作为参数传递给其他函数。这些特性使得lua语言具有极大的灵活性,为程序开发带来了便利。 ... [详细]
  • HDU 2372 El Dorado(DP)的最长上升子序列长度求解方法
    本文介绍了解决HDU 2372 El Dorado问题的一种动态规划方法,通过循环k的方式求解最长上升子序列的长度。具体实现过程包括初始化dp数组、读取数列、计算最长上升子序列长度等步骤。 ... [详细]
  • 本文介绍了在SpringBoot中集成thymeleaf前端模版的配置步骤,包括在application.properties配置文件中添加thymeleaf的配置信息,引入thymeleaf的jar包,以及创建PageController并添加index方法。 ... [详细]
  • 知识图谱——机器大脑中的知识库
    本文介绍了知识图谱在机器大脑中的应用,以及搜索引擎在知识图谱方面的发展。以谷歌知识图谱为例,说明了知识图谱的智能化特点。通过搜索引擎用户可以获取更加智能化的答案,如搜索关键词"Marie Curie",会得到居里夫人的详细信息以及与之相关的历史人物。知识图谱的出现引起了搜索引擎行业的变革,不仅美国的微软必应,中国的百度、搜狗等搜索引擎公司也纷纷推出了自己的知识图谱。 ... [详细]
  • 本文详细介绍了Linux中进程控制块PCBtask_struct结构体的结构和作用,包括进程状态、进程号、待处理信号、进程地址空间、调度标志、锁深度、基本时间片、调度策略以及内存管理信息等方面的内容。阅读本文可以更加深入地了解Linux进程管理的原理和机制。 ... [详细]
  • 后台获取视图对应的字符串
    1.帮助类后台获取视图对应的字符串publicclassViewHelper{将View输出为字符串(注:不会执行对应的ac ... [详细]
  • 《数据结构》学习笔记3——串匹配算法性能评估
    本文主要讨论串匹配算法的性能评估,包括模式匹配、字符种类数量、算法复杂度等内容。通过借助C++中的头文件和库,可以实现对串的匹配操作。其中蛮力算法的复杂度为O(m*n),通过随机取出长度为m的子串作为模式P,在文本T中进行匹配,统计平均复杂度。对于成功和失败的匹配分别进行测试,分析其平均复杂度。详情请参考相关学习资源。 ... [详细]
  • 本文介绍了通过ABAP开发往外网发邮件的需求,并提供了配置和代码整理的资料。其中包括了配置SAP邮件服务器的步骤和ABAP写发送邮件代码的过程。通过RZ10配置参数和icm/server_port_1的设定,可以实现向Sap User和外部邮件发送邮件的功能。希望对需要的开发人员有帮助。摘要长度:184字。 ... [详细]
  • 高质量SQL书写的30条建议
    本文提供了30条关于优化SQL的建议,包括避免使用select *,使用具体字段,以及使用limit 1等。这些建议是基于实际开发经验总结出来的,旨在帮助读者优化SQL查询。 ... [详细]
  • CentOS 7部署KVM虚拟化环境之一架构介绍
    本文介绍了CentOS 7部署KVM虚拟化环境的架构,详细解释了虚拟化技术的概念和原理,包括全虚拟化和半虚拟化。同时介绍了虚拟机的概念和虚拟化软件的作用。 ... [详细]
  • CentOS 6.5安装VMware Tools及共享文件夹显示问题解决方法
    本文介绍了在CentOS 6.5上安装VMware Tools及解决共享文件夹显示问题的方法。包括清空CD/DVD使用的ISO镜像文件、创建挂载目录、改变光驱设备的读写权限等步骤。最后给出了拷贝解压VMware Tools的操作。 ... [详细]
  • 什么是大数据lambda架构
    一、什么是Lambda架构Lambda架构由Storm的作者[NathanMarz]提出,根据维基百科的定义,Lambda架构的设计是为了在处理大规模数 ... [详细]
author-avatar
杀手也热血_949
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有