大家好,我是阿轩。
每当我们谈到并发包的时候,脑海中的第一印象就是ConcurentHashMap类,的确,这个类的很多并发设计思想非常值得我们借鉴,阿轩也趁着周末想仔细研究一下,但是每当遇到问题想求助浏览器的时候,发现很多文章都是 ctrl+c ctrl+v,很多细节压根没提,无奈,只能静下心来,自己研究了这篇文章主要是对核心方法 put get 中的细节进行一个详细的剖析,版本是 JDK1.8.0_241,至于 7 和 8 的区别,网上已经有很多文章了,本篇就不再继续啰嗦了,话不多说,我们开始
put
put 源码比较长,我们先看上半部分

第一个 if 判断集合是否为空,为空进行初始化

这里介绍一下 sizeCtl 这个变量
private transient volatile int sizeCtl;
他是被 volatile
修饰的,具有线程可见性,当初始化和扩容时是负数,其他时候是正数
刚进入循环的时候,sizeCtl
是正数,所以会走到 ②,这里会使用 CAS 进行无锁赋值,CAS 失败的线程下一个循环来到 ①,让出 CPU 时间片。
CAS 成功的线程会进行数组的初始化,当我们实例化 ConcurrentHashMap 的时候,如果传入了初始容量大小,那么会对传入的这个参数进行一系列的位运算,最终得到一个 2 的幂次方的值,如果没有传入初始容量,那么 siteCtl 就等于默认值 0,所以如果传入了初始容量,n 就等于计算后的值,没传就等于默认值 DEFAULT_CAPACITY
private static final int DEFAULT_CAPACITY = 16;
所以 ConcurrentHashMap 的默认初始化容量是 16,和 HashMap 一样,初始化数组之后会将 sizeCtl 设置为(容量 x0.75) 大小,作为下一次扩容的阈值。其他自旋等待的线程发现 table!=null
后跳出循环
回到 putVal 方法,此时来到判断 ②,通过 tabAt
方法求出目标索引位的值,(n - 1) & hash
等同于hash%n
,也就是对容量大小进行取余,计算出目标所在的索引位 i
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
这里通过 Unsafe 的 getObjectVolatile
方法进行取值,getObjectVolatile 具有 valotile 的语义,可以获取 i 索引处的最新值,如果值为 null,新建一个 Node 对象,然后通过 CAS 进行赋值
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
如果 CAS 赋值成功,跳出循环,赋值失败,进入下一次循环,来到 ③,判断目标节点的 hash 值是否等于 MOVED
,也就是-1,
static final int MOVED = -1;
如果等于则帮助扩容,扩容后面再细讲,我们来看最后一个判断
如果目标索引位的值既不为 null,也不在扩容,那么就直接加锁
开始判断节点的 key 是否相等。
这里可能有读者会想,前面赋值的时候使用的是 CAS
,那么这里可不可以也使用 CAS 呢
阿轩仔细思考了一下,其实这里是没办法用 CAS
的

原因有 3 点
-
如果此时有大量的线程进行 put 操作,而这些操作的 key 都是相等的,如果此时节点的值为 null,那么通过 CAS 操作后,节点的值就不为 null 了,下一次循环就不会再进入这个 if 判断,但是如果该节点是有值的,那么只有一个线程可以通过 CAS 操作成功,其他线程下一次循环的时候还是会进入这个 if 判断再次尝试进行 CAS 操作,并发很高的时候会造成很多
无用的自旋操作
,而且 JDK8 的synchronized
经过优化之后性能已经很好了,所以不如直接进行 synchronized 加锁操作,反而来的更快 -
CAS 操作是需要知道目标对象在内存中的地址的,上面通过 CAS 对数组目标索引位进行赋值的操作
tabAt(tab, i = (n - 1) & hash)
,因为 table 这个对象类里面是有变量保存的,而数组又是连续的内存地址
,所以通过索引是可以得知相应的内存地址的,而对节点的 val 进行 CAS 操作是需要知道节点的val 变量内存地址
的,而这个地方没办法得知,所以也就无法进行 CAS 操作
transient volatile Node<K,V>[] table;
-
当链表中所有的 key 都不符合要求时,会创建一个新的节点挂在最后的位置上,而之前最后一个节点的下一个节点位置是不知道的,所以也没法进行 CAS 操作
我们继续往下看,加锁
之后立马进行了一次 if 判断,这个判断很容易被忽略,但是又是必不可少
的

