码农翻身

Tomcat源码篇之HTTP请求处理流程

- by MRyan, 2023-01-15



本系列针对于 Tomcat 版本为 8.5X

文章已收录至精进Tomcat系列 系列其它文章 https://www.wormholestack.com/tag/Tomcat/

源码阅读环境:https://gitee.com/M-Analysis/source_tomcat8 已填充关键注释


当请求从客户端发起之后,Tomcat 的处理流程是怎么样的呢?

1. Tomcat 线程模型结构

不知道你有没有注意到,上文《Tomcat源码篇之启动流程》中,Tomcat 成功启动了之后,会开启 6 个线程核心线程。

uTools_1673270800355

其中 1 个用户线程,5 个守护线程。

守护线程

所谓守护线程,是指在程序运行的时候在后台提供一种通用服务的线程,比如垃圾回收线程。这种线程并不属于程序中不可或缺的部分,用户线程和守护线程两者几乎没有区别,唯一的不同之处就在于虚拟机的离开:如果用户线程已经全部退出运行了,只剩下守护线程存在了,虚拟机也就退出了。因为没有了被守护者,守护线程也就没有工作可做了,也就没有继续运行程序的必要了。反过来说,只要任何非守护线程还在运行,程序就不会终止。

将线程转换为守护线程可以通过调用 Thread 对象的 setDaemon(true) 方法来实现。


在《Tomcat源码篇之启动流程》中我们简要介绍了以上线程的创建过程

Engine 启动时会开启 ContainerBackgroundProcessor 守护线程作用于调用子容器的backgroundProcess 方法。

Connector 启动时会开启 AsyncTimeout 守护线程作用于检测超时的请求,并将该请求再转发到工作线程池处理。

Endpoint 启动时会创建 Work-exec 工作者线程池, 默认 10 个等待处理,作用于真正的任务处理工作。

同时也会创建 Poller 守护线程,用于对接受者线程生产的消息(或事件)进行处理。

还会创建 Acceptor 守护线程,端口 8080,用于监听套接字,将已连接套接字转给Poller线程。


Connector 组件是 Tomcat 最核心的两个组件之一,主要的职责就是负责接收客户端连接和客户端请求的处理加工,上述核心线程大部分都作用于 Connector 组件上。

《Tomcat架构的秘密》中我们简单对 Connector 进行了架构解析,了解了 Connector 组件中的几个部件,如下图所示:

image-20230109220401350


下面我们首先通过 Connector 源码的详细分析,来真正的了解上述核心线程的用途。

1.1 Acceptor线程

Acceptor 作为监听接受请求的线程,也是 Tomcat 处理一次请求的入口点。

// 已删去非关键代码
public class Acceptor<U> implements Runnable {

    private static final Log log = LogFactory.getLog(Acceptor.class);
    private static final StringManager sm = StringManager.getManager(Acceptor.class);

    private static final int INITIAL_ERROR_DELAY = 50;
    private static final int MAX_ERROR_DELAY = 1600;

    private final AbstractEndpoint<?, U> endpoint;

    public Acceptor(AbstractEndpoint<?, U> endpoint) {
        this.endpoint = endpoint;
    }


    /**
     * 接受端口来的数据
     */
    @SuppressWarnings("deprecation")
    @Override
    public void run() {

        int errorDelay = 0;
        long pauseStart = 0;

        try {
            // Loop until we receive a shutdown command
            while (!stopCalled) {

                 // 运行过程中,如果`Endpoint`暂停了,则`Acceptor`进行自旋`
                while (endpoint.isPaused() && !stopCalled) {
                    if (state != AcceptorState.PAUSED) {
                        pauseStart = System.nanoTime();
                        // Entered pause state
                        state = AcceptorState.PAUSED;
                    }
                    if ((System.nanoTime() - pauseStart) > 1_000_000) {
                        // Paused for more than 1ms
                        try {
                            if ((System.nanoTime() - pauseStart) > 10_000_000) {
                                Thread.sleep(10);
                            } else {
                                Thread.sleep(1);
                            }
                        } catch (InterruptedException e) {
                            // Ignore
                        }
                    }
                }

                // 如果`Endpoint`终止运行了,则`Acceptor`也会终止
                if (stopCalled) {
                    break;
                }
                state = AcceptorState.RUNNING;

                try {
                    //if we have reached max connections, wait
                    // 如果请求达到了最大连接数,则wait直到连接数降下来
                    endpoint.countUpOrAwaitConnection();

                    // Endpoint might have been paused while waiting for latch
                    // If that is the case, don't accept new connections
                    if (endpoint.isPaused()) {
                        continue;
                    }

                    U socket = null;
                    try {
                        // Accept the next incoming connection from the server
                        // socket
                        // 接受下一次连接的socket 接受8080端口数据
                        socket = endpoint.serverSocketAccept();
                    } catch (Exception ioe) {
                        // We didn't get a socket
                        endpoint.countDownConnection();
                        if (endpoint.isRunning()) {
                            // Introduce delay if necessary
                            errorDelay = handleExceptionWithDelay(errorDelay);
                            // re-throw
                            throw ioe;
                        } else {
                            break;
                        }
                    }
                    // Successful accept, reset the error delay
                    errorDelay = 0;

                    // Configure the socket
                    if (!stopCalled && !endpoint.isPaused()) {
 
                        // 会将socket以事件的方式传递给poller队列
                        if (!endpoint.setSocketOptions(socket)) {
                            // 设置socket失败 关闭socket
                            endpoint.closeSocket(socket);
                        }
                    } else {
                        endpoint.destroySocket(socket);
                    }
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    String msg = sm.getString("endpoint.accept.fail");
                    // APR specific.
                    // Could push this down but not sure it is worth the trouble.
                    if (t instanceof org.apache.tomcat.jni.Error) {
                        org.apache.tomcat.jni.Error e = (org.apache.tomcat.jni.Error) t;
                        if (e.getError() == 233) {
                            // Not an error on HP-UX so log as a warning
                            // so it can be filtered out on that platform
                            // See bug 50273
                            log.warn(msg, t);
                        } else {
                            log.error(msg, t);
                        }
                    } else {
                        log.error(msg, t);
                    }
                }
            }
        } finally {
            stopLatch.countDown();
        }
        state = AcceptorState.ENDED;
    }


}

当 Acceptor 监听到请求时,会触发以下代码

其关键代码

 endpoint.countUpOrAwaitConnection();

这一行代码的作用是检查当前最大连接数,若未达到 maxConnections 则 +1,如果请求达到了最大连接数,则等待直到连接数降下来。


socket = endpoint.serverSocketAccept();

这一行中的 serverSocket 正是 NioEndpoint 的 bind 函数中打开的 ServerSocketChannel,用于接受下一次连接的 socket 接受 8080 端口数据。


endpoint.setSocketOptions(socket)

进入 Endpoint 的 setSocketOptions 方法

@Override
    protected boolean setSocketOptions(SocketChannel socket) {
        NioSocketWrapper socketWrapper = null;
        try {
            // Allocate channel and wrapper
            NioChannel channel = null;
            if (nioChannels != null) {
                channel = nioChannels.pop();
            }
            if (channel == null) {
                SocketBufferHandler bufhandler = new SocketBufferHandler(
                    socketProperties.getAppReadBufSize(),
                    socketProperties.getAppWriteBufSize(),
                    socketProperties.getDirectBuffer());
                if (isSSLEnabled()) {
                    channel = new SecureNioChannel(bufhandler, this);
                } else {
                    channel = new NioChannel(bufhandler);
                }
            }
            NioSocketWrapper newWrapper = new NioSocketWrapper(channel, this);
            channel.reset(socket, newWrapper);
            // 缓存
            connections.put(socket, newWrapper);
            socketWrapper = newWrapper;

            // Set socket properties
            // Disable blocking, polling will be used
            socket.configureBlocking(false);
            socketProperties.setProperties(socket.socket());

            socketWrapper.setReadTimeout(getConnectionTimeout());
            socketWrapper.setWriteTimeout(getConnectionTimeout());
            socketWrapper.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
            // 将Socket注册到poller队列
            poller.register(socketWrapper);
            return true;
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            try {
                log.error(sm.getString("endpoint.socketOptionsError"), t);
            } catch (Throwable tt) {
                ExceptionUtils.handleThrowable(tt);
            }
            if (socketWrapper == null) {
                destroySocket(socket);
            }
        }
        // Tell to close the socket if needed
        return false;
    }

