阻塞队列BlockingQueue详解

阻塞队列BlockingQueue详解

BlockingQueue 是 Java 并发包中用于线程间数据交换的阻塞队列接口。本文系统讲解其 API 和七种实现类。

核心 API

BlockingQueue 提供四组插入/移除方法:

操作 抛出异常 返回特殊值 阻塞等待 超时退出
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
检查 element() peek() 不可用 不可用
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

// 阻塞插入
queue.put("item");

// 阻塞取出
String item = queue.take();

七种实现类

1. ArrayBlockingQueue

BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);
BlockingQueue<String> queue = new ArrayBlockingQueue<>(100, true); // 公平锁

特点

  • 有界数组队列
  • 一把 ReentrantLock + 两个 Condition(notEmpty、notFull)
  • 插入和移除共用同一把锁
  • 必须指定容量

适用:生产者-消费者模式,需要限制内存的场景。

2. LinkedBlockingQueue

BlockingQueue<String> queue = new LinkedBlockingQueue<>();     // 无界(Integer.MAX_VALUE)
BlockingQueue<String> queue = new LinkedBlockingQueue<>(100); // 有界

特点

  • 链表实现
  • 两把锁:putLock 和 takeLock,读写分离
  • 无界版本有 OOM 风险

适用:吞吐量要求高的生产者-消费者场景。

3. SynchronousQueue

BlockingQueue<String> queue = new SynchronousQueue<>();        // 非公平
BlockingQueue<String> queue = new SynchronousQueue<>(true); // 公平

特点

  • 不存储元素
  • 每个插入操作必须等待一个移除操作
  • 吞吐量最高

适用:直接传递、线程池(Executors.newCachedThreadPool)。

// 线程间直接交换
new Thread(() -> {
try {
queue.put("data"); // 阻塞,直到有消费者
} catch (InterruptedException e) {}
}).start();

new Thread(() -> {
try {
String data = queue.take(); // 阻塞,直到有生产者
} catch (InterruptedException e) {}
}).start();

4. PriorityBlockingQueue

BlockingQueue<Task> queue = new PriorityBlockingQueue<>(100, 
Comparator.comparingInt(Task::getPriority));

特点

  • 支持优先级排序
  • 无界(自动扩容)
  • 基于最小堆实现

适用:任务调度、按优先级处理的场景。

5. DelayQueue

BlockingQueue<DelayedTask> queue = new DelayQueue<>();

public class DelayedTask implements Delayed {
private final long executeTime;

@Override
public long getDelay(TimeUnit unit) {
return unit.convert(executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}

@Override
public int compareTo(Delayed other) {
return Long.compare(this.executeTime, ((DelayedTask) other).executeTime);
}
}

特点

  • 元素只有到期后才能取出
  • 基于 PriorityQueue

适用:定时任务调度、缓存过期。

6. LinkedTransferQueue

TransferQueue<String> queue = new LinkedTransferQueue<>();

// 传输:阻塞直到被消费
queue.transfer("data");

// 尝试传输:不阻塞
queue.tryTransfer("data");

// 超时传输
queue.tryTransfer("data", 1, TimeUnit.SECONDS);

特点

  • 结合了 SynchronousQueue 和 LinkedBlockingQueue
  • transfer 方法:有等待消费者则直接传递,否则入队
  • 无锁实现(CAS),性能极高

7. LinkedBlockingDeque

BlockingDeque<String> deque = new LinkedBlockingDeque<>();

deque.addFirst("first");
deque.addLast("last");

String first = deque.takeFirst();
String last = deque.takeLast();

特点

  • 双端阻塞队列
  • 支持 FIFO 和 LIFO

适用:工作窃取算法。

实现对比

队列 有界 锁机制 吞吐量 适用场景
ArrayBlockingQueue 单锁 有界缓冲
LinkedBlockingQueue 可选 双锁 生产者-消费者
SynchronousQueue 无存储 最高 直接传递
PriorityBlockingQueue 单锁 优先级调度
DelayQueue 单锁 延迟任务
LinkedTransferQueue 无锁 极高 混合场景
LinkedBlockingDeque 可选 双锁 双端操作

生产者-消费者实现

public class ProducerConsumer {
private final BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);

class Producer implements Runnable {
@Override
public void run() {
try {
while (!Thread.interrupted()) {
String item = produce();
queue.put(item); // 队列满时阻塞
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

class Consumer implements Runnable {
@Override
public void run() {
try {
while (!Thread.interrupted()) {
String item = queue.take(); // 队列空时阻塞
consume(item);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}

线程池中的应用

// CachedThreadPool:SynchronousQueue
new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());

// FixedThreadPool:LinkedBlockingQueue(无界!危险)
new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());

// 自定义:有界队列 + 拒绝策略
new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1000),
new ThreadPoolExecutor.CallerRunsPolicy());

最佳实践

  1. 优先使用有界队列:防止 OOM
  2. 选择合适的实现
    • 高并发直接传递:SynchronousQueue
    • 通用生产者-消费者:LinkedBlockingQueue(指定容量)
    • 优先级处理:PriorityBlockingQueue
    • 延迟任务:DelayQueue
  3. 正确处理 InterruptedException
  4. 关闭时清空队列:避免任务丢失
// 优雅关闭
public void shutdown() {
executor.shutdown();
List<Runnable> remaining = executor.shutdownNow();
// 处理剩余任务...
}

总结

BlockingQueue 是 Java 并发编程中最实用的工具之一,它:

  • 解耦了生产者和消费者
  • 自动处理线程等待和唤醒
  • 多种实现满足不同的性能需求

理解各实现类的特点,选择合适的队列,是设计高效并发系统的关键。


   转载规则


《阻塞队列BlockingQueue详解》 小乐 采用 知识共享署名 4.0 国际许可协议 进行许可。
  目录