Jetty启动与连接源码分析

Jetty示例代码

import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.AbstractHandler;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;

/** * Created by zhuqiuhui on 2017/6/14. */
public class HelloWorld extends AbstractHandler {
    public void handle(String target,
                       Request baseRequest,
                       HttpServletRequest request,
                       HttpServletResponse response)
            throws IOException, ServletException
    {
        response.setContentType("text/html;charset=utf-8");
        response.setStatus(HttpServletResponse.SC_OK);
        baseRequest.setHandled(true);
        response.getWriter().println("<h1>Hello World</h1>");
    }

    public static void main(String[] args) throws Exception
    {
        Server server = new Server(8080);
        server.setHandler(new HelloWorld());

        server.start(); // 启动过程
        System.out.println("print");
        server.join();
    }
}

Jetty启动过程

Created with Raphaël 2.1.0 Server Server AbstractLifeCycle AbstractLifeCycle QueuedThreadPool QueuedThreadPool HandlerWrapper HandlerWrapper AbstractHandler AbstractHandler SelectChannelConnector SelectChannelConnector SelectorManager SelectorManager AbstractConnector AbstractConnector 调用父类中的start方法 调用Server的doStart方法(新建QueuedThreadPool) 调用QueuedThreadPool的dostart方法初始化线程池(开启8个线程,默认值为8) success 调用doStart方法 调用doStart方法 success success 调用doStart方法 调用doStart方法(_threadPool.dispatch(new Acceptor(i)) success 调用doStart方法 success 调用dipatch()方法,启动线程等待连接 success success success

其中SelectChannelConnector类中doStart方法:

protected void doStart() throws Exception {
        //设置selectSet的数量,也就是每一个监听器都一个selector 
        _manager.setSelectSets(getAcceptors());
        //设置最大空闲时间
        _manager.setMaxIdleTime(getMaxIdleTime());
        _manager.setLowResourcesConnections(getLowResourcesConnections());
        _manager.setLowResourcesMaxIdleTime(getLowResourcesMaxIdleTime());
        // 启动manager 
        _manager.start(); // 初始化
        super.doStart(); // 此处启动一个线程,循环接受连接
       // 放在QueuedThreadPool执行,并启动了一个线程
        _manager.dispatch(new Runnable() {
            public void run() {
                final ServerSocketChannel server=_acceptChannel;
                while (isRunning() && _acceptChannel==server && server.isOpen()) {
                    try {   // 等待连接
                        SocketChannel channel = server.accept();
                       // 分配了一个 socket ,并将其设置为 nonblocking
                        channel.configureBlocking(false);
                        Socket socket = channel.socket();
                        configure(socket);
                        _manager.register(channel);
                    } catch(IOException e) {
                        Log.ignore(e);
                    }
                } // while
            } // run
        });
}

public boolean dispatch(Runnable task) {
     return getThreadPool().dispatch(task);
}

SelectorManager 类的 register 方法

public void register(SocketChannel channel) {
        // The ++ increment here is not atomic, but it does not matter.
        // so long as the value changes sometimes, then connections will
        // be distributed over the available sets.

        int s=_set++; 
        s=s%_selectSets;
        SelectSet[] sets=_selectSet;
        if (sets!=null)
        {
            SelectSet set=sets[s];
            set.addChange(channel);
            set.wakeup(); //wakeup用于唤醒阻塞在select方法上的线程
        }
 }

QueuedThreadPool类

public boolean dispatch(Runnable job){
        if (isRunning())
        {
            final int jobQ = _jobs.size();
            final int idle = getIdleThreads();
            if(_jobs.offer(job))
            {
                // If we had no idle threads or the jobQ is greater than the idle threads
                if (idle==0 || jobQ>idle)
                {
                    int threads=_threadsStarted.get();
                    if (threads<_maxThreads)
                        startThread(threads); //启动
                }
                return true;
            }
        }
        return false;
}

从线程池中固定分配了一个线程专门用于等待新连接,没有请求来时,该线程是阻塞在 accept () 方法上。

AbstractorConnector类

@Override
    protected void doStart() throws Exception
    {
        if (_server == null)
            throw new IllegalStateException("No server");

        // open listener port
        open();

        super.doStart();

        if (_threadPool == null)
            _threadPool = _server.getThreadPool();
        if (_threadPool != _server.getThreadPool() && (_threadPool instanceof LifeCycle))
            ((LifeCycle)_threadPool).start();

        // Start selector thread
        synchronized (this)
        {
            _acceptorThread = new Thread[getAcceptors()];

            for (int i = 0; i < _acceptorThread.length; i++)
            {
                if (!_threadPool.dispatch(new Acceptor(i))) // 新建Acceptor
                {
                    Log.warn("insufficient maxThreads configured for {}",this);
                    break;
                }
            }
        }

        Log.info("Started {}",this);
    }


private class Acceptor implements Runnable {
        int _acceptor = 0;

        Acceptor(int id)
        {
            _acceptor = id;
        }

        /* ------------------------------------------------------------ */
        public void run()
        {
            Thread current = Thread.currentThread();
            String name;
            synchronized (AbstractConnector.this)
            {
                if (_acceptorThread == null)
                    return;

                _acceptorThread[_acceptor] = current;
                name = _acceptorThread[_acceptor].getName();
                current.setName(name + " - Acceptor" + _acceptor + " " + AbstractConnector.this);
            }
            int old_priority = current.getPriority();

            try
            {
                current.setPriority(old_priority - _acceptorPriorityOffset);
                while (isRunning() && getConnection() != null)
                {
                    try
                    {
                        accept(_acceptor); // 启动时调用,即SelectChannelConnector.accept方法-->SelectorManager.doSelect方法-->SelectSet.doSelect方法中。此时没有连接,while一直处于循环中
                    }
                    catch (EofException e)
                    {
                        Log.ignore(e);
                    }
                    catch (IOException e)
                    {
                        Log.ignore(e);
                    }
                    catch (InterruptedException x)
                    {
                        // Connector has been stopped
                        Log.ignore(x);
                    }
                    catch (ThreadDeath e)
                    {
                        throw e;
                    }
                    catch (Throwable e)
                    {
                        Log.warn(e);
                    }
                }
            }
            finally
            {
                current.setPriority(old_priority);
                current.setName(name);
                try
                {
                    if (_acceptor == 0)
                        close();
                }
                catch (IOException e)
                {
                    Log.warn(e);
                }

                synchronized (AbstractConnector.this)
                {
                    if (_acceptorThread != null)
                        _acceptorThread[_acceptor] = null;
                }
            }
        }
    }

Jetty接受HTTP连接过程

当有新的连接时,SelectChannelConnector中的线程开始进行,并把当前SocketChannel加入到SelectSet中,同时SelectSet中的doSelect方法(SelectorManager类)接收连接,处理流程在该方法中。

SelectChannelConnector类中doStart方法:

protected void doStart() throws Exception {
        //设置selectSet的数量,也就是每一个监听器都一个selector 
        _manager.setSelectSets(getAcceptors());
        //设置最大空闲时间
        _manager.setMaxIdleTime(getMaxIdleTime());
        _manager.setLowResourcesConnections(getLowResourcesConnections());
        _manager.setLowResourcesMaxIdleTime(getLowResourcesMaxIdleTime());
        // 启动manager 
        _manager.start(); // 初始化
        super.doStart();
       // 放在QueuedThreadPool执行,并启动了一个线程
        _manager.dispatch(new Runnable() {
            public void run() {
                final ServerSocketChannel server=_acceptChannel;
                while (isRunning() && _acceptChannel==server && server.isOpen()) {
                    try {   // 等待连接
                        SocketChannel channel = server.accept();
                       // 分配了一个 socket ,并将其设置为 nonblocking
                        channel.configureBlocking(false);
                        Socket socket = channel.socket();
                        configure(socket);
                        _manager.register(channel);
                    } catch(IOException e) {
                        Log.ignore(e);
                    }
                } // while
            } // run
        });
}

SelectorManager 类的 register 方法

public void register(SocketChannel channel) {
        // The ++ increment here is not atomic, but it does not matter.
        // so long as the value changes sometimes, then connections will
        // be distributed over the available sets.

        int s=_set++; 
        s=s%_selectSets;
        SelectSet[] sets=_selectSet;
        if (sets!=null)
        {
            SelectSet set=sets[s];
            set.addChange(channel);
            set.wakeup(); //wakeup用于唤醒阻塞在select方法上的线程-----唤醒
        }
 }

最重要的还是SelectSet中的doSelect方法(SelectorManager类),如下:

public void doSelect() throws IOException
        {
            try
            {
                _selecting=Thread.currentThread();
                final Selector selector=_selector;

                // Make any key changes required
                Object change;
                int changes=_changes.size();
                while (changes-->0 && (change=_changes.poll())!=null)
                {
                    try
                    {
                        if (change instanceof EndPoint)
                        {
                            // Update the operations for a key.
                            SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change;
                            endpoint.doUpdateKey();
                        }
                        else if (change instanceof ChannelAndAttachment)
                        {
                            // finish accepting/connecting this connection
                            final ChannelAndAttachment asc = (ChannelAndAttachment)change;
                            final SelectableChannel channel=asc._channel;
                            final Object att = asc._attachment;
                            SelectionKey key = channel.register(selector,SelectionKey.OP_READ,att);
                            SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key);
                            key.attach(endpoint);
                            endpoint.schedule();
                        }
                        else if (change instanceof SocketChannel)
                        {
                            // Newly registered channel
                            // 执行这里
                            final SocketChannel channel=(SocketChannel)change;
                            SelectionKey key = channel.register(selector,SelectionKey.OP_READ,null);
                            SelectChannelEndPoint endpoint = createEndPoint(channel,key);
                            key.attach(endpoint);
                            endpoint.schedule();
                        }
                        else if (change instanceof Closer)
                        {
                            ((Closer)change).close();
                        }
                        else if (change instanceof Runnable)
                        {
                            dispatch((Runnable)change);
                        }
                        else
                            throw new IllegalArgumentException(change.toString());
                    }
                    catch (Exception e)
                    {
                        if (isRunning())
                            Log.warn(e);
                        else
                            Log.debug(e);
                    }
                }


                // Do and instant select to see if any connections can be handled.
                // 如果有连接,获取已经准备好的连接
                int selected=selector.selectNow();
                _selects++;

                long now=System.currentTimeMillis();

                // if no immediate things to do
                if (selected==0)
                {
                    // If we are in pausing mode
                    // 无连接时_pausing为false
                    if (_pausing)
                    {
                        try
                        {
                            Thread.sleep(__BUSY_PAUSE); // pause to reduce impact of busy loop
                        }
                        catch(InterruptedException e)
                        {
                            Log.ignore(e);
                        }
                        now=System.currentTimeMillis();
                    }

                    // workout how long to wait in select
                    _timeout.setNow(now);
                    long to_next_timeout=_timeout.getTimeToNext();

                    // 无连接,wait是400
                    long wait = _changes.size()==0?__IDLE_TICK:0L;  
                    if (wait > 0 && to_next_timeout >= 0 && wait > to_next_timeout)
                        wait = to_next_timeout;

                    // If we should wait with a select
                    if (wait>0)
                    {
                        long before=now;
                        selected=selector.select(wait);
                        _selects++;
                        now = System.currentTimeMillis();
                        _timeout.setNow(now);
                        checkJvmBugs(before, now, wait, selected);
                    }
                }

                // have we been destroyed while sleeping
                if (_selector==null || !selector.isOpen())
                    return;

                // Look for things to do 有连接时执行
                for (SelectionKey key: selector.selectedKeys())
                {   
                    try
                    {
                        if (!key.isValid())
                        {
                            key.cancel();
                            SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
                            if (endpoint != null)
                                endpoint.doUpdateKey();
                            continue;
                        }

                        Object att = key.attachment();
                        if (att instanceof SelectChannelEndPoint)
                        {
                            ((SelectChannelEndPoint)att).schedule();
                        }
                        else
                        {
                            // Wrap readable registered channel in an endpoint
                            SocketChannel channel = (SocketChannel)key.channel();
                            SelectChannelEndPoint endpoint = createEndPoint(channel,key);
                            key.attach(endpoint);
                            if (key.isReadable())
                                endpoint.schedule();                           
                        }
                        key = null;
                    }
                    catch (CancelledKeyException e)
                    {
                        Log.ignore(e);
                    }
                    catch (Exception e)
                    {
                        if (isRunning())
                            Log.warn(e);
                        else
                            Log.ignore(e);

                        if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
                            key.cancel();
                    }
                }

                // Everything always handled
                selector.selectedKeys().clear();

                now=System.currentTimeMillis();
                _timeout.setNow(now);
                Task task = _timeout.expired();
                while (task!=null)
                {
                    if (task instanceof Runnable)
                        dispatch((Runnable)task);
                    task = _timeout.expired();
                }

                // Idle tick
                if (now-_idleTick>__IDLE_TICK)
                {
                    _idleTick=now;

                    final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections))
                        ?(now+_maxIdleTime-_lowResourcesMaxIdleTime)
                        :now;

                    dispatch(new Runnable()
                    {
                        public void run()
                        {
                            for (SelectChannelEndPoint endp:_endPoints.keySet())
                            {
                                endp.checkIdleTimestamp(idle_now);
                            }
                        }
                    });
                }
            }
            catch (CancelledKeyException e)
            {
                Log.ignore(e);
            }
            finally
            {
                _selecting=null;
            }
        }