可以看到 poller.register(socketWrapper); 将 Socket 包装成了 socketWrapper ,并注册到了 poller 队列中。

注册的方式如下:


public void register(final NioSocketWrapper socketWrapper) {  
            socketWrapper.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
            PollerEvent pollerEvent = createPollerEvent(socketWrapper, OP_REGISTER);
                      // 向events对了中间添加事件
            addEvent(pollerEvent);
        }
    // poller事件队列
private final SynchronizedQueue<PollerEvent> events = new SynchronizedQueue<>();   

private void addEvent(PollerEvent event) {
            events.offer(event);
            if (wakeupCounter.incrementAndGet() == 0) {
                selector.wakeup();
            }
        }

将 Socket 的包装类 socketWrapper 再次封装成 poller 事件,向 events 事件队列中添加了该事件(此时并未读取数据)。

总结一下其实 Acceptor 线程主要用于监听套接字,将已连接套接字转给 Poller 线程。

1.2 Poller线程

Poller 线程在 NioEndpoint 中被定义

  public class Poller implements Runnable {
     // 省略部分代码

       private Selector selector;
  
        // poller事件队列
        private final SynchronizedQueue<PollerEvent> events =
            new SynchronizedQueue<>();

        public Poller() throws IOException {
            // 为每个Poller打开了一个新的Selector
            this.selector = Selector.open();
        }      
  }    

Poller 线程构造函数为每个 Poller 打开了一个新的 Selector

public class Poller implements Runnable {
/**
         * The background thread that adds sockets to the Poller, checks the
         * poller for triggered events and hands the associated socket off to an
         * appropriate processor as events occur.
         * <p>
         * Poller线程执行方法 事件处理
         */
        @Override
        public void run() {
            // Loop until destroy() is called
            // 循环
            while (true) {

                boolean hasEvents = false;

                try {
                    if (!close) {
                        hasEvents = events();
                       // 将通道注册到Poller的Selector上
                        if (wakeupCounter.getAndSet(-1) > 0) {
                            // If we are here, means we have other stuff to do
                            // Do a non blocking select
                            keyCount = selector.selectNow();
                        } else {
                            keyCount = selector.select(selectorTimeout);
                        }
                        wakeupCounter.set(0);
                    }
                    if (close) {
                        events();
                        timeout(0, false);
                        try {
                            selector.close();
                        } catch (IOException ioe) {
                            log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
                        }
                        break;
                    }
                    // Either we timed out or we woke up, process events first
                    if (keyCount == 0) {
                        hasEvents = (hasEvents | events());
                    }
                } catch (Throwable x) {
                    ExceptionUtils.handleThrowable(x);
                    log.error(sm.getString("endpoint.nio.selectorLoopError"), x);
                    continue;
                }

                // 获取当前选择器中所有注册的“选择键(已就绪的监听事件)”
                Iterator<SelectionKey> iterator =
                    keyCount > 0 ? selector.selectedKeys().iterator() : null;
                // Walk through the collection of ready keys and dispatch
                // any active event.
                while (iterator != null && iterator.hasNext()) {
                    SelectionKey sk = iterator.next();
                    iterator.remove();
                    NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
                    // Attachment may be null if another thread has called
                    // cancelledKey()
                    if (socketWrapper != null) {
                        // 真正处理事件的地方
                        processKey(sk, socketWrapper);
                    }
                }

                // Process timeouts
                timeout(keyCount, hasEvents);
            }

            getStopLatch().countDown();
        }
}

上文我们分析了 Poller#register() 方法。Poller 维持了一个 events同步队列,所以 Acceptor 接收到 Poller事件(Socket 监听事件)会放在这个队列里面,通过 events() 方法获取队列中的 Poller事件(Socket 监听事件)

hasEvents = events();
public boolean events() {
            boolean result = false;

            PollerEvent pe = null;
            // 从队列中获取数据
            for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++) {
                result = true;
                NioSocketWrapper socketWrapper = pe.getSocketWrapper();
                SocketChannel sc = socketWrapper.getSocket().getIOChannel();
                int interestOps = pe.getInterestOps();
                if (sc == null) {
                    log.warn(sm.getString("endpoint.nio.nullSocketChannel"));
                    socketWrapper.close();
                } else if (interestOps == OP_REGISTER) {
                    try {
                        sc.register(getSelector(), SelectionKey.OP_READ, socketWrapper);
                    } catch (Exception x) {
                        log.error(sm.getString("endpoint.nio.registerFail"), x);
                    }
                } else {
                    final SelectionKey key = sc.keyFor(getSelector());
                    if (key == null) {
                         socketWrapper.close();
                    } else {
                        final NioSocketWrapper attachment = (NioSocketWrapper) key.attachment();
                        if (attachment != null) {
                            try {
                                int ops = key.interestOps() | interestOps;
                                attachment.interestOps(ops);
                                key.interestOps(ops);
                            } catch (CancelledKeyException ckx) {
                                cancelledKey(key, socketWrapper);
                            }
                        } else {
                            cancelledKey(key, socketWrapper);
                        }
                    }
                }
                if (running && eventCache != null) {
                    pe.reset();
                    eventCache.push(pe);
                }
            }

            return result;
        }

线程获取到了 event 队列的就绪事件,来继续看下 Poller 线程处理

 // 获取当前选择器中所有注册的(已就绪的监听事件)
                Iterator<SelectionKey> iterator =
                    keyCount > 0 ? selector.selectedKeys().iterator() : null;
                  while (iterator != null && iterator.hasNext()) {
                    SelectionKey sk = iterator.next();
                    iterator.remove();
                    NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
                    if (socketWrapper != null) {
                        // 真正处理事件的地方
                        processKey(sk, socketWrapper);
                    }
                }

获取已连接套接字通道注册到 Poller 线程的 Selector 上的 SelectionKey,进行事件处理


接着分析下 processKey(),该方法会根据 key 的类型,来分别处理读操作和写操作

  protected void processKey(SelectionKey sk, NioSocketWrapper socketWrapper) {
            try {
                if (close) {
                    cancelledKey(sk, socketWrapper);
                } else if (sk.isValid()) {
                    if (sk.isReadable() || sk.isWritable()) {
                        if (socketWrapper.getSendfileData() != null) {
                            processSendfile(sk, socketWrapper, false);
                        } else {
                            unreg(sk, socketWrapper, sk.readyOps());
                            boolean closeSocket = false;
                            // Read goes before write
                            // 处理读事件,生成Request对象
                            if (sk.isReadable()) {
                                if (socketWrapper.readOperation != null) {
                                    if (!socketWrapper.readOperation.process()) {
                                        closeSocket = true;
                                    }
                                } else if (socketWrapper.readBlocking) {
                                    synchronized (socketWrapper.readLock) {
                                        socketWrapper.readBlocking = false;
                                        socketWrapper.readLock.notify();
                                    }
                                } else if (!processSocket(socketWrapper, SocketEvent.OPEN_READ, true)) {
                                    closeSocket = true;
                                }
                            }
                            // 处理写事件,将生成的Response对象通过socket写回客户端
                            if (!closeSocket && sk.isWritable()) {
                                if (socketWrapper.writeOperation != null) {
                                    if (!socketWrapper.writeOperation.process()) {
                                        closeSocket = true;
                                    }
                                } else if (socketWrapper.writeBlocking) {
                                    synchronized (socketWrapper.writeLock) {
                                        socketWrapper.writeBlocking = false;
                                        socketWrapper.writeLock.notify();
                                    }
                                } else if (!processSocket(socketWrapper, SocketEvent.OPEN_WRITE, true)) {
                                    closeSocket = true;
                                }
                            }
                            if (closeSocket) {
                                cancelledKey(sk, socketWrapper);
                            }
                        }
                    }
                } else {
                    // Invalid key
                    cancelledKey(sk, socketWrapper);
                }
            } catch (CancelledKeyException ckx) {
                cancelledKey(sk, socketWrapper);
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                log.error(sm.getString("endpoint.nio.keyProcessingError"), t);
            }
        }