比如在上一个 if 判断执行后,加锁之前,这个 f 节点刚好被扩容线程操作过,那么此时的 tabAt(tab, i) != f,细节,细节!
接下来就是判断是链表
还是红黑树
,红黑树只是个算法工具的应用,本文不做说明
我们接着看链表,首先会判断头节点是否符合要求,符合要求就把值赋给oldVal
,onlyIfAbsent
这个变量的意思是,是否仅当不存在的时候赋值,如果为 true,那么就是 val 为 null 就赋值,不为 null,不赋值,只返回原值,这里是 false,所以会覆盖原值。
如果头节点不符合,接着往下遍历,如果整个链表遍历完都不符合,那么在链表的结尾插入构造的新的 Node
节点,在锁结构体执行完会进行一个链表转红黑树
的判断,链表转红黑树阈值是 8
,红黑树转链表阈值是 6
,之所以不是同一个值就是不想 2 种结构频繁的互相转换
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
最后如果存在符合条件的 key,就返回原值,不存在就跳出循环,执行 putVal
方法的最后一个逻辑 addCount
方法,这个方法只有当整个集合中不存在符合条件的 key 时才会执行到,否则就返回原值,提前 return 了,下面我们来看 addCount 方法。
addCount

addCount
主要分为 2 大块,第一个 if 是计算集合的容量
,第二个 if 是判断是否需要扩容
,在看之前,读者不妨思考一下这个问题,在高并发的情况下,怎样高效的计算集合的容量
呢?
Doug 老爷子在这里给出了答案,通过分治
的思想高效计算容量,怎样实现的呢
private transient volatile long baseCount;
private transient volatile CounterCell[] counterCells;
Doug 老爷子设置了一个 long 型变量 baseCount
,一个 CounterCell
类型的数组,CounterCell 实际上内部只有一个 long 型变量。
@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
读者有没有注意到,这个类使用了一个@Contended
注解,这个注解是不是很陌生呀,他的作用是防止伪共享
,这个展开来又是一篇文章了,有兴趣的读者可以去研究一下
所以,这里计算容量的总体思想就是首先尝试对 baseCount
进行+1,如果失败了,随机获取 CounterCell
数组中的一个下标位,对他尝试进行+1,这样就将对容量的并发操作分开来了,大大提高了并发性能,理想情况下最多支持 CPU 个数
的线程同时对容量进行操作,学到了没

理解了这点,我们在看代码就轻松多了
第一次进入这个方法,counterCells
默认是为 null
的,所以会通过 CAS 尝试对 baseCount
进行+1 操作,如果成功跳出 if 进行下一个判断,失败则说明存在并发竞争
,此时需要对 CounterCell 数组进行操作,进入 if 内部,首先又会进行一个 if 判断,总共 4 个条件,前 2 个条件是判断 CounterCell 数组是否为空,第三个条件是根据线程的 probe
值计算出数组的下标,然后判断这个下标是否为 null,简单说下 ThreadLocalRandom.getProbe()
这个方法
static final int getProbe() {
return UNSAFE.getInt(Thread.currentThread(), PROBE);
}

