ConcurrentHashMap为什么适合并发

ConcurrentHashMap为什么适合并发

ConcurrentHashMap 为什么在并发场景下更合适,背后涉及哪些实现细节。从 Java 7 到 Java 8,它的实现发生了很大变化。本文结合实际代码讲清楚这些变化和设计思想。

Map 实现对比

实现 线程安全 性能 null支持 迭代器
HashMap Key/Value 快速失败
Hashtable 是(synchronized) 不支持 快速失败
Collections.synchronizedMap 是(包装器) 支持 快速失败
ConcurrentHashMap 不支持 弱一致性

JDK 7:分段锁(Segment)

#

数据结构

ConcurrentHashMap
├── Segment[0] -> HashEntry[0] -> Node -> Node
├── Segment[1] -> HashEntry[1] -> Node
├── Segment[2] -> HashEntry[2] -> Node -> Node -> Node
└── ...
// Segment 继承 ReentrantLock
static final class Segment<K,V> extends ReentrantLock {
transient volatile HashEntry<K,V>[] table;
transient int count;
}

#

核心参数

  • DEFAULT_CONCURRENCY_LEVEL = 16:默认 16 个 Segment
  • 每个 Segment 是一个独立的哈希表
  • 操作只锁定相关 Segment,其他 Segment 不受影响

#

put 流程

public V put(K key, V value) {
Segment<K,V> s;
if (value == null) throw new NullPointerException();

int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask; // 定位Segment

if ((s = (Segment<K,V>)UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) == null)
s = ensureSegment(j);

return s.put(key, hash, value, false); // 在Segment内加锁put
}

#

get 流程(无锁)

public V get(Object key) {
HashEntry<K,V>[] tab;
V v = null;
// 使用UNSAFE.getObjectVolatile读取,保证可见性
if ((tab = table) != null && (n = tab.length) > 0) {
HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile(
tab, ((long)(((n - 1) & hash)) << TSHIFT) + TBASE);
while (e != null) {
if (e.hash == hash && key.equals(e.key))
return e.value;
e = e.next;
}
}
return null;
}

JDK 8:CAS + synchronized

#

数据结构

Node[] table
├── null
├── Node(链表头)
├── TreeBin(红黑树根)
└── ForwardingNode(扩容时使用)
// 基本节点
transient volatile Node<K,V>[] table;

static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val; // volatile保证可见性
volatile Node<K,V> next;
}

#

put 流程

final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;

for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;

// 1. table未初始化,初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();

// 2. 目标槽位为空,CAS插入
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break;
}

// 3. 发现ForwardingNode,帮助扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);

// 4. 槽位有值,synchronized锁定头节点
else {
V oldVal = null;
synchronized (f) { // 只锁定单个槽位!
if (tabAt(tab, i) == f) {
if (fh >= 0) { // 链表
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key || (ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value, null);
break;
}
}
}
else if (f instanceof TreeBin) { // 红黑树
// ...
}
}
}

if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i); // 链表转红黑树
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}

JDK 8 优化点

  1. 锁粒度更细:只锁定单个槽位(头节点),而非整个 Segment
  2. 无锁 get:volatile 保证可见性
  3. CAS 优化:槽位为空时直接 CAS

#

size 计算

public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n);
}

// 使用CounterCell分散热点
@sun.misc.Contended
static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}

@Contended:避免伪共享(False Sharing),提升并发性能。

关键设计亮点

#

1. volatile + CAS 无锁读

// 使用Unsafe直接内存访问
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

#

2. synchronized 只锁定头节点

相比 JDK 7 的 Segment 锁,粒度更细,并发度更高。

#

3. 链表转红黑树

static final int TREEIFY_THRESHOLD = 8;   // 链表长度>=8转树
static final int UNTREEIFY_THRESHOLD = 6; // 树节点<=6转链表

#

4. 扩容优化

  • 支持多线程协助扩容(helpTransfer
  • 渐进式扩容,不阻塞读操作

使用注意事项

#

1. 不支持 null

map.put("key", null);   // NullPointerException
map.put(null, "value"); // NullPointerException

原因:无法区分”key 不存在”和”key 对应的 value 为 null”。

#

2. size() 是估计值

// 弱一致性,可能读到旧值
int size = map.size();

#

3. 复合操作非原子

// 错误!不是原子操作
if (!map.containsKey(key)) {
map.put(key, value);
}

// 正确:使用putIfAbsent
map.putIfAbsent(key, value);

// 或computeIfAbsent
map.computeIfAbsent(key, k -> createValue());

#

4. 遍历是弱一致性

for (Map.Entry<String, String> entry : map.entrySet()) {
// 可能读到其他线程正在插入的数据
// 也可能读不到其他线程已插入的数据
}

最佳实践

// 1. 使用computeIfAbsent(原子且高效)
map.computeIfAbsent(key, k -> new ArrayList<>()).add(value);

// 2. 使用merge更新值
map.merge(key, 1, Integer::sum);

// 3. 批量操作使用并行流
ConcurrentHashMap<String, Integer> result = list.parallelStream()
.collect(Collectors.toConcurrentMap(
Function.identity(),
s -> 1,
Integer::sum
));

总结

版本 锁机制 并发度 特点
JDK 7 Segment(16个) 16 分段锁
JDK 8 synchronized(槽位级) 槽位数 更细粒度

ConcurrentHashMap 通过精巧的设计,在保证线程安全的同时实现了极高的并发性能,是并发场景下 Map 的首选实现。

核心要点

  1. Java 7 使用分段锁,Java 8 使用 CAS + synchronized

  2. 锁粒度从段级别降到了节点级别

  3. 使用红黑树优化链表的查询性能

  4. 支持原子操作,如 putIfAbsent、computeIfAbsent

总结

ConcurrentHashMap 是并发编程中常用的数据结构,理解它的实现细节有助于更好地使用它。在实际项目中,选择合适的并发集合可以显著提升性能。


   转载规则


《ConcurrentHashMap为什么适合并发》 小乐 采用 知识共享署名 4.0 国际许可协议 进行许可。
  目录