热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

KafkaSocketServer源代码分析

KafkaSocketServer源代码分析标签:kafka本文将详细分析KafkaSocketServer的相关源码。总体设计KafkaSocketServer是基于JavaNIO来开发的

Kafka SocketServer源代码分析

标签: kafka


本文将详细分析Kafka SocketServer的相关源码。

总体设计

Kafka SocketServer是基于Java NIO来开发的,采用了Reactor的模式,其中包含了1个Acceptor负责接受客户端请求,N个Processor负责读写数据,M个Handler来处理业务逻辑。在Acceptor和Processor,Processor和Handler之间都有队列来缓冲请求。

kafka.network.Acceptor

这个类继承了AbstractServerThread,实现了Runnable接口,因此它是一个线程类。它的主要职责是监听客户端的连接请求,并建立和客户端的数据传输通道,然后为这个客户端指定一个Processor,它的工作就到此结束,这样它就可以去响应下一个客户端的连接请求了。

它的run方法的主要逻辑如下:
a. 首先在ServerSocketChannel上注册OP_ACCEPT事件:

serverChannel.register(selector, SelectionKey.OP_ACCEPT);

b. 然后开始等待客户端的连接请求:

val ready = selector.select(500)

c. 如果有连接进来,则将其分配给当前的processor,并且把当前processor指向下一个processor,也就是说它采用了Round Robin的方式来选择processor

  if(ready > 0) {
val keys = selector.selectedKeys()
val iter = keys.iterator()
while(iter.hasNext && isRunning) {
var key: SelectiOnKey= null
key = iter.next
iter.remove()
if(key.isAcceptable)
accept(key, processors(currentProcessor))

// round robin to the next processor thread
currentProcessor = (currentProcessor + 1) % processors.length
}
}

接下来看看Acceptor的accept方法的简化代码(省掉了异常处理)。先说点相关的知识,SelectionKey是表示一个Channel和Selector的注册关系。在Acceptor中的selector,只有监听客户端连接请求的ServerSocketChannel的OP_ACCEPT事件注册在上面。当selector的select方法返回时,则表示注册在它上面的Channel发生了对应的事件。在Acceptor中,这个事件就是OP_ACCEPT,表示这个ServerSocketChannel的OP_ACCEPT事件发生了。

因此,Acceptor的accept方法的处理逻辑为:首先通过SelectionKey来拿到对应的ServerSocketChannel,并调用其accept方法来建立和客户端的连接,然后拿到对应的SocketChannel并交给了processor。然后Acceptor的任务就完成了,开始去处理下一个客户端的连接请求。Processor的accept方法的逻辑将在下一节介绍。

  def accept(key: SelectionKey, processor: Processor) {
val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
val socketChannel = serverSocketChannel.accept()
socketChannel.configureBlocking(false)
socketChannel.socket().setTcpNoDelay(true)
socketChannel.socket().setSendBufferSize(sendBufferSize)

processor.accept(socketChannel)
}

kafka.network.Processor

Processor也是继承自AbstractServerThread并实现Runnable接口,所以也是一个线程类。它的主要职责是负责从客户端读取数据和将响应返回给客户端,它本身不处理具体的业务逻辑,也就是说它并不认识它从客户端读取回来的数据。每个Processor都有一个Selector,用来监听多个客户端,因此可以非阻塞地处理多个客户端的读写请求。

处理新建立的连接

从上一节中可以看到,Acceptor会把多个客户端的数据连接SocketChannel分配一个Processor,因此每个Processor内部都有一个队列来保存这些新来的数据连接:

  private val newCOnnections= new ConcurrentLinkedQueue[SocketChannel]()

Processor的accpet方法(Acceptor会调用它)的代码如下,它就把一个SocketChannel放到队列中,然后唤醒Processor的selector。

def accept(socketChannel: SocketChannel) {
newConnections.add(socketChannel)
wakeup()
}

需要注意的是:这个方法不是在Processor的线程里面执行的,而是在Acceptor线程里面执行的。

在run方法中,它首先调用方法configureNewConnections,如果有队列中有新的SocketChannel,则它首先将其OP_READ事情注册到该Processor的selector上面。

  private def configureNewConnections() {
while(newConnections.size() > 0) {
val channel = newConnections.poll()
channel.register(selector, SelectionKey.OP_READ)
}
}

读取客户端的数据

在Processor的run方法中,它也是调用selector的select方法来监听客户端的数据请求,简化的代码如下:

  val ready = selector.select();