处理连接过程(公有内部类SelectSet的doSelect方法,针对于SelectChannelConnector连接)时序图如下:

Created with Raphaël 2.1.0 SelectorManager(doSelect方法) SelectorManager(doSelect方法) SelectChannelConnector SelectChannelConnector SelectChannelEndPoint SelectChannelEndPoint newEndPoint方法,创建一个SelectChannelEndPoint(内部创建一个connection) success 调用schedule方法执行handle方法,相当于将交给selectorManager来处理,其实就是交给线程池里面去运行,其实是执行httpconnection的handle的方法
Created with Raphaël 2.1.0 Server Server QueuedThreadPool QueuedThreadPool SelectChannelEndPoint SelectChannelEndPoint HttpConnection HttpConnection HttpParser HttpParser HandlerWrapper HandlerWrapper HelloWorld HelloWorld start()方法 run()方法 handle()方法 handle()方法 parseAvailable()方法 parseNext()方法 headerComplete()方法 handleRequest()方法 handle(HttpConnection connection)方法 handle方法 handle方法 success success success success success success success

(1)SocketConnector 来创建 ConnectorEndPoint 表示正在请求的连接,同时创建 HttpConnection 用来表示这个连接是一个 HTTP 协议的连接
(2)HttpConnection 会创建 HttpParse 类解析 HTTP 协议,并且会创建符合 HTTP 协议的 Request 和 Response 对象。接下去就是将这个线程交给队列线程池去执行了。