来看下 processSocket() 方法

public boolean processSocket(SocketWrapperBase<S> socketWrapper, SocketEvent event, boolean dispatch) {
        try {
            if (socketWrapper == null) {
                return false;
            }
            SocketProcessorBase<S> sc = null;
            // 为提升性能,针对每个有效的链接都会缓存其Processor对象,不仅如此,当前链接关闭时,其Processor对象还会被释放到一个回收队列(升级协议不会回收)这样后续链接可以重置冰重复利用,以减少对象构造
            // 缓存中有则从缓存中获取一个Processor
            if (processorCache != null) {
                // 从processorCache里面拿一个Processor来处理socket,Processor的实现为SocketProcessor
                sc = processorCache.pop();
            }
            // 缓存中没有则根据协商协议创建一个SocketProcessor
            if (sc == null) {
                sc = createSocketProcessor(socketWrapper, event);
            } else {
                sc.reset(socketWrapper, event);
            }
            //  将Processor放到Work工作线程池中执行
            Executor executor = getExecutor();
            if (dispatch && executor != null) {
                executor.execute(sc);
            } else {
                sc.run();
            }
        } catch (RejectedExecutionException ree) {
            getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper), ree);
            return false;
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
             getLog().error(sm.getString("endpoint.process.fail"), t);
            return false;
        }
        return true;
    }

实际上它只做了两件事

  1. processorCache 里面拿一个 Processor 来处理socket,缓存中没有则根据协商协议创建爱一个Processor,Processor 的实现为 SocketProcessor
  2. Processor 放到 Work 工作线程池中执行

dispatch 参数表示是否要在工作线程中处理,上文 processKey 各处传递的参数默认都是true。

  • dispatch 为 true 且工作线程池存在时会执行 executor.execute(sc),之后是由工作线程池处理已连接套接字;
  • 否则继续由 Poller 线程自己处理已连接套接字。

AbstractEndPoint 的 createSocketProcessor 是抽象方法,NioEndPoint 实现了它:

  @Override
    protected SocketProcessorBase<NioChannel> createSocketProcessor(
        SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) {
        return new SocketProcessor(socketWrapper, event);
    }

SocketProcessor 继承了 SocketProcessorBase,而 SocketProcessorBase 实现了 Runnable。


public abstract class SocketProcessorBase<S> implements Runnable {

    protected SocketWrapperBase<S> socketWrapper;
    protected SocketEvent event;

    @Override
    public final void run() {
        synchronized (socketWrapper) {
             if (socketWrapper.isClosed()) {
                return;
            }
            doRun();
        }
    }


    protected abstract void doRun();
}

SocketProcessorBase 线程执行中会调用 抽象方法 doRun,由子类实现,也就是 SocketProcessor 的 doRun 方法,也就是说 Work 工作线程最终会执行 SocketProcessor 的 doRun方法。

   protected class SocketProcessor extends SocketProcessorBase<NioChannel> {

        public SocketProcessor(SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) {
            super(socketWrapper, event);
        }

        @Override
        protected void doRun() {
          
            Poller poller = NioEndpoint.this.poller;
            if (poller == null) {
                socketWrapper.close();
                return;
            }

            try {
                // https 握手环节
                int handshake = -1;
                try {
                    if (socketWrapper.getSocket().isHandshakeComplete()) {
                        handshake = 0;
                    } else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||
                        event == SocketEvent.ERROR) {    
                        handshake = -1;
                    } else {
                        handshake = socketWrapper.getSocket().handshake(event == SocketEvent.OPEN_READ, event == SocketEvent.OPEN_WRITE);
                        event = SocketEvent.OPEN_READ;
                    }
                } catch (IOException x) {
                    handshake = -1;
                    if (log.isDebugEnabled()) {
                        log.debug(sm.getString("endpoint.err.handshake"), x);
                    }
                } catch (CancelledKeyException ckx) {
                    handshake = -1;
                }
                if (handshake == 0) {
                    SocketState state = SocketState.OPEN;        
                    // 将处理逻辑交给`Handler`处理,当event为null时,则表明是一个`OPEN_READ`事件
                    if (event == null) {
                        state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
                    } else {
                        state = getHandler().process(socketWrapper, event);
                    }
                    if (state == SocketState.CLOSED) {
                        poller.cancelledKey(getSelectionKey(), socketWrapper);
                    }
                } else if (handshake == -1) {
                    getHandler().process(socketWrapper, SocketEvent.CONNECT_FAIL);
                    poller.cancelledKey(getSelectionKey(), socketWrapper);
                } else if (handshake == SelectionKey.OP_READ) {
                    socketWrapper.registerReadInterest();
                } else if (handshake == SelectionKey.OP_WRITE) {
                    socketWrapper.registerWriteInterest();
                }
            } catch (CancelledKeyException cx) {
                poller.cancelledKey(getSelectionKey(), socketWrapper);
            } catch (VirtualMachineError vme) {
                ExceptionUtils.handleThrowable(vme);
            } catch (Throwable t) {
                log.error(sm.getString("endpoint.processing.fail"), t);
                poller.cancelledKey(getSelectionKey(), socketWrapper);
            } finally {
                socketWrapper = null;
                event = null;
                //return to cache
                if (running && processorCache != null) {
                    processorCache.push(this);
                }
            }
        }
    }

Handler 的关键方法是 process()

state = getHandler().process(socketWrapper, event);

主要是调用 Processor.process()方法。

 protected static class ConnectionHandler<S> implements AbstractEndpoint.Handler<S> {

        @Override
        public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
          // 省略部分代码
          
             // 拿到Socket nio包装wrapper 监听8080端口的socket
            S socket = wrapper.getSocket();

            Processor processor = (Processor) wrapper.takeCurrentProcessor();

            if (processor == null) {
                // 当前协议创建一个Processor
                processor = getProtocol().createProcessor();
                register(processor);
            }

            SocketState state = SocketState.CLOSED
            // 关键代码 处理器处理socket
            state = processor.process(wrapper, status);

            return state;
        }
 }

public abstract class AbstractHttp11Protocol<S> extends AbstractProtocol<S> {
    @Override
    protected Processor createProcessor() {
        Http11Processor processor = new Http11Processor(this, getEndpoint());
        processor.setAdapter(getAdapter());
        // 默认的 KeepAlive 情况下, 每个 Socket 处理的最多的 请求次数
        processor.setMaxKeepAliveRequests(getMaxKeepAliveRequests());
        // http 当遇到文件上传时的 默认超时时间 (300 * 1000)
        processor.setConnectionUploadTimeout(getConnectionUploadTimeout());
        processor.setDisableUploadTimeout(getDisableUploadTimeout());
        processor.setRestrictedUserAgents(getRestrictedUserAgents());
        // 最大的 Post 处理尺寸的大小 4 * 1000
        processor.setMaxSavePostSize(getMaxSavePostSize());
        return processor;
    }
}

