并发工具类CountDownLatch与CyclicBarrier
Java 并发包提供了多种同步工具类,用于协调多个线程的执行。本文重点讲解 CountDownLatch、CyclicBarrier 和 Semaphore。
CountDownLatch:倒计时门闩
核心思想
一个或多个线程等待其他线程完成一组操作。
public class CountDownLatchExample { public static void main(String[] args) throws InterruptedException { int workerCount = 3; CountDownLatch latch = new CountDownLatch(workerCount); for (int i = 0; i < workerCount; i++) { final int id = i; new Thread(() -> { System.out.println("Worker " + id + " 开始工作"); try { Thread.sleep((id + 1) * 1000); } catch (InterruptedException e) {} System.out.println("Worker " + id + " 完成工作"); latch.countDown(); }).start(); } System.out.println("等待所有工作线程完成..."); latch.await(); System.out.println("所有工作线程已完成!"); } }
|
典型应用场景
1. 主线程等待子线程完成
public class DataLoader { public void load() throws InterruptedException { CountDownLatch latch = new CountDownLatch(3); new Thread(() -> { loadUser(); latch.countDown(); }).start(); new Thread(() -> { loadOrder(); latch.countDown(); }).start(); new Thread(() -> { loadProduct(); latch.countDown(); }).start(); latch.await(); System.out.println("所有数据加载完成,开始处理"); } }
|
2. 多服务并行调用
public class AggregateService { public AggregateResult fetch() throws InterruptedException { CountDownLatch latch = new CountDownLatch(3); AtomicReference<User> userRef = new AtomicReference<>(); AtomicReference<Order> orderRef = new AtomicReference<>(); executor.execute(() -> { userRef.set(userService.fetch()); latch.countDown(); }); executor.execute(() -> { orderRef.set(orderService.fetch()); latch.countDown(); }); executor.execute(() -> { inventoryRef.set(inventoryService.fetch()); latch.countDown(); }); latch.await(5, TimeUnit.SECONDS); return new AggregateResult(userRef.get(), orderRef.get(), ...); } }
|
实现原理
public class CountDownLatch { private static final class Sync extends AbstractQueuedSynchronizer { Sync(int count) { setState(count); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); if (c == 0) return false; int nextc = c - 1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } }
|
关键:基于 AQS 的共享模式,state 减到 0 时唤醒所有等待线程。
CyclicBarrier:循环栅栏
核心思想
一组线程互相等待,到达屏障后被阻塞,直到最后一个线程到达,然后一起放行。
public class CyclicBarrierExample { public static void main(String[] args) { int count = 3; CyclicBarrier barrier = new CyclicBarrier(count, () -> { System.out.println("所有线程已到达屏障,放行!"); }); for (int i = 0; i < count; i++) { final int id = i; new Thread(() -> { try { System.out.println("Thread " + id + " 到达屏障"); barrier.await(); System.out.println("Thread " + id + " 继续执行"); } catch (Exception e) {} }).start(); } } }
|
与 CountDownLatch 的区别
| 特性 |
CountDownLatch |
CyclicBarrier |
| 计数方向 |
递减到0 |
递增到指定值 |
| 重用性 |
一次性 |
可循环使用 |
| 等待方 |
一个或多个线程等待 |
所有线程互相等待 |
| 动作 |
无回调 |
可指定回调 |
| 异常 |
无 |
可 broken |
循环使用
public class Race { private final CyclicBarrier barrier = new CyclicBarrier(3); public void round() { for (int i = 0; i < 3; i++) { new Thread(() -> { try { prepare(); barrier.await(); race(); barrier.await(); rest(); } catch (Exception e) {} }).start(); } } }
|
broken 处理
try { barrier.await(); } catch (BrokenBarrierException e) { System.out.println("屏障被打破"); }
|
Semaphore:信号量
核心思想
控制同时访问某资源的线程数量。
public class SemaphoreExample { public static void main(String[] args) { Semaphore semaphore = new Semaphore(3); for (int i = 0; i < 10; i++) { new Thread(() -> { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + " 获取许可"); Thread.sleep(2000); } catch (InterruptedException e) { } finally { semaphore.release(); System.out.println(Thread.currentThread().getName() + " 释放许可"); } }).start(); } } }
|
应用场景
1. 限流
public class RateLimiter { private final Semaphore semaphore; public RateLimiter(int qps) { this.semaphore = new Semaphore(qps); ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); executor.scheduleAtFixedRate(() -> { semaphore.release(qps - semaphore.availablePermits()); }, 1, 1, TimeUnit.SECONDS); } public void acquire() throws InterruptedException { semaphore.acquire(); } }
|
2. 资源池
public class ConnectionPool { private final Semaphore semaphore; private final BlockingQueue<Connection> pool; public ConnectionPool(int size) { semaphore = new Semaphore(size); pool = new ArrayBlockingQueue<>(size); for (int i = 0; i < size; i++) { pool.add(createConnection()); } } public Connection borrow() throws InterruptedException { semaphore.acquire(); return pool.take(); } public void release(Connection conn) { pool.offer(conn); semaphore.release(); } }
|
三者的选择
| 场景 |
工具类 |
| 主线程等待多个子线程完成 |
CountDownLatch |
| 多线程互相等待(分阶段执行) |
CyclicBarrier |
| 限制并发访问数量 |
Semaphore |
总结
CountDownLatch、CyclicBarrier 和 Semaphore 都是基于 AQS 实现的同步工具:
- CountDownLatch:倒计时,用于等待事件完成
- CyclicBarrier:循环栅栏,用于多线程阶段同步
- Semaphore:信号量,用于控制并发数量
理解它们的实现原理和适用场景,可以优雅地解决多线程协作问题。