ConcurrentHashMap
是在 JDK 1.5 时,J.U.C 引入的一个同步集合工具类,顾名思义,这是一个线程安全的 HashMap。不同版本的 ConcurrentHashMap,内部实现机制千差万别,本节所有的讨论基于 JDK 1.8。
基本结构
ConcurrentHashMap内部维护了一个Node
类型的数组,也就是table:
1 | transient volatile Node[] table; |
数组的每一个位置table[i]
代表了一个桶,当插入键值对时,会根据键的hash值映射到不同的桶位置,table 一共可以包含 4 种不同类型的桶:Node、TreeBin、ForwardingNode、ReservationNode。上图中,不同的桶用不同颜色表示。可以看到,有的桶链接着链表,有的桶链接着树。
TreeBin *所链接的是一颗红黑树,红黑树的结点用 *TreeNode 表示,所以 ConcurrentHashMap 中实际上一共有五种不同类型的 Node 结点。
之所以用 TreeBin 而不是直接用 TreeNode,是因为红黑树的操作比较复杂,包括构建、左旋、右旋、删除,平衡等操作,用一个代理结点 TreeBin 来包含这些复杂操作,其实是一种“职责分离”的思想。另外 TreeBin 中也包含了一些加/解锁的操作。
节点定义
Node 节点
Node 结点的定义非常简单,也是其它四种类型结点的父类。
默认链接到
table[i]
——桶上的结点就是 Node 结点。
当出现 hash 冲突时,Node 结点会首先以链表的形式链接到 table 上,当结点数量超过一定数目时,链表会转化为红黑树。因为链表查找的平均时间复杂度为O(n)
,而红黑树是一种平衡二叉树,其平均时间复杂度为O(logn)
。
TreeNode 结点
TreeNode就是红黑树的结点,TreeNode 不会直接链接到table[i]
——桶上面,而是由TreeBin 链接,TreeBin 会指向红黑树的根结点。
TreeBin 结点
TreeBin 相当于 TreeNode 的代理结点。TreeBin 会直接链接到table[i]
——桶上面,该结点提供了一系列红黑树相关的操作,以及加锁、解锁操作。
ForwardingNode 结点
ForwardingNode 结点仅仅在扩容时才会使用
ReservationNode 结点
保留结点,ConcurrentHashMap 中的一些特殊方法会专门用到该类结点。
常量定义
1 | /** |
2 | * 最大容量. |
3 | */ |
4 | private static final int MAXIMUM_CAPACITY = 1 << 30; |
5 | |
6 | /** |
7 | * 默认初始容量 |
8 | */ |
9 | private static final int DEFAULT_CAPACITY = 16; |
10 | |
11 | /** |
12 | * The largest possible (non-power of two) array size. |
13 | * Needed by toArray and related methods. |
14 | */ |
15 | static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; |
16 | |
17 | /** |
18 | * 负载因子,为了兼容JDK1.8以前的版本而保留。 |
19 | * JDK1.8中的ConcurrentHashMap的负载因子恒定为0.75 |
20 | */ |
21 | private static final float LOAD_FACTOR = 0.75f; |
22 | |
23 | /** |
24 | * 链表转树的阈值,即链接结点数大于8时, 链表转换为树. |
25 | */ |
26 | static final int TREEIFY_THRESHOLD = 8; |
27 | |
28 | /** |
29 | * 树转链表的阈值,即树结点树小于6时,树转换为链表. |
30 | */ |
31 | static final int UNTREEIFY_THRESHOLD = 6; |
32 | |
33 | /** |
34 | * 在链表转变成树之前,还会有一次判断: |
35 | * 即只有键值对数量大于MIN_TREEIFY_CAPACITY,才会发生转换。 |
36 | * 这是为了避免在Table建立初期,多个键值对恰好被放入了同一个链表中而导致不必要的转化。 |
37 | */ |
38 | static final int MIN_TREEIFY_CAPACITY = 64; |
39 | |
40 | /** |
41 | * 在树转变成链表之前,还会有一次判断: |
42 | * 即只有键值对数量小于MIN_TRANSFER_STRIDE,才会发生转换. |
43 | */ |
44 | private static final int MIN_TRANSFER_STRIDE = 16; |
45 | |
46 | /** |
47 | * 用于在扩容时生成唯一的随机数. |
48 | */ |
49 | private static int RESIZE_STAMP_BITS = 16; |
50 | |
51 | /** |
52 | * 可同时进行扩容操作的最大线程数. |
53 | */ |
54 | private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; |
55 | |
56 | /** |
57 | * The bit shift for recording size stamp in sizeCtl. |
58 | */ |
59 | private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; |
60 | |
61 | static final int MOVED = -1; // 标识ForwardingNode结点(在扩容时才会出现,不存储实际数据) |
62 | static final int TREEBIN = -2; // 标识红黑树的根结点 |
63 | static final int RESERVED = -3; // 标识ReservationNode结点() |
64 | static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash |
65 | |
66 | /** |
67 | * CPU核心数,扩容时使用 |
68 | */ |
69 | static final int NCPU = Runtime.getRuntime().availableProcessors(); |
字段定义
1 | /** |
2 | * Node数组,标识整个Map,首次插入元素时创建,大小总是2的幂次. |
3 | */ |
4 | transient volatile Node<K, V>[] table; |
5 | |
6 | /** |
7 | * 扩容后的新Node数组,只有在扩容时才非空. |
8 | */ |
9 | private transient volatile Node<K, V>[] nextTable; |
10 | |
11 | /** |
12 | * 控制table的初始化和扩容. |
13 | * 0 : 初始默认值 |
14 | * -1 : 有线程正在进行table的初始化 |
15 | * >0 : table初始化时使用的容量,或初始化/扩容完成后的threshold |
16 | * -(1 + nThreads) : 记录正在执行扩容任务的线程数 |
17 | */ |
18 | private transient volatile int sizeCtl; |
19 | |
20 | /** |
21 | * 扩容时需要用到的一个下标变量. |
22 | */ |
23 | private transient volatile int transferIndex; |
24 | |
25 | /** |
26 | * 计数基值,当没有线程竞争时,计数将加到该变量上。类似于LongAdder的base变量 |
27 | */ |
28 | private transient volatile long baseCount; |
29 | |
30 | /** |
31 | * 计数数组,出现并发冲突时使用。类似于LongAdder的cells数组 |
32 | */ |
33 | private transient volatile CounterCell[] counterCells; |
34 | |
35 | /** |
36 | * 自旋标识位,用于CounterCell[]扩容时使用。类似于LongAdder的cellsBusy变量 |
37 | */ |
38 | private transient volatile int cellsBusy; |
39 | |
40 | |
41 | // 视图相关字段 |
42 | private transient KeySetView<K, V> keySet; |
43 | private transient ValuesView<K, V> values; |
44 | private transient EntrySetView<K, V> entrySet; |
get 操作
get 方法的逻辑很简单,首先根据 key 的 hash 值计算映射到 table 的哪个桶——table[i]
。
- 如果
table[i]
的 key 和待查找 key 相同,那直接返回; - 如果
table[i]
对应的结点是特殊结点(hash值小于0),则通过find
方法查找; - 如果
table[i]
对应的结点是普通链表结点,则按链表方式查找。
第二种情况下,table[i]
是不同的节点,处理方式也会不同:
Node 结点的查找
当槽table[i]
被普通 Node 结点占用,说明是链表链接的形式,直接从链表头开始查找。
TreeBin 结点的查找
TreeBin 的查找比较特殊,我们知道当槽table[i]
被 TreeBin 结点占用时,说明链接的是一棵红黑树。由于红黑树的插入、删除会涉及整个结构的调整,所以通常存在读写并发操作的时候,是需要加锁的。
ConcurrentHashMap 采用了一种类似读写锁的方式:当线程持有写锁(修改红黑树)时,如果读线程需要查找,不会像传统的读写锁那样阻塞等待,而是转而以链表的形式进行查找(TreeBin本身时Node类型的子类,所有拥有 Node 的所有字段)
ForwardingNode 结点的查找
ForwardingNode 是一种临时结点,在扩容进行中才会出现,所以查找也在扩容的table上进行。
ReservationNode 结点的查找
ReservationNode 是保留结点,不保存实际数据,所以直接返回null
。
put 操作
put 方法内部调用了putVal
这个私有方法
putVal
的逻辑还是很清晰的,首先根据key
计算hash
值,然后通过hash
值与table
容量进行运算,计算得到key
所映射的索引——也就是对应到table中桶的位置。
这里需要注意的是计算索引的方式:i = (n - 1) & hash
n - 1 == table.length - 1
,table.length
的大小必须为2的幂次的原因就在这里。
读者可以自己计算下,当table.length
为2的幂次时,(table.length-1)
的二进制形式的特点是除最高位外全部是 1,配合这种索引计算方式可以实现 key 在 table 中的均匀分布,减少 hash 冲突——出现 hash 冲突时,结点就需要以链表或红黑树的形式链接到table[i]
,这样无论是插入还是查找都需要额外的时间。
putVal
方法一共处理四种情况:
1、首次初始化 table 懒加载
ConcurrentHashMap在构造的时候并不会初始化 table 数组,首次初始化就在这里通过initTable
方法完成。首次加载的时候,会通过sizeCtl
变量来控制,只有变量修改成功后才开始初始化,避免了多线程同时初始化的问题。
2、table[i] 对应的桶为空
最简单的情况,直接 CAS 操作占用桶table[i]
即可。
3、发现 ForwardingNode 结点,说明此时 table 正在扩容,则尝试协助进行数据迁移
ForwardingNode 结点是 ConcurrentHashMap 中的五类结点之一,相当于一个占位结点,表示当前 table 正在进行扩容,当前线程可以尝试协助数据迁移。
4、出现 hash 冲突,也就是table[i]
桶中已经有了结点
ConcurrentHashMap 就是在最后一种情况这里加锁的。
当两个不同key
映射到同一个table[i]
桶中时,就会出现这种情况:
- 当
table[i]
的结点类型为 Node——链表结点时,就会将新结点以“尾插法”的形式插入链表的尾部。 - 当
table[i]
的结点类型为 TreeBin——红黑树代理结点时,就会将新结点通过红黑树的插入方式插入。
再然后,涉及将链表转换为红黑树 —— treeifyBin
,但实际情况并非立即就会转换,当 table 的容量小于 64 时,出于性能考虑,只是对 table 数组扩容1倍——tryPresize
。
addCount 操作
在putVal
的最后一步,就是调用addCount
方法,把计数器增加,但是并发环境下也不是一个简简单单的操作,我们看下这个方法都做了什么。
在看这个方法之前,必须了解ConcurrentHashMap
进行数量统计的基本方法,它引入了两个变量baseCount
、counterCells
。baseCount
负责简单计数,直接 CAS 操作,用于在没有出现竞争的情况下统计,操作失败的再用counterCells
进行补充,补充方式有点复杂,这里先不展开,改天值得单独开文解读,总的来说思路是跟 LongAdder 类似。最终计数结果是baseCount
、counterCells
两个变量取和。
- 判断计数盒子属性是否是空,如果是空,就尝试修改 baseCount 变量,对该变量进行加 X。
- 如果计数盒子不是空,或者修改 baseCount 变量失败了,则放弃对 baseCount 进行操作。
- 如果计数盒子是
null
或者计数盒子的 length 是 0,或者随机取一个位置取于数组长度是null
,那么就对刚刚的元素进行 CAS 赋值。 - 如果赋值失败,或者满足上面的条件,则调用
fullAddCount
方法重新死循环插入。 fullAddCount
的逻辑简单来说是分配一个数组counterCells
,长度从 2 开始, 最多是大于等于 cpu 数量的 2 的整数次幂(比如 10 个 cpu 就是 16)。然后对当前线程 hash 取模找到对应位置,进行 cas 累加。由于 cpu 数量就这么多,不可能所有位置都 cas 失败,基本上是空间换时间的思想。- 这里如果操作 baseCount 失败了(或者计数盒子不是
null
),且对计数盒子赋值成功,那么就检查 check 变量,如果该变量小于等于 1。直接结束。否则,计算一下 count 变量。 - 如果 check 大于等于 0 ,说明需要对是否扩容进行检查。
- 如果 map 的 size 大于
sizeCtl
(扩容阈值),且 table 的长度没到最大值,那么就进行扩容。 - 根据 length 得到一个标识符,然后,判断
sizeCtl
状态,如果小于 0 ,说明其他线程在初始化或者在扩容。 - 如果正在扩容,那么就校验一下数据是否变化了。如果检验数据不通过,break。
- 如果校验数据通过了,那么将
sizeCtl
加一,表示多了一个线程帮助扩容。然后进行扩容。 - 如果没有在扩容,但是需要扩容。那么就将
sizeCtl
更新,赋值为标识符左移 16 位 —— 一个负数。然后加 2。 表示已经有一个线程开始扩容了,然后进行扩容。然后再次更新 count,看看是否还需要扩容。
transfer 扩容操作
扩容同样是非常重要的步骤,也是很容易发生线程安全问题的部分,让我们看看ConcurrentHashMap
怎么做的:
- 通过计算 CPU 核心数和 Map 数组的长度得到每个线程(CPU)要帮助处理多少个桶,并且这里每个线程处理都是平均的。默认每个线程处理 16 个桶。因此,如果长度是 16 的时候,扩容的时候只会有一个线程扩容。
- 初始化临时变量
nextTable
。将其在原有基础上扩容两倍。 - 死循环开始转移。多线程并发转移就是在这个死循环中,根据一个
finishing
变量来判断,该变量为 true 表示扩容结束,否则继续扩容。- 进入一个 while 循环,分配数组中一个桶的区间给线程,默认是 16。
- 出 while 循环,进 if 判断,判断扩容是否结束,如果扩容结束,清空临死变量,更新 table 变量,更新扩容阈值。如果没完成,但已经无法领取区间(没了),该线程退出该方法,并将 sizeCtl 减一,表示扩容的线程少一个了。如果减完这个数以后,sizeCtl 回归了初始状态,表示没有线程再扩容了,该方法所有的线程扩容结束了。(这里主要是判断扩容任务是否结束,如果结束了就让线程退出该方法,并更新相关变量)。然后检查所有的桶,防止遗漏。
- 如果没有完成任务,且 i 对应的槽位是空,尝试 CAS 插入占位符,让
putVal
方法的线程感知。 - 如果
i
对应的槽位不是空,且有了占位符,那么该线程跳过这个槽位,处理下一个槽位。 - 如果以上都是不是,说明这个槽位有一个实际的值。开始同步处理这个。
- 到这里,都还没有对桶内数据进行转移,只是计算了下标和处理区间,然后一些完成状态判断。同时,如果对应下标内没有数据或已经被占位了,就跳过了。
- 处理每个桶的行为都是同步的,此处加了
synchronized
关键字。防止putVal
的时候向链表插入数据。- 如果这个桶是链表,那么就将这个链表根据
length
取于拆成两份,取于结果是 0 的放在新表的低位,取于结果是 1 放在新表的高位。 - 如果这个桶是红黑数,那么也拆成 2 份,方式和链表的方式一样,然后,判断拆分过的树的节点数量,如果数量小于等于 6,改造成链表。反之,继续使用红黑树结构。
- 到这里,就完成了一个桶从旧表转移到新表的过程。
- 如果这个桶是链表,那么就将这个链表根据
多线程无锁扩容的关键就是通过 CAS 设置 sizeCtl 与 transferIndex 变量,协调多个线程对 table 数组中的 node 进行迁移。
总结
由上面种种可以看到,ConcurrentHashMap
对比HashMap
和HashTable
成功处理线程安全的关键有几个:
- 关键操作加锁。这个说起来简单,但是优化起来很复杂,添加修改删除元素、扩容,
HashTable
也是这么做的。 - 善用锁升级,从无锁到偏向锁,再到自旋锁和重量级锁,一步步加码,保证低并发的时候最大效率。
- 空间换时间,在面对超大并发,自旋锁大量重复消耗的时候,让多个线程同步进行修改,增加效率。
- 多余线程参与扩容、计数等工作,能不闲着绝不闲着。
ConcurrentHashMap 六千多行代码,五十三个内部类,都是精华代码,Doug Lea 大神果然厉害。
参考文章:
简书博主莫那一鲁道对ConcurrentHashMap
进行了非常细致的总结,本文学习和抄袭了大量文字,更详细的内容建议直接去看原文:ConcurrentHashMap 源码阅读小结