热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

TomcatNIO模型的实现

Tomcat对BIO和NIO两种模型都进行了实现,其中BIO的实现理解起来比较简单,而NIO的实现就比较复杂了,并且它跟常用的Reactor模型也略有不同,具体设计如下:

Tomcat 对 BIO 和 NIO 两种模型都进行了实现,其中 BIO 的实现理解起来比较简单,而 NIO 的实现就比较复杂了,并且它跟常用的 Reactor 模型也略有不同,具体设计如下:

Tomcat NIO 模型

可以看出多了一个 BlockPoller 的设计,这是因为在 Servlet 规范中 ServletInputStream 和 ServletOutputStream 是阻塞的,所以请求体和响应体的读取和发送需要阻塞处理。请求行读取SSL 握手使用非阻塞的 Poller 处理。一次连接基本的处理流程是:

  • Acceptor 接收 TCP 连接,并将其注册到 Poller 上
  • Poller 发现通道有就绪的 I/O 事件,将事件分配给线程池中的线程处理
  • 线程池线程首先在 Poller 上非阻塞完成请求行和 SSL 握手的处理,然后通过容器调用 Servlet,生成响应,最后如果需要读取请求体或者发送响应,那就会将通道注册到 BlockPoller 上模拟阻塞完成

接下来分析核心代码的实现,源码来自 Tomcat 6.0.53 版本,之所以使用这个版本是因为看起来简单直观没有太多的抽象,也不影响来理解核心的处理逻辑。首先看下连接处理的方法调用情况,可右键直接打开图片查看大图:

Tomcat NIO 方法调用

相关类或接口的功能如下:

  • Acceptor: 阻塞监听和接收通道连接
  • Poller: 事件多路复用器,通知 I/O 事件的发生并分配合适的处理器
  • PollerEvent: 是对通道、SelectionKey 的附加对象和通道关注事件的封装
  • SocketProcessor: 线程池调度单元,它处理 SSL 握手,调用 Handler 解析协议
  • Handler: 通道处理接口,用于适配多种协议,如 HTTP、AJP
  • NioEndpoint: 服务启停初始化的入口
  • NioSelectorPool: 提供一个阻塞读写使用的 Selector 池和一个单例 Selector
  • NioBlockingSelector: 提供阻塞读和写的方法
  • BlockPoller: 与 NioBlockingSelector 配合完成模拟阻塞
  • NioChannel: 封装 SocketChannel 和读写使用的 ByteBuffer
  • KeyAttachment: Key 的附加对象,它包含通道最后访问时间和用于模拟阻塞使用的读写闭锁

1. Acceptor 注册通道到 Poller 上

Acceptor 和 Poller 分属两个不同的线程,通常情况下 Poller 阻塞在 select() 方法的调用上,此方法会锁住内部的 publicKeys 集合,所以 Acceptor 接收到通道连接不能直接注册到 Poller 上,否则会造成死锁。Tomcat 使用生产者-消费者模式来进行并发协作,缓冲区使用的是 ConcurrentLinkedQueue 无界队列。

Acceptor 接收到连接的 SocketChannel 后,将其配置成非阻塞模式,封装成 NioChannel,最后调用 getPoller0().register(NioChannel) 加入到某个 Poller 的事件队列中。

public void register(final NioChannel socket) {
  socket.setPoller(this); // 关联此 Poller
  KeyAttachment key = keyCache.poll();
  final KeyAttachment ka = key!=null?key:new KeyAttachment();
  // 重置或者初始化 KeyAttachment 对象
  ka.reset(this,socket,getSocketProperties().getSoTimeout());
  PollerEvent r = eventCache.poll();
  // 声明此通道关注的事件
  ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
  // 将此通道和 SelectionKey 附件对象封装成 PollerEvent 对象
  if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
  else r.reset(socket,ka,OP_REGISTER);
  // 加入到 Poller 的 events 队列中
  addEvent(r);
}
public void addEvent(Runnable event) {
  events.offer(event); // 插入队列
  if ( wakeupCounter.incrementAndGet() == 0 )
    selector.wakeup(); // 唤醒 Selector
}

