热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Jetty启动与连接源码分析

Jetty示例代码importorg.eclipse.jetty.server.Request;importorg.eclipse.jetty.server.Server;i

Jetty示例代码

import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.AbstractHandler;import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;/*** Created by zhuqiuhui on 2017/6/14.*/
public class HelloWorld extends AbstractHandler
{
public void handle(String target,Request baseRequest,HttpServletRequest request,HttpServletResponse response)throws IOException, ServletException{response.setContentType("text/html;charset=utf-8");response.setStatus(HttpServletResponse.SC_OK);baseRequest.setHandled(true);response.getWriter().println("

Hello World

");}public static void main(String[] args) throws Exception{Server server = new Server(8080);server.setHandler(new HelloWorld());server.start(); // 启动过程System.out.println("print");server.join();}
}

Jetty启动过程

Created with Raphaël 2.1.0

ServerServer

AbstractLifeCycleAbstractLifeCycle

QueuedThreadPoolQueuedThreadPool

HandlerWrapperHandlerWrapper

AbstractHandlerAbstractHandler

SelectChannelConnectorSelectChannelConnector

SelectorManagerSelectorManager

AbstractConnectorAbstractConnector

调用父类中的start方法

调用Server的doStart方法(新建QueuedThreadPool)

调用QueuedThreadPool的dostart方法初始化线程池(开启8个线程,默认值为8)

success

调用doStart方法

调用doStart方法

success

success

调用doStart方法

调用doStart方法(_threadPool.dispatch(new Acceptor(i))

success

调用doStart方法

success

调用dipatch()方法,启动线程等待连接

success

success

success


其中SelectChannelConnector类中doStart方法:

protected void doStart() throws Exception {//设置selectSet的数量,也就是每一个监听器都一个selector _manager.setSelectSets(getAcceptors());//设置最大空闲时间_manager.setMaxIdleTime(getMaxIdleTime());_manager.setLowResourcesConnections(getLowResourcesConnections());_manager.setLowResourcesMaxIdleTime(getLowResourcesMaxIdleTime());// 启动manager _manager.start(); // 初始化super.doStart(); // 此处启动一个线程,循环接受连接// 放在QueuedThreadPool执行,并启动了一个线程_manager.dispatch(new Runnable() {public void run() {final ServerSocketChannel server=_acceptChannel;while (isRunning() && _acceptChannel==server && server.isOpen()) {try { // 等待连接SocketChannel channel = server.accept();// 分配了一个 socket ,并将其设置为 nonblockingchannel.configureBlocking(false);Socket socket = channel.socket();configure(socket);_manager.register(channel);} catch(IOException e) {Log.ignore(e);}} // while} // run});
}public boolean dispatch(Runnable task) {return getThreadPool().dispatch(task);
}

SelectorManager 类的 register 方法

public void register(SocketChannel channel) {// The ++ increment here is not atomic, but it does not matter.// so long as the value changes sometimes, then connections will// be distributed over the available sets.int s=_set++; s=s%_selectSets;SelectSet[] sets=_selectSet;if (sets!=null){SelectSet set=sets[s];set.addChange(channel);set.wakeup(); //wakeup用于唤醒阻塞在select方法上的线程}}

QueuedThreadPool类

public boolean dispatch(Runnable job){if (isRunning()){final int jobQ &#61; _jobs.size();final int idle &#61; getIdleThreads();if(_jobs.offer(job)){// If we had no idle threads or the jobQ is greater than the idle threadsif (idle&#61;&#61;0 || jobQ>idle){int threads&#61;_threadsStarted.get();if (threads<_maxThreads)startThread(threads); //启动}return true;}}return false;
}

从线程池中固定分配了一个线程专门用于等待新连接&#xff0c;没有请求来时&#xff0c;该线程是阻塞在 accept () 方法上。

AbstractorConnector类

&#64;Overrideprotected void doStart() throws Exception{if (_server &#61;&#61; null)throw new IllegalStateException("No server");// open listener portopen();super.doStart();if (_threadPool &#61;&#61; null)_threadPool &#61; _server.getThreadPool();if (_threadPool !&#61; _server.getThreadPool() && (_threadPool instanceof LifeCycle))((LifeCycle)_threadPool).start();// Start selector threadsynchronized (this){_acceptorThread &#61; new Thread[getAcceptors()];for (int i &#61; 0; i <_acceptorThread.length; i&#43;&#43;){if (!_threadPool.dispatch(new Acceptor(i))) // 新建Acceptor{Log.warn("insufficient maxThreads configured for {}",this);break;}}}Log.info("Started {}",this);}private class Acceptor implements Runnable{int _acceptor &#61; 0;Acceptor(int id){_acceptor &#61; id;}/* ------------------------------------------------------------ */public void run(){Thread current &#61; Thread.currentThread();String name;synchronized (AbstractConnector.this){if (_acceptorThread &#61;&#61; null)return;_acceptorThread[_acceptor] &#61; current;name &#61; _acceptorThread[_acceptor].getName();current.setName(name &#43; " - Acceptor" &#43; _acceptor &#43; " " &#43; AbstractConnector.this);}int old_priority &#61; current.getPriority();try{current.setPriority(old_priority - _acceptorPriorityOffset);while (isRunning() && getConnection() !&#61; null){try{accept(_acceptor); // 启动时调用&#xff0c;即SelectChannelConnector.accept方法-->SelectorManager.doSelect方法-->SelectSet.doSelect方法中。此时没有连接&#xff0c;while一直处于循环中}catch (EofException e){Log.ignore(e);}catch (IOException e){Log.ignore(e);}catch (InterruptedException x){// Connector has been stoppedLog.ignore(x);}catch (ThreadDeath e){throw e;}catch (Throwable e){Log.warn(e);}}}finally{current.setPriority(old_priority);current.setName(name);try{if (_acceptor &#61;&#61; 0)close();}catch (IOException e){Log.warn(e);}synchronized (AbstractConnector.this){if (_acceptorThread !&#61; null)_acceptorThread[_acceptor] &#61; null;}}}}

Jetty接受HTTP连接过程

当有新的连接时&#xff0c;SelectChannelConnector中的线程开始进行&#xff0c;并把当前SocketChannel加入到SelectSet中&#xff0c;同时SelectSet中的doSelect方法&#xff08;SelectorManager类&#xff09;接收连接&#xff0c;处理流程在该方法中。

SelectChannelConnector类中doStart方法&#xff1a;

protected void doStart() throws Exception {//设置selectSet的数量&#xff0c;也就是每一个监听器都一个selector _manager.setSelectSets(getAcceptors());//设置最大空闲时间_manager.setMaxIdleTime(getMaxIdleTime());_manager.setLowResourcesConnections(getLowResourcesConnections());_manager.setLowResourcesMaxIdleTime(getLowResourcesMaxIdleTime());// 启动manager _manager.start(); // 初始化super.doStart();// 放在QueuedThreadPool执行&#xff0c;并启动了一个线程_manager.dispatch(new Runnable() {public void run() {final ServerSocketChannel server&#61;_acceptChannel;while (isRunning() && _acceptChannel&#61;&#61;server && server.isOpen()) {try { // 等待连接SocketChannel channel &#61; server.accept();// 分配了一个 socket &#xff0c;并将其设置为 nonblockingchannel.configureBlocking(false);Socket socket &#61; channel.socket();configure(socket);_manager.register(channel);} catch(IOException e) {Log.ignore(e);}} // while} // run});
}

SelectorManager 类的 register 方法

public void register(SocketChannel channel) {// The &#43;&#43; increment here is not atomic, but it does not matter.// so long as the value changes sometimes, then connections will// be distributed over the available sets.int s&#61;_set&#43;&#43;; s&#61;s%_selectSets;SelectSet[] sets&#61;_selectSet;if (sets!&#61;null){SelectSet set&#61;sets[s];set.addChange(channel);set.wakeup(); //wakeup用于唤醒阻塞在select方法上的线程-----唤醒}}

最重要的还是SelectSet中的doSelect方法&#xff08;SelectorManager类&#xff09;&#xff0c;如下&#xff1a;

public void doSelect() throws IOException{try{_selecting&#61;Thread.currentThread();final Selector selector&#61;_selector;// Make any key changes requiredObject change;int changes&#61;_changes.size();while (changes-->0 && (change&#61;_changes.poll())!&#61;null){try{if (change instanceof EndPoint){// Update the operations for a key.SelectChannelEndPoint endpoint &#61; (SelectChannelEndPoint)change;endpoint.doUpdateKey();}else if (change instanceof ChannelAndAttachment){// finish accepting/connecting this connectionfinal ChannelAndAttachment asc &#61; (ChannelAndAttachment)change;final SelectableChannel channel&#61;asc._channel;final Object att &#61; asc._attachment;SelectionKey key &#61; channel.register(selector,SelectionKey.OP_READ,att);SelectChannelEndPoint endpoint &#61; createEndPoint((SocketChannel)channel,key);key.attach(endpoint);endpoint.schedule();}else if (change instanceof SocketChannel){// Newly registered channel// 执行这里final SocketChannel channel&#61;(SocketChannel)change;SelectionKey key &#61; channel.register(selector,SelectionKey.OP_READ,null);SelectChannelEndPoint endpoint &#61; createEndPoint(channel,key);key.attach(endpoint);endpoint.schedule();}else if (change instanceof Closer){((Closer)change).close();}else if (change instanceof Runnable){dispatch((Runnable)change);}elsethrow new IllegalArgumentException(change.toString());}catch (Exception e){if (isRunning())Log.warn(e);elseLog.debug(e);}}// Do and instant select to see if any connections can be handled.// 如果有连接&#xff0c;获取已经准备好的连接int selected&#61;selector.selectNow();_selects&#43;&#43;;long now&#61;System.currentTimeMillis();// if no immediate things to doif (selected&#61;&#61;0){// If we are in pausing mode// 无连接时_pausing为falseif (_pausing){try{Thread.sleep(__BUSY_PAUSE); // pause to reduce impact of busy loop}catch(InterruptedException e){Log.ignore(e);}now&#61;System.currentTimeMillis();}// workout how long to wait in select_timeout.setNow(now);long to_next_timeout&#61;_timeout.getTimeToNext();// 无连接&#xff0c;wait是400long wait &#61; _changes.size()&#61;&#61;0?__IDLE_TICK:0L; if (wait > 0 && to_next_timeout >&#61; 0 && wait > to_next_timeout)wait &#61; to_next_timeout;// If we should wait with a selectif (wait>0){long before&#61;now;selected&#61;selector.select(wait);_selects&#43;&#43;;now &#61; System.currentTimeMillis();_timeout.setNow(now);checkJvmBugs(before, now, wait, selected);}}// have we been destroyed while sleepingif (_selector&#61;&#61;null || !selector.isOpen())return;// Look for things to do 有连接时执行for (SelectionKey key: selector.selectedKeys()){ try{if (!key.isValid()){key.cancel();SelectChannelEndPoint endpoint &#61; (SelectChannelEndPoint)key.attachment();if (endpoint !&#61; null)endpoint.doUpdateKey();continue;}Object att &#61; key.attachment();if (att instanceof SelectChannelEndPoint){((SelectChannelEndPoint)att).schedule();}else{// Wrap readable registered channel in an endpointSocketChannel channel &#61; (SocketChannel)key.channel();SelectChannelEndPoint endpoint &#61; createEndPoint(channel,key);key.attach(endpoint);if (key.isReadable())endpoint.schedule(); }key &#61; null;}catch (CancelledKeyException e){Log.ignore(e);}catch (Exception e){if (isRunning())Log.warn(e);elseLog.ignore(e);if (key !&#61; null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())key.cancel();}}// Everything always handledselector.selectedKeys().clear();now&#61;System.currentTimeMillis();_timeout.setNow(now);Task task &#61; _timeout.expired();while (task!&#61;null){if (task instanceof Runnable)dispatch((Runnable)task);task &#61; _timeout.expired();}// Idle tickif (now-_idleTick>__IDLE_TICK){_idleTick&#61;now;final long idle_now&#61;((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections))?(now&#43;_maxIdleTime-_lowResourcesMaxIdleTime):now;dispatch(new Runnable(){public void run(){for (SelectChannelEndPoint endp:_endPoints.keySet()){endp.checkIdleTimestamp(idle_now);}}});}}catch (CancelledKeyException e){Log.ignore(e);}finally{_selecting&#61;null;}}

处理连接过程&#xff08;公有内部类SelectSet的doSelect方法&#xff0c;针对于SelectChannelConnector连接&#xff09;时序图如下&#xff1a;

Created with Raphaël 2.1.0SelectorManager(doSelect方法)SelectorManager(doSelect方法)

SelectChannelConnectorSelectChannelConnector

SelectChannelEndPointSelectChannelEndPoint

newEndPoint方法&#xff0c;创建一个SelectChannelEndPoint(内部创建一个connection)

success

调用schedule方法执行handle方法&#xff0c;相当于将交给selectorManager来处理&#xff0c;其实就是交给线程池里面去运行&#xff0c;其实是执行httpconnection的handle的方法



Created with Raphaël 2.1.0ServerServer

QueuedThreadPoolQueuedThreadPool

SelectChannelEndPointSelectChannelEndPoint

HttpConnectionHttpConnection

HttpParserHttpParser

HandlerWrapperHandlerWrapper

HelloWorldHelloWorld

start()方法

run()方法

handle()方法

handle()方法

parseAvailable()方法

parseNext()方法

headerComplete()方法

handleRequest()方法

handle(HttpConnection connection)方法

handle方法

handle方法

success

success

success

success

success

success

success



&#xff08;1&#xff09;SocketConnector 来创建 ConnectorEndPoint 表示正在请求的连接&#xff0c;同时创建 HttpConnection 用来表示这个连接是一个 HTTP 协议的连接
&#xff08;2&#xff09;HttpConnection 会创建 HttpParse 类解析 HTTP 协议&#xff0c;并且会创建符合 HTTP 协议的 Request 和 Response 对象。接下去就是将这个线程交给队列线程池去执行了。

1.QueuedThreadPool对象的start()方法启动线程池&#xff0c;调用dispatch()方法分发任务。

2.Jetty的Handler组件用来处理接收到的请求。实现这个接口的类用来处理请求、过滤请求和生成响应内容。

3.selector.selectNow()&#xff1a;Selects a set of keys whose corresponding channels are ready for I/O operations.

4.对于客户端的每次连接&#xff0c;Connector都会创建相应的EndPoint来表示该连接&#xff0c;一般在创建EndPoint的同时会同时创建Connection&#xff0c;这里EndPoint用于和Socket打交道&#xff0c;而Connection用于在从Socket中读取到数据后的处理逻辑以及生成响应数据的处理逻辑。

总结

​ Acceptor 线程有很多个 ( 全部来自于线程池&#xff0c;并且固定分配出来&#xff0c;基于 jetty.xml 配置中的Acceptors 配置数量 ) &#xff0c;每个线程都维护了一个 SelectSet, 每个 SelectSet 又对应了一个 Selector, 这些线程会检测当前是否有任务来&#xff0c;例如检测 changes 队列中是否有任务&#xff0c;有并且是新连接&#xff0c;那么就迅速建立一个endpoint 点负责管理这个 socket &#xff0c;并注册 read 事件&#xff0c;后续该 selector 就会负责该连接的 read 事件监听。 对于连接很多的情况&#xff0c;这里分很多个 Selector 来分别监听&#xff0c;提高了效率。

​ 当数据发送过来时&#xff0c; Selector 检测到 read 事件&#xff0c;会立马调用 endpoint 的 schedule() 方法&#xff0c;该方法目的就是从线程池分配一个 worker 线程专门来处理这个 read 事件&#xff0c;而自己却立马返回继续监听&#xff0c;可见&#xff0c;这里也是一个高效的处理方式。

​ 业务线程分配成功后&#xff0c;负责请求的读取以及解析&#xff0c;如果请求是完整的&#xff0c;那么就开始调用 Server 的 handle方法 (server 本身就是一个 handler) &#xff0c;开始 handler 的处理&#xff0c;最后调用到 SerlvetHandler &#xff0c;最终交给Servlet 、 Filter &#xff0c;开始了我们的自己应用。
这里写图片描述


推荐阅读
  • Java太阳系小游戏分析和源码详解
    本文介绍了一个基于Java的太阳系小游戏的分析和源码详解。通过对面向对象的知识的学习和实践,作者实现了太阳系各行星绕太阳转的效果。文章详细介绍了游戏的设计思路和源码结构,包括工具类、常量、图片加载、面板等。通过这个小游戏的制作,读者可以巩固和应用所学的知识,如类的继承、方法的重载与重写、多态和封装等。 ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • 关键词:Golang, Cookie, 跟踪位置, net/http/cookiejar, package main, golang.org/x/net/publicsuffix, io/ioutil, log, net/http, net/http/cookiejar ... [详细]
  • [大整数乘法] java代码实现
    本文介绍了使用java代码实现大整数乘法的过程,同时也涉及到大整数加法和大整数减法的计算方法。通过分治算法来提高计算效率,并对算法的时间复杂度进行了研究。详细代码实现请参考文章链接。 ... [详细]
  • 本文介绍了使用Java实现大数乘法的分治算法,包括输入数据的处理、普通大数乘法的结果和Karatsuba大数乘法的结果。通过改变long类型可以适应不同范围的大数乘法计算。 ... [详细]
  • 开发笔记:加密&json&StringIO模块&BytesIO模块
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了加密&json&StringIO模块&BytesIO模块相关的知识,希望对你有一定的参考价值。一、加密加密 ... [详细]
  • 本文主要解析了Open judge C16H问题中涉及到的Magical Balls的快速幂和逆元算法,并给出了问题的解析和解决方法。详细介绍了问题的背景和规则,并给出了相应的算法解析和实现步骤。通过本文的解析,读者可以更好地理解和解决Open judge C16H问题中的Magical Balls部分。 ... [详细]
  • JavaSE笔试题-接口、抽象类、多态等问题解答
    本文解答了JavaSE笔试题中关于接口、抽象类、多态等问题。包括Math类的取整数方法、接口是否可继承、抽象类是否可实现接口、抽象类是否可继承具体类、抽象类中是否可以有静态main方法等问题。同时介绍了面向对象的特征,以及Java中实现多态的机制。 ... [详细]
  • 本文介绍了一种划分和计数油田地块的方法。根据给定的条件,通过遍历和DFS算法,将符合条件的地块标记为不符合条件的地块,并进行计数。同时,还介绍了如何判断点是否在给定范围内的方法。 ... [详细]
  • Python正则表达式学习记录及常用方法
    本文记录了学习Python正则表达式的过程,介绍了re模块的常用方法re.search,并解释了rawstring的作用。正则表达式是一种方便检查字符串匹配模式的工具,通过本文的学习可以掌握Python中使用正则表达式的基本方法。 ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • 本文详细介绍了Java中vector的使用方法和相关知识,包括vector类的功能、构造方法和使用注意事项。通过使用vector类,可以方便地实现动态数组的功能,并且可以随意插入不同类型的对象,进行查找、插入和删除操作。这篇文章对于需要频繁进行查找、插入和删除操作的情况下,使用vector类是一个很好的选择。 ... [详细]
  • Linux环境变量函数getenv、putenv、setenv和unsetenv详解
    本文详细解释了Linux中的环境变量函数getenv、putenv、setenv和unsetenv的用法和功能。通过使用这些函数,可以获取、设置和删除环境变量的值。同时给出了相应的函数原型、参数说明和返回值。通过示例代码演示了如何使用getenv函数获取环境变量的值,并打印出来。 ... [详细]
  • 本文介绍了PE文件结构中的导出表的解析方法,包括获取区段头表、遍历查找所在的区段等步骤。通过该方法可以准确地解析PE文件中的导出表信息。 ... [详细]
author-avatar
飞天兔g_653
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有