大家好,我是阿轩。
今天我们来剖析一下StampedLock
的源码。
说到StampedLock,可能有的小伙伴会比较陌生,他是在JDK8新增的一种锁
,可以说是ReentrantReadWriteLock
的加强版。
主要是用来解决ReentrantReadWriteLock
的写线程饥饿
问题,在读多写少的场景下,如果一直有线程来进行读操作
的话,那么写线程
会一直阻塞在那造成饥饿问题,在StampedLock中引入了一种乐观读
机制,即,读的时候不加锁,而是乐观的认为在这期间不会有写操作,如果真的有再申请读锁,这样,写线程过来的时候就可以直接拿到锁不用再等待了。
今天我们主要来剖析一下StampedLock
中核心的3种操作,悲观读
,乐观读
和写
的源码。版本基于JDK8。
话不多说,我们开始。
前言
鉴于有的小伙伴可能对StampedLock
不太了解,在正式开始源码剖析之前,我们简单说一下StampedLock的机制。
StampedLock没有继承AQS
,但是他的实现和AQS差不多,当多个线程产生锁竞争的时候,也是通过队列
来让线程进行排队等待的。
锁状态是通过一个long
型的变量state
来进行保存的。
private transient volatile long state;
低7位
存储读锁的数量
,第八位
表示写锁的标识
,其他位表示写锁的版本
。
由于64位太长了,我们暂且以16位简单表示一下。
-
0100 0000 1000 0000 锁被写线程占有,写锁版本0100 0000 -
0010 0000 0000 1000 锁被读线程占有,读线程数量是8个
详细的细节我们在看源码的时候在细说。
悲观读
悲观读锁
的获取JDK提供了4个方法

