线程池
1. 线程池简介
1.1 什么是线程池
线程池(Thread Pool)是一种基于池化思想管理线程的工具,广泛应用于多线程服务器场景,如 Tomcat、RPC 框架、消息中间件等。本文所描述的线程池特指 JDK 提供的 ThreadPoolExecutor 类及其子类实现。
在并发环境下,线程过多会带来额外的开销:包括线程创建销毁的系统开销、线程上下文切换的调度开销等,同时也会因 CPU 资源抢占降低系统整体性能。线程池的核心设计思路是维护多个常驻线程,由管理者统一分配可并发执行的任务:一方面避免了处理任务时频繁创建销毁线程的开销,另一方面避免了线程数量膨胀导致的过度调度问题,保证了 CPU 内核资源的充分利用。
1.2 线程池解决的核心问题
线程池解决的核心问题是并发场景下的资源管理问题。在高并发环境下,系统无法预知任意时刻有多少任务需要执行,需要投入多少资源,这种不确定性会带来一系列问题:
- 频繁申请/销毁线程和调度线程,会带来巨大的额外系统开销
- 缺少对资源无限申请的抑制手段,容易引发系统资源耗尽的风险
- 系统无法合理管理内部的资源分布,会降低系统的稳定性
为解决资源分配问题,线程池采用了"池化"(Pooling)思想。池化是一种通用的资源管理思想,核心是将资源统一管理,最大化资源利用效率的同时最小化风险,这种思想不仅应用在计算机领域,在金融、设备管理、人员管理等领域也有广泛应用。
在计算机领域,池化思想的典型应用除了线程池外,还有:
- 内存池 (Memory Pooling):预先申请内存,提升内存申请速度,减少内存碎片
- 连接池 (Connection Pooling):预先申请数据库/HTTP 连接,提升连接获取速度,降低频繁创建连接的开销
- 实例池 (Object Pooling):循环使用重量级对象,减少对象初始化和释放时的昂贵损耗
1.3 线程池的优势
使用线程池可以带来一系列好处:
- 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的性能损耗
- 提高响应速度:任务到达时,无需等待线程创建即可立即分配线程执行
- 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统稳定性。使用线程池可以对线程进行统一的分配、调优和监控
- 提供更强大的扩展功能:线程池具备良好的扩展性,允许开发人员自定义实现增加更多功能,例如延时定时线程池
ScheduledThreadPoolExecutor,就支持任务延时执行或周期性执行
2. 线程池的使用
2.1 创建线程池
ThreadPoolExecutor
可以通过 ThreadPoolExecutor 提供的构造方法创建线程池。我们重点来看参数最全的构造方法,其他构造方法最终都会调用该构造方法。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)线程池的核心参数:
corePoolSize:核心线程数:线程池初始化时默认没有线程,当有任务提交时才会开始创建线程执行任务。maximumPoolSize:最大线程数:当核心线程数已满且任务队列也已满时,如果当前工作线程数小于maximumPoolSize,则会创建非核心线程来执行新提交的任务。keepAliveTime:非核心线程的空闲时间超过keepAliveTime就会被自动终止回收;当corePoolSize = maximumPoolSize时,该参数无效,因为此时不存在非核心线程。unit:keepAliveTime的时间单位。workQueue:用于保存待执行任务的阻塞队列,主要分为无界队列、有界队列、同步移交队列等类型。当工作线程数达到corePoolSize时,新提交的任务会被放入队列中等待执行。ArrayBlockingQueue(有界队列):队列长度有限,当队列满了就需要创建非核心线程执行任务,如果最大线程数已达上限,则执行拒绝策略。LinkedBlockingQueue(无界队列):队列长度无限,当任务处理速度跟不上任务提交速度时,可能会导致内存占用过高甚至 OOM。SynchronousQueue(同步队列):队列不作为任务的缓冲,队列长度为 0。
threadFactory:创建线程的工厂接口,默认使用Executors.defaultThreadFactory()。也可以通过实现ThreadFactory接口自定义线程工厂,用于设置线程名称、优先级等属性。handler:线程池无法继续接收任务时(workQueue已满且maximumPoolSize也已达上限)的拒绝策略。AbortPolicy:默认拒绝策略,抛出RejectedExecutionException异常。CallerRunsPolicy:由提交任务的线程来执行该任务。DiscardOldestPolicy:丢弃队列中存在时间最久的任务,然后尝试重新提交当前任务。DiscardPolicy:直接丢弃任务,不抛出任何异常也不给出任何通知。- 也可以实现
RejectedExecutionHandler接口,自定义拒绝策略。
以下是一个使用
ThreadPoolExecutor创建线程池的示例:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolDemo1 {
public static void main(String[] args) {
// 线程池的核心线程数
int corePoolSize = 5;
// 线程池的最大线程数
int maximumPoolSize = 10;
// 线程池的任务队列
ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(100);
// 线程池保持空闲的时间
long keepAliveTime = 60L;
// 时间单位
TimeUnit unit = TimeUnit.SECONDS;
// 线程池的拒绝策略
RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
// 创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
handler
);
// 提交任务到线程池
for (int i = 0; i < 15; i++) {
final int taskId = i;
executor.execute(() -> {
System.out.println("Task " + taskId + " is running by " + Thread.currentThread().getName());
});
}
// 关闭线程池
executor.shutdown();
try {
// 等待所有任务完成,超时时间为60秒
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
// 如果超时后任务仍未完成,则强制关闭线程池
executor.shutdownNow();
}
} catch (InterruptedException e) {
// 如果等待过程中被中断,也强制关闭线程池
executor.shutdownNow();
}
System.out.println("All tasks are done or interrupted.");
}
}Executors
Executors 类提供了创建各种类型线程池的静态方法,常见的预定义线程池包括:
newFixedThreadPool(int nThreads):固定大小的线程池,核心线程数和最大线程数都为nThreads,任务队列使用无界队列LinkedBlockingQueue,适用于任务量已知且相对稳定的场景。newSingleThreadExecutor():单线程线程池,核心线程数和最大线程数都为 1,任务队列使用无界队列LinkedBlockingQueue,保证所有任务按照提交顺序串行执行。newCachedThreadPool():缓存线程池,核心线程数为 0,最大线程数为Integer.MAX_VALUE,任务队列使用SynchronousQueue,线程空闲 60 秒后会被回收,适用于执行大量短生命周期任务的场景。newScheduledThreadPool(int corePoolSize):定时任务线程池,支持延时和周期性任务执行,最大线程数为Integer.MAX_VALUE,任务队列使用延迟队列DelayedWorkQueue。newSingleThreadScheduledExecutor():单线程的定时任务线程池,保证定时任务按照顺序串行执行。
但不推荐使用这种方式创建线程池,《阿里巴巴 Java 开发手册》中对此有明确规定,主要原因是:
FixedThreadPool和SingleThreadPool:允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。CachedThreadPool和ScheduledThreadPool:允许创建的线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。
2.2 提交任务
线程池提交任务主要有两类方法:
无返回值的任务使用
public void execute(Runnable command)方法提交;有返回值的任务使用
submit方法:Future<?> submit(Runnable task):提交Runnable任务Future<T> submit(Runnable task, T result):提交Runnable任务并指定执行结果Future<T> submit(Callable<T> task):提交Callable任务
execute 方法示例
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolDemo2 {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2,
5,
60L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100)
);
// 提交无返回值的Runnable任务
for (int i = 0; i < 3; i++) {
final int taskId = i;
executor.execute(() -> {
System.out.println("任务" + taskId + "正在执行,执行线程:" + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务" + taskId + "执行完成");
});
}
executor.shutdown();
}
}submit 方法示例
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolDemo3 {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2,
5,
60L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100)
);
// 提交Callable任务,获取返回值
Future<Integer> future = executor.submit(() -> {
// 模拟计算任务
int sum = 0;
for (int i = 0; i < 100; i++) {
sum += i;
}
return sum;
});
// 获取任务执行结果
Integer result = future.get(); // 阻塞等待任务完成
System.out.println("1-99累加结果:" + result);
executor.shutdown();
}
}如何执行批量任务
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException:执行批量任务,返回所有任务的执行结果。public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit):执行批量任务,在指定时间内等待任务完成,返回已完成任务的执行结果,取消未完成的任务。public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException:执行批量任务,返回最先完成的任务的执行结果。public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit):执行批量任务,在指定时间内等待,返回最先完成的任务的执行结果,取消所有未完成的任务:
批量任务执行示例
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolDemo4 {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
3,
5,
60L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100)
);
// 创建批量任务
List<Callable<Integer>> tasks = new ArrayList<>();
for (int i = 0; i < 5; i++) {
final int taskId = i;
tasks.add(() -> {
Thread.sleep(1000);
return taskId * 10;
});
}
System.out.println("===== invokeAll执行所有批量任务 =====");
// 执行所有批量任务,等待全部完成
List<Future<Integer>> futures = executor.invokeAll(tasks);
for (int i = 0; i < futures.size(); i++) {
System.out.println("任务" + i + "结果:" + futures.get(i).get());
}
System.out.println("\n===== invokeAny获取最先完成的任务结果 =====");
// 获取最先完成的任务结果
Integer firstResult = executor.invokeAny(tasks);
System.out.println("最先完成的任务结果:" + firstResult);
executor.shutdown();
}
}如何执行定时、延时任务
具备执行定时、延时、周期性任务的线程池:ScheduledThreadPoolExecutor
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit):延时执行Runnable任务,只执行一次。public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit):延时执行Callable任务,只执行一次。
延时任务示例
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
public class ThreadPoolDemo5 {
public static void main(String[] args) throws Exception {
ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(3);
System.out.println("当前时间:" + System.currentTimeMillis());
// 延时3秒后执行Runnable任务
scheduledExecutor.schedule(() -> {
System.out.println("延时3秒执行的任务,当前时间:" + System.currentTimeMillis());
}, 3, TimeUnit.SECONDS);
// 延时2秒后执行Callable任务,获取返回值
ScheduledFuture<String> scheduledFuture = scheduledExecutor.schedule(() -> {
return "延时2秒的Callable任务执行完成";
}, 2, TimeUnit.SECONDS);
System.out.println(scheduledFuture.get());
// 等待所有任务完成后关闭
Thread.sleep(4000);
scheduledExecutor.shutdown();
}
}如何执行周期、重复性任务
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit):延时一段时间后,周期性执行Runnable任务,周期为固定时间。固定频率执行,以上一次任务开始执行时间为基准,间隔period时间后执行下一次。如果任务执行时间超过period,则任务执行完成后立即执行下一次。public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit):延时一段时间后,周期性执行Runnable任务,周期为间隔时间。固定延迟执行,以上一次任务执行完成时间为基准,间隔delay时间后执行下一次。
周期性任务示例
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ThreadPoolDemo6 {
public static void main(String[] args) throws InterruptedException {
ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(2);
System.out.println("程序启动时间:" + System.currentTimeMillis());
// 固定频率执行:延时1秒后,每隔2秒执行一次
scheduledExecutor.scheduleAtFixedRate(() -> {
System.out.println("固定频率执行任务,当前时间:" + System.currentTimeMillis());
try {
// 模拟任务执行耗时1秒
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 1, 2, TimeUnit.SECONDS);
// 固定延迟执行:延时1秒后,每次任务执行完成后间隔3秒执行下一次
scheduledExecutor.scheduleWithFixedDelay(() -> {
System.out.println("固定延迟执行任务,当前时间:" + System.currentTimeMillis());
try {
// 模拟任务执行耗时1秒
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 1, 3, TimeUnit.SECONDS);
// 运行10秒后关闭线程池
Thread.sleep(10000);
scheduledExecutor.shutdown();
}
}2.3 关闭线程池
shutdownNow():立即关闭线程池,正在执行中的任务和队列中的任务都会被中断,同时返回队列中尚未执行的任务列表。shutdown():平缓关闭线程池,正在执行中的任务和队列中的任务都会执行完成,后续提交的新任务会被执行拒绝策略。isTerminated():当正在执行的任务和队列中的任务全部执行完成时返回true。
shutdown 和 shutdownNow 的区别
| 特性 | shutdown | shutdownNow |
|---|---|---|
| 立即关闭线程池 | 否 | 是 |
| 延时关闭线程池 | 是 | 否 |
| 不再接收新任务 | 是 | 是 |
| 继续执行完任务队列中的任务 | 是 | 否 |
| 返回任务队列中的任务 | 否 | 是 |
| 线程池状态 | SHUTDOWN | STOP |
2.4 线程池的参数设计分析
核心线程数 (corePoolSize)
核心线程数的设计通常需要根据任务的性质来确定:
- CPU 密集型:任务主要消耗 CPU 资源,推荐核心线程数为
CPU 核心数 + 1。这样即使发生页缺失等意外情况,备用线程也能保证 CPU 的利用率。 - I/O 密集型:任务涉及大量网络通信或磁盘操作,推荐核心线程数为
CPU 核心数 * 2。也可以使用更精确的计算公式:CPU 核心数 * (1 + 线程等待时间 / 线程计算时间)。
另外,也可以根据预期的每秒任务量来确定,例如一个线程执行一个任务需要 0.1 秒,那么 1 秒可以执行 10 个任务;如果系统 80% 的时间每秒都会产生 100 个任务,那么要想在 1 秒内处理完这 100 个任务,就需要 10 个线程,此时核心线程数可以设计为 10。
任务队列长度 (workQueue)
任务队列长度的设计需要综合考虑内存容量和系统响应速度,通常建议队列长度为:核心线程数 / 单个任务执行时间 * 2(这是一个经验参考值)。在上面的场景中,核心线程数为 10,单个任务执行时间为 0.1 秒,则队列长度可以设计为 200。
最大线程数 (maximumPoolSize)
最大线程数 = (最大峰值任务数 - 任务队列长度) * 单个任务执行时间。例如:系统每秒最大产生 1000 个任务,则最大线程数 = (1000 - 200) * 0.1 = 80 个。
最大空闲时间 (keepAliveTime)
这个参数的设计需要根据系统运行环境和硬件压力来设定,没有固定的参考值。
3. 线程池核心设计与实现
3.1 总体设计
Java 中的线程池核心实现类是 ThreadPoolExecutor,本章基于 JDK 1.8 的源码来分析 Java 线程池的核心设计与实现。我们首先来看一下 ThreadPoolExecutor 的 UML 类图,了解下 ThreadPoolExecutor 的继承关系。

ThreadPoolExecutor 实现的顶层接口是 Executor,顶层接口 Executor 提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供 Runnable 对象,将任务的运行逻辑提交到执行器(Executor)中,由 Executor 框架完成线程的调配和任务的执行部分。
ExecutorService 接口增加了一些能力:
- 扩充执行任务的能力,补充可以为一个或一批异步任务生成
Future的方法; - 提供了管控线程池的方法,比如停止线程池的运行。
AbstractExecutorService 则是上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。最下层的实现类 ThreadPoolExecutor 实现最复杂的运行部分,ThreadPoolExecutor 将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。
ThreadPoolExecutor 的运行机制如下图所示:

线程池在内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程。线程池的运行主要分成两部分:任务管理、线程管理。
- 任务管理部分充当生产者的角色,当任务提交后,线程池会判断该任务后续的流转:
- 直接申请线程执行该任务;
- 缓冲到队列中等待线程执行;
- 拒绝该任务。
- 线程管理部分是消费者,它们被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收。
接下来,我们会按照以下三个部分去详细讲解线程池运行机制:
- 线程池如何维护自身状态。
- 线程池如何管理任务。
- 线程池如何管理线程。
3.2 生命周期管理
线程池运行的状态,并不是用户显式设置的,而是伴随着线程池的运行,由内部来维护。线程池内部使用一个变量维护两个值:运行状态 (runState) 和 线程数量 (workerCount)。
在具体实现中,线程池将运行状态 (runState)、线程数量 (workerCount) 两个关键参数的维护放在了一起,如下代码所示:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));ctl 这个 AtomicInteger 类型,是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它同时包含两部分的信息:高 3 位保存 runState,低 29 位保存 workerCount,两个变量之间互不干扰。
用一个变量去存储两个值,可避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源。通过阅读线程池源代码也可以发现,经常出现要同时判断线程池运行状态和线程数量的情况。线程池也提供了若干方法去供用户获得线程池当前的运行状态、线程个数,这里都使用的是位运算的方式,相比于基本运算,速度也会快很多。
关于内部封装的获取生命周期状态、获取线程池线程数量的计算方法如以下代码所示:
private static int runStateOf(int c) { return c & ~CAPACITY; } // 计算当前运行状态
private static int workerCountOf(int c) { return c & CAPACITY; } // 计算当前线程数量
private static int ctlOf(int rs, int wc) { return rs | wc; } // 通过状态和线程数生成ctlThreadPoolExecutor 的运行状态有 5 种,分别为:
| 运行状态 | 状态描述 |
|---|---|
| RUNNING | 能接受新提交的任务,并且也能处理阻塞队列中的任务。 |
| SHUTDOWN | 关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。 |
| STOP | 不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。 |
| TIDYING | 所有的任务都终止了,workerCount(有效线程数)为0。 |
| TERMINATED | 在terminated()方法执行完后进入该状态。 |
其生命周期转换如下入所示:

3.3 任务执行机制
任务调度
任务调度是线程池的主要入口,当用户提交了一个任务,接下来这个任务将如何执行都是由这个阶段决定的。了解这部分就相当于了解了线程池的核心运行机制。
首先,所有任务的调度都是由 execute 方法完成的,这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。其执行过程如下:
- 首先检测线程池运行状态,如果不是 RUNNING,则直接拒绝,线程池要保证在 RUNNING 的状态下执行任务。
- 如果
workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。 - 如果
workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。 - 如果
workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。 - 如果
workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
其执行流程如下图所示:

任务缓冲
任务缓冲模块是线程池能够管理任务的核心部分。线程池的本质是对任务和线程的管理,而做到这一点最关键的思想就是将任务和线程两者解耦,不让两者直接关联,才可以做后续的分配工作。
线程池中是以生产者消费者模式,通过一个阻塞队列来实现的。阻塞队列缓存任务,工作线程从阻塞队列中获取任务。
阻塞队列 (BlockingQueue) 是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空;当队列满时,存储元素的线程会等待队列可用。
阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器中拿元素。
下图中展示了线程 1 往阻塞队列中添加元素,而线程 2 从阻塞队列中移除元素:

使用不同的队列可以实现不一样的任务存取策略,常见阻塞队列的特点我们在上一章已经介绍过。
任务申请
由上文的任务分配部分可知,任务的执行有两种可能:
- 任务直接由新创建的线程执行(仅出现在线程初始创建的时候)
- 线程从任务队列中获取任务然后执行,执行完任务的空闲线程会再次去从队列中申请任务再去执行(绝大多数情况)
线程需要从任务缓存模块中不断地取任务执行,帮助线程从阻塞队列中获取任务,实现线程管理模块和任务管理模块之间的通信。这部分策略由 getTask 方法实现,其执行流程如下图所示:

getTask 这部分进行了多次判断,为的是控制线程的数量,使其符合线程池的状态。如果线程池现在不应该持有那么多线程,则会返回 null 值。工作线程 Worker 会不断接收新任务去执行,而当工作线程 Worker 接收不到任务的时候,就会开始被回收。
任务拒绝
任务拒绝模块是线程池的保护部分,线程池有一个最大的容量,当线程池的任务缓存队列已满,并且线程池中的线程数目达到 maximumPoolSize 时,就需要拒绝掉该任务,采取任务拒绝策略,保护线程池。
拒绝策略是一个接口,其设计如下:
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}用户可以通过实现这个接口去定制拒绝策略,也可以选择 JDK 提供的四种已有拒绝策略,其特点我们在使用中已经介绍过。
3.4 Worker 线程管理
Worker 线程
线程池为了掌握线程的状态并维护线程的生命周期,设计了线程池内的工作线程 Worker。我们来看一下它的部分代码:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
final Thread thread;// Worker持有的线程
Runnable firstTask;// 初始化的任务,可以为null
}Worker 这个工作线程,实现了 Runnable 接口,并持有一个线程 thread,一个初始化的任务 firstTask。
thread是在调用构造方法时通过ThreadFactory来创建的线程,可以用来执行任务;firstTask用它来保存传入的第一个任务,这个任务可以有也可以为null:- 如果这个值是非空的,那么线程就会在启动初期立即执行这个任务,也就对应核心线程创建时的情况;
- 如果这个值是
null,那么就需要创建一个线程去执行任务列表(workQueue)中的任务,也就是非核心线程的创建。
Worker 执行任务的模型如下图所示:

线程池需要管理线程的生命周期,需要在线程长时间不运行的时候进行回收。线程池使用一张 HashSet 去持有线程的引用,这样可以通过添加引用、移除引用这样的操作来控制线程的生命周期。这个时候重要的就是如何判断线程是否在运行。
Worker 是通过继承 AQS,使用 AQS 来实现独占锁这个功能。没有使用可重入锁 ReentrantLock,而是使用 AQS,为的就是实现不可重入的特性去反应线程现在的执行状态:
lock方法一旦获取了独占锁,表示当前线程正在执行任务中,此时不应该中断线程。- 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断。
- 线程池在执行
shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;如果线程是空闲状态则可以安全回收。
在线程回收过程中就使用到了这种特性,回收过程如下图所示:

Worker 线程增加
增加线程是通过线程池中的 addWorker 方法,该方法的功能就是增加一个线程,该方法不考虑线程池是在哪个阶段增加的该线程,这个分配线程的策略是在上个步骤完成的,该步骤仅仅完成增加线程,并使它运行,最后返回是否成功这个结果。
addWorker 方法有两个参数:firstTask、core:
firstTask参数用于指定新增的线程执行的第一个任务,该参数可以为空;core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize
其执行流程如下图所示:

Worker 线程回收
线程池中线程的销毁依赖 JVM 自动的回收,线程池做的工作是根据当前线程池的状态维护一定数量的线程引用,防止这部分线程被 JVM 回收,当线程池决定哪些线程需要回收时,只需要将其引用消除即可。
Worker 被创建出来后,就会不断地进行轮询,然后获取任务去执行,核心线程可以无限等待获取任务,非核心线程要限时获取任务。当 Worker 无法获取到任务,也就是获取的任务为空时,循环会结束,Worker 会主动消除自身在线程池内的引用。
try {
while (task != null || (task = getTask()) != null) {
// 执行任务
}
} finally {
processWorkerExit(w, completedAbruptly);// 获取不到任务时,主动回收自己
}线程回收的工作是在 processWorkerExit 方法完成的。

事实上,在这个方法中,将线程引用移出线程池就已经结束了线程销毁的部分。但由于引起线程销毁的可能性有很多,线程池还要判断是什么引发了这次销毁,是否要改变线程池的现阶段状态,是否要根据新状态,重新分配线程。
Worker 线程执行任务
在 Worker 类中的 run 方法调用了 runWorker 方法来执行任务,runWorker 方法的执行过程如下:
while循环不断地通过getTask()方法获取任务。getTask()方法从阻塞队列中取任务。- 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态。
- 执行任务。
- 如果
getTask结果为null则跳出循环,执行processWorkerExit()方法,销毁线程。
执行流程如下图所示:

参考资料,部分内容转载整合来自以下资料