Poler 有个 events() 方法,用于遍历事件队列进行处理,events() 会在 select 调用超时或者被唤醒且没有通道发生 I/O 事件时被调用,代码如下:

public boolean events() {
  boolean result = false;
  Runnable r = null;
  // 遍历事件队列
  while ( (r = events.poll()) != null ) {
    result = true;// 有事件待处理
    try {
      r.run(); // 本质调用的是 PollerEvent.run()
      if ( r instanceof PollerEvent ) {
        // 重置并缓存 PollerEvent 对象
        ((PollerEvent)r).reset();
        eventCache.offer((PollerEvent)r); 
      }
    } catch ( Throwable x ) {
      log.error("",x);
    }
  }
  return result;
}

可以看出这里有个关键对象 PollerEvent,它内部有个 interestOps 属性,表示要处理的事件类型,它有三个可能的值分别是:

  • NioEndpoint.OP_REGISTER: 通道注册事件
  • SelectionKey.OP_READ: 通道重新声明在 Poller 上关注读事件
  • SelectionKey.OP_WRITE: 通道重新声明在 Poller 上关注写事件

OP_REGISTER 的处理就是将通道注册到 Selector 上的最终实现,代码如下:

if ( interestOps == OP_REGISTER ) {
  try {
    // 将 SocketChannel 注册到 Poller 的 Selector 上并指定关注的事件和附加对象
    socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, key);
  } catch (Exception x) {
    log.error("", x);
  }
}

至此已完成了通道注册,接下来看一下 PollerEvent 为什么还要处理 OP_READ 和 OP_WRITE 事件。

2. PollerEvent 对 OP_READ 和 OP_WRITE 的处理

PollerEvent(又或者说 Poller)要处理读写事件,就是因为程序需要一次非阻塞的读或写操作。一开始通道是在 Poller 上声明关注的事件,但是在发生 I/O 事件后,Poller 就会把此通道就绪的事件从它关注的事件中移除(原因见下文),所以如果需要非阻塞的读或写,只能再次在这个 Poller 上重新声明。

解析请求行是非阻塞的,解析过程中,由于 TCP 存在粘包/拆包的问题,可能导致数据读取不完整,需要再次从通道读取,此时就要在关联的 Poller 上重新关注读事件,核心代码:

// 拿到通道在 Poller 上对应的 SelectionKey
final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
try {
  boolean cancel = false;
  if (key != null) {
    ...
    // 将 interestOps 合并到 key 现有关注的事件集合中
    int ops = key.interestOps() | interestOps;
    // 更新 key 和 附加对象关注的操作
    att.interestOps(ops);
    key.interestOps(ops);
    att.setCometOps(ops);
  } else {
    cancel = true;
  }
}catch (CancelledKeyException ckx) {}

3. Poller 对 I/O 事件的处理

Poller 就是 Reactor,主要功能是将就绪的 SelectionKey 分配给处理器处理,此外它还检查通道是否超时。它在调用 select 方法时会根据条件确定是阻塞还是非阻塞,代码如下:

if ( !close ) {
  if (wakeupCounter.getAndSet(-1) > 0) {
    // wakeupCounter 大于0,意味着 Acceptor 接收了大量连接,产生大量 PollerEvent 急
    // 需 Poller 消费处理,此时进行一次非阻塞调用
    keyCount = selector.selectNow();// 非阻塞直接返回
  } else {
    // wakeupCounter 等于0,阻塞等待 IO 事件发生或被唤醒
    keyCount = selector.select(selectorTimeout);
  }
  wakeupCounter.set(0);
}

当有通道 I/O 事件就绪时,Poller 将会创建一个 SocketProcessor 提交线程池处理,具体代码不再贴出。在这个过程中有一个将当前就绪的事件从 SelectionKey 中移除的操作,这是为了后续能够在 BlockPoller 上阻塞读写时,防止多个线程的干扰,具体代码如下:

