我们生来,就是为了,在宇宙中,留下印记。 —— Steve Jobs 《史蒂夫·乔布斯》
本系列针对于 Tomcat 版本为 8.5X
文章已收录至精进Tomcat系列 系列其它文章 https://www.wormholestack.com/tag/Tomcat/
源码阅读环境:https://gitee.com/M-Analysis/source_tomcat8 已填充关键注释
JUC 原生线程池前置知识可以参考深入Java线程池
原生线程池提交任务流程
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 线程池状态和线程数的整数
int c = ctl.get();
// 如果当前线程数小于核心线程数,创建 Worker 线程并启动线程
if (workerCountOf(c) < corePoolSize) {
// 添加任务成功,那么就结束了 结果会包装到 FutureTask 中
if (addWorker(command, true))
return;
c = ctl.get();
}
// 要么当前线程数大于等于核心线程数,要么刚刚 addWorker 失败了 ,如果线程池处于 RUNNING 状态,把这个任务添加到任务队列 workQueue 中
if (isRunning(c) && workQueue.offer(command)) {
// 二次状态检查
int recheck = ctl.get();
// 如果线程池已不处于 RUNNING 状态,那么移除已经入队的这个任务,并且执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果线程池还是 RUNNING 的,并且线程数为 0,重新创建一个新的线程 这里目的担心任务提交到队列中了,但是线程都关闭了
else if (workerCountOf(recheck) == 0)
// 创建Worker,并启动里面的Thread,为什么传null,线程启动后会自动从阻塞队列拉任务执行
addWorker(null, false);
}
// 如果 workQueue 队列满了,那么进入到这个分支 以 maximumPoolSize 为界创建新的 worker线程并启动线程,如果失败,说明当前线程数已经达到 maximumPoolSize,执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
- 当前线程数小于核心线程数,则创建一个新的线程来执行任务
- 当前线程数大于等于核心线程数,且阻塞队列未满,则将任务添加到队列中
- 如果阻塞队列已满,当前线程数大于等于核心线程数,当前线程数小于最大线程数,则创建并启动一个线程来执行新提交的任务(这里可以继续看下面的分析)
- 若当前线程数大于等于最大线程数,且阻塞队列已满,此时会执行拒绝策略
如下图所示:
核心思想就是就是先让核心线程数的线程工作,多余的任务统统塞到阻塞队列,阻塞队列塞不下才再多创建线程来工作,这种情况下当大量请求提交时,大量的请求很有可能都会被阻塞在队列中,而线程还没有创建到最大线程数,导致用户请求处理很慢,用户体验很差,而且当我们的工作队列设置得很大时,最大线程数这个参数显得没有意义,因为队列很难满,或者到满的时候再去扩容线程池已经于事无补了。
如何解决这个问题呢?
我们有没有办法让线程池更激进一点呢,优先开启更多的线程,而把队列当成一个后备方案。
那 Tomcat 如何解决这个问题呢?
Tomcat 自己实现了 ThreadPoolExecutor 方法
重写了execute()方法
public void execute(Runnable command, long timeout, TimeUnit unit) {
submittedCount.incrementAndGet();
try {
executeInternal(command);
} catch (RejectedExecutionException rx) {
// 判断如果任务队列类型是 TaskQueue,则尝试一次将任务添加到任务队列中,如果添加失败,证明队列已满,然后再执行拒绝策略
if (getQueue() instanceof TaskQueue) {
final TaskQueue queue = (TaskQueue) getQueue();
try {
if (!queue.force(command, timeout, unit)) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
}
} catch (InterruptedException x) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} else {
submittedCount.decrementAndGet();
throw rx;
}
}
以上代码其实就是当抛出异常拒绝策略了再给一次机会,尝试往阻塞队列里插任务,尽最大努力的去执行任务。
executeInternal 实现代码如下:
public void executeInternal(Runnable command) {
if (command == null)
throw new NullPointerException();
// 线程池状态和线程数的整数
int c = ctl.get();
// 如果当前线程数小于核心线程数,创建 Worker 线程并启动线程
if (workerCountOf(c) < corePoolSize) {
// 添加任务成功,那么就结束了 结果会包装到 FutureTask 中
if (addWorker(command, true))
return;
c = ctl.get();
}
// 要么当前线程数大于等于核心线程数,要么刚刚 addWorker 失败了 ,如果线程池处于 RUNNING 状态,把这个任务添加到任务队列 workQueue 中
if (isRunning(c) && workQueue.offer(command)) {
// 二次状态检查
int recheck = ctl.get();
// 如果线程池已不处于 RUNNING 状态,那么移除已经入队的这个任务,并且执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果线程池还是 RUNNING 的,并且线程数为 0,重新创建一个新的线程 这里目的担心任务提交到队列中了,但是线程都关闭了
else if (workerCountOf(recheck) == 0)
// 创建Worker,并启动里面的Thread,为什么传null,线程启动后会自动从阻塞队列拉任务执行
addWorker(null, false);
}
// workQueue.offer(command)返回false,以 maximumPoolSize 为界创建新的 worker线程并启动线程,如果失败,说明当前线程数已经达到 maximumPoolSize,执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
新增阻塞队列 TaskQueue 继承了 LinkedBlockingQueue,重写了offer()方法,
public class TaskQueue extends LinkedBlockingQueue<Runnable> {
@Override
public boolean offer(@NonNull Runnable runnable) {
if (executor == null) {
throw new RejectedExecutionException("The task queue does not have executor.");
}
int currentPoolThreadSize = executor.getPoolSize();
// 线程池中的线程数等于最大线程数的时候,就将任务放进队列等待工作线程处理
if (currentPoolThreadSize == executor.getMaximumPoolSize()) {
return super.offer(runnable);
}
// 如果当前未执行的任务数量小于等于当前线程数,说明还有剩余的worker线程,就将任务放进队列等待工作线程处理
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
return super.offer(runnable);
}
// 如果当前线程数大于核心线程,但小于最大线程数量,则直接返回false,外层会让线程池创建新的线程来执行任务
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
return false;
}
// currentPoolThreadSize >= max
return super.offer(runnable);
}
}
上述代码我们看到,每次向队列插入任务,判断如果当前线程数小于最大线程数则插入失败(currentPoolThreadSize < executor.getMaximumPoolSize()) 返回 false。
外层 executeInternal 方法则执行 addWorker(command, false),创建新的线程来执行任务。
如下图所示:
简单来说就是重写了execute()方法,当抛出拒绝策略了尝试一次往阻塞队列里插入任务,尽最大努力的去执行任务,新增阻塞队列继承了 LinkedBlockingQueue,重写了offer()方法,重写了offer()方法,每次向队列插入任务,判断如果当前线程数小于最大线程数则插入失败。进而让线程池创建新线程来处理任务。