if(ready > 0) {
val keys = selector.selectedKeys()
val iter = keys.iterator()
while(iter.hasNext && isRunning) {
var key: SelectiOnKey= null
key = iter.next
iter.remove()
if(key.isReadable)
read(key)
}
}

从上面的逻辑中可以看到,当一个客户端数据传输过来,read方法会被调用,下面是read方法的简化代码。

def read(key: SelectionKey) {
val socketChannel = channelFor(key)
var receive = key.attachment.asInstanceOf[Receive]
if(key.attachment == null) {
receive = new BoundedByteBufferReceive(maxRequestSize)
key.attach(receive)
}
val read = receive.readFrom(socketChannel)
if(read <0) {
close(key)
} else if(receive.complete) {
val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address)
requestChannel.sendRequest(req)
key.attach(null)
// explicitly reset interest ops to not READ, no need to wake up the selector just yet
key.interestOps(key.interestOps & (~SelectionKey.OP_READ))
} else {
// more reading to be done
key.interestOps(SelectionKey.OP_READ)
wakeup()
}
}

read方法的流程为:
a. 首先从SelectionKey中拿到对应的SocketChannel,并且取出attach在SelectionKey上的Receive对象,如果是第一次读取,Receive对象为null,则创建一个BoundedByteBufferReceive,由它来处理具体的读数据的逻辑。可以看到每个客户端都有一个Receive对象来读取数据。
b. 如果数据从客户端读取完毕(receive.complete),则将读取的数据封装成Request对象,并添加到requestChannel中去。如果没有读取完毕(可能是客户端还没有发送完或者网络延迟),那么就让selector继续监听这个通道的OP_READ事件。

因此,我们知道具体读取数据是在BoundedByteBufferReceive里面完成的,而读取完成后要交给RequestChannel,接下来我们来看这两部分的代码。

BoundedByteBufferReceive

BoundedByteBufferReceive中有2个ByteBuffer,分别是sizeBuffer和contentBuffer,其中sizeBuffer是固定的4个字节,表示这次发送来的数据总共有多大,随后再读取对应大小的数据放到contentBuffer中。

主要的处理逻辑都是在readFrom这个方法中,简化的代码如下:

  def readFrom(channel: ReadableByteChannel): Int = {
var read = 0
// have we read the request size yet?
if(sizeBuffer.remaining > 0)
read += Utils.read(channel, sizeBuffer)

// have we allocated the request buffer yet?
if(cOntentBuffer== null && !sizeBuffer.hasRemaining) {
sizeBuffer.rewind()
val size = sizeBuffer.getInt()
cOntentBuffer= byteBufferAllocate(size)
}

// if we have a buffer read some stuff into it
if(contentBuffer != null) {
read = Utils.read(channel, contentBuffer)
// did we get everything?
if(!contentBuffer.hasRemaining) {
contentBuffer.rewind()
complete = true
}
}
read
}

首先检查sizeBuffer是不是都读满了,没有的话就从对应的channel中读取数据放到sizeBuffer中,就是下面这句,它会从channel中读取最多等同于sizeBuffer中剩下空间数量的数据。

Utils.read(channel, sizeBuffer)

当sizeBuffer读取完成了,就知道真正的数据有多少了,因此就是按照这个大小来分配contentBuffer了。紧接着就是从channel读取真正的数据放到contentBuffer中,当把contentBuffer读满以后就停止了并把complet标记为true。因此,可以看到客户端在发送数据的时候需要先发送这次要发送数据的大小,然后再发送对应的数据。

这样设计是因为java NIO在从channel中读取数据的时候只能指定读多少,而且数据也不是一次就能全部读取完成的,用这种方式来保证数据都读进来了。

到此为止,我们知道了Processor是如何读取数据的。简而言之,Processor通过selector来监听它负责的那些数据通道,当通道上有数据可读时,它就是把这个事情交给BoundedByteBufferReceive。BoundedByteBufferReceive先读一个int来确定数据量有多少,然后再读取真正的数据。那数据读取进来后又是如何被处理的呢?下一节来分析对应的代码。

kafka.network.RequestChannel

RequestChannel是Processor和Handler交换数据的地方。它包含了一个队列requestQueue用来存放Processor加入的Request,Handler会从里面取出Request来处理;它还为每个Processor开辟了一个respondQueue,用来存放Handler处理了Request后给客户端的Response。下面是一些源码:

初始化requestQueue和responseQueues的代码:

  private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)
private val respOnseQueues= new Array[BlockingQueue[RequestChannel.Response]](numProcessors)
for(i <- 0 until numProcessors)
responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()