/** Probe hash value; nonzero if threadLocalRandomSeed initialized */
@sun.misc.Contended("tlr")
int threadLocalRandomProbe;
这个方法的意思是获得线程的一个唯一身份码
,这里我们又看到刚刚提及的那个注解sun.misc.Contended
了,这个注解其实在 JDK 源码
里用的还是挺多的
最后一个条件是通过 CAS 对刚刚得到的数组下标位的 CounterCell 进行赋值,如果成功则跳出 if,失败进入 if 内部
这里,不知道读者有没有注意到一个细节,外层的 CAS
和里面的 CAS
是不会连续执行
的,作者在这里巧妙的运用了逻辑短路
如果 CounterCell 数组为 null,那么先尝试对 baseCount
进行操作,如果成功皆大欢喜,失败则说明存在并发竞争了,需要对 CounterCell
数组进行初始化了,里层的 if 第一个条件就是判断 CounterCell 数组是否为空,所以不会执行后面的 CAS
操作。如果 CounterCell 数组不为空,那么说明已经存在并发竞争了,这个时候在对 baseCount
进行 CAS 操作很大程度上会失败
,不如直接对 CounterCell 数组进行操作,成功率更高
,所以会直接进入 if 内部,不会执行后面 CAS
进入内部 if 判断时,因为此时 CounterCell 数组不为 null,所以前 2 个条件跳过
,第三个条件如果下标位为 null 则进入 fullAddCount
方法进行这个下标位的初始化,不为 null 会尝试对这个下标位进行 CAS 赋值操作,成功跳出 if,失败也进入 fullAddCount
方法。
fullAddCount 方法后面会讲,这里暂表不提,不过这里阿轩有一个问题一直没有想明白,就是第一个 if 里有 2 个 return
,为什么这里要进行 return 呢?如果有读者知道,还请在评论区解答一下,非常感谢!
继续看第二个 if 判断 ②,while
循环里的 s
是上面计算得到的集合容量,sizeCtl
是扩容阈值,所以这里就是判断集合是否需要扩容了
我们先看下resizeStamp
这个方法
static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
前半部分是将 n 转为二进制
,计算从左到右有多少个 0
,举个例子,n=16

从左往右数总共有 27 个 0,那么 Integer.numberOfLeadingZeros(n)
计算后的值就是 27,所以 Integer.numberOfLeadingZeros(n)
的值介于 1-32 之间,转换成二进制
-
最大值:00000000 00000000 00000000 00100000 -
最小值:00000000 00000000 00000000 00000001
private static int RESIZE_STAMP_BITS = 16;
RESIZE_STAMP_BITS=16
,后半部分意思是将 1 左移 15 位
-
00000000 00000000 10000000 00000000
然后进行或运算
,相当于两者之和
,那么 resizeStamp 的最大值和最小值分别是
-
最大值:00000000 00000000 10000000 00100000 -
最小值:00000000 00000000 10000000 00000001
可以发现最大值或者最小值高 16 位全部为 0
,这个值记下,马上会用到
接着会进行 2 个 if 判断,第一次进入这个循环的时候 sc
等于扩容阈值,是大于 0 的,会进入第二个 if,条件是通过 CAS 对sizeCtl
进行赋值,成功则进入 transfer
方法开始扩容,失败进行下一次循环,我们看下赋的值是多少,(rs << RESIZE_STAMP_SHIFT) + 2
,rs
是刚刚根据容量计算得出的值,将他左移 16 位
,那么低 16 位会全部变为 0
,然后加 2,这里是什么意思呢
其实开始扩容之后 sizeCtl 保存的值,高 16 位
表示原集合的容量
,低 16 位
表示的是参与扩容的线程数
,读到这里,我们可能会想,直接用 sizeCtl
保存容量,再建一个变量保存线程数不就行了吗,干嘛费这么老大一个劲
其实这也是 JDK 的一大特性,对内存的极致使用
,如果你源码读多了,就会发现很多地方都是这样玩的,高 16 位保存一个值,低 16 位保存另一个值,又省内存,逼格又高