1.QueuedThreadPool对象的start()方法启动线程池,调用dispatch()方法分发任务。

2.Jetty的Handler组件用来处理接收到的请求。实现这个接口的类用来处理请求、过滤请求和生成响应内容。

3.selector.selectNow():Selects a set of keys whose corresponding channels are ready for I/O operations.

4.对于客户端的每次连接,Connector都会创建相应的EndPoint来表示该连接,一般在创建EndPoint的同时会同时创建Connection,这里EndPoint用于和Socket打交道,而Connection用于在从Socket中读取到数据后的处理逻辑以及生成响应数据的处理逻辑。

总结

​ Acceptor 线程有很多个 ( 全部来自于线程池,并且固定分配出来,基于 jetty.xml 配置中的Acceptors 配置数量 ) ,每个线程都维护了一个 SelectSet, 每个 SelectSet 又对应了一个 Selector, 这些线程会检测当前是否有任务来,例如检测 changes 队列中是否有任务,有并且是新连接,那么就迅速建立一个endpoint 点负责管理这个 socket ,并注册 read 事件,后续该 selector 就会负责该连接的 read 事件监听。 对于连接很多的情况,这里分很多个 Selector 来分别监听,提高了效率。

​ 当数据发送过来时, Selector 检测到 read 事件,会立马调用 endpoint 的 schedule() 方法,该方法目的就是从线程池分配一个 worker 线程专门来处理这个 read 事件,而自己却立马返回继续监听,可见,这里也是一个高效的处理方式。

​ 业务线程分配成功后,负责请求的读取以及解析,如果请求是完整的,那么就开始调用 Server 的 handle方法 (server 本身就是一个 handler) ,开始 handler 的处理,最后调用到 SerlvetHandler ,最终交给Servlet 、 Filter ,开始了我们的自己应用。
这里写图片描述

相关文章
相关标签/搜索