JUC 并发同步工具类
ReentrantLock(可重入的独占锁)
ReentrantLock 是一种可重入的独占锁,它允许同一个线程多次获取同一个锁而不会被阻塞。
它的功能类似于 synchronized,是一种互斥锁,可以保证线程安全。相对于 synchronized, ReentrantLock 具备如下特点:
- 可中断 - 可以设置超时时间
- 可以设置为公平锁 - 支持多个条件变量
- 与
synchronized一样,都支持可重入
ReentrantLock 的应用场景主要体现在多线程环境下对共享资源的独占式访问,以保证数据的一致性和安全性。具体应用场景如下:
- 解决多线程竞争资源的问题:例如多个线程同时对同一个数据库进行写操作,可以使用
ReentrantLock保证每次只有一个线程能够写入。 - 实现多线程任务的顺序执行:例如在一个线程执行完某个任务后,再让另一个线程执行任务。
- 实现多线程等待/通知机制:例如在某个线程执行完某个任务后,利用
Condition通知其他线程继续执行任务。
常用 API
构造器
ReentrantLock 支持公平锁和非公平锁两种模式:
| 锁类型 | 描述 |
|---|---|
public ReentrantLock(true) | 公平锁,线程在获取锁时,严格按照线程等待的先后顺序来获取锁(遵循 FIFO 队列)。 |
public ReentrantLock() | 非公平锁,线程在获取锁时,不按照等待的先后顺序获取锁,而是允许插队竞争(具有一定的随机性)。ReentrantLock 默认使用非公平锁。 |
比如买票的时候就有可能出现插队的场景,允许插队就是非公平锁。
常见 API
| 方法 | 描述 |
|---|---|
void lock() | 获取锁。调用该方法时当前线程会尝试获取锁,当成功获得锁后,该方法返回。 |
void lockInterruptibly() throws InterruptedException | 可中断地获取锁。和 lock() 方法的不同之处在于该方法会响应中断,即在尝试获取锁的过程中可以中断当前线程。 |
boolean tryLock() | 尝试非阻塞地获取锁。调用该方法后会立即返回。如果能够成功获取到锁则返回 true,否则返回 false。 |
boolean tryLock(long time, TimeUnit unit) throws InterruptedException | 超时获取锁。当前线程在以下三种情况下会被返回: 1. 当前线程在超时时间内获取了锁; 2. 当前线程在超时时间内被中断; 3. 超时时间结束,返回 false。 |
void unlock() | 释放锁。 |
Condition newCondition() | 获取等待通知组件。该组件和当前的锁绑定,当前线程只有获取了锁,才能调用该组件的 await() 方法,而调用后当前线程将释放锁。 |
基本语法
void lock()
private final Lock lock = new ReentrantLock();
// 获取锁
lock.lock();
try {
// 程序执行逻辑(临界区操作)
} finally {
// finally 语句块可以确保 lock 被正确释放
lock.unlock();
}boolean tryLock()
private final Lock lock = new ReentrantLock();
// 尝试获取锁,最多等待 100 毫秒
if (lock.tryLock(100, TimeUnit.MILLISECONDS)) {
try {
// 成功获取到锁,执行需要同步的代码块
// ... 执行一些操作 ...
} finally {
// 释放锁
lock.unlock();
}
} else {
// 超时后仍未获取到锁,执行备选逻辑
// ... 执行一些不需要同步的操作 ...
}在使用时要注意 4 个关键问题:
- 默认情况下
ReentrantLock为非公平锁; - 加锁次数和释放锁次数一定要保持一致,否则导致死锁或程序异常;
- 加锁操作一定要放在
try代码块之前,这样可以避免未加锁成功又释放锁的异常; - 释放锁操作一定要放在
finally代码块中,否则若业务代码抛出异常会导致锁无法释放,进而引起其他线程阻塞。
工作原理
当线程调用 lock() 方法时,会首先通过 CAS(Compare-And-Swap)机制尝试将 AQS 内部的同步状态 state 从 0 修改为 1(以独占锁为例)。如果修改成功,当前线程便成功获取到锁。
若获取失败,该线程会被封装成一个节点(Node),加入到 AQS 内部的阻塞队列(如图所示的 CLH 变体双向队列)的尾部(tail)。需要注意的是,在 AQS 的结构中,首部节点(head 节点)通常是一个虚拟节点(Dummy Node)或代表当前正持有锁的线程。实际上,真正处于排队等待第一顺位的线程,是 head.next 节点封装的线程。
在锁释放与竞争阶段,公平锁与非公平锁的机制有所不同:
- 公平锁:当
head节点代表的线程释放锁后,会严格按照 FIFO 原则,优先唤醒head.next节点对应的线程来获取锁。 - 非公平锁:当锁被释放的瞬间,允许全新到来的、尚未入队的线程直接通过 CAS 操作去竞争锁。这意味着新线程可能会“插队”,与刚刚被唤醒的
head.next线程共同争夺state资源。如果新线程竞争失败,才会乖乖加入队列尾部。