sendRequest方法:Processor在读取完数据后,将数据封装成一个Request对象然后调用这个方法将Request添加到requestQueue中。如果requestQueue满的话,这个方法会阻塞在这里直到有Handler取走一个Request。

  def sendRequest(request: RequestChannel.Request) {
requestQueue.put(request)
}

receiveRequest方法:Handler从requestQueue中取出Request,如果队列为空,这个方法会阻塞在这里直到有Processor加入新的Request。

  def receiveRequest(): RequestChannel.Request =
requestQueue.take()

类似的sendResponse和receiveResponse就写在这里,唯一的区别就是添加和取出Response的时候要指定Processor的id因为每个Processor都有其对应的responseQueue。

返回数据给客户端

Processor不仅负责从客户端读取数据,还要将Handler的处理结果返回给客户端。在Processor的run方法(Processor是一个线程类),它会调用processNewResponses()来处理Handler的提供给客户端的Response。简化的代码如下:

  private def processNewResponses() {
var curr = requestChannel.receiveResponse(id)
while(curr != null) {
val key = curr.request.requestKey.asInstanceOf[SelectionKey]
curr.responseAction match {
case RequestChannel.SendAction => {
key.interestOps(SelectionKey.OP_WRITE)
key.attach(curr)
}
}
curr = requestChannel.receiveResponse(id)
}
}

它依次把requestChannel中responseQueue的Response取出来,然后将对应通道的OP_WRITE事件注册到selector上。这和上面的configureNewConnections很类似。

然后当selector的select方法返回时,检查是否有通道是WRITEABLE,如果有则调用Processor中的write方法。在write方法中,Processor又将具体写数据的任务交给了Response中的Send对象。这和读取数据的处理方式非常类似,就不细说了。

到此为止,我们分析了Processor是如何从客户端读取数据的,以及如何将Handler处理后的响应返回给客户端。下一节将简要分析一下Handler。

kafka.server.KafkaRequestHandler

Handler的职责是从requestChannel中的requestQueue取出Request,处理以后再将Response添加到requestChannel中的responseQueue中。 因为Handler是处理具体业务的,所以它可以有不同的实现,或者把具体的处理再外包出去。我们就简要看一下KafkaRequestHandler是如何做的。

KafkaRequestHandler实现了Runnable,因此是个线程类,除去错误处理的代码后,其run方法可以简化为如下代码,它把所有的处理逻辑都交给了KafkaApis:

  def run() {
while(true) {
var req : RequestChannel.Request = requestChannel.receiveRequest(300)
apis.handle(req)
}
}

因为KafkaApis是和具体业务相关,以后再分析相关的代码。

kafka.network.SocketServer

在分析完Acceptor、Processor和Handler之后,整个SocketServer就分析得差不多了。SocketServer这个类就无非是把前面几个类组合在一起。

首先构造出RequestChannel,

val requestChannel = new RequestChannel(numProcessorThreads, maxQueuedRequests)

然后,startup方法中先启动Processors,后启动Acceptor,

for(i <- 0 until numProcessorThreads) {
processors(i) = new Processor(...)
Utils.newThread("kafka-network-thread-%d-%d".format(port, i), processors(i), false).start()
}

// start accepting connections
this.acceptor = new Acceptor(host, port, processors, sendBufferSize, recvBufferSize, quotas)

在shutdown方法中则是先停止Acceptor后停止Processor。

那什么时候启动Handler呢?这和SocketServer真没有什么关系,因为SocketServer是一个底层的通讯设施,以它为基础来构建上层应用的,因此上层应用会创建SocketServer和Handler从而让他们一起工作,那在Kafka里面,这个上层应用在哪里?答案就是kafka.server.KafkaServer的startUp方法中,相关的代码很直白就不写在这里了。

小结

本文详细分析了Kafka中SocketServer中的Acceptor和Processor的主要代码,以及它们是如何在一起构建上层应用的。Kafka采用是经典的Reactor模式,也就是1个Acceptor响应客户端的连接请求,N个Processor来读取数据,从Kafka的实践可见,这种模式可以构建出高性能的服务器。