protected void unreg(SelectionKey sk, KeyAttachment attachment, int readyOps) {
  // 取反再与 - 表示从 sk.interestOps() 中清除 readyOps 所在的位
  reg(sk,attachment,sk.interestOps()& (~readyOps));
}
protected void reg(SelectionKey sk, KeyAttachment attachment, int intops) {
  sk.interestOps(intops);
  attachment.interestOps(intops);
  //attachment.setCometOps(intops);
}

检查超时的方法是 Poller.timedout(keyCount, hasEvents),它在 Poller 的每次循环上都被调用,但不是每次都处理超时,因为这会产生过多的负载,而超时可等待几秒钟再超时也没事。Poler 有一个名为 nextExpiration 的成员变量,它表示检查超时的最短时间间隔,在这个时间内,如果只是 select() 调用超时(表示负载不大)会执行处理超时。

4. SocketProcessor 的处理

SocketProcessor 处理 SSL 握手和调用 Handler 进行实际的 I/O 操作。Handler 的子类 Http11ConnectionHandler 会创建 一个 Http11NioProcessor 对象最终处理 Socket,这里不分析具体的协议处理,来看看几种处理结果:

public SocketState process(NioChannel socket) {
  Http11NioProcessor processor = null;
  try {
    processor = connections.remove(socket);
    ...
    SocketState state = processor.process(socket);
    if (state == SocketState.LONG) {
      // 在处理request和生成response之间,保持socket和此processor的关联
      connections.put(socket, processor);
      // 通常是收到了不完整的请求行,再次以 OP_READ 注册到 Poller 上
      socket.getPoller().add(socket);
    } else if (state == SocketState.OPEN) {
      // 长连接,Http 保活,回收 processor
      release(socket, processor);
      // 此时已处理一个完整的请求并响应,再次注册到 Poller 上,等待处理下个请求
      socket.getPoller().add(socket);
    } else if (state == SocketState.SENDFILE) {
      // 处理文件
      connections.put(socket, processor);
    } else {
      // 连接关闭,回收 processor
      release(socket, processor);
    }
    return state;
  } catch (...) {...}
  release(socket, processor);
  return SocketState.CLOSED;
}

5. 模拟阻塞的实现

模拟阻塞是通过 NioBlockingSelector 和 BlockPoller,以及 KeyAttachment 中的两个 CountDownLatch 读写闭锁合作完成。这里分析阻塞读,阻塞写的实现类似。一般的,在读取 POST 请求参数时会使用模拟阻塞完成,来看下 NioBlockingSelector.read() 方法的具体实现:

public int read(ByteBuffer buf, NioChannel socket, long readTimeout) throws IOException {
  // 拿到通道在 Poller 上注册的 key
  SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
  if ( key == null ) throw new IOException("Key no longer registered");
  KeyReference reference = new KeyReference();
  // key 的附加对象
  KeyAttachment att = (KeyAttachment) key.attachment();
  int read = 0; // 读取的字节数
  boolean timedout = false; // 是否超时
  int keycount = 1; //assume we can write 假设通道可读
  long time = System.currentTimeMillis(); //start the timeout timer
  try {
    while ( (!timedout) && read == 0) {
      if (keycount > 0) { //only read if we were registered for a read
        // 尝试读取一次,如果通道无数据可读则返回 0,若连接断开则返回 -1
        int cnt = socket.read(buf);
        if (cnt == -1) throw new EOFException();
        read += cnt;
        if (cnt > 0) break;
      }
      try {
        // 初始化读闭锁
        if ( att.getReadLatch()==null || att.getReadLatch().getCount()==0) att.startReadLatch(1);
        // 将此通道注册到 BlockPoller,关注读取事件
        poller.add(att,SelectionKey.OP_READ, reference);
        // 阻塞等待通道可读
        att.awaitReadLatch(readTimeout,TimeUnit.MILLISECONDS);
      }catch (InterruptedException ignore) {
        Thread.interrupted();
      }
      if ( att.getReadLatch()!=null && att.getReadLatch().getCount()> 0) {
        // 被打断了,但是没有接收到 blockPoller 的提醒
        keycount = 0;
        // 继续循环等待可读
      }else {
        //通道可读,重置读闭锁
        keycount = 1;
        att.resetReadLatch();
      }
      if (readTimeout > 0 && (keycount == 0)) // 如果超时了,则不再读取,抛异常
        timedout = (System.currentTimeMillis() - time) >= readTimeout;
    } //while
    if (timedout)
        throw new SocketTimeoutException();
  } finally {
    poller.remove(att,SelectionKey.OP_READ); // 移除注册
    if (timedout && reference.key!=null) {
        poller.cancelKey(reference.key); // 超时取消
    }
    reference.key = null;
  }
  return read;
}