代码比较长,我们直接看关键代码

  state = processor.process(wrapper, status);
public abstract class AbstractProcessorLight implements Processor {

    private Set<DispatchType> dispatches = new CopyOnWriteArraySet<>();


    @Override
    public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status)
        throws IOException {

        // 处理中 设置关闭socket状态
        SocketState state = SocketState.CLOSED;
        Iterator<DispatchType> dispatches = null;
        do {
            if (dispatches != null) {
                DispatchType nextDispatch = dispatches.next();
                if (getLog().isDebugEnabled()) {
                    getLog().debug("Processing dispatch type: [" + nextDispatch + "]");
                }
                state = dispatch(nextDispatch.getSocketStatus());
                if (!dispatches.hasNext()) {
                    state = checkForPipelinedData(state, socketWrapper);
                }
            } else if (status == SocketEvent.DISCONNECT) {
                // Do nothing here, just wait for it to get recycled
            } else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) {
                state = dispatch(status);
                state = checkForPipelinedData(state, socketWrapper);
            } else if (status == SocketEvent.OPEN_WRITE) {
                // Extra write event likely after async, ignore
                state = SocketState.LONG;
            } else if (status == SocketEvent.OPEN_READ) {
                // 调用`service()`方法
                state = service(socketWrapper);
            } else if (status == SocketEvent.CONNECT_FAIL) {
                logAccess(socketWrapper);
            } else {
                // Default to closing the socket if the SocketEvent passed in
                // is not consistent with the current state of the Processor
                state = SocketState.CLOSED;
            }
            
        } while (state == SocketState.ASYNC_END ||
            dispatches != null && state != SocketState.CLOSED);

        return state;
    }

对于读操作,会调用service()方法

 public class Http11Processor extends AbstractProcessor {
  
        @Override
    public SocketState service(SocketWrapperBase<?> socketWrapper)
        throws IOException {
       // 省略部分代码,感兴趣可自行翻看源码
      
       // 关键代码,CoyoteAdapter请求映射
        getAdapter().service(request, response);
    }
 }
     

上述代码实际只完成了 2 件事

  1. 读取消息报文,解析请求行、请求体、请求头,封装成 Request 对象
  2. 生成 Response 对象
  3. 调用 Adapter.service() 方法,将生成的 Request 和 Response 对象传进去,由连接器分发请求各组件执行处理流程

1.3 小结

回顾下我们刚刚分析的 Tomcat 接受请求的流程

image-20230115001028034


分析到目前为止 Tomcat 线程模型结构应该是非常清晰了,简单总结下:

Acceptor守护线程,Poller守护线程,Worker批量工作线程

  1. Tomcat启动结束后,Accepor线程 一直接受 8080 的请求,会尝试 endpoint.setSocketOptions(socket),socket 封装为 socketWrapper,创建一个 PollerEvent 然后把请求注册到 poller 事件队列 SynchronizedQueue 上。
  2. Poller线程 一直判断是否有事件 events.poll(),事件就拿到读取 socket 的内容并处理 processSocket (socketWrapper),processSocket 的时候 poller 会进行select(),接收到 key 后提交给 worker 进行processKey,也就是把线程形成一个 processor 进行后提交给 Worker 线程池,
  3. Worker 线程池控制执行工作事件,在里面执行 processor.process(),在 process 里调用service(),交由连接器组件分发处理请求,调用连接器适配器 CoyoteAdapter.service(),最终由连接器分发到各个组件处理。

image-20230109220435952

2. 处理一次 Http 请求的流程

2.1 连接器处理请求的流程

CoyoteAdaptor 组件负责将 Connector 组件和 Engine 容器关联起来,把生成的 Request对象和响应对象 Response 传递到 Engine 容器中,调用 Pipeline。

