Jetty示例代码
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.AbstractHandler;import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;/*** Created by zhuqiuhui on 2017/6/14.*/
public class HelloWorld extends AbstractHandler
{public void handle(String target,Request baseRequest,HttpServletRequest request,HttpServletResponse response)throws IOException, ServletException{response.setContentType("text/html;charset=utf-8");response.setStatus(HttpServletResponse.SC_OK);baseRequest.setHandled(true);response.getWriter().println("Hello World
");}public static void main(String[] args) throws Exception{Server server = new Server(8080);server.setHandler(new HelloWorld());server.start(); // 启动过程System.out.println("print");server.join();}
}
Jetty启动过程
其中SelectChannelConnector类中doStart方法:
protected void doStart() throws Exception {//设置selectSet的数量,也就是每一个监听器都一个selector _manager.setSelectSets(getAcceptors());//设置最大空闲时间_manager.setMaxIdleTime(getMaxIdleTime());_manager.setLowResourcesConnections(getLowResourcesConnections());_manager.setLowResourcesMaxIdleTime(getLowResourcesMaxIdleTime());// 启动manager _manager.start(); // 初始化super.doStart(); // 此处启动一个线程,循环接受连接// 放在QueuedThreadPool执行,并启动了一个线程_manager.dispatch(new Runnable() {public void run() {final ServerSocketChannel server=_acceptChannel;while (isRunning() && _acceptChannel==server && server.isOpen()) {try { // 等待连接SocketChannel channel = server.accept();// 分配了一个 socket ,并将其设置为 nonblockingchannel.configureBlocking(false);Socket socket = channel.socket();configure(socket);_manager.register(channel);} catch(IOException e) {Log.ignore(e);}} // while} // run});
}public boolean dispatch(Runnable task) {return getThreadPool().dispatch(task);
}
SelectorManager 类的 register 方法
public void register(SocketChannel channel) {// The ++ increment here is not atomic, but it does not matter.// so long as the value changes sometimes, then connections will// be distributed over the available sets.int s=_set++; s=s%_selectSets;SelectSet[] sets=_selectSet;if (sets!=null){SelectSet set=sets[s];set.addChange(channel);set.wakeup(); //wakeup用于唤醒阻塞在select方法上的线程}}
QueuedThreadPool类
public boolean dispatch(Runnable job){if (isRunning()){final int jobQ &#61; _jobs.size();final int idle &#61; getIdleThreads();if(_jobs.offer(job)){// If we had no idle threads or the jobQ is greater than the idle threadsif (idle&#61;&#61;0 || jobQ>idle){int threads&#61;_threadsStarted.get();if (threads<_maxThreads)startThread(threads); //启动}return true;}}return false;
}
从线程池中固定分配了一个线程专门用于等待新连接&#xff0c;没有请求来时&#xff0c;该线程是阻塞在 accept () 方法上。
AbstractorConnector类
&#64;Overrideprotected void doStart() throws Exception{if (_server &#61;&#61; null)throw new IllegalStateException("No server");// open listener portopen();super.doStart();if (_threadPool &#61;&#61; null)_threadPool &#61; _server.getThreadPool();if (_threadPool !&#61; _server.getThreadPool() && (_threadPool instanceof LifeCycle))((LifeCycle)_threadPool).start();// Start selector threadsynchronized (this){_acceptorThread &#61; new Thread[getAcceptors()];for (int i &#61; 0; i <_acceptorThread.length; i&#43;&#43;){if (!_threadPool.dispatch(new Acceptor(i))) // 新建Acceptor{Log.warn("insufficient maxThreads configured for {}",this);break;}}}Log.info("Started {}",this);}private class Acceptor implements Runnable{int _acceptor &#61; 0;Acceptor(int id){_acceptor &#61; id;}/* ------------------------------------------------------------ */public void run(){Thread current &#61; Thread.currentThread();String name;synchronized (AbstractConnector.this){if (_acceptorThread &#61;&#61; null)return;_acceptorThread[_acceptor] &#61; current;name &#61; _acceptorThread[_acceptor].getName();current.setName(name &#43; " - Acceptor" &#43; _acceptor &#43; " " &#43; AbstractConnector.this);}int old_priority &#61; current.getPriority();try{current.setPriority(old_priority - _acceptorPriorityOffset);while (isRunning() && getConnection() !&#61; null){try{accept(_acceptor); // 启动时调用&#xff0c;即SelectChannelConnector.accept方法-->SelectorManager.doSelect方法-->SelectSet.doSelect方法中。此时没有连接&#xff0c;while一直处于循环中}catch (EofException e){Log.ignore(e);}catch (IOException e){Log.ignore(e);}catch (InterruptedException x){// Connector has been stoppedLog.ignore(x);}catch (ThreadDeath e){throw e;}catch (Throwable e){Log.warn(e);}}}finally{current.setPriority(old_priority);current.setName(name);try{if (_acceptor &#61;&#61; 0)close();}catch (IOException e){Log.warn(e);}synchronized (AbstractConnector.this){if (_acceptorThread !&#61; null)_acceptorThread[_acceptor] &#61; null;}}}}
Jetty接受HTTP连接过程
当有新的连接时&#xff0c;SelectChannelConnector中的线程开始进行&#xff0c;并把当前SocketChannel加入到SelectSet中&#xff0c;同时SelectSet中的doSelect方法&#xff08;SelectorManager类&#xff09;接收连接&#xff0c;处理流程在该方法中。
SelectChannelConnector类中doStart方法&#xff1a;
protected void doStart() throws Exception {//设置selectSet的数量&#xff0c;也就是每一个监听器都一个selector _manager.setSelectSets(getAcceptors());//设置最大空闲时间_manager.setMaxIdleTime(getMaxIdleTime());_manager.setLowResourcesConnections(getLowResourcesConnections());_manager.setLowResourcesMaxIdleTime(getLowResourcesMaxIdleTime());// 启动manager _manager.start(); // 初始化super.doStart();// 放在QueuedThreadPool执行&#xff0c;并启动了一个线程_manager.dispatch(new Runnable() {public void run() {final ServerSocketChannel server&#61;_acceptChannel;while (isRunning() && _acceptChannel&#61;&#61;server && server.isOpen()) {try { // 等待连接SocketChannel channel &#61; server.accept();// 分配了一个 socket &#xff0c;并将其设置为 nonblockingchannel.configureBlocking(false);Socket socket &#61; channel.socket();configure(socket);_manager.register(channel);} catch(IOException e) {Log.ignore(e);}} // while} // run});
}
SelectorManager 类的 register 方法
public void register(SocketChannel channel) {// The &#43;&#43; increment here is not atomic, but it does not matter.// so long as the value changes sometimes, then connections will// be distributed over the available sets.int s&#61;_set&#43;&#43;; s&#61;s%_selectSets;SelectSet[] sets&#61;_selectSet;if (sets!&#61;null){SelectSet set&#61;sets[s];set.addChange(channel);set.wakeup(); //wakeup用于唤醒阻塞在select方法上的线程-----唤醒}}
最重要的还是SelectSet中的doSelect方法&#xff08;SelectorManager类&#xff09;&#xff0c;如下&#xff1a;
public void doSelect() throws IOException{try{_selecting&#61;Thread.currentThread();final Selector selector&#61;_selector;// Make any key changes requiredObject change;int changes&#61;_changes.size();while (changes-->0 && (change&#61;_changes.poll())!&#61;null){try{if (change instanceof EndPoint){// Update the operations for a key.SelectChannelEndPoint endpoint &#61; (SelectChannelEndPoint)change;endpoint.doUpdateKey();}else if (change instanceof ChannelAndAttachment){// finish accepting/connecting this connectionfinal ChannelAndAttachment asc &#61; (ChannelAndAttachment)change;final SelectableChannel channel&#61;asc._channel;final Object att &#61; asc._attachment;SelectionKey key &#61; channel.register(selector,SelectionKey.OP_READ,att);SelectChannelEndPoint endpoint &#61; createEndPoint((SocketChannel)channel,key);key.attach(endpoint);endpoint.schedule();}else if (change instanceof SocketChannel){// Newly registered channel// 执行这里final SocketChannel channel&#61;(SocketChannel)change;SelectionKey key &#61; channel.register(selector,SelectionKey.OP_READ,null);SelectChannelEndPoint endpoint &#61; createEndPoint(channel,key);key.attach(endpoint);endpoint.schedule();}else if (change instanceof Closer){((Closer)change).close();}else if (change instanceof Runnable){dispatch((Runnable)change);}elsethrow new IllegalArgumentException(change.toString());}catch (Exception e){if (isRunning())Log.warn(e);elseLog.debug(e);}}// Do and instant select to see if any connections can be handled.// 如果有连接&#xff0c;获取已经准备好的连接int selected&#61;selector.selectNow();_selects&#43;&#43;;long now&#61;System.currentTimeMillis();// if no immediate things to doif (selected&#61;&#61;0){// If we are in pausing mode// 无连接时_pausing为falseif (_pausing){try{Thread.sleep(__BUSY_PAUSE); // pause to reduce impact of busy loop}catch(InterruptedException e){Log.ignore(e);}now&#61;System.currentTimeMillis();}// workout how long to wait in select_timeout.setNow(now);long to_next_timeout&#61;_timeout.getTimeToNext();// 无连接&#xff0c;wait是400long wait &#61; _changes.size()&#61;&#61;0?__IDLE_TICK:0L; if (wait > 0 && to_next_timeout >&#61; 0 && wait > to_next_timeout)wait &#61; to_next_timeout;// If we should wait with a selectif (wait>0){long before&#61;now;selected&#61;selector.select(wait);_selects&#43;&#43;;now &#61; System.currentTimeMillis();_timeout.setNow(now);checkJvmBugs(before, now, wait, selected);}}// have we been destroyed while sleepingif (_selector&#61;&#61;null || !selector.isOpen())return;// Look for things to do 有连接时执行for (SelectionKey key: selector.selectedKeys()){ try{if (!key.isValid()){key.cancel();SelectChannelEndPoint endpoint &#61; (SelectChannelEndPoint)key.attachment();if (endpoint !&#61; null)endpoint.doUpdateKey();continue;}Object att &#61; key.attachment();if (att instanceof SelectChannelEndPoint){((SelectChannelEndPoint)att).schedule();}else{// Wrap readable registered channel in an endpointSocketChannel channel &#61; (SocketChannel)key.channel();SelectChannelEndPoint endpoint &#61; createEndPoint(channel,key);key.attach(endpoint);if (key.isReadable())endpoint.schedule(); }key &#61; null;}catch (CancelledKeyException e){Log.ignore(e);}catch (Exception e){if (isRunning())Log.warn(e);elseLog.ignore(e);if (key !&#61; null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())key.cancel();}}// Everything always handledselector.selectedKeys().clear();now&#61;System.currentTimeMillis();_timeout.setNow(now);Task task &#61; _timeout.expired();while (task!&#61;null){if (task instanceof Runnable)dispatch((Runnable)task);task &#61; _timeout.expired();}// Idle tickif (now-_idleTick>__IDLE_TICK){_idleTick&#61;now;final long idle_now&#61;((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections))?(now&#43;_maxIdleTime-_lowResourcesMaxIdleTime):now;dispatch(new Runnable(){public void run(){for (SelectChannelEndPoint endp:_endPoints.keySet()){endp.checkIdleTimestamp(idle_now);}}});}}catch (CancelledKeyException e){Log.ignore(e);}finally{_selecting&#61;null;}}
处理连接过程&#xff08;公有内部类SelectSet的doSelect方法&#xff0c;针对于SelectChannelConnector连接&#xff09;时序图如下&#xff1a;
&#xff08;1&#xff09;SocketConnector 来创建 ConnectorEndPoint 表示正在请求的连接&#xff0c;同时创建 HttpConnection 用来表示这个连接是一个 HTTP 协议的连接
&#xff08;2&#xff09;HttpConnection 会创建 HttpParse 类解析 HTTP 协议&#xff0c;并且会创建符合 HTTP 协议的 Request 和 Response 对象。接下去就是将这个线程交给队列线程池去执行了。
1.QueuedThreadPool对象的start()方法启动线程池&#xff0c;调用dispatch()方法分发任务。
2.Jetty的Handler组件用来处理接收到的请求。实现这个接口的类用来处理请求、过滤请求和生成响应内容。
3.selector.selectNow()&#xff1a;Selects a set of keys whose corresponding channels are ready for I/O operations.
4.对于客户端的每次连接&#xff0c;Connector都会创建相应的EndPoint来表示该连接&#xff0c;一般在创建EndPoint的同时会同时创建Connection&#xff0c;这里EndPoint用于和Socket打交道&#xff0c;而Connection用于在从Socket中读取到数据后的处理逻辑以及生成响应数据的处理逻辑。
总结
Acceptor 线程有很多个 ( 全部来自于线程池&#xff0c;并且固定分配出来&#xff0c;基于 jetty.xml 配置中的Acceptors 配置数量 ) &#xff0c;每个线程都维护了一个 SelectSet, 每个 SelectSet 又对应了一个 Selector, 这些线程会检测当前是否有任务来&#xff0c;例如检测 changes 队列中是否有任务&#xff0c;有并且是新连接&#xff0c;那么就迅速建立一个endpoint 点负责管理这个 socket &#xff0c;并注册 read 事件&#xff0c;后续该 selector 就会负责该连接的 read 事件监听。 对于连接很多的情况&#xff0c;这里分很多个 Selector 来分别监听&#xff0c;提高了效率。
当数据发送过来时&#xff0c; Selector 检测到 read 事件&#xff0c;会立马调用 endpoint 的 schedule() 方法&#xff0c;该方法目的就是从线程池分配一个 worker 线程专门来处理这个 read 事件&#xff0c;而自己却立马返回继续监听&#xff0c;可见&#xff0c;这里也是一个高效的处理方式。
业务线程分配成功后&#xff0c;负责请求的读取以及解析&#xff0c;如果请求是完整的&#xff0c;那么就开始调用 Server 的 handle方法 (server 本身就是一个 handler) &#xff0c;开始 handler 的处理&#xff0c;最后调用到 SerlvetHandler &#xff0c;最终交给Servlet 、 Filter &#xff0c;开始了我们的自己应用。