BlockPoller 实现逻辑与 Poller 大致相同,不同的地方在于对就绪 key 的处理,核心代码如下:

Iterator iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null;
while (run && iterator != null && iterator.hasNext()) {
  SelectionKey sk = iterator.next();
  KeyAttachment attachment = (KeyAttachment)sk.attachment();
  try {
    attachment.access();
    iterator.remove();
    // 移除已就绪的事件
    sk.interestOps(sk.interestOps() & (~sk.readyOps()));
    // 可读或可写时减少对应闭锁的值,此时阻塞在 NioBlockingSelector.read() 上的线程继续执行读取
    if ( sk.isReadable() ) {
        countDown(attachment.getReadLatch());
    }
    if (sk.isWritable()) {
        countDown(attachment.getWriteLatch());
    }
  }catch (CancelledKeyException ckx) {
    if (sk!=null) sk.cancel();
    countDown(attachment.getReadLatch());
    countDown(attachment.getWriteLatch());
  }
}//while

6. 小结

至此,本文对连接的接收、分发以及模拟阻塞的核心代码实现进行了分析,为了更好的理解内部流程,尽可能的使用简洁的代码仿写了这部分功能。

源码地址:https://github.com/tonwu/rxtomcat 位于 rxtomcat-net 模块

7. Tomcat 8.5 版本变化

7.1 替换缓存数据结构

Tomcat 对 PollerEvent、NioChannel 和 Processor 对象进行了缓存,目的是减少 GC 提高系统性能,这是一种用空间换时间,被称为对象池的优化手段。从版本 8.* 开始,缓存数据结构从 ConcurrentLinkedQueue 换成了自定义的同步栈 SynchronizedStack。SynchronizedStack 的 javadoc 明确说明:

当需要创建一个无需缩小的可重用对象池时,这是 ConcurrentLinkedQueue 无 GC 的主要替代方案。目的是尽可能快地以最少的垃圾提供最少的所需功能。

在这个特殊的情况下,ConcurrentLinkedQueue 有很多功能是不需要的,所以就实现了一个有重点的类,可以专注完成一件事,来提升性能。但它不是 ConcurrentLinkedQueue 的替代品。

7.2 LimitLatch

Acceptor 在接收连接前添加了一个 LimitLatch(类似信号量)来控制总连接数。分析下如果不加有什么现象,在极端情况下,线程池没有空闲线程并且它内部的队列已满,当有通道发生可读或可写事件时,Poller 会关闭此通道,此时系统负载已达到最高,如果 Acceptor 还在继续接收连接并请求注册,而不加限制,那么就会一直重复 PollerEvent 入队出队和 Poller 单纯关闭通道的操作,浪费系统资源。