推荐阅读
  • 深入理解Kafka服务端请求队列中请求的处理
    本文深入分析了Kafka服务端请求队列中请求的处理过程,详细介绍了请求的封装和放入请求队列的过程,以及处理请求的线程池的创建和容量设置。通过场景分析、图示说明和源码分析,帮助读者更好地理解Kafka服务端的工作原理。 ... [详细]
  • 本文介绍了解决Netty拆包粘包问题的一种方法——使用特殊结束符。在通讯过程中,客户端和服务器协商定义一个特殊的分隔符号,只要没有发送分隔符号,就代表一条数据没有结束。文章还提供了服务端的示例代码。 ... [详细]
  • 如何使用Java获取服务器硬件信息和磁盘负载率
    本文介绍了使用Java编程语言获取服务器硬件信息和磁盘负载率的方法。首先在远程服务器上搭建一个支持服务端语言的HTTP服务,并获取服务器的磁盘信息,并将结果输出。然后在本地使用JS编写一个AJAX脚本,远程请求服务端的程序,得到结果并展示给用户。其中还介绍了如何提取硬盘序列号的方法。 ... [详细]
  • 本文介绍了Python爬虫技术基础篇面向对象高级编程(中)中的多重继承概念。通过继承,子类可以扩展父类的功能。文章以动物类层次的设计为例,讨论了按照不同分类方式设计类层次的复杂性和多重继承的优势。最后给出了哺乳动物和鸟类的设计示例,以及能跑、能飞、宠物类和非宠物类的增加对类数量的影响。 ... [详细]
  • 本文介绍了Swing组件的用法,重点讲解了图标接口的定义和创建方法。图标接口用来将图标与各种组件相关联,可以是简单的绘画或使用磁盘上的GIF格式图像。文章详细介绍了图标接口的属性和绘制方法,并给出了一个菱形图标的实现示例。该示例可以配置图标的尺寸、颜色和填充状态。 ... [详细]
  • 本文讨论了在VMWARE5.1的虚拟服务器Windows Server 2008R2上安装oracle 10g客户端时出现的问题,并提供了解决方法。错误日志显示了异常访问违例,通过分析日志中的问题帧,找到了解决问题的线索。文章详细介绍了解决方法,帮助读者顺利安装oracle 10g客户端。 ... [详细]
  • 本文介绍了RxJava在Android开发中的广泛应用以及其在事件总线(Event Bus)实现中的使用方法。RxJava是一种基于观察者模式的异步java库,可以提高开发效率、降低维护成本。通过RxJava,开发者可以实现事件的异步处理和链式操作。对于已经具备RxJava基础的开发者来说,本文将详细介绍如何利用RxJava实现事件总线,并提供了使用建议。 ... [详细]
  • 用Vue实现的Demo商品管理效果图及实现代码
    本文介绍了一个使用Vue实现的Demo商品管理的效果图及实现代码。 ... [详细]
  • 2021最新总结网易/腾讯/CVTE/字节面经分享(附答案解析)
    本文分享作者在2021年面试网易、腾讯、CVTE和字节等大型互联网企业的经历和问题,包括稳定性设计、数据库优化、分布式锁的设计等内容。同时提供了大厂最新面试真题笔记,并附带答案解析。 ... [详细]
  • 讨伐Java多线程与高并发——MQ篇
    本文是学习Java多线程与高并发知识时做的笔记。这部分内容比较多,按照内容分为5个部分:多线程基础篇JUC篇同步容器和并发容器篇线程池篇MQ篇本篇 ... [详细]
  • ConsumerConfiguration在kafka0.9使用JavaConsumer替代了老版本的scalaConsumer。新版的配置如下:bootstrap. ... [详细]
  • druid接入kafka indexing service整个流程
    先介绍下我们的druid集群配置Overload1台Coordinator1台Middlemanager3台Broker3台Historical一共12台,其中cold6台,hot ... [详细]
  • 你知道Kafka和Redis的各自优缺点吗?一文带你优化选择,不走弯路 ... [详细]
  • Java String与StringBuffer的区别及其应用场景
    本文主要介绍了Java中String和StringBuffer的区别,String是不可变的,而StringBuffer是可变的。StringBuffer在进行字符串处理时不生成新的对象,内存使用上要优于String类。因此,在需要频繁对字符串进行修改的情况下,使用StringBuffer更加适合。同时,文章还介绍了String和StringBuffer的应用场景。 ... [详细]
  • 本文介绍了在处理不规则数据时如何使用Python自动提取文本中的时间日期,包括使用dateutil.parser模块统一日期字符串格式和使用datefinder模块提取日期。同时,还介绍了一段使用正则表达式的代码,可以支持中文日期和一些特殊的时间识别,例如'2012年12月12日'、'3小时前'、'在2012/12/13哈哈'等。 ... [详细]
author-avatar
mobiledu2502900597
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有