常用的是readLock
方法,先尝试获取锁,获取不到加入队列等待。
tryReadLock
方法就是尝试获取,抢到了就抢到了,没抢到就直接返回。
tryReadLock(long time, TimeUnit unit),readLockInterruptibly
2种方法和readLock方法很相似,主要区别在于他们两支持中断
,readLock不支持中断
。
我们主要看readLock
方法
public long readLock() {
long s = state, next; // bypass acquireRead on common uncontended case
return ((whead == wtail && (s & ABITS) < RFULL &&
U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
next : acquireRead(false, 0L));
}
StampedLock的读写锁状态
变化涉及到一些位运算
,为了方便后面的理解,我们把类中的一些状态常量先介绍下,同样,为了方便起见,就用16位简略表示下
private static final int LG_READERS = 7;
private static final long RUNIT = 1L;
private static final long WBIT = 1L << LG_READERS;
private static final long RBITS = WBIT - 1L;
private static final long RFULL = RBITS - 1L;
private static final long ABITS = RBITS | WBIT;
private static final long SBITS = ~RBITS;
private static final long ORIGIN = WBIT << 1;
用16位表示就是
-
WBIT 0000 0000 1000 0000 -
RBITS 0000 0000 0111 1111 -
RFULL 0000 0000 0111 1110 -
ABITS 0000 0000 1111 1111 -
SBITS 1111 1111 1000 0000 -
ORIGIN 0000 0001 0000 0000
下面我们看下readLock
方法的逻辑。
return
中是一个三目运算符
,第一个条件whead == wtail
表示,等待队列要么没有节点
都是null,要么头节点和尾节点相等
。
这个三目运算符的表达式由3个条件通过&
运算构成,为什么这个条件是第一个条件
呢?
因为,如果此时whead不等于wtail
,那么就表示队列中有线程在等待锁
。
要么是读线程在等待写锁
,这种情况下肯定没办法获取锁。
要么就是写线程在等待读锁
,前面我们说过读写锁的饥饿问题
,这里同样,为了避免出现这种问题,如果此时锁被读线程
占有,但是队列中已经有写线程
在等待了,此时再有读线程
过来,不会拿到锁,而是到队列中去排队
。仔细想想,如果在读多写少
的场景下,一直有读线程
过来操作,那写线程要等到猴年马月才能拿到锁。
所以,虽然whead == wtail
,不一定表示锁没有被占用,但是whead != wtail
,一定表示,无法获取锁。
这个条件虽然不是充分条件
,但是却是必要条件
,而且相比较于第二个条件还要进行位运算
操作,他只是比较一下2个对象的内存地址
是否一样,速度上
肯定更快,而且大多数情况下都可以过滤掉
,不用再判断后面的表达式了,所以才放在了第一位
。
第二个条件(s & ABITS) < RFULL
,结合前面转换的二进制
来看,要想结果小于RFULL
,那么第八位
就不能为1,也就是写锁没有被占有
,此时才会来到第三个条件,通过CAS
将读锁状态+1
。
如果成功直接返回state,失败进入acquireRead
方法,准备进行入队
操作,注意,这里的中断参数位
值是false,就是不支持中断
。
acquireRead
acquireRead
方法很长,里面分为2个大的for
循环,每个循环里又含有一堆的 if 判断。
我们先看第一个循环里的第一个大的if
判断
if ((h = whead) == (p = wtail)) {
for (long m, s, ns;;) {
if ((m = (s = state) & ABITS) < RFULL ?
U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
(m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L))
return ns;
else if (m >= WBIT) {
if (spins > 0) {
if (LockSupport.nextSecondarySeed() >= 0)
--spins;
}
else {
if (spins == 0) {
WNode nh = whead, np = wtail;
if ((nh == h && np == p) || (h = nh) != (p = np))
break;
}
spins = SPINS;
}
}
}
}
首先再进行一次头节点
和尾节点
的判断,看看此时具不具备抢锁
的条件,如果具备,进入循环准备抢锁
。
这里提一下,头尾节点相等
有4种情况
-
无锁状态下 -
锁被占有,但是没有节点等待 -
锁被占有,第一个节点初始化的时候,有短暂的时间头尾节点相等 -
锁被占有,等待队列只有一个节点,该节点持有锁
第一种情况显然是可以去获取锁
的。
第二种情况,读锁状态
下可以去获取锁。
第三种情况,如果锁被写线程
占有,无法获取。如果被读线程
占有,那么此时就是写线程在初始化队列
,也是无法获取的。
第四种情况,持有锁是读线程
情况下,可以获取。
进入循环后是一个三目运算
,对状态进行位运算
操作,判断是否小于RFULL
,小于表示没有写锁
或者读锁数量还没有溢出
,因为state低7位
只能表示0-127
,所以,当数量溢出的时候会用readerOverflow
变量进行额外的保存。然后CAS
尝试获取锁,获取成功将state加1
,也就是读锁数量加1。
这里注意下,不小于RFULL
有2种情况
-
有写锁 -
无写锁,state等于RFULL,或者等于RFULL+1
所以,当条件判断为false
时,首先判断运算结果是否小于WBIT
,也就是有没有写锁
,没有的话就是读锁数量溢出
了,调用tryIncReaderOverflow
准备加到readerOverflow
变量上
private long tryIncReaderOverflow(long s) {
// assert (s & ABITS) >= RFULL;
if ((s & ABITS) == RFULL) {
if (U.compareAndSwapLong(this, STATE, s, s | RBITS)) {
++readerOverflow;
state = s;
return s;
}
}
else if ((LockSupport.nextSecondarySeed() &
OVERFLOW_YIELD_RATE) == 0)
Thread.yield();
return 0L;
}
这个方法不多说,有2个地方需要注意下。
第一个,if 里对state
进行了CAS
操作,但是紧接着又把他还原
了,这是为什么呢?
其实也不难理解,毕竟state
是共享变量
嘛,对他的操作存在竞争
,所以,这里的CAS就是对他加个锁
。
那可能又有小伙伴要问了,为什么不把readerOverflow
设置成原子类
,比如AtomicInteger
呢?
这样不是不用CAS
了吗?
你想想,AtomicInteger
底层还是用的CAS
,与其绕了一圈最后还是使用CAS
,不如这里直接使用好了,对不对

第二个,就是这个else if
,也就是说会存在(s & ABITS) != RFULL
的情况。
这种情况可以说,几乎不存在
,但是为什么作者要这样写呢?
前面我们说过,RFULL
的二进制是0000 0000 0111 1110
,想想,为什么低7位不全部是1
呢?
我们设想下,如果低7位全部为1
会怎么样?
也就是RFULL
为0000 0000 0111 1111
。
如果某一时刻,(s = state) & ABITS
等于RFULL-1
,然后进行CAS加1
,那么,下一个线程(s = state) & ABITS
计算的结果就等于RFULL
,如果此时出现了某种不可预知的意外情况,又进行了一次CAS加1
,那么会发生什么情况?
state低位
全部进1,第八位
变成了1,此时变成写锁
状态了!!
所以作者把RFULL
设为0000 0000 0111 1110
,就是为了防止出现这种情况,如果真的,真的,真的出现这种情况了,也就是state低7位
全部为1了,走到else if
里了,那么少侠你先休息
会吧,等其他读锁释放了,把数量降下来回到正轨再说。
这种就叫防御性编程
,给自己留一线,虽说理论上不可能出现这种情况,但是程序的世界谁又知道呢。
不得不服,Doug老爷子,牛逼

回到acquireRead
方法里,如果if
不满足,走到else if
里,判断锁是否被写线程占有了,如果是,设置自旋次数,开始自旋
吧
private static final int SPINS = (NCPU > 1) ? 1 << 6 : 0;
等到自旋
完了,spins == 0
,来到这么一个判断
WNode nh = whead, np = wtail;
if ((nh == h && np == p) || (h = nh) != (p = np))
break;
咋一看,如果if
不满足,岂不是要从头到尾再自旋
一轮?
其实,细看一下,你就会发现,不管怎么样,if
都会通过。
前半部分判断,此时的头节点
和尾节点
与之前的是否都相同,其实也就是头节点
和尾节点
还等不等,如果不等,来到后半部分,将h
和p
重新赋值
,因为此时头节点和尾节点不相等,所以,后半部分条件必然成立
。
所以,与其说这里是个条件判断
,不如说就是一个变相赋值
的操作,
这里如果换成我们来写,一行代码至少要写成3,4行,甚至更多,这就是和大佬的差距啊
第一层if
看完了,下面看第二层if
,因为最后一个判断比较长,我们先看前面的几个判断
if (p == null) { // initialize queue
WNode hd = new WNode(WMODE, null);
if (U.compareAndSwapObject(this, WHEAD, null, hd))
wtail = hd;
}
else if (node == null)
node = new WNode(RMODE, p);
else if (h == p || p.mode != RMODE) {
if (node.prev != p)
node.prev = p;
else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
p.next = node;
break;
}
}
else if (!U.compareAndSwapObject(p, WCOWAIT,
node.cowait = p.cowait, node))
node.cowait = null;
第一个判断,如果头节点
和尾节点
都是空
的话,开始初始化
队列,构造一个写模式
的节点,然后把头节点和尾节点指针指向他。
设置完之后,进行下一次
的循环
,此时头节点和尾节点虽然不为空了,但是他们暂时
还是相等的,所以会再一次看看有没有抢锁的机会,但是不会再自旋
了,就看一眼,没机会就来到第二个判断
。
构造一个读模式
的节点,也就是自己,前指针
指向尾节点,再一次循环,瞧瞧有没有机会,没有来到第三个判断。
头节点等于尾节点,或者尾节点不是读模式
。
这里提前介绍下,节点排队的机制
,写节点
是直接排到队列结尾的
但是读节点
,如果尾节点
是写节点
的话,会继续排到结尾
; 如果是读节点
,那么会挂到读节点的cowait链
上