推荐阅读
  • 如何自行分析定位SAP BSP错误
    The“BSPtag”Imentionedintheblogtitlemeansforexamplethetagchtmlb:configCelleratorbelowwhichi ... [详细]
  • 生成式对抗网络模型综述摘要生成式对抗网络模型(GAN)是基于深度学习的一种强大的生成模型,可以应用于计算机视觉、自然语言处理、半监督学习等重要领域。生成式对抗网络 ... [详细]
  • 在Android开发中,使用Picasso库可以实现对网络图片的等比例缩放。本文介绍了使用Picasso库进行图片缩放的方法,并提供了具体的代码实现。通过获取图片的宽高,计算目标宽度和高度,并创建新图实现等比例缩放。 ... [详细]
  • 云原生边缘计算之KubeEdge简介及功能特点
    本文介绍了云原生边缘计算中的KubeEdge系统,该系统是一个开源系统,用于将容器化应用程序编排功能扩展到Edge的主机。它基于Kubernetes构建,并为网络应用程序提供基础架构支持。同时,KubeEdge具有离线模式、基于Kubernetes的节点、群集、应用程序和设备管理、资源优化等特点。此外,KubeEdge还支持跨平台工作,在私有、公共和混合云中都可以运行。同时,KubeEdge还提供数据管理和数据分析管道引擎的支持。最后,本文还介绍了KubeEdge系统生成证书的方法。 ... [详细]
  • CSS3选择器的使用方法详解,提高Web开发效率和精准度
    本文详细介绍了CSS3新增的选择器方法,包括属性选择器的使用。通过CSS3选择器,可以提高Web开发的效率和精准度,使得查找元素更加方便和快捷。同时,本文还对属性选择器的各种用法进行了详细解释,并给出了相应的代码示例。通过学习本文,读者可以更好地掌握CSS3选择器的使用方法,提升自己的Web开发能力。 ... [详细]
  • eclipse学习(第三章:ssh中的Hibernate)——11.Hibernate的缓存(2级缓存,get和load)
    本文介绍了eclipse学习中的第三章内容,主要讲解了ssh中的Hibernate的缓存,包括2级缓存和get方法、load方法的区别。文章还涉及了项目实践和相关知识点的讲解。 ... [详细]
  • sklearn数据集库中的常用数据集类型介绍
    本文介绍了sklearn数据集库中常用的数据集类型,包括玩具数据集和样本生成器。其中详细介绍了波士顿房价数据集,包含了波士顿506处房屋的13种不同特征以及房屋价格,适用于回归任务。 ... [详细]
  • Java太阳系小游戏分析和源码详解
    本文介绍了一个基于Java的太阳系小游戏的分析和源码详解。通过对面向对象的知识的学习和实践,作者实现了太阳系各行星绕太阳转的效果。文章详细介绍了游戏的设计思路和源码结构,包括工具类、常量、图片加载、面板等。通过这个小游戏的制作,读者可以巩固和应用所学的知识,如类的继承、方法的重载与重写、多态和封装等。 ... [详细]
  • 本文详细介绍了GetModuleFileName函数的用法,该函数可以用于获取当前模块所在的路径,方便进行文件操作和读取配置信息。文章通过示例代码和详细的解释,帮助读者理解和使用该函数。同时,还提供了相关的API函数声明和说明。 ... [详细]
  • 本文详细介绍了SQL日志收缩的方法,包括截断日志和删除不需要的旧日志记录。通过备份日志和使用DBCC SHRINKFILE命令可以实现日志的收缩。同时,还介绍了截断日志的原理和注意事项,包括不能截断事务日志的活动部分和MinLSN的确定方法。通过本文的方法,可以有效减小逻辑日志的大小,提高数据库的性能。 ... [详细]
  • GetWindowLong函数
    今天在看一个代码里头写了GetWindowLong(hwnd,0),我当时就有点费解,靠,上网搜索函数原型说明,死活找不到第 ... [详细]
  • 本文介绍了数据库的存储结构及其重要性,强调了关系数据库范例中将逻辑存储与物理存储分开的必要性。通过逻辑结构和物理结构的分离,可以实现对物理存储的重新组织和数据库的迁移,而应用程序不会察觉到任何更改。文章还展示了Oracle数据库的逻辑结构和物理结构,并介绍了表空间的概念和作用。 ... [详细]
  • 本文介绍了Redis的基础数据结构string的应用场景,并以面试的形式进行问答讲解,帮助读者更好地理解和应用Redis。同时,描述了一位面试者的心理状态和面试官的行为。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • 本文详细介绍了在ASP.NET中获取插入记录的ID的几种方法,包括使用SCOPE_IDENTITY()和IDENT_CURRENT()函数,以及通过ExecuteReader方法执行SQL语句获取ID的步骤。同时,还提供了使用这些方法的示例代码和注意事项。对于需要获取表中最后一个插入操作所产生的ID或马上使用刚插入的新记录ID的开发者来说,本文提供了一些有用的技巧和建议。 ... [详细]
author-avatar
乖乖雯莉_775
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有