ReentrantLock 使用
独占锁:模拟抢票场景
思考:8 张票,10 个人抢,如果不加锁,会出现什么问题?
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
/**
* 模拟抢票场景
*/
public class ReentrantLockDemo1 {
private final ReentrantLock lock = new ReentrantLock();//默认非公平
private static int tickets = 8; // 总票数
public void buyTicket() {
lock.lock(); // 获取锁
try {
if (tickets > 0) { // 还有票
try {
TimeUnit.MILLISECONDS.sleep(10); // 休眠10ms,模拟出并发效果
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "购买了第" + tickets-- + "张票");
} else {
System.out.println("票已经卖完了," + Thread.currentThread().getName() + "抢票失败");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock(); // 释放锁
}
}
public static void main(String[] args) {
ReentrantLockDemo1 ticketSystem = new ReentrantLockDemo1();
for (int i = 1; i <= 10; i++) {
Thread thread = new Thread(() -> {
ticketSystem.buyTicket(); // 抢票
}, "线程" + i);
// 启动线程
thread.start();
}
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("剩余票数:" + tickets);
}
}输出结果,如果不加锁会出现超卖。
线程1购买了第8张票
线程2购买了第7张票
线程3购买了第6张票
线程4购买了第5张票
线程5购买了第4张票
线程6购买了第3张票
线程7购买了第2张票
线程8购买了第1张票
票已经卖完了,线程9抢票失败
票已经卖完了,线程10抢票失败
剩余票数:0可重入锁
可重入锁又名递归锁,是指在同一个线程在外层方法获取锁的时候,再进入该线程的内层方法会自动获取该锁(前提锁对象得是同一个对象),而不会因为之前已经获取过还没释放而导致自我阻塞。
Java 中 ReentrantLock 和 synchronized 都是可重入锁,可重入锁的一个显著优点是可以一定程度避免死锁。在实际开发中,可重入锁常常应用于递归操作、调用同一个类中的其他同步方法、锁嵌套等场景中。
import java.util.concurrent.locks.ReentrantLock;
class ReentrantLockDemo2 {
private final ReentrantLock lock = new ReentrantLock(); // 创建 ReentrantLock 对象
public void recursiveCall(int num) {
lock.lock(); // 获取锁
try {
if (num == 0) {
return;
}
System.out.println("执行递归,num = " + num);
recursiveCall(num - 1);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock(); // 释放锁
}
}
public static void main(String[] args) throws InterruptedException {
ReentrantLockDemo2 demo2 = new ReentrantLockDemo2(); // 创建计数器对象
// 测试递归调用
demo2.recursiveCall(10);
}
}输出结果:
执行递归,num = 10
执行递归,num = 9
执行递归,num = 8
执行递归,num = 7
执行递归,num = 6
执行递归,num = 5
执行递归,num = 4
执行递归,num = 3
执行递归,num = 2
执行递归,num = 1Condition 结合使用
在 Java 中,Condition 是一个接口,它提供了线程之间的协调机制。可以将它看作是一个更加灵活、强大的 wait() 和 notify() 机制,通常与 Lock 接口(比如 ReentrantLock)配合使用。它的核心作用体现在两个方面:
- 等待/通知机制:它允许线程等待某个条件成立,或者通知其他线程某个条件已经满足。这与使用
Object的wait()和notify()方法相似,但Condition提供了更高的灵活性和更多的控制。 - 多条件协调:与每个
Object只有一个内置的等待/通知机制不同,一个Lock可以对应多个Condition对象。这意味着可以为不同的等待条件创建不同的Condition,从而实现对多个等待线程集合的精准独立控制。
核心方法:
| 方法 | 描述 |
|---|---|
void await() | 使当前线程等待,直到被其他线程通过 signal() 或 signalAll() 方法唤醒,或者线程被中断。该方法会在等待之前释放当前线程所持有的锁,在被唤醒后会再次尝试获取锁。 |
boolean await(long time, TimeUnit unit) | 使当前线程等待指定的时间,如果超时未被唤醒则返回。 |
void signal() | 唤醒等待在此 Condition 上的一个线程。如果有多个线程正在等待,则随机选择其中一个进行唤醒。 |
void signalAll() | 唤醒等待在此 Condition 上的所有线程。 |
java.util.concurrent 类库中提供 Condition 类来实现线程之间的协调。调用 Condition.await() 方法使线程等待,其他线程调用 Condition.signal() 或 Condition.signalAll() 方法唤醒等待的线程。
注意:调用
Condition的await()和signal()方法,都必须在lock保护之内(即先获取锁)。
案例:基于 ReentrantLock 和 Condition 实现一个简单队列
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockDemo3 {
public static void main(String[] args) {
// 创建队列
Queue queue = new Queue(5);
// 启动生产者线程
new Thread(new Producer(queue)).start();
// 启动消费者线程
new Thread(new Customer(queue)).start();
}
/**
* 队列封装类
*/
static class Queue {
private Object[] items;
int size = 0;
int takeIndex;
int putIndex;
private ReentrantLock lock;
public Condition notEmpty; // 消费者线程阻塞唤醒条件,队列为空阻塞,生产者生产完唤醒
public Condition notFull; // 生产者线程阻塞唤醒条件,队列满了阻塞,消费者消费完唤醒
public Queue(int capacity) {
this.items = new Object[capacity];
lock = new ReentrantLock();
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public void put(Object value) throws Exception {
// 加锁
lock.lock();
try {
while (size == items.length) {
// 队列满了让生产者等待
notFull.await();
}
items[putIndex] = value;
if (++putIndex == items.length) {
putIndex = 0;
}
size++;
notEmpty.signal(); // 生产完唤醒消费者
} finally {
System.out.println("producer生产:" + value);
// 解锁
lock.unlock();
}
}
public Object take() throws Exception {
lock.lock();
try {
// 队列空了就让消费者等待
while (size == 0) {
notEmpty.await();
}
Object value = items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length) {
takeIndex = 0;
}
size--;
notFull.signal(); // 消费完唤醒生产者生产
return value;
} finally {
lock.unlock();
}
}
}
/**
* 生产者
*/
static class Producer implements Runnable {
private Queue queue;
public Producer(Queue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
// 隔1秒轮询生产一次
while (true) {
TimeUnit.SECONDS.sleep(1);
queue.put(new Random().nextInt(1000));
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 消费者
*/
static class Customer implements Runnable {
private Queue queue;
public Customer(Queue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
// 隔2秒轮询消费一次
while (true) {
TimeUnit.SECONDS.sleep(5);
System.out.println("consumer消费:" + queue.take());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}Semaphore(信号量)
Semaphore(信号量)是一种用于多线程编程的同步工具,主要用于控制同一时刻访问特定共享资源的线程数量,常用于并发限流场景。
Semaphore 的应用场景主要涉及到需要限制资源访问数量或控制并发访问的场景,例如数据库连接池、文件访问并发控制、网络请求限流等。在这些场景中,Semaphore 能够有效地协调线程对资源的访问,保证系统的稳定性和性能。
通常情况下,使用 Semaphore 的过程实际上是多个线程获取访问共享资源许可证(permit)的过程。Semaphore 维护了一个计数器,线程可以通过调用 acquire() 方法来获取许可证。当计数器为 0 时,调用 acquire() 的线程将被阻塞,直到有其他线程释放许可证;线程完成操作后通过调用 release() 方法来释放许可证,这会使 Semaphore 中的计数器增加,从而允许更多等待中的线程访问共享资源。

常用 API
构造器
| 构造方法 | 描述 |
|---|---|
public Semaphore(int permits) | 创建一个指定许可证数量(permits)的 Semaphore 实例,默认使用非公平的同步器。因此 new Semaphore(n) 等价于 new Semaphore(n, false)。 |
public Semaphore(int permits, boolean fair) | 创建一个指定许可证数量(permits)的 Semaphore 实例,并可通过 fair 参数指定使用公平同步器(true)或非公平同步器(false)。 |
acquire() 方法
该方法用于向 Semaphore 获取许可证,属于阻塞方法,获取不到就会一直等:
| 方法 | 描述 |
|---|---|
void acquire() throws InterruptedException | 向 Semaphore 获取一个许可证。如果没有可用的许可证,则当前线程会阻塞,直到有许可证可用或被其他线程中断。 |
void acquire(int permits) throws InterruptedException | 向 Semaphore 获取指定数量的许可证。如果没有足够的可用许可证,则当前线程会阻塞,直到有足够的许可证可用或被其他线程中断。 |
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreDemo1 {
public static void main(String[] args) throws InterruptedException {
// 定义permit=1的Semaphore
final Semaphore semaphore = new Semaphore(1, true);
// 主线程直接抢先申请成功
semaphore.acquire();
Thread t = new Thread(() -> {
try {
// 线程t会进入阻塞,等待当前有可用的permit
System.out.println("子线程等待获取permit");
semaphore.acquire();
System.out.println("子线程获取到permit");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放permit
semaphore.release();
}
});
t.start();
TimeUnit.SECONDS.sleep(5);
System.out.println("主线程释放permit");
// 主线程休眠5秒后释放permit,线程t才能获取到permit
semaphore.release();
}
}输出结果:
子线程等待获取permit
主线程释放permit
子线程获取到permittryAcquire() 方法
尝试向 Semaphore 获取许可证。如果此时许可证的数量少于申请的数量,线程会立即返回,返回 false 表示申请失败。
| 方法 | 描述 |
|---|---|
boolean tryAcquire() | 尝试获取 1 个许可证。成功返回 true,失败立即返回 false,不会阻塞。 |
boolean tryAcquire(int permits) | 尝试获取指定数量的许可证。成功返回 true,失败立即返回 false,不会阻塞。 |
boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException | 在指定超时时间内尝试获取 1 个许可证。若超时时间内获取到则返回 true,若超时后仍未获取到则返回 false。线程在等待过程中会被中断。 |
boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException | 在指定超时时间内尝试获取指定数量的许可证。若超时时间内获取到则返回 true,若超时后仍未获取到则返回 false。线程在等待过程中会被中断。 |
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreDemo2 {
public static void main(String[] args) throws InterruptedException {
final Semaphore semaphore = new Semaphore(1, true);
// 定义一个线程
new Thread(() -> {
// 获取许可证
boolean gotPermit = semaphore.tryAcquire();
// 如果获取成功就休眠5秒的时间
if (gotPermit) {
try {
System.out.println(Thread.currentThread() + " 获得 1 个令牌");
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放Semaphore的许可证
semaphore.release();
}
}
}).start();
// 短暂休眠1秒的时间,确保上面的线程能够启动,并且顺利获取许可证
TimeUnit.SECONDS.sleep(1);
// 主线程在3秒之内肯定是无法获取许可证的,那么主线程将在阻塞3秒之后返回获取许可证失败
if (semaphore.tryAcquire(3, TimeUnit.SECONDS)) {
System.out.println("获得 1 个令牌");
} else {
System.out.println("获得令牌失败");
}
}
}输出结果:
Thread[#21,Thread-0,5,main] 获得 1 个令牌
获得令牌失败正确使用 release()
当线程结束对资源的使用后,应该立即释放许可证,让其他线程有机会抢占。
| 方法 | 描述 |
|---|---|
void release() | 释放 1 个许可证,将可用许可证计数器加 1。 |
void release(int permits) | 释放指定数量的许可证,将可用许可证计数器增加相应数量。 |
陷阱注意:为了确保释放许可证,我们的第一反应是将其放到 try...finally... 语句块中。但如果不加以约束,由于异常或中断导致未成功 acquire 的线程也执行了 release,会导致系统的凭证数量多于初始值。
看下面的错误示例:
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreDemo3 {
public static void main(String[] args) throws InterruptedException {
// 定义只有一个许可证的 Semaphore,并开启公平锁模式
final Semaphore semaphore = new Semaphore(1, true);
// 创建线程 t1
Thread t1 = new Thread(() -> {
try {
// 获取 Semaphore 的许可证
semaphore.acquire();
System.out.println("线程 t1 已成功获取许可证。");
// 模拟业务处理,霸占许可证一个小时
TimeUnit.HOURS.sleep(1);
} catch (InterruptedException e) {
System.out.println("线程 t1 在休眠期间被中断。");
} finally {
// 在 finally 语句块中释放许可证,确保资源最终被归还
semaphore.release();
System.out.println("线程 t1 已释放许可证。");
}
});
// 启动线程 t1
t1.start();
// 为确保线程 t1 已经先行启动并占用许可证,在主线程中休眠 1 秒
TimeUnit.SECONDS.sleep(1);
// 创建线程 t2
Thread t2 = new Thread(() -> {
try {
System.out.println("线程 t2 尝试获取许可证(将被阻塞)...");
// 阻塞式地获取一个许可证
semaphore.acquire();
System.out.println("线程 t2 已成功获取许可证。");
} catch (InterruptedException e) {
// 当 t2 在 acquire() 阻塞期间被调用 interrupt() 时,会抛出此异常
System.out.println("线程 t2 在阻塞等待许可证时被中断。");
} finally {
// 同样在 finally 语句块中尝试释放许可证
// 注意:如果 acquire() 抛出异常,通常不需要 release(),
// 但为了严谨性,Semaphore 的 release() 可以在任何时候调用(增加许可证计数)
semaphore.release();
}
});
// 启动线程 t2
t2.start();
// 主线程休眠 2 秒,观察 t2 的阻塞状态
TimeUnit.SECONDS.sleep(2);
// 对处于阻塞状态的线程 t2 执行中断操作
System.out.println("主线程对线程 t2 执行中断操作...");
t2.interrupt();
// 提示:由于 t1 仍在占用许可证且未释放,下面的代码在运行时会一直阻塞
System.out.println("主线程尝试获取许可证(将持续阻塞,因为 t1 正在睡眠)...");
semaphore.acquire();
System.out.println("主线程已获取许可证。");
}
}输出结果:
线程 t1 已成功获取许可证。
线程 t2 尝试获取许可证(将被阻塞)...
主线程对线程 t2 执行中断操作...
主线程尝试获取许可证(将持续阻塞,因为 t1 正在睡眠)...
线程 t2 在阻塞等待许可证时被中断。
主线程已获取许可证。为什么会这样?如果线程 t2 被其他线程中断,它释放了原本不属于自己的许可证,导致在 Semaphore 内部的可用许可证计数器增多。
这是设计缺陷吗?并不是。官方文档说明:“不要求释放许可的线程必须是获取许可的线程”。正确用法应该是由开发人员自身的编程约束来保证。改进思路如下:
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreDemo4 {
public static void main(String[] args) throws InterruptedException {
// 定义只有一个许可证的 Semaphore,并开启公平锁模式
final Semaphore semaphore = new Semaphore(1, true);
// 创建线程 t1
Thread t1 = new Thread(() -> {
try {
semaphore.acquire();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
try {
// 获取 Semaphore 的许可证
System.out.println("线程 t1 已成功获取许可证。");
// 模拟业务处理,霸占许可证一个小时
TimeUnit.HOURS.sleep(1);
} catch (InterruptedException e) {
System.out.println("线程 t1 在休眠期间被中断。");
} finally {
// 在 finally 语句块中释放许可证,确保资源最终被归还
semaphore.release();
System.out.println("线程 t1 已释放许可证。");
}
});
// 启动线程 t1
t1.start();
// 为确保线程 t1 已经先行启动并占用许可证,在主线程中休眠 1 秒
TimeUnit.SECONDS.sleep(1);
// 创建线程 t2
Thread t2 = new Thread(() -> {
try {
System.out.println("线程 t2 尝试获取许可证(将被阻塞)...");
// 阻塞式地获取一个许可证
semaphore.acquire();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
try {
System.out.println("线程 t2 已成功获取许可证。");
} finally {
// 同样在 finally 语句块中尝试释放许可证
// 注意:如果 acquire() 抛出异常,通常不需要 release(),
// 但为了严谨性,Semaphore 的 release() 可以在任何时候调用(增加许可证计数)
semaphore.release();
}
});
// 启动线程 t2
t2.start();
// 主线程休眠 2 秒,观察 t2 的阻塞状态
TimeUnit.SECONDS.sleep(2);
// 对处于阻塞状态的线程 t2 执行中断操作
System.out.println("主线程对线程 t2 执行中断操作...");
t2.interrupt();
// 提示:由于 t1 仍在占用许可证且未释放,下面的代码在运行时会一直阻塞
System.out.println("主线程尝试获取许可证(将持续阻塞,因为 t1 正在睡眠)...");
semaphore.acquire();
System.out.println("主线程已获取许可证。");
}
}输出结果:
线程 t1 已成功获取许可证。
线程 t2 尝试获取许可证(将被阻塞)...
主线程对线程 t2 执行中断操作...
主线程尝试获取许可证(将持续阻塞,因为 t1 正在睡眠)...
Exception in thread "Thread-1" java.lang.RuntimeException: java.lang.InterruptedException
at com.juzicoding.juc.cls.semaphore.SemaphoreDemo4.lambda$main$1(SemaphoreDemo4.java:46)
at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.lang.InterruptedException
at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1134)
at java.base/java.util.concurrent.Semaphore.acquire(Semaphore.java:318)
at com.juzicoding.juc.cls.semaphore.SemaphoreDemo4.lambda$main$1(SemaphoreDemo4.java:44)
... 1 moreCountDownLatch(闭锁)
CountDownLatch 是一个同步协助类,可以用于控制一个或多个线程等待多个任务完成后再执行。当某项工作需要由若干项子任务并行地完成,并且只有在所有的子任务都结束之后(正常结束或异常结束),当前主任务才能进入下一阶段时,CountDownLatch 将是非常好用的工具。
它内部维护了一个计数器,初始值为 N,代表需要等待的线程数目。当一个线程完成了需要等待的任务后,就会调用 countDown() 方法将计数器减 1;当计数器的值到达 0 时,因调用 await() 方法而在等待的线程就会被唤醒并继续执行。
应用场景主要有以下几种:
- 并行任务同步 | 协调多个并行任务的完成情况,确保所有任务都完成后再执行后续合并操作。
- 多任务汇总 | 统计多个线程的完成情况。
- 资源初始化 | 主线程等待所有依赖组件/资源的初始化子线程完成后,再对外提供服务。
CountDownLatch 的不足:CountDownLatch 是一次性的,计数器的值只能在构造方法中初始化一次,当递减为 0 后,无法再次重置和重复使用。
常用 API
构造器
public CountDownLatch(int count) :构造非常简单,需要给定一个不能小于 0 的 int 初始计数值。
常用方法
| 方法 | 描述 |
|---|---|
void await() throws InterruptedException | 调用该方法的线程会被挂起(阻塞),一直等待直到 count 值变为 0 才继续执行。 |
boolean await(long timeout, TimeUnit unit) throws InterruptedException | 调用该方法的线程会等待指定的超时时间。若在超时时间内 count 值变为 0,则返回 true;若超时后 count 仍未变为 0,则线程不再等待,直接继续执行并返回 false。 |
void countDown() | 将计数器 count 的值减 1,直至减到 0 为止。 |
long getCount() | 返回当前的计数器数值。 |
CountDownLatch 使用
并发任务经常存在前后依赖关系。比如数据详情页需要同时调用多个接口获取数据,并发请求获取到数据后,主线程需要进行结果合并。
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
public class CountDownLatchDemo1 {
public static void main(String[] args) throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(5);
for (int i = 0; i < 5; i++) {
final int index = i;
new Thread(() -> {
try {
Thread.sleep(1000 + ThreadLocalRandom.current().nextInt(2000));
System.out.println("任务" + index +"执行完成");
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
// 主线程在阻塞,当计数器为0,就唤醒主线程往下执行
countDownLatch.await();
System.out.println("主线程:在所有任务运行完成后,进行结果汇总");
}
}输出结果:
任务3执行完成
任务4执行完成
任务2执行完成
任务1执行完成
任务0执行完成
主线程:在所有任务运行完成后,进行结果汇总CyclicBarrier(回环栅栏/循环屏障)
CyclicBarrier 是一个同步工具,它可以实现让一组线程等待至某个状态(屏障点)之后,再全部同时执行。叫做“回环(Cyclic)”是因为当所有等待线程都被释放以后,这个屏障可以被重用。
它非常适合用于某个串行化任务被分拆成若干个并行执行的子任务,当所有的子任务都执行结束(都到达屏障点)之后,再继续进行下一阶段聚合的工作。

应用场景:
- 多线程任务切分与合并 | 将复杂的任务分配给多个线程执行,并在所有线程完成阶段工作后触发后续合并操作。
- 数据分段处理 | 协调多个线程间的数据处理,在一批数据全部处理完后,再统一处理下一批。
常用 API
构造器
| 构造方法 | 描述 |
|---|---|
public CyclicBarrier(int parties) | 创建一个 CyclicBarrier 实例,指定屏障拦截的线程数量(parties)。每个线程调用 await() 方法时,都会告诉 CyclicBarrier 自己已到达屏障,随后当前线程被阻塞。 |
public CyclicBarrier(int parties, Runnable barrierAction) | 创建一个 CyclicBarrier 实例,指定屏障拦截的线程数量(parties)。当所有线程都到达屏障时,优先执行 barrierAction 任务,然后才释放所有等待的线程。该构造方法适用于需要在聚合数据后统一处理的业务场景。 |
常用方法
| 方法 | 描述 |
|---|---|
public int await() throws InterruptedException, BrokenBarrierException | 线程调用该方法表示自己已经到达栅栏。如果尚未集齐 parties 数量的线程,则当前线程被阻塞。BrokenBarrierException 表示栅栏已经被破坏(例如某个等待的线程被中断或超时,引起所有等待线程集体抛出该异常)。 |
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException | 线程调用该方法表示自己已经到达栅栏,并等待指定的超时时间。如果在超时时间内集齐 parties 数量的线程,则正常继续;如果超时后仍未集齐,则抛出 TimeoutException,同时栅栏被破坏。 |
public void reset() | 重置栅栏,使其回归初始状态,以便循环利用。如果当前有线程正在等待,这些线程会收到 BrokenBarrierException 异常。 |
CyclicBarrier 使用
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
/**
* 演示 CyclicBarrier(循环屏障)的基本用法。
* 当指定数量的线程都到达屏障点时,屏障才会打开,并可选地执行一个汇总任务。
*/
public class CyclicBarrierDemo1 {
// 定义屏障,拦截 4 个线程,并在所有线程到达后执行 CollectThread 汇总任务
private static CyclicBarrier barrier = new CyclicBarrier(4, new CollectThread());
/**
* 存放子线程工作结果的容器,使用线程安全的 ConcurrentHashMap
*/
private static ConcurrentHashMap<String, String> resultMap = new ConcurrentHashMap<>();
public static void main(String[] args) {
// 启动 4 个子线程
for (int i = 0; i < 4; i++) {
Thread thread = new Thread(new SubThread(), "Thread_" + i);
thread.start();
}
}
/**
* 汇总任务:当所有子线程都到达屏障点时,由最后一个到达的线程执行此任务
*/
private static class CollectThread implements Runnable {
@Override
public void run() {
StringBuilder result = new StringBuilder();
for (Map.Entry<String, String> workResult : resultMap.entrySet()) {
result.append("[").append(workResult.getValue()).append("]");
}
System.out.println("--- 汇总任务执行结果 = " + result + " ---");
System.out.println("汇总线程(CollectThread)执行结束。");
}
}
/**
* 相互等待的子线程:模拟多阶段并行任务
*/
private static class SubThread implements Runnable {
@Override
public void run() {
String name = Thread.currentThread().getName();
// 将当前线程名存入结果 Map
resultMap.put(name, name);
try {
// 模拟第一阶段工作
TimeUnit.SECONDS.sleep(1);
System.out.println(name + " 已完成第一阶段工作,正在屏障点 1 等待...");
// 到达第一个屏障点,等待其他线程
barrier.await();
// 模拟第二阶段工作
TimeUnit.SECONDS.sleep(1);
System.out.println(name + " 已完成第二阶段工作,正在屏障点 2 等待...");
// 到达第二个屏障点(体现了 CyclicBarrier 的可重用性/循环特性)
barrier.await();
System.out.println(name + " 已通过所有屏障,任务结束。");
} catch (Exception e) {
e.printStackTrace();
}
}
}
}输出结果:
Thread_1 已完成第一阶段工作,正在屏障点 1 等待...
Thread_2 已完成第一阶段工作,正在屏障点 1 等待...
Thread_0 已完成第一阶段工作,正在屏障点 1 等待...
Thread_3 已完成第一阶段工作,正在屏障点 1 等待...
--- 汇总任务执行结果 = [Thread_0][Thread_1][Thread_2][Thread_3] ---
汇总线程(CollectThread)执行结束。
Thread_3 已完成第二阶段工作,正在屏障点 2 等待...
Thread_0 已完成第二阶段工作,正在屏障点 2 等待...
Thread_1 已完成第二阶段工作,正在屏障点 2 等待...
Thread_2 已完成第二阶段工作,正在屏障点 2 等待...
--- 汇总任务执行结果 = [Thread_0][Thread_1][Thread_2][Thread_3] ---
汇总线程(CollectThread)执行结束。
Thread_2 已通过所有屏障,任务结束。
Thread_3 已通过所有屏障,任务结束。
Thread_0 已通过所有屏障,任务结束。
Thread_1 已通过所有屏障,任务结束。CyclicBarrier 与 CountDownLatch 的区别
- 可重用性:
CountDownLatch是一次性的;而CyclicBarrier是可循环利用的(内部计数器可重置)。 - 等待对象不同:
CountDownLatch的await()通常是主线程在等待计数器归零;而CyclicBarrier的await()是各个子线程互相等待,大家都在等待其他线程到达 Barrier Point,集齐后大家一起往下走。
Exchanger(数据交换器)
Exchanger 是一个用于两个工作线程之间协作的工具类,专门用于两个线程间精确交换数据,例如在遗传算法中,两个独立演化的群落交换优秀基因。
当一个线程先执行了 exchange() 方法后,它会同步阻塞等待另一个线程也执行 exchange() 方法。当两个线程都到达了这个同步点时,两个线程就可以安全地交换它们的数据对象。

常用 API
| 方法 | 描述 |
|---|---|
public V exchange(V x) throws InterruptedException | 等待另一个线程到达此交换点,然后将给定的对象 x 传送给该线程,并接收该线程传递回来的对象作为返回值。如果没有其他线程等待交换,则当前线程会阻塞,直到有另一个线程到达交换点。 |
public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException | 在指定的超时时间内等待另一个线程到达此交换点,进行数据交换。如果在超时时间内完成交换,则返回对方线程传递的对象;如果超时后仍未完成交换,则抛出 TimeoutException 异常。 |
Exchanger 使用
模拟交易场景 (一手交钱一手交货),如果一方先到,要等另一方到了才能交易。
import java.util.concurrent.Exchanger;
public class ExchangerDemo {
public static void main(String[] args) {
// 创建一个 Exchanger,指定交换的数据类型为 String
Exchanger<String> exchanger = new Exchanger<>();
// 线程 1:买家
new Thread(() -> {
try {
String money = "【100万现金】";
System.out.println("买家:我带着 " + money + " 来到了交易地点,等待卖家...");
// 核心:调用 exchange 进行交换,会阻塞直到卖家也调用 exchange
String goods = exchanger.exchange(money);
System.out.println("买家:交易成功!我用 100万现金 换回了 " + goods);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Buyer-Thread").start();
// 线程 2:卖家
new Thread(() -> {
try {
// 模拟卖家晚到 2 秒钟
Thread.sleep(2000);
String goods = "【传世古董】";
System.out.println("卖家:不好意思我来晚了,我带着 " + goods + " 来了!开始交易...");
// 核心:调用 exchange 进行交换
String money = exchanger.exchange(goods);
System.out.println("卖家:交易成功!我用 传世古董 换回了 " + money);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Seller-Thread").start();
}
}Phaser(阶段协同器)
Phaser 是一个更加灵活强大的并发协同工具类,可以视为 CyclicBarrier 和 CountDownLatch 的动态进化版。
不仅能够管理多阶段的任务执行,最核心的区别在于:它可以自适应地调整并发线程数,动态地增加或减少参与线程的数量。所以 Phaser 特别适合在参与者数量不固定、或者任务分为极其多级的重复阶段时使用。

应用场景:
- 多线程任务动态分配 | 任务数不可控或随时有新任务加入的并发控制场景.
- 多级任务流程 | 实现复杂的多阶段任务流,在每一级任务完成后自动触发下一级任务。
- 阶段性任务/并行计算 | 模拟迭代的并行计算算法,协调多个处理线程,在每个阶段的数据迭代完成后共同推进到下一次迭代计算。
常用 API
构造方法
| 方法 | 描述 |
|---|---|
Phaser() | 创建一个 Phaser 实例,初始参与任务数为 0。 |
Phaser(int parties) | 创建一个 Phaser 实例,指定初始参与任务数。 |
Phaser(Phaser parent) | 创建一个 Phaser 实例,构建树形结构。当子节点没有参与者时,会自动解除父节点的注册。 |
增减参与任务数方法
| 方法 | 描述 |
|---|---|
int register() | 动态增加 1 个参与任务,返回当前阶段号。 |
int bulkRegister(int parties) | 动态增加多个参与任务,返回当前阶段号。 |
int arriveAndDeregister() | 到达屏障并注销退出(不再参与后面的阶段),使参与任务数减 1,返回当前阶段号。 |
到达与等待方法
| 方法 | 描述 |
|---|---|
int arrive() | 仅到达屏障(表示任务完成),不会阻塞,返回当前阶段号。 |
int arriveAndAwaitAdvance() | 到达屏障后阻塞,等待本阶段所有其他任务全部到达后再一起进入下一阶段,返回下一阶段号。 |
int awaitAdvance(int phase) | 在指定的阶段号等待。如果传入的阶段号与当前阶段号一致,则当前线程阻塞,直到所有参与者完成该阶段;否则立即返回。 |
protected boolean onAdvance(int phase, int registeredParties) | 触发回调方法。可以通过重写该方法在每个阶段到达时执行特定逻辑。返回 true 表示终止整个 Phaser,返回 false 则继续下一阶段。 |
Phaser 使用
阶段性任务:模拟公司团建,团队活动分为几步:在公司集合 -> 出发去公园 -> 餐厅聚餐。中间可能有员工半路回家,也可能到了餐厅有新员工加入。这体现了参与者数量是动态变化的。
package com.juzicoding.juc.cls.phaser;
import java.util.Random;
import java.util.concurrent.Phaser;
public class PhaserDemo1 {
public static void main(String[] args) {
final Phaser phaser = new Phaser() {
//重写该方法来增加阶段到达动作
@Override
protected boolean onAdvance(int phase, int registeredParties) {
// 参与者数量,去除主线程
int staffs = registeredParties - 1;
switch (phase) {
case 0:
System.out.println("大家都到公司了,出发去公园,人数:" + staffs);
break;
case 1:
System.out.println("大家都到公园门口了,出发去餐厅,人数:" + staffs);
break;
case 2:
System.out.println("大家都到餐厅了,开始用餐,人数:" + staffs);
break;
}
// 判断是否只剩下主线程(一个参与者),如果是,则返回true,代表终止
return registeredParties == 1;
}
};
// 注册主线程 ———— 让主线程全程参与
phaser.register();
final StaffTask staffTask = new StaffTask();
// 3个全程参与团建的员工
for (int i = 0; i < 3; i++) {
// 添加任务数
phaser.register();
new Thread(() -> {
try {
staffTask.step1Task();
//到达后等待其他任务到达
phaser.arriveAndAwaitAdvance();
staffTask.step2Task();
phaser.arriveAndAwaitAdvance();
staffTask.step3Task();
phaser.arriveAndAwaitAdvance();
staffTask.step4Task();
// 完成了,注销离开
phaser.arriveAndDeregister();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
// 两个不聚餐的员工加入
for (int i = 0; i < 2; i++) {
phaser.register();
new Thread(() -> {
try {
staffTask.step1Task();
phaser.arriveAndAwaitAdvance();
staffTask.step2Task();
System.out.println("员工【" + Thread.currentThread().getName() + "】回家了");
// 完成了,注销离开
phaser.arriveAndDeregister();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
while (!phaser.isTerminated()) {
int phase = phaser.arriveAndAwaitAdvance();
if (phase == 2) {
// 到了去餐厅的阶段,又新增4人,参加晚上的聚餐
for (int i = 0; i < 4; i++) {
phaser.register();
new Thread(() -> {
try {
staffTask.step3Task();
phaser.arriveAndAwaitAdvance();
staffTask.step4Task();
// 完成了,注销离开
phaser.arriveAndDeregister();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
}
}
static final Random random = new Random();
static class StaffTask {
public void step1Task() throws InterruptedException {
// 第一阶段:来公司集合
String staff = "员工【" + Thread.currentThread().getName() + "】";
System.out.println(staff + "从家出发了……");
Thread.sleep(random.nextInt(5000));
System.out.println(staff + "到达公司");
}
public void step2Task() throws InterruptedException {
// 第二阶段:出发去公园
String staff = "员工【" + Thread.currentThread().getName() + "】";
System.out.println(staff + "出发去公园玩");
Thread.sleep(random.nextInt(5000));
System.out.println(staff + "到达公园门口集合");
}
public void step3Task() throws InterruptedException {
// 第三阶段:去餐厅
String staff = "员工【" + Thread.currentThread().getName() + "】";
System.out.println(staff + "出发去餐厅");
Thread.sleep(random.nextInt(5000));
System.out.println(staff + "到达餐厅");
}
public void step4Task() throws InterruptedException {
// 第四阶段:就餐
String staff = "员工【" + Thread.currentThread().getName() + "】";
System.out.println(staff + "开始用餐");
Thread.sleep(random.nextInt(5000));
System.out.println(staff + "用餐结束,回家");
}
}
}输出结果:
员工【Thread-2】从家出发了……
员工【Thread-4】从家出发了……
员工【Thread-3】从家出发了……
员工【Thread-0】从家出发了……
员工【Thread-1】从家出发了……
员工【Thread-4】到达公司
员工【Thread-1】到达公司
员工【Thread-2】到达公司
员工【Thread-3】到达公司
员工【Thread-0】到达公司
大家都到公司了,出发去公园,人数:5
员工【Thread-0】出发去公园玩
员工【Thread-2】出发去公园玩
员工【Thread-4】出发去公园玩
员工【Thread-3】出发去公园玩
员工【Thread-1】出发去公园玩
员工【Thread-4】到达公园门口集合
员工【Thread-4】回家了
员工【Thread-0】到达公园门口集合
员工【Thread-1】到达公园门口集合
员工【Thread-3】到达公园门口集合
员工【Thread-3】回家了
员工【Thread-2】到达公园门口集合
大家都到公园门口了,出发去餐厅,人数:3
员工【Thread-1】出发去餐厅
员工【Thread-2】出发去餐厅
员工【Thread-0】出发去餐厅
员工【Thread-5】出发去餐厅
员工【Thread-8】出发去餐厅
员工【Thread-7】出发去餐厅
员工【Thread-6】出发去餐厅
员工【Thread-1】到达餐厅
员工【Thread-6】到达餐厅
员工【Thread-2】到达餐厅
员工【Thread-5】到达餐厅
员工【Thread-0】到达餐厅
员工【Thread-8】到达餐厅
员工【Thread-7】到达餐厅
大家都到餐厅了,开始用餐,人数:7
员工【Thread-5】开始用餐
员工【Thread-8】开始用餐
员工【Thread-7】开始用餐
员工【Thread-0】开始用餐
员工【Thread-1】开始用餐
员工【Thread-2】开始用餐
员工【Thread-6】开始用餐
员工【Thread-7】用餐结束,回家
员工【Thread-5】用餐结束,回家
员工【Thread-2】用餐结束,回家
员工【Thread-8】用餐结束,回家
员工【Thread-1】用餐结束,回家
员工【Thread-0】用餐结束,回家
员工【Thread-6】用餐结束,回家