所以 CAS 失败
的线程再一次循环,因为此时的 sizeCtl
已经是负数了,所以会进入第一个 if,这里又是一波位运算
的骚操作,我们一个个看,(sc >>> RESIZE_STAMP_SHIFT) != rs
,将之前计算得到的 sizeCtl 值右移 16 位
,因为线程数的加减只是在低 16 位
进行,所以右移 16 位之后得到的值就是原集合的容量
这里又有一个小细节
,大家有没有发现这里使用的是>>>
,之前赋值的时候使用的是<<
,怎么多了一个箭头呢,>>>表示的是无符号右移
,即右移之前如果是负数,右移之后高位补 0,会变成正数,相对应的是>>
,右移之前是负数,右移之后还是负数,所以第一个条件就是判断容量是否发生了变化
第二个和第三个条件实际上是 JDK8 的一个 bug
,JDK12
中修复,详情参考oracle 官网,正确的条件应该是
-
sc == (rs << RESIZE_STAMP_SHIFT) + 1 || sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS
第一个是判断扩容的线程数是否为 0,读者可能会有疑问,一开始不是+2
吗,为什么这里是+1
,因为一开始第一个扩容的线程已经把自己算进去了,所以初始值应该是+1,然后自己又+1,所以就是+2 了,第二个是判断扩容线程是否达到最大线程数 MAX_RESIZERS
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
最后 2 个条件是判断扩容之后的数组是否为 null,以及扩容的下标位是否到 0 了,也就是是否扩容结束了
(nt = nextTable) == null || transferIndex <= 0
整个 if 的 5 个条件都是判断扩容是否结束了,这里其实还隐藏了个细节
,后面讲到 transfer 方法的时候会讲到
如果此时没有扩容完毕,那么通过 CAS 对 sizeCtl
进行赋值+1 操作,成功则进入 transfer
方法协助扩容,在循环的最后会进行一个集合容量的计算操作
s = sumCount();
判断是否需要再一次进行扩容,可能有读者会有疑问,会不会在扩容的过程中又触发一次扩容,这个在后面 transfer 方法讲完你就会知道了,这里简单提一下,是不会的,因为完成扩容的线程进行下一次循环执行到这个 if 判断的时候,transfer 必然等于 0,不会再次进入 transfer 方法,只有最后一个出来的线程有可能再次触发扩容操作
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0)
addCount 到这里基本说完了,我们接下来将一开始提到的 fullAddCount
方法
fullAddCount
这个方法有点长,我们分 4 部分来说明,第一部分
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
刚进来就会判断 ThreadLocalRandom.getProbe()
是否等于 0,前文说过,这个值是线程的唯一身份码
,如果为 0 就进行初始化
static final void localInit() {
int p = probeGenerator.addAndGet(PROBE_INCREMENT);
int probe = (p == 0) ? 1 : p; // skip 0
long seed = mix64(seeder.getAndAdd(SEEDER_INCREMENT));
Thread t = Thread.currentThread();
UNSAFE.putLong(t, SEED, seed);
UNSAFE.putInt(t, PROBE, probe);
}
从源码中可以看到,初始化之后的值是不可能为 0
的,如果为 0 会被改成 1,为什么不能为 0 呢,这个稍后会提到
接下来会进入一个死循环
,循环中主要分 3 大块,简写一下
-
if ((as = counterCells) != null && (n = as.length) > 0) -
else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) -
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
先判断 counterCells 数组是否为空
,为空进入第二个 if,尝试初始化
else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try {
if (counterCells == as) {
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
这块代码比较简单,初始化一个容量为 2 的数组,然后新建一个 CounterCell
对象赋值给目标索引位,初始化完毕退出循环
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break;
如果 CounterCell 数组为空,初始化又尝试失败了,这个时候会再一次尝试对 baseCount
进行 CAS,万一成功了呢
接下来,我们重点看下第一个 if,里面有好几处细节

里面分成了 6 个判断,① 判断目标索引位是否为 null,为 null 尝试 CAS 将 cellsBusy
设为 1
private transient volatile int cellsBusy;
这个变量同样是被 volatile
修饰的,用来控制对 CounterCell
数组的操作,看到这里,读者可能又有疑问了,哈哈,疑问好多
为什么这里连续 2 次判断cellsBusy == 0

这里只能说 Doug 老爷子的”毛病”又犯了,注意我这里加了引号
哈
怎么说呢
因为这里会进行一个 CAS
操作,CAS 是什么,也是一种锁
,既然涉及到锁,那么就会存在锁区间
,我们都知道,锁区间越小越好,这样才会将性能的损耗
降到最低,Doug 老爷子当然也是知道的,而这里会有一个新建对象
的操作,新建对象会发生什么呢
-
检查常量池是否有类的符号引用 -
是否已被加载解析初始化 -
分配内存 -
初始化零值 -
设置对象头信息 -
引用赋值
而分配内存
的时候,其实也是会存在并发竞争
的,所以 JVM 采用 2 种机制
-
CAS+失败重试 -
TLAB 线程本地分配缓冲区
现在是不是觉得一个 new 看起来很简单,里面道道还挺多的,所以老爷子将新建对象的操作抽离
了出来,目的就是为了减少锁区间
,反正就算后面的 CAS 失败了,重新再来一轮就是了,反正 java 有 GC 机制
(手动滑稽),而且可以看到锁区间里的操作非常简单,就是一个赋值操作,执行非常快,佩服,佩服

② 这个判断又是一个细节,wasUncontended
这个变量从头到尾只在这个判断里用到,大家有没有想过,这个变量是干什么的,可以去掉
吗
这个变量命名直译过来是是否无竞争
,什么意思,竞争什么呢
我们不妨倒推
一下,满足什么样的条件会执行到这里,从上面的 if 可以得知,执行到这里要满足 2 个条件
-
(as = counterCells) != null && (n = as.length) > 0,数组不为空 -
(a = as[(n – 1) & h]) == null,目标索引位不为空
而 fullAddCount
方法只有一个进来的入口,就是之前说的 addCount
,我们在回看一下进入这个方法的条件
if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
根据上面得出的 2 个条件,在满足这 2 个条件的情况下想进入 fullAddCount
方法只有一种情况,就是!(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
,也就是 CAS 设置失败
的情况下
回到 fullAddCount
方法里,如果!wasUncontended
这个条件去掉
的话,那么又会进行一次一模一样的 CAS 操作
,而这 2 个 CAS 操作间没有任何耗时的操作,那么很大概率
,这次的 CAS 又会失败
,而 CAS 是通过操作底层操作系统
来实现的,也是有性能损耗
的,所以这里进行了一次拦截
,不让他再继续执行 CAS,而是通过底部的 advanceProbe
方法,将 hash 值更改一下
,路由到其他的下标位上在重新尝试
所以,这下你知道为什么ConcurrentHashMap
性能这么高了吧
③,通过 CAS 进行赋值,成功就退出,没什么说的
③④,细节
又来了(手动落泪),刚刚我们分析过,执行到上一个 if
的时候,已经 CAS 失败一次
了,加上上一个 if
本身,就是已经 CAS 失败 2 次
了,那么,这个时候该考虑什么?
当然是扩容
啦,对不对,这里有一个变量collide
,翻译过来是碰撞
的意思,前半部分判断counterCells != as
,是否已经扩容
了,如果 true
,表示已经扩容了,那么 collide
设为默认值 false
,我刚扩过容,肯定不存在碰撞
,是吧。如果没扩容,判断容量是否大于 CPU 个数
,如果小于,说明还有扩容的空间
,因为理论上,最多支持 CPU 个数的线程
对数组进行同时操作,所以会进行下一个 if 判断,如果已经大于等于 CPU,那么已经没有扩容的空间
了,那么就要把 collide 设为 false,阻止
进入到后面的扩容操作
④ 这个判断又是干嘛用的吗,看起来和 ② 很像呀,其实作用也是差不多的,就是再给你最后一次机会
,重新 hash
然后看看能不能成功,如果还不能成功,没办法,竞争太激烈了,只能扩容
了()
最后来到 ⑤,也就是扩容,这里倒没什么好讲的,新建一个容量是之前 2 倍
的数组,然后将原数组复制
过来,最后将 collide 重新置为 false
在第一个大的 if 判断里,里面的每个小 if 除非执行到 continue
或者 break
,否则都会执行到advanceProbe
这个方法
static final int advanceProbe(int probe) {
probe ^= probe << 13; // xorshift
probe ^= probe >>> 17;
probe ^= probe << 5;
UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
return probe;
}
其实就是通过位运算
将 hash
值改变一下,重新路由
到另一个下标位上去,避开冲突
的下标位,在方法的开头说过为什么等于 0 需要初始化重新赋值
,因为如果是 0 的话,经过位运算操作还是0
,就很尴尬了
好累呀,喝口水,咕咕咕—
transfer
这个方法也很长,同样,也有几个细节
,细节好多(),一点点看,
第一部分
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE;
这里是确定每个线程负责的扩容区间
,很好理解,如果 CPU 核数大于 1,计算出一个值,等于 1,那么设置区间也没意义,全都是你的
。最后和最小区间
判断一下大小
private static final int MIN_TRANSFER_STRIDE = 16;
然后来到初始化操作
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
不知道大家注意到没有,这里竟然没有加锁
,难道是 Doug 老爷子忘记了?
显然不是,那这里难道不用加锁吗,如果有 2 个线程
同时执行初始化
操作,那么 nextTable
指向的对象地址就会发生变化,从而产生错误
,一开始阿轩读到这里也是很疑惑
的

心想,这里又是什么骚操作呀?后来把前后源码翻了一下,终于,哦——
来一起看看怎么回事
通过搜索得知,transfer 总共有 5 个地方
用到

第二个方法 helpTransfer
是 putVal 里的,如果节点的 hash 值等于-1,那么协助扩容
,很明显,这会已经初始化
过了
第三个方法 tryPresize
,主要作用也是进行扩容
的,和 transfer
的区别是,这个方法是当链表转红黑树的时候发现集合的容量
小于 64
就会走到这个方法进行扩容
,里面逻辑和 addCount
几乎差不多,所以我们直接看 addCount
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
大家回忆一下,之前说过只有第一个
通过 CAS 成功
的线程才会进入第二个 if
,其他的线程进入第一个 if
,大家注意看,里面有一个很不起眼的判断条件是(nt = nextTable) == null
,也就是说,在其他线程走到这个判断的时候,如果第一个进入的线程此时还没有完成初始化
,那么会直接 break
,跳出循环,不参与扩容
。
所以除了第一个进去 transfer
的线程外,其他线程进入 transfer 一定是已经完成初始化了
,所以,在 transfer 方法里面初始化才不需要加锁

接着往下看,设置了 2 个临时变量 i=0,bound=0
,然后进行死循环
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
进入内部的 while
循环,第一次进入循环的时候会走到第三个 if
,确定线程负责的扩容区间
,后面再进入循环的时候会走到第一个 if,当自己负责的扩容区间全部扩容完毕
后,如果此时所有区间都已经分配完了,会进入第二个 if
,否则再次进入第三个 if
,继续分配扩容区间
,这里第一个判断中的 finishing
也是个细节,后面会讲
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
省略。。。
}
}
}
接下来有 4 个 if
判断,第一个 if
,当扩容完毕
时 i 才会小于 0,第二个 if
,如果目标索引位的值为 null
,通过 CAS 设置为 ForwardingNode
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
}
ForwardingNode 是对 Node
的一个简单封装,hash 值是-1
,表示正在扩容
,nextTable
指向扩容之后的数组对象
第三个 if
,如果发现节点 hash 值为-1
,那么继续,这里,大家想过没有,什么时候会执行到这个分支呢,从上面的 while
循环可以发现,每个线程都负责有自己的扩容区间
,是不会走到其他线程的区间的,那这个 if 判断是干嘛的呢?读者可以先思考一下,后面会提到

