并发工具类CountDownLatch与CyclicBarrier

并发工具类CountDownLatch与CyclicBarrier

Java 并发包提供了多种同步工具类,用于协调多个线程的执行。本文重点讲解 CountDownLatch、CyclicBarrier 和 Semaphore。

CountDownLatch:倒计时门闩

核心思想

一个或多个线程等待其他线程完成一组操作。

public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
int workerCount = 3;
CountDownLatch latch = new CountDownLatch(workerCount);

for (int i = 0; i < workerCount; i++) {
final int id = i;
new Thread(() -> {
System.out.println("Worker " + id + " 开始工作");
try {
Thread.sleep((id + 1) * 1000);
} catch (InterruptedException e) {}
System.out.println("Worker " + id + " 完成工作");
latch.countDown(); // 计数减1
}).start();
}

System.out.println("等待所有工作线程完成...");
latch.await(); // 阻塞,直到计数为0
System.out.println("所有工作线程已完成!");
}
}

典型应用场景

1. 主线程等待子线程完成

public class DataLoader {
public void load() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);

// 并行加载三类数据
new Thread(() -> { loadUser(); latch.countDown(); }).start();
new Thread(() -> { loadOrder(); latch.countDown(); }).start();
new Thread(() -> { loadProduct(); latch.countDown(); }).start();

latch.await(); // 等待全部加载完成
System.out.println("所有数据加载完成,开始处理");
}
}

2. 多服务并行调用

public class AggregateService {
public AggregateResult fetch() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);
AtomicReference<User> userRef = new AtomicReference<>();
AtomicReference<Order> orderRef = new AtomicReference<>();

executor.execute(() -> {
userRef.set(userService.fetch());
latch.countDown();
});
executor.execute(() -> {
orderRef.set(orderService.fetch());
latch.countDown();
});
executor.execute(() -> {
inventoryRef.set(inventoryService.fetch());
latch.countDown();
});

latch.await(5, TimeUnit.SECONDS); // 最多等5秒
return new AggregateResult(userRef.get(), orderRef.get(), ...);
}
}

实现原理

public class CountDownLatch {
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count); // state = 计数器
}

protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1; // state为0才成功
}

protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0) return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
}

关键:基于 AQS 的共享模式,state 减到 0 时唤醒所有等待线程。

CyclicBarrier:循环栅栏

核心思想

一组线程互相等待,到达屏障后被阻塞,直到最后一个线程到达,然后一起放行。

public class CyclicBarrierExample {
public static void main(String[] args) {
int count = 3;
CyclicBarrier barrier = new CyclicBarrier(count, () -> {
System.out.println("所有线程已到达屏障,放行!");
});

for (int i = 0; i < count; i++) {
final int id = i;
new Thread(() -> {
try {
System.out.println("Thread " + id + " 到达屏障");
barrier.await(); // 等待其他线程
System.out.println("Thread " + id + " 继续执行");
} catch (Exception e) {}
}).start();
}
}
}

与 CountDownLatch 的区别

特性 CountDownLatch CyclicBarrier
计数方向 递减到0 递增到指定值
重用性 一次性 可循环使用
等待方 一个或多个线程等待 所有线程互相等待
动作 无回调 可指定回调
异常 可 broken

循环使用

public class Race {
private final CyclicBarrier barrier = new CyclicBarrier(3);

public void round() {
for (int i = 0; i < 3; i++) { // 3轮比赛
new Thread(() -> {
try {
prepare(); // 准备
barrier.await();
race(); // 起跑
barrier.await();
rest(); // 休息
} catch (Exception e) {}
}).start();
}
}
}

broken 处理

try {
barrier.await();
} catch (BrokenBarrierException e) {
// 其他线程中断或超时,屏障被打破
System.out.println("屏障被打破");
}

Semaphore:信号量

核心思想

控制同时访问某资源的线程数量。

public class SemaphoreExample {
public static void main(String[] args) {
// 同时允许3个线程访问
Semaphore semaphore = new Semaphore(3);

for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
semaphore.acquire(); // 获取许可
System.out.println(Thread.currentThread().getName() + " 获取许可");
Thread.sleep(2000);
} catch (InterruptedException e) {
} finally {
semaphore.release(); // 释放许可
System.out.println(Thread.currentThread().getName() + " 释放许可");
}
}).start();
}
}
}

应用场景

1. 限流

public class RateLimiter {
private final Semaphore semaphore;

public RateLimiter(int qps) {
this.semaphore = new Semaphore(qps);
// 定时释放
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
executor.scheduleAtFixedRate(() -> {
semaphore.release(qps - semaphore.availablePermits());
}, 1, 1, TimeUnit.SECONDS);
}

public void acquire() throws InterruptedException {
semaphore.acquire();
}
}

2. 资源池

public class ConnectionPool {
private final Semaphore semaphore;
private final BlockingQueue<Connection> pool;

public ConnectionPool(int size) {
semaphore = new Semaphore(size);
pool = new ArrayBlockingQueue<>(size);
for (int i = 0; i < size; i++) {
pool.add(createConnection());
}
}

public Connection borrow() throws InterruptedException {
semaphore.acquire();
return pool.take();
}

public void release(Connection conn) {
pool.offer(conn);
semaphore.release();
}
}

三者的选择

场景 工具类
主线程等待多个子线程完成 CountDownLatch
多线程互相等待(分阶段执行) CyclicBarrier
限制并发访问数量 Semaphore

总结

CountDownLatch、CyclicBarrier 和 Semaphore 都是基于 AQS 实现的同步工具:

  • CountDownLatch:倒计时,用于等待事件完成
  • CyclicBarrier:循环栅栏,用于多线程阶段同步
  • Semaphore:信号量,用于控制并发数量

理解它们的实现原理和适用场景,可以优雅地解决多线程协作问题。


   转载规则


《并发工具类CountDownLatch与CyclicBarrier》 小乐 采用 知识共享署名 4.0 国际许可协议 进行许可。
  目录