ForkJoinPool
1. 什么是 Fork/Join
Fork/Join 是 Java 7 引入的并行计算框架,其核心思想是将分治算法与工作窃取相结合。
- Fork(分): 将大任务分解为小任务,并行执行
- Join(合): 合并小任务的结果,得到最终答案
Fork/Join 框架主要就是用来支持分治任务模型的,Fork 对应的是分治任务模型里的任务分解,Join 对应的是结果合并。它的核心思想是将一个大任务分成许多小任务,然后并行执行这些小任务,最终将它们的结果合并成一个大的结果。

适合使用 Fork/Join 的场景:
- 大规模数组求和:将数组分段并行计算,最后汇总
- 排序算法:归并排序、快速排序等可分治的算法
- 矩阵运算:矩阵乘法、转置等
- 图像处理:图像渲染、滤镜应用、像素操作
- 递归任务:树形结构的遍历、搜索
- 并行流操作:Java 8 Stream 的 parallel() 底层实现
不适合使用 Fork/Join 的场景:
- 文件读写:IO 密集型,阻塞等待浪费时间
- 网络请求:IO 密集型,阻塞等待浪费时间
- 数据库操作:IO 密集型,且任务不可分解
- 短小任务:任务分解和管理开销大于收益
2. 核心组件
Fork/Join 框架主要有三个核心组件:ForkJoinPool、ForkJoinTask,以及具体实现类 RecursiveAction 和 RecursiveTask。
2.1 ForkJoinPool
ForkJoinPool 是专门为 Fork/Join 任务设计的线程池,负责管理工作线程的执行和任务调度。
构造器
/**
* 构造 ForkJoinPool。
*
* @param parallelism 指定并行级别(parallelism level)。ForkJoinPool 将根据这个设定,决定工作线程的数量。
* 如果未设置的话,将使用 {@code Runtime.getRuntime().availableProcessors()} 来设置并行级别
* @param factory ForkJoinPool 在创建线程时,会通过 factory 来创建。
* 注意,这里需要实现的是 {@code ForkJoinWorkerThreadFactory},而不是 {@code ThreadFactory}。
* 如果不指定 factory,那么将由默认的 {@code DefaultForkJoinWorkerThreadFactory} 负责线程的创建工作
* @param handler 指定异常处理器,当任务在运行中出错时,将由设定的处理器处理
* @param asyncMode 设置队列的工作模式。当 asyncMode 为 {@code true} 时,将使用先进先出队列,
* 而为 {@code false} 时则使用后进先出的模式
*/
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode)// 获取处理器数量
int processors = Runtime.getRuntime().availableProcessors();
// 构建forkjoin线程池
ForkJoinPool forkJoinPool = new ForkJoinPool(processors);常用方法
| 方法 | 说明 |
|---|---|
void execute(ForkJoinTask<?> task) | 异步执行任务,无返回值 |
void execute(Runnable task) | 异步执行 Runnable 任务 |
T invoke(ForkJoinTask<T> task) | 阻塞等待执行并获取结果 |
ForkJoinTask<T> submit(ForkJoinTask<T> task) | 提交任务,返回 Future |
ForkJoinTask<T> submit(Callable<T> task) | 提交 Callable 任务 |
ForkJoinTask<T> submit(Runnable task, T result) | 提交 Runnable,带默认值 |
void shutdown() | 优雅关闭,不再接受新任务 |
boolean awaitTermination(long timeout, TimeUnit unit) | 等待线程池终止 |
2.2 ForkJoinTask
ForkJoinTask 是所有 Fork/Join 任务的抽象基类,实现了 Future 接口。ForkJoinTask 最核心的是 fork() 方法和 join() 方法,承载着主要的任务协调作用,一个用于任务提交,一个用于结果获取。
fork() 和 join()
fork():fork() 方法用于向当前任务所运行的线程池中提交任务。如果当前线程是 ForkJoinWorkerThread 类型,将会放入该线程的工作队列,否则放入 common 线程池的工作队列中。
join():join() 方法用于获取任务的执行结果。调用 join() 时,将阻塞当前线程直到对应的子任务完成运行并返回结果。
protected Integer compute() {
if (任务足够小) {
return 直接计算结果;
}
// 分解为子任务
Task t1 = new Task(subProblem1);
Task t2 = new Task(subProblem2);
// 异步提交子任务
t1.fork();
// 当前线程处理另一个子任务
Integer result2 = t2.compute();
// 等待第一个子任务结果
Integer result1 = t1.join();
// 合并结果
return result1 + result2;
}通常情况下我们不需要直接继承 ForkJoinTask 类,而只需要继承它的子类,Fork/Join 框架提供了以下三个子类:
| 子类 | 说明 |
|---|---|
RecursiveAction | 用于递归执行但不需要返回结果的任务 |
RecursiveTask | 用于递归执行需要返回结果的任务 |
CountedCompleter<T> | 在任务完成执行后会触发执行一个自定义的钩子函数 |
3. 工作原理
ForkJoinPool 内部有多个任务队列,当我们通过 ForkJoinPool 的 invoke() 或者 submit() 方法提交任务时,ForkJoinPool 根据一定的路由规则把任务提交到一个任务队列中,如果任务在执行过程中会创建出子任务,那么子任务会提交到工作线程对应的任务队列中。
如果工作线程对应的任务队列空了,是不是就没活儿干了呢?不是的,ForkJoinPool 支持一种叫做"任务窃取"的机制,如果工作线程空闲了,那它可以"窃取"其他工作任务队列里的任务。如此一来,所有的工作线程都不会闲下来了。
3.1 整体架构
ForkJoinPool 的核心设计包括三个关键组件:
┌─────────────────────────────────────────────────────────────┐
│ ForkJoinPool │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ WorkQueue[] 数组 │ │
│ │ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ │ │
│ │ │ Q0 │ │ Q1 │ │ Q2 │ │ Q3 │ │ Q4 │ │ Q5 │ │... │ │ │
│ │ │偶数│ │奇数│ │偶数│ │奇数│ │偶数│ │奇数│ │ │ │ │
│ │ │外部│ │工作│ │外部│ │工作│ │外部│ │工作│ │ │ │ │
│ │ │提交│ │线程│ │提交│ │线程│ │提交│ │线程│ │ │ │ │
│ │ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘- ForkJoinPool 内部维护一个 WorkQueue[] 数组
- 每个 WorkQueue 关联一个 ForkJoinWorkerThread 工作线程
- 外部提交的任务放在偶数索引的队列,工作线程私有任务放在奇数索引的队列
3.2 ForkJoinWorkerThread
ForkJoinWorkerThread 是 ForkJoinPool 中的一个专门用于执行任务的线程。当一个 ForkJoinWorkerThread 被创建时,它会自动注册一个 WorkQueue 到 ForkJoinPool 中。这个 WorkQueue 是该线程专门用于存储自己的任务的队列,只能出现在 WorkQueues[] 的奇数位。
ForkJoinWorkerThread 工作线程启动后就会扫描偷取任务执行,另外当其在 ForkJoinTask#join() 等待返回结果时如果被 ForkJoinPool 线程池发现其任务队列为空或者已经将当前任务执行完毕,也会通过工作窃取算法从其他任务队列中获取任务分配到其任务队列中并执行。
3.3 WorkQueue
WorkQueue 是一个双端队列,用于存储工作线程自己的任务。每个工作线程都会维护一个本地的 WorkQueue,并且优先执行本地队列中的任务。当本地队列中的任务执行完毕后,工作线程会尝试从其他线程的 WorkQueue 中窃取任务。
WorkQueue 任务队列其实也分为了两种类型,一种是外部提交进来的任务所占用的队列,其在任务队列数组中的数组下标为偶数;另一种是属于工作线程私有的任务队列,保存大任务 fork 分解出来的任务,其在任务队列数组中的数组下标为奇数。
3.4 工作窃取
ForkJoinPool 与 ThreadPoolExecutor 有个很大的不同之处在于,ForkJoinPool 引入了工作窃取设计,它是其性能保证的关键之一。工作窃取,就是允许空闲线程从繁忙线程的双端队列中窃取任务。默认情况下,工作线程从它自己的双端队列的头部获取任务。但是,当自己的任务为空时,线程会从其他繁忙线程双端队列的尾部中获取任务。这种方法,最大限度地减少了线程竞争任务的可能性。
通过工作窃取,Fork/Join 框架可以实现任务的自动负载均衡,以充分利用多核 CPU 的计算能力,同时也可以避免线程的饥饿和延迟问题。
3.5 与普通线程池的区别
| 特性 | ForkJoinPool | 普通线程池(如 ThreadPoolExecutor) |
|---|---|---|
| 任务队列 | 每个线程独立的双端队列 | 共享的阻塞队列 |
| 负载均衡 | 工作窃取,自动均衡 | 需要手动分配 |
| 任务执行 | 优先执行自己的任务 | 抢占式执行 |
| 适用场景 | CPU 密集型、可分解任务 | 任意类型的任务 |
| 线程数量 | 默认 = CPU 核心数 | 可任意配置 |
4. 基本使用
4.1 数组求和
数组求和是 Fork/Join 最经典的入门案例。
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class ForkJoinPoolDemo1 {
/**
* 数组求和任务
*/
static class SumTask extends RecursiveTask<Integer> {
// 阈值
private static final int THRESHOLD = 1000;
private final int[] array;
private final int start;
private final int end;
public SumTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int length = end - start;
// 小于阈值,直接计算
if (length <= THRESHOLD) {
int sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
}
// 大于阈值,拆分任务
int mid = start + length / 2;
SumTask leftTask = new SumTask(array, start, mid);
SumTask rightTask = new SumTask(array, mid, end);
// 异步执行左半部分
leftTask.fork();
// 同步执行右半部分
Integer rightResult = rightTask.compute();
// 等待左半部分结果
Integer leftResult = leftTask.join();
// 合并结果
return leftResult + rightResult;
}
}
public static void main(String[] args) {
int[] array = new int[1000000];
for (int i = 0; i < array.length; i++) {
array[i] = i + 1;
}
ForkJoinPool pool = ForkJoinPool.commonPool();
SumTask task = new SumTask(array, 0, array.length);
long start = System.currentTimeMillis();
int result = pool.invoke(task);
long end = System.currentTimeMillis();
System.out.println("结果: " + result);
System.out.println("耗时: " + (end - start) + "ms");
}
}输出:
结果: 1784293664
耗时: 3ms阈值设置的影响:
| 阈值设置 | 优点 | 缺点 |
|---|---|---|
| 阈值太小 | 任务数量多,并行度高 | 任务管理开销大 |
| 阈值太大 | 减少管理开销 | 并行度降低 |
| 经验值 | 100010000(简单操作)<br/>1001000(复杂操作) | - |
4.2 归并排序
归并排序天然适合 Fork/Join 框架,因为其分治特性非常明显。
import java.util.Arrays;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.stream.IntStream;
public class ForkJoinPoolDemo2 {
/**
* 归并排序任务
*/
public static class MergeSortTask extends RecursiveAction {
private static final int THRESHOLD = 1000;
private final int[] array;
private final int start;
private final int end;
public MergeSortTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected void compute() {
int length = end - start;
if (length <= THRESHOLD) {
// 小数组使用 Arrays.sort
Arrays.sort(array, start, end);
return;
}
int mid = start + length / 2;
MergeSortTask leftTask = new MergeSortTask(array, start, mid);
MergeSortTask rightTask = new MergeSortTask(array, mid, end);
// 并行执行两个子任务
leftTask.fork();
rightTask.fork();
// 等待子任务完成
leftTask.join();
rightTask.join();
// 合并两个有序数组
merge(start, mid, end);
}
private void merge(int start, int mid, int end) {
int[] temp = new int[end - start];
int i = start, j = mid, k = 0;
while (i < mid && j < end) {
if (array[i] <= array[j]) {
temp[k++] = array[i++];
} else {
temp[k++] = array[j++];
}
}
while (i < mid) {
temp[k++] = array[i++];
}
while (j < end) {
temp[k++] = array[j++];
}
System.arraycopy(temp, 0, array, start, temp.length);
}
}
public static void main(String[] args) {
int[] array = IntStream.range(0, 1000000).toArray();
ForkJoinPool pool = ForkJoinPool.commonPool();
MergeSortTask task = new MergeSortTask(array, 0, array.length);
long start = System.currentTimeMillis();
pool.invoke(task);
long end = System.currentTimeMillis();
System.out.println("排序耗时: " + (end - start) + "ms");
}
}输出:
排序耗时: 28ms4.3 斐波那契数列
斐波那契数列展示了 Fork/Join 的递归特性。
斐波那契数列指的是这样一个数列:1,1,2,3,5,8,13,21,34,55,89... 这个数列从第 3 项开始,每一项都等于前两项之和。
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class ForkJoinPoolDemo3 {
/**
* 斐波那契数列任务
*/
static class Fibonacci extends RecursiveTask<Integer> {
private final int n;
public Fibonacci(int n) {
this.n = n;
}
@Override
protected Integer compute() {
if (n <= 1) {
return n;
}
Fibonacci f1 = new Fibonacci(n - 1);
Fibonacci f2 = new Fibonacci(n - 2);
f1.fork();
int result = f2.compute() + f1.join();
return result;
}
}
public static void main(String[] args) {
ForkJoinPool pool = ForkJoinPool.commonPool();
long start = System.currentTimeMillis();
int result = pool.invoke(new Fibonacci(40));
long end = System.currentTimeMillis();
System.out.println("结果: " + result);
System.out.println("耗时: " + (end - start) + "ms");
}
}输出:
结果: 102334155
耗时: 573ms性能对比:
| n 值 | 递归调用次数 | Fork/Join 效果 |
|---|---|---|
| 10 | 177 | 一般 |
| 30 | 2,692,537 | 明显 |
| 40 | 331,160,281 | 显著(但有栈溢出风险) |
栈溢出问题
在上面的例子中,由于递归计算 Fibonacci 数列的任务数量呈指数级增长,当 n 较大时,就容易出现 StackOverflowError 错误。这个错误通常发生在递归过程中,由于递归过程中每次调用函数都会在栈中创建一个新的栈帧,当递归深度过大时,栈空间就会被耗尽,导致 StackOverflowError 错误。
可以使用使用迭代的方式计算 Fibonacci 数列,以避免递归过程中占用大量的栈空间。
4.4 处理递归任务的注意事项
对于一些递归深度较大的任务,使用 Fork/Join 框架可能会出现任务调度和内存消耗的问题。
当递归深度较大时,会产生大量的子任务,这些子任务可能被调度到不同的线程中执行,而线程的创建和销毁以及任务调度的开销都会占用大量的资源,从而导致性能下降。
此外,对于递归深度较大的任务,由于每个子任务所占用的栈空间较大,可能会导致内存消耗过大,从而引起内存溢出的问题。
因此,在使用 Fork/Join 框架处理递归任务时,需要根据实际情况来评估递归深度和任务粒度,以避免任务调度和内存消耗的问题。如果递归深度较大,可以尝试采用其他方法来优化算法,如使用迭代方式替代递归,或者限制递归深度来减少任务数量,以避免 Fork/Join 框架的缺点。
5. 并行流实现
Java 8 引入了 Stream API,用于对集合进行函数式编程风格的操作。ForkJoinPool 通常用于执行并行流操作中的并行计算部分,例如对流中的元素进行过滤、映射、聚合等操作。
5.1 基本用法
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
public class ForkJoinPoolDemo4 {
public static void main(String[] args) {
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
// 串行流
list.stream().forEach(System.out::println);
// 并行流(使用 ForkJoinPool.commonPool())
list.parallelStream().forEach(System.out::println);
// 指定池
ForkJoinPool customPool = new ForkJoinPool(4);
customPool.submit(() ->
list.parallelStream().forEach(System.out::println)
);
// 通知线程池不再接受新任务
customPool.shutdown();
try {
// 阻塞主线程,最多等1分钟,直到池中任务执行完毕
customPool.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}输出:
1
2
3
4
5
3
1
2
4
5
3
2
1
4
55.2 性能对比
import java.util.Arrays;
import java.util.stream.IntStream;
public class ForkJoinPoolDemo5 {
public static void main(String[] args) {
int[] array = IntStream.range(0, 10000000).toArray();
// 串行求和
long start = System.currentTimeMillis();
int sum = Arrays.stream(array).sum();
System.out.println("串行求和结果: " + sum + ",耗时: " + (System.currentTimeMillis() - start) + "ms");
// 并行求和
start = System.currentTimeMillis();
int parallelSum = Arrays.stream(array).parallel().sum();
System.out.println("并行求和结果: " + parallelSum + ",耗时: " + (System.currentTimeMillis() - start) + "ms");
}
}输出:
串行求和结果: -2014260032,耗时: 7ms
并行求和结果: -2014260032,耗时: 5ms并行流注意事项
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
// 错误:共享状态
List<Integer> results = new ArrayList<>();
list.parallelStream().forEach(results::add);
// 线程不安全
// 正确:使用收集器
List<Integer> results = list.parallelStream()
.collect(Collectors.toList());
// 错误:阻塞操作
list.parallelStream().forEach(item -> {
Thread.sleep(100); // IO 阻塞,浪费线程
});
// 正确:CPU 密集型操作
list.parallelStream().forEach(item -> {
computeIntensive(item);
});6. 阻塞任务处理
在 ForkJoinPool 中使用阻塞型任务时需要注意以下几点:
防止线程饥饿:当一个线程在执行一个阻塞型任务时,它将会一直等待任务完成,这时如果没有其他线程可以窃取任务,那么该线程将一直被阻塞,直到任务完成为止。为了避免这种情况,应该避免在 ForkJoinPool 中提交大量的阻塞型任务。
使用特定的线程池:为了最大程度地利用 ForkJoinPool 的性能,可以使用专门的线程池来处理阻塞型任务,这些线程不会被 ForkJoinPool 的窃取机制所影响。例如,可以使用 ThreadPoolExecutor 来创建一个线程池,然后将这个线程池作为 ForkJoinPool 的执行器,这样就可以使用 ThreadPoolExecutor 来处理阻塞型任务,而使用 ForkJoinPool 来处理非阻塞型任务。
不要阻塞工作线程:如果在 ForkJoinPool 中使用阻塞型任务,那么需要确保这些任务不会阻塞工作线程,否则会导致整个线程池的性能下降。为了避免这种情况,可以将阻塞型任务提交到一个专门的线程池中,或者使用 CompletableFuture 等异步编程工具来处理阻塞型任务。
下面是一个使用阻塞型任务的例子,这个例子展示了如何使用 CompletableFuture 来处理阻塞型任务:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
public class BlockingTaskDemo {
public static void main(String[] args) {
// 构建一个forkjoin线程池
ForkJoinPool pool = new ForkJoinPool();
// 创建一个异步任务,并将其提交到ForkJoinPool中执行
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
// 模拟一个耗时的任务
TimeUnit.SECONDS.sleep(5);
return "Hello, world!";
} catch (InterruptedException e) {
e.printStackTrace();
return null;
}
}, pool);
try {
// 等待任务完成,并获取结果
String result = future.get();
System.out.println(result);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} finally {
// 关闭ForkJoinPool,释放资源
pool.shutdown();
}
}
}7. 性能调优
线程池大小
// 默认:CPU 核心数
ForkJoinPool.commonPool();
// 自定义
int parallelism = Runtime.getRuntime().availableProcessors();
ForkJoinPool pool = new ForkJoinPool(parallelism);
// CPU 密集型:parallelism = CPU 核心数
// IO 密集型:parallelism = CPU 核心数 * 2阈值选择
阈值太小:任务分解过多,开销大;阈值太大:并行度不足。
// 经验法则
// - 简单操作:1000-10000
// - 复杂操作:100-1000
// - 根据 CPU 核心数调整
int threshold = array.length / (Runtime.getRuntime().availableProcessors() * 4);阈值设置经验法则:
| 操作类型 | 推荐阈值 | 示例 |
|---|---|---|
| 简单计算 | 1000~10000 | 数组求和、计数 |
| 中等复杂度 | 100~1000 | 排序、搜索 |
| 复杂计算 | 10~100 | 图算法、矩阵运算 |
性能监控
// 监控 ForkJoinPool
ForkJoinPool pool = ForkJoinPool.commonPool();
System.out.println("并行度:" + pool.getParallelism());
System.out.println("活跃线程:" + pool.getActiveThreadCount());
System.out.println("运行线程:" + pool.getRunningThreadCount());
System.out.println("队列大小:" + pool.getQueuedSubmissionCount());
System.out.println("窃取次数:" + pool.getStealCount());8. 总结
Fork/Join 是一种基于分治思想的模型,在并发处理计算型任务时有着显著的优势。其效率的提升主要得益于两个方面:
- 任务切分:将大的任务分割成更小粒度的小任务,让更多的线程参与执行;
- 任务窃取:通过任务窃取,充分地利用空闲线程,并减少竞争。
组件总结
| 组件 | 作用 | 特点 |
|---|---|---|
| ForkJoinPool | 线程池 | 工作窃取、自动负载均衡 |
| ForkJoinTask | 任务基类 | 支持 fork/join |
| RecursiveAction | 无返回值任务 | compute() 返回 void |
| RecursiveTask<V> | 有返回值任务 | compute() 返回 V |
| CountedCompleter | 完成回调任务 | 任务完成后触发钩子 |
| WorkQueue | 双端队列 | 本地 LIFO,窃取 FIFO |
和普通线程池之间的区别
工作窃取算法:ForkJoinPool 采用工作窃取算法来提高线程的利用率,而普通线程池则采用任务队列来管理任务。在工作窃取算法中,当一个线程完成自己的任务后,它可以从其它线程的队列中获取一个任务来执行,以此来提高线程的利用率。
任务的分解和合并:ForkJoinPool 可以将一个大任务分解为多个小任务,并行地执行这些小任务,最终将它们的结果合并起来得到最终结果。而普通线程池只能按照提交的任务顺序一个一个地执行任务。
工作线程的数量:ForkJoinPool 会根据当前系统的 CPU 核心数来自动设置工作线程的数量,以最大限度地发挥 CPU 的性能优势。而普通线程池需要手动设置线程池的大小,如果设置不合理,可能会导致线程过多或过少,从而影响程序的性能。
任务类型:ForkJoinPool 适用于执行大规模任务并行化,而普通线程池适用于执行一些短小的任务,如处理请求等。