上文我们分析过 Processor 会调用 Adapter.service() 方法,请求会经过 CoyoteAdaptor 组件,如下代码所示。

   @Override
    public void service(org.apache.coyote.Request req, org.apache.coyote.Response res)
        throws Exception {
        // 根据coyote框架的request和response对象,生成connector的request和response对象(是HttpServletRequest和HttpServletResponse的封装)
        Request request = (Request) req.getNote(ADAPTER_NOTES);
        Response response = (Response) res.getNote(ADAPTER_NOTES);

        if (request == null) {
            // Create objects
            request = connector.createRequest();
            request.setCoyoteRequest(req);
            response = connector.createResponse();
            response.setCoyoteResponse(res);

            // Link objects
            request.setResponse(response);
            response.setRequest(request);

            // Set as notes
            req.setNote(ADAPTER_NOTES, request);
            res.setNote(ADAPTER_NOTES, response);

            // Set query string encoding
            req.getParameters().setQueryStringCharset(connector.getURICharset());
        }

        // 补充header
        if (connector.getXpoweredBy()) {
            response.addHeader("X-Powered-By", POWERED_BY);
        }

        boolean async = false;
        boolean postParseSuccess = false;

        req.getRequestProcessor().setWorkerThreadName(THREAD_NAME.get());
        req.setRequestThread();

        try {
            // Parse and set Catalina and configuration specific
            // request parameters
            // 用来处理请求映射 (获取 host, context, wrapper, URI 后面的参数的解析, sessionId )
            postParseSuccess = postParseRequest(req, request, res, response);
            if (postParseSuccess) {
                //check valves if we support async
                // 检查Value是否支持异步
                request.setAsyncSupported(
                    connector.getService().getContainer().getPipeline().isAsyncSupported());
                // Calling the container
                // 得到Engine首个Value()
                // 每一级Container的基础Value在完成自身处理情况下,同时要保证启动下一级Container的Value链的执行,由于请求映射已经将映射结果保存到请求对象中,因此Value直接从请求中获取下一级Container即可
                // 真正进入容器的地方,调用Engine容器下pipeline的阀门
                connector.getService().getContainer().getPipeline().getFirst().invoke(
                    request, response);
            }
            if (request.isAsync()) {
                async = true;
                ReadListener readListener = req.getReadListener();
                if (readListener != null && request.isFinished()) {
                    // Possible the all data may have been read during service()
                    // method so this needs to be checked here
                    ClassLoader oldCL = null;
                    try {
                        oldCL = request.getContext().bind(false, null);
                        if (req.sendAllDataReadEvent()) {
                            req.getReadListener().onAllDataRead();
                        }
                    } finally {
                        request.getContext().unbind(false, oldCL);
                    }
                }

                Throwable throwable =
                    (Throwable) request.getAttribute(RequestDispatcher.ERROR_EXCEPTION);

                if (!request.isAsyncCompleting() && throwable != null) {
                    request.getAsyncContextInternal().setErrorState(throwable, true);
                }
            } else {
                //  // 通过request.finishRequest 与 response.finishResponse(将OutputBuffer中的数据写到浏览器) 来完成整个请求
                request.finishRequest();
                response.finishResponse();
            }

        } catch (IOException e) {
            // Ignore
        } finally {
            AtomicBoolean error = new AtomicBoolean(false);
            res.action(ActionCode.IS_ERROR, error);

            if (request.isAsyncCompleting() && error.get()) {
                // Connection will be forcibly closed which will prevent
                // completion happening at the usual point. Need to trigger
                // call to onComplete() here.
                res.action(ActionCode.ASYNC_POST_PROCESS, null);
                async = false;
            }

            req.getRequestProcessor().setWorkerThreadName(null);
            req.clearRequestThread();

            // Recycle the wrapper request and response
            if (!async) {
                updateWrapperErrorCount(request, response);
                request.recycle();
                response.recycle();
            }
        }
    

上述代码只要做了这么几件事:

  1. 处理请求映射 (获取 host, context, wrapper, URI 后面的参数的解析, sessionId )
  2. 解析请求,该方法会出现代理服务器、设置必要的 header 等操作
  3. 调用 Engine 容器下 pipeline 的阀门,请求处理交由容器
  4. 通过request.finishRequest 与 response.finishResponse(将OutputBuffer中的数据写到浏览器) 来完成整个请求

2.2 各容器处理请求的流程

在《Tomcat源码篇之启动流程中》我们说过 StandardEngine 的构造函数为自己的 Pipeline 添加了基本阀 StandardEngineValve

Engine请求处理

所以我们直接来看 StandardEngineValve 的 invoke 方法

inal class StandardEngineValve extends ValveBase {

    //------------------------------------------------------ Constructor
    public StandardEngineValve() {
        super(true);
    }


    // --------------------------------------------------------- Public Methods
    @Override
    public final void invoke(Request request, Response response)
        throws IOException, ServletException {

        // 从请求中获取Host
        Host host = request.getHost();
        // 如果不存在,则返回404
        if (host == null) {
            if (!response.isError()) {
                response.sendError(404);
            }
            return;
        }
        if (request.isAsyncSupported()) {
            request.setAsyncSupported(host.getPipeline().isAsyncSupported());
        }

        // 调用Host容器下pipeline的阀门
        host.getPipeline().getFirst().invoke(request, response);
    }
}

该方法主要是选择合适的 Host,然后调用 Host 中 pipeline 的第一个 Valve 的 invoke() 方法。


Host请求处理

host.getPipeline().getFirst().invoke(request, response),可以看到 Host 容器先获取自己的管道,再获取第一个阀门。

StandardHost 的构造函数为自己的 Pipeline 添加了基本阀 StandardHostValve,同时 Host 管道中还存在 ErrorReportValve 与 StandardHostValve 两个 Valve 处于管道前面,基本阀门在管道最后。

ErrorReportValve 主要是检测 Http 请求过程中是否出现过什么异常, 有异常的话, 直接拼装 html 页面, 输出到客户端。

public class ErrorReportValve extends ValveBase {
    public ErrorReportValve() {
        super(true);
    }


    // --------------------------------------------------------- Public Methods

      @Override
    public void invoke(Request request, Response response) throws IOException, ServletException {

        // 将请求转发给下一个 Valve
        getNext().invoke(request, response);
    
             // 正常处理结束   
        if (response.isCommitted()) {
            if (response.setErrorReported()) {
                // Error wasn't previously reported but we can't write an error
                // page because the response has already been committed.

                // See if IO is allowed
                AtomicBoolean ioAllowed = new AtomicBoolean(true);
                response.getCoyoteResponse().action(ActionCode.IS_IO_ALLOWED, ioAllowed);

                if (ioAllowed.get()) {
                    // I/O is currently still allowed. Flush any data that is
                    // still to be written to the client.
                    try {
                        response.flushBuffer();
                    } catch (Throwable t) {
                        ExceptionUtils.handleThrowable(t);
                    }
                    // Now close immediately to signal to the client that
                    // something went wrong
                    response.getCoyoteResponse().action(ActionCode.CLOSE_NOW,
                            request.getAttribute(RequestDispatcher.ERROR_EXCEPTION));
                }
            }
            return;
        }

        // 判断请求过程中是否有异常发生
        Throwable throwable = (Throwable) request.getAttribute(RequestDispatcher.ERROR_EXCEPTION);

         if (request.isAsync() && !request.isAsyncCompleting()) {
            return;
        }

        if (throwable != null && !response.isError()) {
           // 重置 response 里面的数据
            response.reset();
            // 500 错误码
            response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
        }

         response.setSuspended(false);

        try {
            // 这里就是将 异常的堆栈信息组合成 html 页面, 输出到前台 
            report(request, response, throwable);
        } catch (Throwable tt) {
            ExceptionUtils.handleThrowable(tt);
        }
    }

}

来看下 StandardHostValve 的 invoke 方法的实现。

final class StandardHostValve extends ValveBase {
  
    public StandardHostValve() {
        super(true);
    }

    @Override
    public final void invoke(Request request, Response response)
        throws IOException, ServletException {

        // 从请求中获取Context
        Context context = request.getContext();
        if (context == null) {
            // Don't overwrite an existing error
            if (!response.isError()) {
                response.sendError(404);
            }
            return;
        }

        if (request.isAsyncSupported()) {
            request.setAsyncSupported(context.getPipeline().isAsyncSupported());
        }

        boolean asyncAtStart = request.isAsync();

        try {
            context.bind(Globals.IS_SECURITY_ENABLED, MY_CLASSLOADER);

            if (!asyncAtStart && !context.fireRequestInitEvent(request.getRequest())) {
                return;
            }

            try {
                if (!response.isErrorReportRequired()) {
                    // 调用Context容器下pipeline的阀门
                    context.getPipeline().getFirst().invoke(request, response);
                }
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                container.getLogger().error("Exception Processing " + request.getRequestURI(), t);
                if (!response.isErrorReportRequired()) {
                    request.setAttribute(RequestDispatcher.ERROR_EXCEPTION, t);
                    throwable(request, response, t);
                }
            }

             response.setSuspended(false);

            Throwable t = (Throwable) request.getAttribute(RequestDispatcher.ERROR_EXCEPTION);

            if (!context.getState().isAvailable()) {
                return;
            }

            // Look for (and render if found) an application level error page
            if (response.isErrorReportRequired()) {
                // If an error has occurred that prevents further I/O, don't waste time
                // producing an error report that will never be read
                AtomicBoolean result = new AtomicBoolean(false);
                response.getCoyoteResponse().action(ActionCode.IS_IO_ALLOWED, result);
                if (result.get()) {
                    if (t != null) {
                        throwable(request, response, t);
                    } else {
                        status(request, response);
                    }
                }
            }

            if (!request.isAsync() && !asyncAtStart) {
                context.fireRequestDestroyEvent(request.getRequest());
            }
        } finally {
            // Access a session (if present) to update last accessed time, based
            // on a strict interpretation of the specification
            if (ACCESS_SESSION) {
                request.getSession(false);
            }

            context.unbind(Globals.IS_SECURITY_ENABLED, MY_CLASSLOADER);
        }
    
  
}

StandardHostValve 首先校验了 Request 是否存在 Context(在执行 CoyoteAdapter.postParseRequest 方法的时候设置进去),如果 Context 不存在,就返回 500 的错误码,然后调用了 Context 容器下 pipeline 的阀门

context.getPipeline().getFirst().invoke(request, response);,可以看到 Context 容器先获取自己的管道,再获取第一个阀门。


Context请求处理

同理,我们来看 StandardContextValve 的 invoke 实现。

final class StandardContextValve extends ValveBase {

    private static final StringManager sm = StringManager.getManager(StandardContextValve.class);

    public StandardContextValve() {
        super(true);
    }


    @Override
    public final void invoke(Request request, Response response)
        throws IOException, ServletException {

        // Disallow any direct access to resources under WEB-INF or META-INF
        MessageBytes requestPathMB = request.getRequestPathMB();
        if ((requestPathMB.startsWithIgnoreCase("/META-INF/", 0))
            || (requestPathMB.equalsIgnoreCase("/META-INF"))
            || (requestPathMB.startsWithIgnoreCase("/WEB-INF/", 0))
            || (requestPathMB.equalsIgnoreCase("/WEB-INF"))) {
            response.sendError(HttpServletResponse.SC_NOT_FOUND);
            return;
        }

        // Select the Wrapper to be used for this Request
        // 获取请求中的Wrapper
        Wrapper wrapper = request.getWrapper();
        if (wrapper == null || wrapper.isUnavailable()) {
            response.sendError(HttpServletResponse.SC_NOT_FOUND);
            return;
        }

        // Acknowledge the request
        try {
            response.sendAcknowledgement(ContinueResponseTiming.IMMEDIATELY);
        } catch (IOException ioe) {
            container.getLogger().error(sm.getString(
                "standardContextValve.acknowledgeException"), ioe);
            request.setAttribute(RequestDispatcher.ERROR_EXCEPTION, ioe);
            response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
            return;
        }

        if (request.isAsyncSupported()) {
            request.setAsyncSupported(wrapper.getPipeline().isAsyncSupported());
        }
        // 调用Wrapper容器下pipeline的阀门
        wrapper.getPipeline().getFirst().invoke(request, response);
    }
}

同理调用了 Wrapper 容器下的 pipeline 阀门


Wrapper请求处理

Wrapper 基础阀门为 StandardWrapperValve,来看下 StandardWrapperValve 的 invoke 实现

final class StandardWrapperValve extends ValveBase {
   @Override
    public final void invoke(Request request, Response response)
        throws IOException, ServletException {

        // Initialize local variables we may need
        boolean unavailable = false;
        Throwable throwable = null;
        // This should be a Request attribute...
        long t1 = System.currentTimeMillis();
        requestCount.incrementAndGet();
        StandardWrapper wrapper = (StandardWrapper) getContainer();
        Servlet servlet = null;
        Context context = (Context) wrapper.getParent();

        // Check for the application being marked unavailable
        if (!context.getState().isAvailable()) {
            response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE,
                sm.getString("standardContext.isUnavailable"));
            unavailable = true;
        }

        // Check for the servlet being marked unavailable
        if (!unavailable && wrapper.isUnavailable()) {
            container.getLogger().info(sm.getString("standardWrapper.isUnavailable",
                wrapper.getName()));
            long available = wrapper.getAvailable();
            if ((available > 0L) && (available < Long.MAX_VALUE)) {
                response.setDateHeader("Retry-After", available);
                response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE,
                    sm.getString("standardWrapper.isUnavailable",
                        wrapper.getName()));
            } else if (available == Long.MAX_VALUE) {
                response.sendError(HttpServletResponse.SC_NOT_FOUND,
                    sm.getString("standardWrapper.notFound",
                        wrapper.getName()));
            }
            unavailable = true;
        }

        // Allocate a servlet instance to process this request
        try {
            if (!unavailable) {
                // 这儿调用Wrapper的allocate()方法分配一个Servlet实例
                servlet = wrapper.allocate();
            }
        } catch (UnavailableException e) {
            container.getLogger().error(
                sm.getString("standardWrapper.allocateException",
                    wrapper.getName()), e);
            long available = wrapper.getAvailable();
            if ((available > 0L) && (available < Long.MAX_VALUE)) {
                response.setDateHeader("Retry-After", available);
                response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE,
                    sm.getString("standardWrapper.isUnavailable",
                        wrapper.getName()));
            } else if (available == Long.MAX_VALUE) {
                response.sendError(HttpServletResponse.SC_NOT_FOUND,
                    sm.getString("standardWrapper.notFound",
                        wrapper.getName()));
            }
        } catch (ServletException e) {
            container.getLogger().error(sm.getString("standardWrapper.allocateException",
                wrapper.getName()), StandardWrapper.getRootCause(e));
            throwable = e;
            exception(request, response, e);
        } catch (Throwable e) {
            ExceptionUtils.handleThrowable(e);
            container.getLogger().error(sm.getString("standardWrapper.allocateException",
                wrapper.getName()), e);
            throwable = e;
            exception(request, response, e);
            servlet = null;
        }

        MessageBytes requestPathMB = request.getRequestPathMB();
        DispatcherType dispatcherType = DispatcherType.REQUEST;
        if (request.getDispatcherType() == DispatcherType.ASYNC) {
            dispatcherType = DispatcherType.ASYNC;
        }
        request.setAttribute(Globals.DISPATCHER_TYPE_ATTR, dispatcherType);
        request.setAttribute(Globals.DISPATCHER_REQUEST_PATH_ATTR,
            requestPathMB);
        // Create the filter chain for this request
        // 创建过滤器链,类似于Pipeline的功能
        ApplicationFilterChain filterChain =
            ApplicationFilterFactory.createFilterChain(request, wrapper, servlet);

        // Call the filter chain for this request
        // NOTE: This also calls the servlet's service() method
        Container container = this.container;
        try {
            if ((servlet != null) && (filterChain != null)) {
                // Swallow output if needed
                if (context.getSwallowOutput()) {
                    try {
                        SystemLogHandler.startCapture();
                        if (request.isAsyncDispatching()) {
                            request.getAsyncContextInternal().doInternalDispatch();
                        } else {
                            // 调用过滤器链的doFilter,最终会调用到Servlet的service方法
                            filterChain.doFilter(request.getRequest(),
                                response.getResponse());
                        }
                    } finally {
                        String log = SystemLogHandler.stopCapture();
                        if (log != null && log.length() > 0) {
                            context.getLogger().info(log);
                        }
                    }
                } else {
                    if (request.isAsyncDispatching()) {
                        request.getAsyncContextInternal().doInternalDispatch();
                    } else {
                        // 调用过滤器链的doFilter,最终会调用到Servlet的service方法
                        filterChain.doFilter
                            (request.getRequest(), response.getResponse());
                    }
                }

            }
        } catch (ClientAbortException | CloseNowException e) {
            if (container.getLogger().isDebugEnabled()) {
                container.getLogger().debug(sm.getString(
                    "standardWrapper.serviceException", wrapper.getName(),
                    context.getName()), e);
            }
            throwable = e;
            exception(request, response, e);
        } catch (IOException e) {
            container.getLogger().error(sm.getString(
                "standardWrapper.serviceException", wrapper.getName(),
                context.getName()), e);
            throwable = e;
            exception(request, response, e);
        } catch (UnavailableException e) {
            container.getLogger().error(sm.getString(
                "standardWrapper.serviceException", wrapper.getName(),
                context.getName()), e);
            //            throwable = e;
            //            exception(request, response, e);
            wrapper.unavailable(e);
            long available = wrapper.getAvailable();
            if ((available > 0L) && (available < Long.MAX_VALUE)) {
                response.setDateHeader("Retry-After", available);
                response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE,
                    sm.getString("standardWrapper.isUnavailable",
                        wrapper.getName()));
            } else if (available == Long.MAX_VALUE) {
                response.sendError(HttpServletResponse.SC_NOT_FOUND,
                    sm.getString("standardWrapper.notFound",
                        wrapper.getName()));
            }
            // Do not save exception in 'throwable', because we
            // do not want to do exception(request, response, e) processing
        } catch (ServletException e) {
            Throwable rootCause = StandardWrapper.getRootCause(e);
            if (!(rootCause instanceof ClientAbortException)) {
                container.getLogger().error(sm.getString(
                        "standardWrapper.serviceExceptionRoot",
                        wrapper.getName(), context.getName(), e.getMessage()),
                    rootCause);
            }
            throwable = e;
            exception(request, response, e);
        } catch (Throwable e) {
            ExceptionUtils.handleThrowable(e);
            container.getLogger().error(sm.getString(
                "standardWrapper.serviceException", wrapper.getName(),
                context.getName()), e);
            throwable = e;
            exception(request, response, e);
        } finally {
            // Release the filter chain (if any) for this request
            if (filterChain != null) {
                filterChain.release();
            }

            // Deallocate the allocated servlet instance
            try {
                if (servlet != null) {
                    // 释放掉Servlet及相关资源
                    wrapper.deallocate(servlet);
                }
            } catch (Throwable e) {
                ExceptionUtils.handleThrowable(e);
                container.getLogger().error(sm.getString("standardWrapper.deallocateException",
                    wrapper.getName()), e);
                if (throwable == null) {
                    throwable = e;
                    exception(request, response, e);
                }
            }

            // If this servlet has been marked permanently unavailable,
            // unload it and release this instance
            try {
                if ((servlet != null) &&
                    (wrapper.getAvailable() == Long.MAX_VALUE)) {
                    // 如果servlet被标记为永远不可达,则需要卸载掉它,并释放这个servlet实例
                    wrapper.unload();
                }
            } catch (Throwable e) {
                ExceptionUtils.handleThrowable(e);
                container.getLogger().error(sm.getString("standardWrapper.unloadException",
                    wrapper.getName()), e);
                if (throwable == null) {
                    exception(request, response, e);
                }
            }
            long t2 = System.currentTimeMillis();

            long time = t2 - t1;
            processingTime += time;
            if (time > maxTime) {
                maxTime = time;
            }
            if (time < minTime) {
                minTime = time;
            }
        }
    }
 
}