所以这里,条件头节点等于尾节点,因为不管是读模式
还是写模式
,初始化的第一个节点都是写节点
,所以此时是要挂到主链
上的,尾节点不是读模式
也是一个意思。
看完整个acquireRead
方法,你就会发现,挂到主链
上的节点会进入第二个大for循环
,挂到cowait链
上的节点只在第一个大for循环
里,因为此时满足这个if条件的节点是要进入主链
的,所以才会break
跳出第一个大for循环
。
继续来到第四个if
条件,走到这里,就说明节点是要挂到cowait链
的,CAS尝试将尾节点的cowait指针
指向自己。
这里注意一点,cowait
的插入是头插法
,就是每次新节点的插入是从头部
插入的,为什么不是尾插法
呢?
你想,如果用尾插法
,那是不是还得保存一下cowait链
的尾部节点位置
,要不然下一个节点过来,哪知道尾部在哪,对不对
而使用头插法
,就不需要知道尾部
在哪了,每次过来一个新节点
,我把自己的cowait指针
指向新节点,把老的cowait节点
挂到新节点的cowait
上,这样就可以省去保存尾部指针的内存消耗
了,反正,读模式
的节点唤醒的时候都是cowait链
一起唤醒的,谁先谁后无所谓
所以,你说JDK的源码为什么那么省内存
,就是这样的一个个小细节堆起来的。
下面来到最后一个else
分支,里面又是一个for
循环
for (;;) {
WNode pp, c; Thread w;
if ((h = whead) != null && (c = h.cowait) != null &&
U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null) // help release
U.unpark(w);
if (h == (pp = p.prev) || h == p || pp == null) {
long m, s, ns;
do {
if ((m = (s = state) & ABITS) < RFULL ?
U.compareAndSwapLong(this, STATE, s,
ns = s + RUNIT) :
(m < WBIT &&
(ns = tryIncReaderOverflow(s)) != 0L))
return ns;
} while (m < WBIT);
}
if (whead == h && p.prev == pp) {
long time;
if (pp == null || h == p || p.status > 0) {
node = null; // throw away
break;
}
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
return cancelWaiter(node, p, false);
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
if ((h != pp || (state & ABITS) == WBIT) &&
whead == h && p.prev == pp)
U.park(false, time);
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted())
return cancelWaiter(node, p, true);
}
}
一点点看,第一个if
判断,头节点
不为空并且cowait链
也不为空,那说明这个主节点是读模式
的节点,并且此时正在被唤醒
。
为什么这么说呢?
读模式
好理解,因为只有读模式的节点才有cowait链
。
前面我们讲过,不管是读线程
初始化队列还是写线程
初始化,第一个节点
都是写模式的虚节点
,读节点要想来到头部
,一定是前面拿到锁的线程释放锁
了,并且把他唤醒
了,他拿到锁之后把头部指针
指向了自己。
所以,前面2个条件
可以确定,此时,整个读模式节点的cowait链
正在被唤醒
。后面2个条件就是把头节点的cowait指针
指向下一个节点,然后唤醒
当前的cowait节点,其实从注释也能看出来,此处就是帮助唤醒
。
因为你想,在读多写少
的场景下,一个主节点的cowait链
上挂很多的读节点是很正常的,所以这里就是帮助唤醒
,提高一下效率。
第二个if
判断,尾节点的前一个节点
是头节点,或者头节点等于
尾节点,或者尾节点的前一个节点
为空。
前2个条件都好理解,最后一个条件是什么鬼?
尾节点的前一个节点
不等于头节点,尾节点又不等于
头节点,这种情况下,尾节点的前一个节点
等于空??
要知道,不管是中断取消
节点还是线程释放锁唤醒
后面的节点,他们都是不会把自己的节点设置为null
的。
而且,新的尾节点
插入的时候都是先设置自己的前置节点
再把尾部指针
指向自己的,不可能出现前置节点为null
的情况,那这里的pp == null
又是什么鬼?