如果节点不为空,会将节点的值转移
到新的数组上去,看过 HashMap 扩容源码
的话,这块理解起来就会很容易了,我们计算 key
在数组中下标位是通过 hash&(n-1)
来计算的,当数组扩容后,hash
是不变的 ,变得只是 n-1
,举个例子,n=8,扩容之后 n=16,转换成二进制
-
8-1:00000111 -
16-1: 00001111
也就是说,key
在扩容之后数组的位置取决于比 8 高一位的那个位置和 hash 计算的结果
,而那个位置是几呢,如果 n=8,那个位置就是 8
-
8:00001000
所以,源码里直接计算 int runBit = fh & n
,如果这个值为 0
,那么扩容后位置不变
,如果为 1
,那么计算后的值正好是原索引位+n
,所以,通过一个 for 循环
计算链表的最后一个节点的值是 0
还是 1
,这里有一个细节,如果最后几个节点计算结果一致
,那么取第一个节点
赋值给 lastRun
,什么意思,举个例子
-
1->2->3->4->5->6->null
如果 4,5,6,3 个节点计算的结果一致
,那么 lastRun=4
,然后根据 runBit
的值,给高位引用 hn
和低位引用 ln
赋值,hn
就表示位置发生变化
的节点,ln
变化位置不发生变化
的,然后再通过一个 for 循环
,将位置发生变化和不发生变化的分成 2 个链表
,最后赋值给新的数组下标位
当所有的数据都完成迁移
后,i 会变成-1
,来到这个 if
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
刚进入的时候,finishing
是 false
,进入第二个 if
,因为自己的扩容任务
结束了,所以要把 sizeCtl 减 1
,当然也是通过 CAS
的,如果失败了,重新进入循环再试一次
,里面那个 if 判断应该不陌生了吧,还记得我们一开始赋值
的时候吗,这个就是判断此时线程是否是最后一个执行扩容的线程
,如果不是,退出方法,没你事了,否则,finishing
设为 true
,i=n
,看到这里会不会觉得奇怪,i 怎么又等于 n
了呢,没错,从头到尾再检查一遍
,还记得一开始 while
里说的那个 finishing
条件吗,如果没有这个条件,那么最后一个线程只会检查自己边界 bound
到 n
的区间,但是有了这个条件,则会把所有区间
都检查一遍