我们都知道 Wrapper 实际上是 servlet 的上层包装类,与 servlet 打交道。

上述代码实现了:

  1. Servlet 实例的分配 servlet = wrapper.allocate();
  2. 创建过滤器链 `ApplicationFilterChain filterChain =

    ApplicationFilterFactory.createFilterChain(request, wrapper, servlet);`
  3. 调用过滤器链的 doFilter,最终会调用到 Servlet 的 service 方法 `filterChain.doFilter(request.getRequest(),

    response.getResponse());`
  4. 释放掉过滤器链及其相关资源,释放掉Servlet及相关资源,整个请求链路结束。


首先我们详细来看下 Servlet 实例是如何分配的

    @Override
    public Servlet allocate() throws ServletException {

        // If we are currently unloading this servlet, throw an exception
        // 卸载过程中,不能分配Servlet
        if (unloading) {
            throw new ServletException(sm.getString("standardWrapper.unloading", getName()));
        }

        boolean newInstance = false;

        // If not SingleThreadedModel, return the same instance every time
        // 如果Wrapper没有实现SingleThreadedModel,则每次都会返回同一个Servlet
        if (!singleThreadModel) {
            // Load and initialize our instance if necessary
            // 实例为null或者实例还未初始化,使用synchronized来保证并发时的原子性
            if (instance == null || !instanceInitialized) {
                synchronized (this) {
                    if (instance == null) {
                        try {
                            if (log.isDebugEnabled()) {
                                log.debug("Allocating non-STM instance");
                            }

                            // Note: We don't know if the Servlet implements
                            // SingleThreadModel until we have loaded it.
                            // 加载Servlet
                            instance = loadServlet();
                            newInstance = true;
                            if (!singleThreadModel) {
                                // For non-STM, increment here to prevent a race
                                // condition with unload. Bug 43683, test case
                                // #3
                                countAllocated.incrementAndGet();
                            }
                        } catch (ServletException e) {
                            throw e;
                        } catch (Throwable e) {
                            ExceptionUtils.handleThrowable(e);
                            throw new ServletException(sm.getString("standardWrapper.allocate"), e);
                        }
                    }
                    // 初始化Servlet
                    if (!instanceInitialized) {
                        initServlet(instance);
                    }
                }
            }
            // 非单线程模型,直接返回已经创建的Servlet,也就是说,这种情况下只会创建一个Servlet
            if (singleThreadModel) {
                if (newInstance) {
                    // Have to do this outside of the sync above to prevent a
                    // possible deadlock
                    synchronized (instancePool) {
                        instancePool.push(instance);
                        nInstances++;
                    }
                }
            } else {
                if (log.isTraceEnabled()) {
                    log.trace("  Returning non-STM instance");
                }
                // For new instances, count will have been incremented at the
                // time of creation
                if (!newInstance) {
                    countAllocated.incrementAndGet();
                }
                return instance;
            }
        }

        // 如果是单线程模式,则使用servlet对象池技术来加载多个Servlet
        synchronized (instancePool) {
            while (countAllocated.get() >= nInstances) {
                // Allocate a new instance if possible, or else wait
                if (nInstances < maxInstances) {
                    try {
                        instancePool.push(loadServlet());
                        nInstances++;
                    } catch (ServletException e) {
                        throw e;
                    } catch (Throwable e) {
                        ExceptionUtils.handleThrowable(e);
                        throw new ServletException(sm.getString("standardWrapper.allocate"), e);
                    }
                } else {
                    try {
                        instancePool.wait();
                    } catch (InterruptedException e) {
                        // Ignore
                    }
                }
            }
            if (log.isTraceEnabled()) {
                log.trace("  Returning allocated STM instance");
            }
            countAllocated.incrementAndGet();
            return instancePool.pop();
        }
    }
   public synchronized Servlet loadServlet() throws ServletException {

        // Nothing to do if we already have an instance or an instance pool
        if (!singleThreadModel && (instance != null)) {
            return instance;
        }

        PrintStream out = System.out;
        if (swallowOutput) {
            SystemLogHandler.startCapture();
        }

        Servlet servlet;
        try {
            long t1 = System.currentTimeMillis();
            // Complain if no servlet class has been specified
            if (servletClass == null) {
                unavailable(null);
                throw new ServletException
                    (sm.getString("standardWrapper.notClass", getName()));
            }

            // 关键的地方,就是通过实例管理器,创建Servlet实例,而实例管理器是通过特殊的类加载器来加载给定的类
            InstanceManager instanceManager = ((StandardContext) getParent()).getInstanceManager();
            try {
                servlet = (Servlet) instanceManager.newInstance(servletClass);
            } catch (ClassCastException e) {
                unavailable(null);
                // Restore the context ClassLoader
                throw new ServletException
                    (sm.getString("standardWrapper.notServlet", servletClass), e);
            } catch (Throwable e) {
                e = ExceptionUtils.unwrapInvocationTargetException(e);
                ExceptionUtils.handleThrowable(e);
                unavailable(null);

                // Added extra log statement for Bugzilla 36630:
                // https://bz.apache.org/bugzilla/show_bug.cgi?id=36630
                if (log.isDebugEnabled()) {
                    log.debug(sm.getString("standardWrapper.instantiate", servletClass), e);
                }

                // Restore the context ClassLoader
                throw new ServletException
                    (sm.getString("standardWrapper.instantiate", servletClass), e);
            }

            if (multipartConfigElement == null) {
                MultipartConfig annotation =
                    servlet.getClass().getAnnotation(MultipartConfig.class);
                if (annotation != null) {
                    multipartConfigElement =
                        new MultipartConfigElement(annotation);
                }
            }

            // Special handling for ContainerServlet instances
            // Note: The InstanceManager checks if the application is permitted
            //       to load ContainerServlets
            if (servlet instanceof ContainerServlet) {
                ((ContainerServlet) servlet).setWrapper(this);
            }

            classLoadTime = (int) (System.currentTimeMillis() - t1);

            if (servlet instanceof SingleThreadModel) {
                if (instancePool == null) {
                    instancePool = new Stack<>();
                }
                singleThreadModel = true;
            }

            // 调用Servlet的init方法
            initServlet(servlet);

            fireContainerEvent("load", this);

            loadTime = System.currentTimeMillis() - t1;
        } finally {
            if (swallowOutput) {
                String log = SystemLogHandler.stopCapture();
                if (log != null && log.length() > 0) {
                    if (getServletContext() != null) {
                        getServletContext().log(log);
                    } else {
                        out.println(log);
                    }
                }
            }
        }
        return servlet;

    }
   private synchronized void initServlet(Servlet servlet)
        throws ServletException {

        if (instanceInitialized && !singleThreadModel) {
            return;
        }

        // Call the initialization method of this servlet
        try {
            if (Globals.IS_SECURITY_ENABLED) {
                boolean success = false;
                try {
                    Object[] args = new Object[]{facade};
                    SecurityUtil.doAsPrivilege("init",
                        servlet,
                        classType,
                        args);
                    success = true;
                } finally {
                    if (!success) {
                        // destroy() will not be called, thus clear the reference now
                        SecurityUtil.remove(servlet);
                    }
                }
            } else {
                servlet.init(facade);
            }

            instanceInitialized = true;
        } catch (UnavailableException f) {
            unavailable(f);
            throw f;
        } catch (ServletException f) {
            // If the servlet wanted to be unavailable it would have
            // said so, so do not call unavailable(null).
            throw f;
        } catch (Throwable f) {
            ExceptionUtils.handleThrowable(f);
            getServletContext().log(sm.getString("standardWrapper.initException", getName()), f);
            // If the servlet wanted to be unavailable it would have
            // said so, so do not call unavailable(null).
            throw new ServletException
                (sm.getString("standardWrapper.initException", getName()), f);
        }
    }