阿轩思来想去,只想到一种可能
,就是当前2个条件
判断的时候,队列中存在2个以上
的节点,所以头节点不等于尾节点,尾节点的前置节点也不是头节点,但是,当执行到第三个条件
的时候,前面的线程全部执行完了,释放锁
了,此时自己来到了第一个节点
,而之前的前置节点因为路径不可达,此时又恰巧发生了GC
被回收了,导致pp == null
,
那有小伙伴可能要问了,如此短的时间内发生这么多事,不可能吧?
按常理来说,这种情况确实极难发生
,但也并非完全不可能。
设想下,如果CPU
执行了前面2个
条件后,当前线程分配的时间片
用完了,CPU
切出去了,此时前面的线程拿到锁几乎没做什么事立即又释放
了锁,如果此时队列中只有3个节点,那么很快,自己就会成为第一个
节点,而因为堆大小
设置的很小,youngGC
频繁发生,此时凑巧发生了GC
,那么前置节点
就被回收
了,最终pp == null
了。
这3个条件任意一个满足就说明,此时有机会获取锁
,如果此时确实没有写锁
,那么就会开始循环获取锁
了,while
这段代码和前面的几乎没什么区别,就不再赘述了。
来到最后一个if判断,whead == h && p.prev == pp
,这个条件成立说明什么?
说明头节点还是那个头节点,尾节点的前置节点还是那个节点,你大爷还是你大爷,但是你却不是之前的那个你了
进来之后进行这么一个判断pp == null || h == p || p.status > 0
,有没有一点眼熟的感觉。
是的,就是上面说的那个if
里的后2个条件
加上一个尾节点取消
,如果满足,就把自己设置为null
,也就是把当前节点丢弃
,跳出循环
,重新去创建新节点
,最后一个条件可以理解,但是前2个条件又是为什么呢?
有的小伙伴可能要现学现卖了,阿轩,阿轩,如果出现了你之前说的那种情况,有可能pp == null
吗?
可以告诉你,如果是之前那种情况,都执行不到这里,滑稽—
why?
之前那种情况出现的话,头节点肯定是已经换过好几轮
了,if
里的条件whead == h
肯定是不成立
的。
所以,真出现pp == null || h == p || p.status > 0
这种情况的话,一定是发生了什么意想不到的意外
,我上面刚执行过pp == null || h == p
不成立,咋到你这就成立了呢?
所以,果断丢弃
掉,换个新的。
继续往下执行,就是判断有没有超时,没有超时
就把当前线程设置到节点上,然后挂起等待被唤醒
了,没什么好说的。
下面来看第二个大for
循环,比较长,先看第一层 if 判断
if ((h = whead) == p) {
if (spins < 0)
spins = HEAD_SPINS;
else if (spins < MAX_HEAD_SPINS)
spins <<= 1;
for (int k = spins;;) { // spin at head
long m, s, ns;
if ((m = (s = state) & ABITS) < RFULL ?
U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
(m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {
WNode c; Thread w;
whead = node;
node.prev = null;
while ((c = node.cowait) != null) {
if (U.compareAndSwapObject(node, WCOWAIT,
c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
return ns;
}
else if (m >= WBIT &&
LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
break;
}
}
else if (h != null) {
WNode c; Thread w;
while ((c = h.cowait) != null) {
if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
}
首先判断头节点
是否等于自己的前一个节点
,也就是自己初始化
队列之后,有没有节点进来。
没有的话,就再自旋
一下看看能不能拿到锁,如果拿到了,把头节点指向自己,唤醒cowait链
上的所有节点。
有的话,判断头节点
是否为空,不为空并且cowait
也不为空的话就循环把cowait链
上节点全部唤醒
。这又是什么场景呢?
当新的读线程进入队列排队
的时候或者自己被中断唤醒
了,发现此时头节点
正是读模式
节点并且正在唤醒整个cowait链
上的节点,那就帮一把手,毕竟助人为乐嘛。
第一层 if 判断就结束了,来到第二层 if。
if (whead == h) {
if ((np = node.prev) != p) {
if (np != null)
(p = np).next = node; // stale
}
else if ((ps = p.status) == 0)
U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
else if (ps == CANCELLED) {
if ((pp = p.prev) != null) {
node.prev = pp;
pp.next = node;
}
}
else {
long time;
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
return cancelWaiter(node, node, false);
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
if (p.status < 0 &&
(p != h || (state & ABITS) == WBIT) &&
whead == h && node.prev == p)
U.park(false, time);
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted())
return cancelWaiter(node, node, true);
}
}
如果头节点
没变,先判断自己的前置节点
有没有变化,如果变化了,重新设置自己的前置节点
。
接着把前置节点
的状态设为-1
,和我们之前说过的AQS
是不是很像。
如果取消了,把节点前后指针调整一下。
最后的else
分支和刚刚说过的几乎一样,就不说了。
简单总结一下读锁的获取:
-
state变量的低7位用来表示读锁的个数,每加一个读锁,state加1,溢出则用readerOverflow变量表示。
-
主链上的节点进入第二个大for循环运行,cowait上的节点在第一个大for循环运行。
-
新节点排队的时候,如果尾节点是写节点,那么挂在主链上,如果是读节点,就挂在读节点的cowait链上。
写锁
看完了读锁再来看写锁
就很轻松了,写锁的逻辑和读锁非常相似
,同样的,有4个方法

他们之间的区别和读锁一样,就不介绍了。我们主要看writeLock
方法
public long writeLock() {
long s, next; // bypass acquireWrite in fully unlocked case only
return ((((s = state) & ABITS) == 0L &&
U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
next : acquireWrite(false, 0L));
}
首先通过位运算
判断是否有写锁,没有的话就CAS
把state
加上128
,就是表示锁状态的第8位bit位
。失败进入acquireWrite
方法。
acquireWrite
方法和acquireRead
方法非常相似,同样的分为了2个
大的for
循环,但是作用不一样。先看第一个大for循环。
private long acquireWrite(boolean interruptible, long deadline) {
WNode node = null, p;
for (int spins = -1;;) { // spin while enqueuing
long m, s, ns;
if ((m = (s = state) & ABITS) == 0L) {
if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))
return ns;
}
else if (spins < 0)
spins = (m == WBIT && wtail == whead) ? SPINS : 0;
else if (spins > 0) {
if (LockSupport.nextSecondarySeed() >= 0)
--spins;
}
else if ((p = wtail) == null) { // initialize queue
WNode hd = new WNode(WMODE, null);
if (U.compareAndSwapObject(this, WHEAD, null, hd))
wtail = hd;
}
else if (node == null)
node = new WNode(WMODE, p);
else if (node.prev != p)
node.prev = p;
else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
p.next = node;
break;
}
}
进来同样的,首先看看有没有机会获取锁
,没有的话判断此时是写锁
还是读锁
。
如果是写锁,设置一个自旋次数64
,开始自旋
。
如果是读锁,直接设置为0
,开始初始化队列
。
按照常理来说,写锁
相对读锁
进行的很慢,应该直接进入队列等待
才对。而读锁
进行的很快,应该自旋
才对。但是为什么这里是反着来的呢?
其实结合原理,仔细分析一下,就会发现,如果此时是读锁
,写线程
开始自旋的话,那么后面过来的读线程,因为此时队列
还没有初始化
,头节点等于尾节点等于空
,满足whead == wtail && (s & ABITS) < RFULL
这个条件,就会直接去获取锁了,那写线程
就会一直获取不到锁
,所以,想要尽快获取到锁,唯一的方法
就是赶紧初始化队列
,让后面过来的读线程
乖乖的排队
,而不是傻乎乎的在那自旋
。
后面 if 条件就是初始化队列
了,然后把尾节点指针
指向自己就跳出第一层for循环
了。
来到第二层for循环
,里面又分为2层 if 判断,先看第一层 if 。
WNode h, np, pp; int ps;
if ((h = whead) == p) {
if (spins < 0)
spins = HEAD_SPINS;
else if (spins < MAX_HEAD_SPINS)
spins <<= 1;
for (int k = spins;;) { // spin at head
long s, ns;
if (((s = state) & ABITS) == 0L) {
if (U.compareAndSwapLong(this, STATE, s,
ns = s + WBIT)) {
whead = node;
node.prev = null;
return ns;
}
}
else if (LockSupport.nextSecondarySeed() >= 0 &&
--k <= 0)
break;
}
}
else if (h != null) { // help release stale waiters
WNode c; Thread w;
while ((c = h.cowait) != null) {
if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
}
同样的,先判断一下,从初始化队列
之后到现在有没有进入新的节点。
没有的话设置自旋次数1024
次,开始自旋
尝试获取锁。
有的话判断头节点是否是读模式节点
,帮助唤醒cowait链
。这里和之前读锁源码一样,当写线程
进入队列时或者被中断唤醒
时,发现此时头节点是读模式节点且正在唤醒cowait链
,才会触发这个 if 条件。
接着来到第二层 if 判断
if (whead == h) {
if ((np = node.prev) != p) {
if (np != null)
(p = np).next = node; // stale
}
else if ((ps = p.status) == 0)
U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
else if (ps == CANCELLED) {
if ((pp = p.prev) != null) {
node.prev = pp;
pp.next = node;
}
}
else {
long time; // 0 argument to park means no timeout
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
return cancelWaiter(node, node, false);
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
if (p.status < 0 && (p != h || (state & ABITS) != 0L) &&
whead == h && node.prev == p)
U.park(false, time); // emulate LockSupport.park
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted())
return cancelWaiter(node, node, true);
}
}
先判断一下自己的前置节点
有没有发生变化,如果变化了重新设置下。把尾节点的状态设为-1
。如果前置节点被取消了,把前后节点的指向重新调整一下。
注意了,这3个条件
满足任意一个,都会结束第二层 if 判断
,重新进入循环
,再循环2048次
,然后再次来到第二层 if 判断。
进入最后一个 else 分支,和读锁获取一样,判断是否超时
,然后设置节点线程,将自己挂起
。
但是注意了,这个地方是存在一个bug
的,不知道小伙伴们有没有发现?
如果获取锁的线程被中断
了,他会被从挂起状态唤醒
,执行到下面的 if 条件interruptible && Thread.interrupted()
,但是由于writeLock
方法和readLock
方法的interruptible参数
传的都是false
,所以第一个条件就不通过
,然后重新进入循环开始自旋获取锁
,如果此时并不具备获取锁的条件,那么会再一次执行到U.park(false, time)
挂起方法这里,我们之前在说AQS源码
的时候,带着小伙伴看了park和unpark
在虚拟机
层面的源码实现,忘记的小伙伴可以去回顾一下,AQS那些不为人知的细节,所以,因为这里的中断状态
并没有被清除
,所以park
方法并不会把线程挂起,线程会继续往下执行,又一次进入循环进行自旋
,结果就是CPU
会在很短的时间内迅速飙高
。
修复
的方法也很简单,就是将判断的条件if (interruptible && Thread.interrupted())
改下顺序,变成if (Thread.interrupted() && interruptible)
,这样第一个条件就会把线程的中断状态清除
掉,下次自旋之后就可以挂起
了。
这个bug在JDK9
已经修复了,感兴趣的小伙伴可以参阅,Lack of save / restore interrupt mechanism undermines the StampedLock
乐观读锁
看下乐观读锁
public long tryOptimisticRead() {
long s;
return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}
代码非常简单,和WBIT
进行与运算
判断结果是否等于0,回顾一下WBIT
的二进制表示
-
WBIT 0000 0000 1000 000
所以,等于0
就表示没有写锁,然后再和SBITS
进行与运算
,回顾一下SBITS
二进制表示
-
SBITS 1111 1111 10000 0000
也就是将高位写状态
全部保存下来。
思考一下,既然StampedLock
锁不支持可重入
,为什么不直接对写锁进行加1减1
这样的操作呢?
原因就在于,乐观锁都有一个普遍的问题,就是ABA问题
。
当线程尝试获取乐观锁
时,发现没有写锁,然后开始将数据复制
一份到线程本地栈
,复制完之后判断一下此时锁有没有被写线程
占有,如果没有,就开始对读数据进行操作了。
这里就会存在一个问题,就是一开始
是没有写锁
的,但是在线程复制数据
的这段时间内,写线程
过来拿到了锁,然后操作完之后又释放
了锁,然后线程再来检查的时候发现是没有写锁
的,但是此时数据实际上已经被修改
了,那么就会导致结果不准。
所以,这里通过将高位写状态
保存下来,如果中间过程有写线程获取过锁,那么高位的二进制
就会发生变化
,说白了就是添加一个版本号
,原子类中的AtomicReference
类也是基于版本号的原理避免ABA问题
的。
释放锁
写锁释放
public void unlockWrite(long stamp) {
WNode h;
if (state != stamp || (stamp & WBIT) == 0L)
throw new IllegalMonitorStateException();
state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
if ((h = whead) != null && h.status != 0)
release(h);
}
读锁释放
public void unlockRead(long stamp) {
long s, m; WNode h;
for (;;) {
if (((s = state) & SBITS) != (stamp & SBITS) ||
(stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)
throw new IllegalMonitorStateException();
if (m < RFULL) {
if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
if (m == RUNIT && (h = whead) != null && h.status != 0)
release(h);
break;
}
}
else if (tryDecReaderOverflow(s) != 0L)
break;
}
}
首先判断一下是否发生了异常情况,然后对state
进行操作,写锁会把state继续加128
,如果达到了long
型最大值就初始化为256
,。读锁就是简单的把state减1
.接着调用release
开始唤醒
节点。
private void release(WNode h) {
if (h != null) {
WNode q; Thread w;
U.compareAndSwapInt(h, WSTATUS, WAITING, 0);
if ((q = h.next) == null || q.status == CANCELLED) {
for (WNode t = wtail; t != null && t != h; t = t.prev)
if (t.status <= 0)
q = t;
}
if (q != null && (w = q.thread) != null)
U.unpark(w);
}
}
首先将头节点
的状态设为0
,然后判断第二个节点是否为空或者是否被取消了,满足任意一个条件,就从尾节点
向前遍历,找出第一个
节点状态小于等于0
的节点。
为什么从后往前
遍历呢?
之前我们在看AQS源码
的时候也是这样,主要是因为新节点入队
的时候都是先设置的前置节点
,再设置的后置节点
。
最后如果第二个节点存在,调用unpark
唤醒。
还有一个通用释放
方法
public void unlock(long stamp) {
long a = stamp & ABITS, m, s; WNode h;
while (((s = state) & SBITS) == (stamp & SBITS)) {
if ((m = s & ABITS) == 0L)
break;
else if (m == WBIT) {
if (a != m)
break;
state = (s += WBIT) == 0L ? ORIGIN : s;
if ((h = whead) != null && h.status != 0)
release(h);
return;
}
else if (a == 0L || a >= WBIT)
break;
else if (m < RFULL) {
if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
if (m == RUNIT && (h = whead) != null && h.status != 0)
release(h);
return;
}
}
else if (tryDecReaderOverflow(s) != 0L)
return;
}
throw new IllegalMonitorStateException();
}
将之前获取锁拿到的state
状态戳stamp
和当前状态
分别和SBITS
做与运算
,看看是否相等,回忆一下SBITS
的二进制表示。
-
SBITS 1111 1111 1000 0000
第七位
之后全部为1,为什么?
如果是获取的写锁
,那么低7位
肯定都是0,只要保存高位
的状态即可。
如果是获取的读锁
,因为读锁是多个线程共享
的,低7位
的二进制表示会一直发生变化
,而读锁的释放又不需要判断低7位
有没有发生变化,只需要判断写锁状态
有没有发生变化即可。
所以,如果通过while
条件,只能说明state
的高位写锁状态
没有发生变化,即从第七位
开始往后的二进制位没有发生变化。
进入循环首先判断和ABITS
的与运算
结果是否为0
-
ABITS 0000 0000 1111 1111
因为读锁是低7位
,写锁是第八位
,所以,只需要看低8位
就可以知道当前状态有没有写锁
或者读锁
,为0
表示没有写锁也没有读锁,什么锁都没有,肯定无法释放锁
,跳出循环抛异常。
如果存在锁再判断是否等于WBIT
-
WBIT 0000 0000 1000 0000
m == WBIT
表示当前是写锁
状态,写锁状态的话那么当初获取锁拿到的state状态戳
必须和现在相等才能释放锁,不相等表示之前的锁和现在的锁不是同一个锁
,不能释放,跳出循环抛异常。如果相等就进行之前所说的释放
写锁逻辑。
如果前2个 if 都不满足,那么说明当前是读锁状态
。
a == 0L
表示之前获取的是乐观读
,乐观读是不用释放锁的,跳出循环抛异常。
a >= WBIT
表示之前获取的是写锁
,而现在却是读锁
状态,也是有问题的,抛异常。
如果前3个 if 都不满足,那么就说明之前获取的锁和现在的锁是一致的,都是读锁
,下面就看锁的数量有没有溢出
了。
最后我们来看下取消获取锁
的逻辑
cancelWaiter
取消方法也是分为了2大块
,一个大的if
判断,一个while
循环,先看 if 判断的上半部分
Thread w;
node.status = CANCELLED;
// unsplice cancelled nodes from group
for (WNode p = group, q; (q = p.cowait) != null;) {
if (q.status == CANCELLED) {
U.compareAndSwapObject(p, WCOWAIT, q, q.cowait);
p = group; // restart
}
else
p = q;
}
首先将自身状态设为取消1
,如果是读模式
节点,顺便看下cowait链
上有没有取消的节点,有的话就把取消的节点
删除掉。
进入下面一个 if 判断
if (group == node) {
for (WNode r = group.cowait; r != null; r = r.cowait) {
if ((w = r.thread) != null)
U.unpark(w); // wake up uncancelled co-waiters
}
for (WNode pred = node.prev; pred != null; ) { // unsplice
WNode succ, pp; // find valid successor
while ((succ = node.next) == null ||
succ.status == CANCELLED) {
WNode q = null; // find successor the slow way
for (WNode t = wtail; t != null && t != node; t = t.prev)
if (t.status != CANCELLED)
q = t; // dont link if succ cancelled
if (succ == q || // ensure accurate successor
U.compareAndSwapObject(node, WNEXT,
succ, succ = q)) {
if (succ == null && node == wtail)
U.compareAndSwapObject(this, WTAIL, node, pred);
break;
}
}
if (pred.next == node) // unsplice pred link
U.compareAndSwapObject(pred, WNEXT, node, succ);
if (succ != null && (w = succ.thread) != null) {
succ.thread = null;
U.unpark(w); // wake up succ to observe new pred
}
if (pred.status != CANCELLED || (pp = pred.prev) == null)
break;
node.prev = pp; // repeat if new pred wrong/cancelled
U.compareAndSwapObject(pp, WNEXT, pred, succ);
pred = pp;
}
}
第一个for
循环,如果取消的节点是读模式
节点并且是挂在主链
上的,那么把cowait链
上的节点全部唤醒
。正所谓,树倒猢狲散,老大都被干掉了,做小弟的也只能另谋出路了。
第二个for
循环主要做的就是将前后节点
的指针调整一下。先进入一个while
循环,如果当前节点的后置节点是空
或者被取消
了,那么就从尾节点从后往前
寻找最接近node节点
的非取消状态
的节点,这里会出现几种情况
-
succ==null,q没找到也等于null,也就是node就是尾节点 -
succ==null,但是q找到了,这种情况是因为新节点入队的时候,是先设置新节点的前置指针,在设置尾节点指针,最后才设置前置节点的后置指针,所以是会出现node的后置节点为空,但是他又不是尾节点的情况的 -
succ取消了,q没找到,也就是node是尾节点,或者node后面的节点全部都取消了,q==null -
succ取消了,q找到了,存在未取消的节点
下面来看下基于这4种情况的判断
if (succ == q || // ensure accurate successor
U.compareAndSwapObject(node, WNEXT,
succ, succ = q)) {
if (succ == null && node == wtail)
U.compareAndSwapObject(this, WTAIL, node, pred);
break;
}
succ == q
,这个条件只有1
才能满足,进入之后succ == null && node == wtail
也满足,将尾节点指针指向前一个节点,跳出循环。
剩余的3种情况会执行到U.compareAndSwapObject(node, WNEXT, succ, succ = q)
条件,不管q为空或者非空,都将node
的后置指针
指向他。
这里2
这种情况比较特殊一点,稍微回顾一下前面讲新节点入队
的源码
else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
p.next = node;
break;
}
如果p.next = node
在CAS
前面执行,那么执行CAS
的时候因为WNEXT
位置已经不是null
了,那么CAS
会执行失败
,重新进入while
条件(succ = node.next) == null || succ.status == CANCELLED
,但是此时succ
已经不等于null
了,所以还是会跳出循环,殊途同归
。
如果是CAS
先执行,那么会通过判断进入 if 体内。
但是不管2
能不能进入 if 体内,这3种情况都不满足succ == null && node == wtail
条件,2
是因为node不是尾节点
,3和4
是因为succ
非空,所以跳出循环。
接着将pred节点
的后置指针指向succ
,如果succ
不为空并且绑定了线程就将他唤醒
,因为执行到这里,只是设置了前置节点
的后置指针
,succ
的前置指针
一直没有设置,所以需要将他唤醒让他去设置一下。
最后判断一下pred
节点是否被取消,或者pred
的前置节点
是否为空,如果没取消,或者前置节点为空也就是pred
就是头节点
,那么跳出循环。
否则,就是pred
节点取消了,并且pred前置节点
不为空,那么将前后节点
的指针重新调整一下,再次进入循环。
第一个 if 块执行完后来到最后的while
循环
WNode h;
while ((h = whead) != null) {
long s; WNode q;
if ((q = h.next) == null || q.status == CANCELLED) {
for (WNode t = wtail; t != null && t != h; t = t.prev)
if (t.status <= 0)
q = t;
}
if (h == whead) {
if (q != null && h.status == 0 &&
((s = state) & ABITS) != WBIT &&
(s == 0L || q.mode == RMODE))
release(h);
break;
}
}
首先判断头节点
的下一个节点是否为空或者被取消了,满足任一条件就从尾节点
向前遍历,寻找最接近头节点
的非取消状态
的节点。
第二个 if 判断就非常有意思了。
先看前2个条件,q != null && h.status == 0
,头节点之后存在非取消状态的节点,并且头节点的status
为0,这2个条件什么情况下会满足呢?
有2个场景
可以满足。
-
队列初始化的时候,不管是写节点还是读节点都会执行这一句U.compareAndSwapInt(p, WSTATUS, 0, WAITING),在这句代码执行之前,是可以满足前2个条件的。
-
线程释放锁的时候,会执行release方法中的U.compareAndSwapInt(h, WSTATUS, WAITING, 0)代码,此时也是可以满足前2个条件的。
来到第三个条件((s = state) & ABITS) != WBIT
,表示此时锁状态不是写锁,那么就是读锁
或者没有锁
。
结合1情况,就是读锁
。
结合2情况,就是没有锁
。
最后一个条件,s == 0L || q.mode == RMODE,s == 0L
我们先暂且不看,只看q.mode == RMODE
这个条件。
那么1这种情况,q
是写节点
就不满足了。
2这种情况,因为获取锁的线程已经在调用release
方法了,这里就算通过了条件判断,也是调用release
方法,重复调用了,没有什么意义。
所以,1和2这2种情况不管满足不满足 if 判断,都没有什么实际的意义,那作者写这段代码又是为了什么呢?
阿轩觉得,作者写这段代码可能是考虑这么一种场景

q节点
此时排在第三位
,处于挂起
状态,此时的锁是读锁
,在某一时刻,有一个节点中断取消
了,执行到了while
方法这里,在同时,1和2节点也发生了中断取消
,刚执行完node.status = CANCELLED
,此时,q
节点已经具备获取锁的条件了,那么就会提前把他唤醒
。
其实,就算这里不执行release
方法唤醒,在我们刚刚说过的上面 if 方法里,1节点取消时也会唤醒q
if (succ != null && (w = succ.thread) != null) {
succ.thread = null;
U.unpark(w); // wake up succ to observe new pred
}
这里只是提前唤醒
了,提高一下效率而已。
但是这种场景,因为h.status == 0
这个条件的存在,实际上是无法出现的,所以,阿轩觉得,这个条件是不是删掉
会比较好一点,为此,阿轩特地在stackoverflow上提了一个问题,Confusion about the cancelWaiter method in the StampedLock class,想听听大佬们的意见,可能StampedLock
这个类平时使用的很少吧,目前还没有大佬给出意见。
在提一下刚刚说的另一个问题,s == 0L
这个条件,也就是锁状态state
为0。
StampedLock
类只有一个构造方法
public StampedLock() {
state = ORIGIN;
}
在构造方法里将state
设为了ORIGIN
private static final long ORIGIN = WBIT << 1;
也就是256
,在写锁不断的获取和释放中,每次都会加128
,在state
超过long
型最大值溢出时,会被重新初始化为ORIGIN
state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
也就是state
永远无法等于0,那这里的s == 0L
条件又是干嘛的呢?
阿轩思考了许久,也没想出什么场景下会出现s == 0L
,说来惭愧,好像每篇文章阿轩都会留一个问题,暂且叫做阿轩未解之谜
吧,,等到日后哪天阿轩水平提高了,搞明白了,到时在另开文章说明吧。
总结
关于StampedLock类
,阿轩觉得最精华
的地方在于只用一个long
型变量就把读写锁的状态完美的体现了出来。
低7位
表示读锁的数量,溢出使用额外的变量保存。
第八位
表示写锁标识,其余位用来表示写锁的版本号
,规避ABA问题
。每次释放锁时,state加128
,即将第八位
变成了0,表示无写锁状态,又会使高位
进1,更新版本号
,着实精妙啊。
好了,到此StampedLock
类的源码剖析就全部结束了,非常感谢能够耐心看到这里的小伙伴,如果你读StampedLock也有着自己的一些独特见解,欢迎添加阿轩的个人微信号,一起讨论讨论。
原文始发于微信公众号(程序员阿轩):StampedLock那些不为人知的细节
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/37000.html