热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Netty源码解析:EventLoop之Reactor模型

Reactor模型的核心思想:将关注的IO事件注册到多路复用器上,一旦有IO事件触发,将事件分发到事件处理器中,执行就绪I

 Reactor 模型的核心思想:

将关注的 I/O 事件注册到多路复用器上,一旦有 I/O 事件触发,将事件分发到事件处理器中,执行就绪 I/O 事件对应的处理函数中。模型中有三个重要的组件:

  • 多路复用器:由操作系统提供接口,Linux 提供的 I/O 复用接口有select、poll、epoll 。
  • 事件分离器:将多路复用器返回的就绪事件分发到事件处理器中。
  • 事件处理器:处理就绪事件处理函数。

单 Reactor 单线程模型

 

示例代码: 

/**
* 等待事件到来,分发事件处理
*/
class Reactor implements Runnable {
​private Reactor() throws Exception {SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);// attach Acceptor 处理新连接sk.attach(new Acceptor());}

​ @Overridepublic void run() {try {while (!Thread.interrupted()) {selector.select();Set selected = selector.selectedKeys();Iterator it = selected.iterator();while (it.hasNext()) {it.remove();//分发事件处理dispatch((SelectionKey) (it.next()));}}} catch (IOException ex) {//do something}}
​void dispatch(SelectionKey k) {// 若是连接事件获取是acceptor// 若是IO读写事件获取是handlerRunnable runnable = (Runnable) (k.attachment());if (runnable != null) {runnable.run();}}

}

Reactor 线程,负责多路分离套接字。

  • 有新连接到来触发 OP_ACCEPT 事件之后, 交由 Acceptor 进行处理。
  • 有 IO 读写事件之后,交给 Handler 处理。

Acceptor 主要任务是构造 Handler 。

  • 在获取到 Client 相关的 SocketChannel 之后,绑定到相应的 Handler 上。
  • 对应的 SocketChannel 有读写事件之后,基于 Reactor 分发,Handler 就可以处理了。

注意,所有的 IO 事件都绑定到 Selector 上,由 Reactor 统一分发

该模型适用于处理器链中业务处理组件能快速完成的场景。不过,这种单线程模型不能充分利用多核资源,所以实际使用的不多

 

单 Reactor 多线程模型

相对于第一种单线程的模式来说,在处理业务逻辑,也就是获取到 IO 的读写事件之后,交由线程池来处理,这样可以减小主 Reactor 的性能开销,从而更专注的做事件分发工作了,从而提升整个应用的吞吐。

/**
* 多线程处理读写业务逻辑
*/
class MultiThreadHandler implements Runnable {public static final int READING = 0, WRITING = 1;int state;final SocketChannel socket;final SelectionKey sk;
​//多线程处理业务逻辑ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

​public MultiThreadHandler(SocketChannel socket, Selector sl) throws Exception {this.state = READING;this.socket = socket;sk = socket.register(selector, SelectionKey.OP_READ);sk.attach(this);socket.configureBlocking(false);}
​@Overridepublic void run() {if (state == READING) {read();} else if (state == WRITING) {write();}}
​private void read() {//任务异步处理executorService.submit(() -> process());
​//下一步处理写事件sk.interestOps(SelectionKey.OP_WRITE);this.state = WRITING;}
​private void write() {//任务异步处理executorService.submit(() -> process());
​//下一步处理读事件sk.interestOps(SelectionKey.OP_READ);this.state = READING;}
​/*** task 业务处理*/public void process() {//do IO ,task,queue something}
}

  • 在 #read() 和 #write() 方法中,提交 executorService 线程池,进行处理。

 

多 Reactor 多线程模型

 

第三种模型比起第二种模型,是将 Reactor 分成两部分:

  1. mainReactor 负责监听 ServerSocketChannel ,用来处理客户端新连接的建立,并将建立的客户端的 SocketChannel 指定注册给 subReactor 。
  2. subReactor 维护自己的 Selector ,基于 mainReactor 建立的客户端的 SocketChannel 多路分离 IO 读写事件,读写网络数据。对于业务处理的功能,另外扔给 worker 线程池来完成。

MultiWorkThreadAcceptor 示例代码如下:

/**
* 多work 连接事件Acceptor,处理连接事件
*/
class MultiWorkThreadAcceptor implements Runnable {
​// cpu线程数相同多work线程int workCount = Runtime.getRuntime().availableProcessors();SubReactor[] workThreadHandlers = new SubReactor[workCount];volatile int nextHandler = 0;
​public MultiWorkThreadAcceptor() {this.init();}
​public void init() {nextHandler = 0;for (int i = 0; i ​@Overridepublic void run() {try {SocketChannel c = serverSocket.accept();if (c != null) {// 注册读写synchronized (c) {// 顺序获取SubReactor,然后注册channel SubReactor work = workThreadHandlers[nextHandler];work.registerChannel(c);nextHandler++;if (nextHandler >= workThreadHandlers.length) {nextHandler = 0;}}}} catch (Exception e) {}}
}

SubReactor 示例代码如下:

/**
* 多work线程处理读写业务逻辑
*/
class SubReactor implements Runnable {final Selector mySelector;
​//多线程处理业务逻辑int workCount =Runtime.getRuntime().availableProcessors();ExecutorService executorService = Executors.newFixedThreadPool(workCount);

​public SubReactor() throws Exception {// 每个SubReactor 一个selector this.mySelector = SelectorProvider.provider().openSelector();}
​/*** 注册chanel** @param sc* @throws Exception*/public void registerChannel(SocketChannel sc) throws Exception {sc.register(mySelector, SelectionKey.OP_READ | SelectionKey.OP_CONNECT);}
​@Overridepublic void run() {while (true) {try {//每个SubReactor 自己做事件分派处理读写事件selector.select();Set keys = selector.selectedKeys();Iterator iterator = keys.iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();iterator.remove();if (key.isReadable()) {read();} else if (key.isWritable()) {write();}}
​} catch (Exception e) {
​}}}
​private void read() {//任务异步处理executorService.submit(() -> process());}
​private void write() {//任务异步处理executorService.submit(() -> process());}
​/*** task 业务处理*/public void process() {//do IO ,task,queue something}
}

 

从代码中,我们可以看到:

  1. mainReactor 主要用来处理网络 IO 连接建立操作,通常,mainReactor 只需要一个,因为它一个线程就可以处理。
  2. subReactor 主要和建立起来的客户端的 SocketChannel 做数据交互和事件业务处理操作。通常,subReactor 的个数和 CPU 个数相等,每个 subReactor 独占一个线程来处理。


此种模式中,每个模块的工作更加专一,耦合度更低,性能和稳定性也大大的提升,支持的可并发客户端数量可达到上百万级别。

关于此种模式的应用,目前有很多优秀的框架已经在应用,比如 Mina 和 Netty 等等。上述中去掉线程池的第三种形式的变种,也是 Netty NIO 的默认模式

 

 

 

 

 

 

 

 

 

 

 

 

 


推荐阅读
  • Java太阳系小游戏分析和源码详解
    本文介绍了一个基于Java的太阳系小游戏的分析和源码详解。通过对面向对象的知识的学习和实践,作者实现了太阳系各行星绕太阳转的效果。文章详细介绍了游戏的设计思路和源码结构,包括工具类、常量、图片加载、面板等。通过这个小游戏的制作,读者可以巩固和应用所学的知识,如类的继承、方法的重载与重写、多态和封装等。 ... [详细]
  • 阿,里,云,物,联网,net,core,客户端,czgl,aliiotclient, ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • JDK源码学习之HashTable(附带面试题)的学习笔记
    本文介绍了JDK源码学习之HashTable(附带面试题)的学习笔记,包括HashTable的定义、数据类型、与HashMap的关系和区别。文章提供了干货,并附带了其他相关主题的学习笔记。 ... [详细]
  • 如何自行分析定位SAP BSP错误
    The“BSPtag”Imentionedintheblogtitlemeansforexamplethetagchtmlb:configCelleratorbelowwhichi ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • JavaSE笔试题-接口、抽象类、多态等问题解答
    本文解答了JavaSE笔试题中关于接口、抽象类、多态等问题。包括Math类的取整数方法、接口是否可继承、抽象类是否可实现接口、抽象类是否可继承具体类、抽象类中是否可以有静态main方法等问题。同时介绍了面向对象的特征,以及Java中实现多态的机制。 ... [详细]
  • 本文介绍了Web学习历程记录中关于Tomcat的基本概念和配置。首先解释了Web静态Web资源和动态Web资源的概念,以及C/S架构和B/S架构的区别。然后介绍了常见的Web服务器,包括Weblogic、WebSphere和Tomcat。接着详细讲解了Tomcat的虚拟主机、web应用和虚拟路径映射的概念和配置过程。最后简要介绍了http协议的作用。本文内容详实,适合初学者了解Tomcat的基础知识。 ... [详细]
  • 前景:当UI一个查询条件为多项选择,或录入多个条件的时候,比如查询所有名称里面包含以下动态条件,需要模糊查询里面每一项时比如是这样一个数组条件:newstring[]{兴业银行, ... [详细]
  • 先看官方文档TheJavaTutorialshavebeenwrittenforJDK8.Examplesandpracticesdescribedinthispagedontta ... [详细]
  • 欢乐的票圈重构之旅——RecyclerView的头尾布局增加
    项目重构的Git地址:https:github.comrazerdpFriendCircletreemain-dev项目同步更新的文集:http:www.jianshu.comno ... [详细]
  • Android系统源码分析Zygote和SystemServer启动过程详解
    本文详细解析了Android系统源码中Zygote和SystemServer的启动过程。首先介绍了系统framework层启动的内容,帮助理解四大组件的启动和管理过程。接着介绍了AMS、PMS等系统服务的作用和调用方式。然后详细分析了Zygote的启动过程,解释了Zygote在Android启动过程中的决定作用。最后通过时序图展示了整个过程。 ... [详细]
  • 基于Socket的多个客户端之间的聊天功能实现方法
    本文介绍了基于Socket的多个客户端之间实现聊天功能的方法,包括服务器端的实现和客户端的实现。服务器端通过每个用户的输出流向特定用户发送消息,而客户端通过输入流接收消息。同时,还介绍了相关的实体类和Socket的基本概念。 ... [详细]
  • 重入锁(ReentrantLock)学习及实现原理
    本文介绍了重入锁(ReentrantLock)的学习及实现原理。在学习synchronized的基础上,重入锁提供了更多的灵活性和功能。文章详细介绍了重入锁的特性、使用方法和实现原理,并提供了类图和测试代码供读者参考。重入锁支持重入和公平与非公平两种实现方式,通过对比和分析,读者可以更好地理解和应用重入锁。 ... [详细]
author-avatar
小猪朱一凡
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有