当第二次进入 i<0
的时候,因为 finishing
为 true
,会把几个关键变量重新初始化然后退出
还记得前文问过一个问题吗,(fh = f.hash) == MOVED
这个条件什么时候会执行到
是的,最后一遍检查
的时候会执行到
其实最后一遍检查在阿轩看来是多余
的,只是老爷子比较谨慎
,所以又检查了一遍,然后阿轩去查阅了一波资料,发现还真有人就这个问题问过 Doug,地址http://cs.oswego.edu/pipermail/concurrency-interest/2020-July/017171.html,Doug是这样回复的
Yes, this is a valid point; thanks. The post-scan was needed in a previous version, and could be removed. It does not trigger often enough to matter though, so is for now another minor tweak that might be included next time CHM is updated.
话里的意思也说了,确实是没有必要的
,后面可能会做调整
get
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
看完了 put
方法的整个脉络再来看 get
方法就简单很多了,因为 Node
的 val
是通过 volatile
修饰的
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
}
所以直接对节点进行遍历
,符合就返回,没有就返回 null
最后再看一下计算集合容量的方法
sumCount
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
也很简单,就是把 baseCount
和 CounterCell
数组的值加一下
后记
通过对 ConcurrentHashMap
源码的学习,我们发现作者使用了很多技巧,这些技巧很值得我们学习和借鉴
,所以,平时没事的时候可以多翻翻源码,总会有所收货的。
我是阿轩,欢迎关注我的公众号,一起交流,一起进步
原文始发于微信公众号(程序员阿轩):ConcurrentHashMap那些不为人知的细节
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/37061.html