接着来看下过滤器链的执行过程

public final class ApplicationFilterChain implements FilterChain {
   @Override
    public void doFilter(ServletRequest request, ServletResponse response)
        throws IOException, ServletException {

        if (Globals.IS_SECURITY_ENABLED) {
            final ServletRequest req = request;
            final ServletResponse res = response;
            try {
                java.security.AccessController.doPrivileged(
                    new java.security.PrivilegedExceptionAction<Void>() {
                        @Override
                        public Void run()
                            throws ServletException, IOException {
                            // 执行Filter链
                            internalDoFilter(req, res);
                            return null;
                        }
                    }
                );
            } catch (PrivilegedActionException pe) {
                Exception e = pe.getException();
                if (e instanceof ServletException) {
                    throw (ServletException) e;
                } else if (e instanceof IOException) {
                    throw (IOException) e;
                } else if (e instanceof RuntimeException) {
                    throw (RuntimeException) e;
                } else {
                    throw new ServletException(e.getMessage(), e);
                }
            }
        } else {
            // 执行Filter链
            internalDoFilter(request, response);
        }
     
}
    /**
     * `internalDoFilter`方法通过pos和n来调用过滤器链里面的每个过滤器。pos表示当前的过滤器下标,n表示总的过滤器数量
     * `internalDoFilter`方法最终会调用servlet.service()方法
     *
     * @param request
     * @param response
     * @throws IOException
     * @throws ServletException
     */
    private void internalDoFilter(ServletRequest request,
                                  ServletResponse response)
        throws IOException, ServletException {

        // Call the next filter if there is one
        // 当pos小于n时, 则执行Filter
        if (pos < n) {
            // 得到 过滤器 Filter,执行一次post++
            ApplicationFilterConfig filterConfig = filters[pos++];
            try {
                Filter filter = filterConfig.getFilter();

                if (request.isAsyncSupported() && "false".equalsIgnoreCase(
                    filterConfig.getFilterDef().getAsyncSupported())) {
                    request.setAttribute(Globals.ASYNC_SUPPORTED_ATTR, Boolean.FALSE);
                }
                if (Globals.IS_SECURITY_ENABLED) {
                    final ServletRequest req = request;
                    final ServletResponse res = response;
                    Principal principal =
                        ((HttpServletRequest) req).getUserPrincipal();

                    Object[] args = new Object[]{req, res, this};
                    SecurityUtil.doAsPrivilege("doFilter", filter, classType, args, principal);
                } else {
                    // 执行Filter
                    filter.doFilter(request, response, this);
                }
            } catch (IOException | ServletException | RuntimeException e) {
                throw e;
            } catch (Throwable e) {
                e = ExceptionUtils.unwrapInvocationTargetException(e);
                ExceptionUtils.handleThrowable(e);
                throw new ServletException(sm.getString("filterChain.filter"), e);
            }
            return;
        }

        // 当pos等于n时,过滤器都执行完毕执行下面的代码
        // We fell off the end of the chain -- call the servlet instance
        try {
            if (ApplicationDispatcher.WRAP_SAME_OBJECT) {
                lastServicedRequest.set(request);
                lastServicedResponse.set(response);
            }

            if (request.isAsyncSupported() && !servletSupportsAsync) {
                request.setAttribute(Globals.ASYNC_SUPPORTED_ATTR,
                    Boolean.FALSE);
            }
            // Use potentially wrapped request from this point
            if ((request instanceof HttpServletRequest) &&
                (response instanceof HttpServletResponse) &&
                Globals.IS_SECURITY_ENABLED) {
                final ServletRequest req = request;
                final ServletResponse res = response;
                Principal principal =
                    ((HttpServletRequest) req).getUserPrincipal();
                Object[] args = new Object[]{req, res};
                SecurityUtil.doAsPrivilege("service",
                    servlet,
                    classTypeUsedInService,
                    args,
                    principal);
            } else {
                // service执行请求
                servlet.service(request, response);
            }
        } catch (IOException | ServletException | RuntimeException e) {
            throw e;
        } catch (Throwable e) {
            e = ExceptionUtils.unwrapInvocationTargetException(e);
            ExceptionUtils.handleThrowable(e);
            throw new ServletException(sm.getString("filterChain.servlet"), e);
        } finally {
            if (ApplicationDispatcher.WRAP_SAME_OBJECT) {
                lastServicedRequest.set(null);
                lastServicedResponse.set(null);
            }
        }
    

最终调用servlet.service(request, response); 方法,

真正将请求交给 Servlet 处理

protected void service(HttpServletRequest req, HttpServletResponse resp)
        throws ServletException, IOException {

        String method = req.getMethod();

        if (method.equals(METHOD_GET)) {
            long lastModified = getLastModified(req);
            if (lastModified == -1) {
                 doGet(req, resp);
            } else {
                long ifModifiedSince;
                try {
                    ifModifiedSince = req.getDateHeader(HEADER_IFMODSINCE);
                } catch (IllegalArgumentException iae) {
                    ifModifiedSince = -1;
                }
                if (ifModifiedSince < (lastModified / 1000 * 1000)) {
                    maybeSetLastModified(resp, lastModified);
                    doGet(req, resp);
                } else {
                    resp.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
                }
            }

        } else if (method.equals(METHOD_HEAD)) {
            long lastModified = getLastModified(req);
            maybeSetLastModified(resp, lastModified);
            doHead(req, resp);

        } else if (method.equals(METHOD_POST)) {
            doPost(req, resp);

        } else if (method.equals(METHOD_PUT)) {
            doPut(req, resp);

        } else if (method.equals(METHOD_DELETE)) {
            doDelete(req, resp);

        } else if (method.equals(METHOD_OPTIONS)) {
            doOptions(req,resp);

        } else if (method.equals(METHOD_TRACE)) {
            doTrace(req,resp);

        } else {
        
            String errMsg = lStrings.getString("http.method_not_implemented");
            Object[] errArgs = new Object[1];
            errArgs[0] = method;
            errMsg = MessageFormat.format(errMsg, errArgs);

            resp.sendError(HttpServletResponse.SC_NOT_IMPLEMENTED, errMsg);
        }
    }

至此整个请求链路结束,一次HTTP请求处理完成。

3. 小节

本文通过以 Connector 接受请求为起点,对框架源码进行了深度分析,并解答了 Tomcat 线程模型的结构的意义,以及一个 HTTP 请求经过 Tomcat 服务器的完整流程。

下图是框架接受请求各组件的时序图,以助于更好的理解。

https://www.processon.com/view/link/63c2e873e8d4677705c87b56

作者:MRyan


本文采用 知识共享署名-相同方式共享 4.0 国际许可协议 进行许可。
转载时请注明本文出处及文章链接。本文链接:https://wormholestack.com/archives/649/
2024 © MRyan 107 ms