StampedLock那些不为人知的细节

大家好,我是阿轩。

今天我们来剖析一下StampedLock的源码。

说到StampedLock,可能有的小伙伴会比较陌生,他是在JDK8新增的一种,可以说是ReentrantReadWriteLock的加强版。

主要是用来解决ReentrantReadWriteLock写线程饥饿问题,在读多写少的场景下,如果一直有线程来进行读操作的话,那么写线程会一直阻塞在那造成饥饿问题,在StampedLock中引入了一种乐观读机制,即,读的时候不加锁,而是乐观的认为在这期间不会有写操作,如果真的有再申请读锁,这样,写线程过来的时候就可以直接拿到锁不用再等待了。

今天我们主要来剖析一下StampedLock中核心的3种操作,悲观读乐观读的源码。版本基于JDK8。

话不多说,我们开始。

前言

鉴于有的小伙伴可能对StampedLock不太了解,在正式开始源码剖析之前,我们简单说一下StampedLock的机制。

StampedLock没有继承AQS,但是他的实现和AQS差不多,当多个线程产生锁竞争的时候,也是通过队列来让线程进行排队等待的。

锁状态是通过一个long型的变量state来进行保存的。

private transient volatile long state;

低7位存储读锁的数量第八位表示写锁的标识,其他位表示写锁的版本

由于64位太长了,我们暂且以16位简单表示一下。

  • 0100 0000 1000 0000 锁被写线程占有,写锁版本0100 0000
  • 0010 0000 0000 1000 锁被读线程占有,读线程数量是8个

详细的细节我们在看源码的时候在细说。

悲观读

悲观读锁的获取JDK提供了4个方法

StampedLock那些不为人知的细节

常用的是readLock方法,先尝试获取锁,获取不到加入队列等待。

tryReadLock方法就是尝试获取,抢到了就抢到了,没抢到就直接返回。

tryReadLock(long time, TimeUnit unit),readLockInterruptibly 2种方法和readLock方法很相似,主要区别在于他们两支持中断readLock不支持中断

我们主要看readLock方法

public long readLock() {
    long s = state, next;  // bypass acquireRead on common uncontended case
    return ((whead == wtail && (s & ABITS) < RFULL &&
             U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
            next : acquireRead(false, 0L));
}

StampedLock的读写锁状态变化涉及到一些位运算,为了方便后面的理解,我们把类中的一些状态常量先介绍下,同样,为了方便起见,就用16位简略表示下

private static final int LG_READERS = 7;

private static final long RUNIT = 1L;
private static final long WBIT  = 1L << LG_READERS;
private static final long RBITS = WBIT - 1L;
private static final long RFULL = RBITS - 1L;
private static final long ABITS = RBITS | WBIT;
private static final long SBITS = ~RBITS;

private static final long ORIGIN = WBIT << 1;

用16位表示就是

  • WBIT   0000 0000 1000 0000
  • RBITS  0000 0000 0111 1111
  • RFULL  0000 0000 0111 1110
  • ABITS  0000 0000 1111 1111
  • SBITS  1111 1111 1000 0000
  • ORIGIN 0000 0001 0000 0000

下面我们看下readLock方法的逻辑。

return中是一个三目运算符,第一个条件whead == wtail表示,等待队列要么没有节点都是null,要么头节点和尾节点相等

这个三目运算符的表达式由3个条件通过&运算构成,为什么这个条件是第一个条件呢?

因为,如果此时whead不等于wtail,那么就表示队列中有线程在等待锁

要么是读线程在等待写锁,这种情况下肯定没办法获取锁。

要么就是写线程在等待读锁,前面我们说过读写锁的饥饿问题,这里同样,为了避免出现这种问题,如果此时锁被读线程占有,但是队列中已经有写线程在等待了,此时再有读线程过来,不会拿到锁,而是到队列中去排队。仔细想想,如果在读多写少的场景下,一直有读线程过来操作,那写线程要等到猴年马月才能拿到锁。

所以,虽然whead == wtail,不一定表示锁没有被占用,但是whead != wtail,一定表示,无法获取锁。

这个条件虽然不是充分条件,但是却是必要条件,而且相比较于第二个条件还要进行位运算操作,他只是比较一下2个对象的内存地址是否一样,速度上肯定更快,而且大多数情况下都可以过滤掉,不用再判断后面的表达式了,所以才放在了第一位

第二个条件(s & ABITS) < RFULL,结合前面转换的二进制来看,要想结果小于RFULL,那么第八位就不能为1,也就是写锁没有被占有,此时才会来到第三个条件,通过CAS将读锁状态+1

如果成功直接返回state,失败进入acquireRead方法,准备进行入队操作,注意,这里的中断参数位值是false,就是不支持中断

acquireRead

acquireRead方法很长,里面分为2个大的for循环,每个循环里又含有一堆的 if 判断。

我们先看第一个循环里的第一个大的if判断

if ((h = whead) == (p = wtail)) {
    for (long m, s, ns;;) {
        if ((m = (s = state) & ABITS) < RFULL ?
            U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
            (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L))
            return ns;
        else if (m >= WBIT) {
            if (spins > 0) {
                if (LockSupport.nextSecondarySeed() >= 0)
                    --spins;
            }
            else {
                if (spins == 0) {
                    WNode nh = whead, np = wtail;
                    if ((nh == h && np == p) || (h = nh) != (p = np))
                        break;
                }
                spins = SPINS;
            }
        }
    }
}

首先再进行一次头节点尾节点的判断,看看此时具不具备抢锁的条件,如果具备,进入循环准备抢锁

这里提一下,头尾节点相等有4种情况

  • 无锁状态下
  • 锁被占有,但是没有节点等待
  • 锁被占有,第一个节点初始化的时候,有短暂的时间头尾节点相等
  • 锁被占有,等待队列只有一个节点,该节点持有锁

第一种情况显然是可以去获取锁的。

第二种情况,读锁状态下可以去获取锁。

第三种情况,如果锁被写线程占有,无法获取。如果被读线程占有,那么此时就是写线程在初始化队列,也是无法获取的。

第四种情况,持有锁是读线程情况下,可以获取。

进入循环后是一个三目运算,对状态进行位运算操作,判断是否小于RFULL,小于表示没有写锁或者读锁数量还没有溢出,因为state低7位只能表示0-127,所以,当数量溢出的时候会用readerOverflow变量进行额外的保存。然后CAS尝试获取锁,获取成功将state加1,也就是读锁数量加1。

这里注意下,不小于RFULL有2种情况

  • 有写锁
  • 无写锁,state等于RFULL,或者等于RFULL+1

所以,当条件判断为false时,首先判断运算结果是否小于WBIT,也就是有没有写锁,没有的话就是读锁数量溢出了,调用tryIncReaderOverflow准备加到readerOverflow变量上

private long tryIncReaderOverflow(long s) {
    // assert (s & ABITS) >= RFULL;
    if ((s & ABITS) == RFULL) {
        if (U.compareAndSwapLong(this, STATE, s, s | RBITS)) {
            ++readerOverflow;
            state = s;
            return s;
        }
    }
    else if ((LockSupport.nextSecondarySeed() &
              OVERFLOW_YIELD_RATE) == 0)
        Thread.yield();
    return 0L;
}

这个方法不多说,有2个地方需要注意下。

第一个,if 里对state进行了CAS操作,但是紧接着又把他还原了,这是为什么呢?

其实也不难理解,毕竟state共享变量嘛,对他的操作存在竞争,所以,这里的CAS就是对他加个

那可能又有小伙伴要问了,为什么不把readerOverflow设置成原子类,比如AtomicInteger呢?

这样不是不用CAS了吗?

你想想,AtomicInteger底层还是用的CAS,与其绕了一圈最后还是使用CAS,不如这里直接使用好了,对不对

StampedLock那些不为人知的细节

第二个,就是这个else if,也就是说会存在(s & ABITS) != RFULL的情况。

这种情况可以说,几乎不存在,但是为什么作者要这样写呢?

前面我们说过,RFULL的二进制是0000 0000 0111 1110,想想,为什么低7位不全部是1呢?

我们设想下,如果低7位全部为1会怎么样?

也就是RFULL0000 0000 0111 1111

如果某一时刻,(s = state) & ABITS等于RFULL-1,然后进行CAS加1,那么,下一个线程(s = state) & ABITS计算的结果就等于RFULL,如果此时出现了某种不可预知的意外情况,又进行了一次CAS加1,那么会发生什么情况?

state低位全部进1,第八位变成了1,此时变成写锁状态了!!

所以作者把RFULL设为0000 0000 0111 1110,就是为了防止出现这种情况,如果真的,真的,真的出现这种情况了,也就是state低7位全部为1了,走到else if里了,那么少侠你先休息会吧,等其他读锁释放了,把数量降下来回到正轨再说。

这种就叫防御性编程,给自己留一线,虽说理论上不可能出现这种情况,但是程序的世界谁又知道呢。

不得不服,Doug老爷子,牛逼

StampedLock那些不为人知的细节

回到acquireRead方法里,如果if不满足,走到else if里,判断锁是否被写线程占有了,如果是,设置自旋次数,开始自旋

private static final int SPINS = (NCPU > 1) ? 1 << 6 : 0;

等到自旋完了,spins == 0,来到这么一个判断

WNode nh = whead, np = wtail;
if ((nh == h && np == p) || (h = nh) != (p = np))
    break;

咋一看,如果if不满足,岂不是要从头到尾再自旋一轮?

其实,细看一下,你就会发现,不管怎么样,if都会通过。

前半部分判断,此时的头节点尾节点与之前的是否都相同,其实也就是头节点尾节点还等不等,如果不等,来到后半部分,将hp重新赋值,因为此时头节点和尾节点不相等,所以,后半部分条件必然成立

所以,与其说这里是个条件判断,不如说就是一个变相赋值的操作,

这里如果换成我们来写,一行代码至少要写成3,4行,甚至更多,这就是和大佬的差距啊StampedLock那些不为人知的细节StampedLock那些不为人知的细节

第一层if看完了,下面看第二层if,因为最后一个判断比较长,我们先看前面的几个判断

if (p == null) { // initialize queue
    WNode hd = new WNode(WMODE, null);
    if (U.compareAndSwapObject(this, WHEAD, null, hd))
        wtail = hd;
}
else if (node == null)
    node = new WNode(RMODE, p);
else if (h == p || p.mode != RMODE) {
    if (node.prev != p)
        node.prev = p;
    else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
        p.next = node;
        break;
    }
}
else if (!U.compareAndSwapObject(p, WCOWAIT,
                                 node.cowait = p.cowait, node))
    node.cowait = null;

第一个判断,如果头节点尾节点都是的话,开始初始化队列,构造一个写模式的节点,然后把头节点和尾节点指针指向他。

设置完之后,进行下一次循环,此时头节点和尾节点虽然不为空了,但是他们暂时还是相等的,所以会再一次看看有没有抢锁的机会,但是不会再自旋了,就看一眼,没机会就来到第二个判断

构造一个读模式的节点,也就是自己,前指针指向尾节点,再一次循环,瞧瞧有没有机会,没有来到第三个判断。

头节点等于尾节点,或者尾节点不是读模式

这里提前介绍下,节点排队的机制写节点是直接排到队列结尾的

但是读节点,如果尾节点写节点的话,会继续排到结尾; 如果是读节点,那么会挂到读节点的cowait链

StampedLock那些不为人知的细节

所以这里,条件头节点等于尾节点,因为不管是读模式还是写模式,初始化的第一个节点都是写节点,所以此时是要挂到主链上的,尾节点不是读模式也是一个意思。

看完整个acquireRead方法,你就会发现,挂到主链上的节点会进入第二个大for循环,挂到cowait链上的节点只在第一个大for循环里,因为此时满足这个if条件的节点是要进入主链的,所以才会break跳出第一个大for循环

继续来到第四个if条件,走到这里,就说明节点是要挂到cowait链的,CAS尝试将尾节点的cowait指针指向自己。

这里注意一点,cowait的插入是头插法,就是每次新节点的插入是从头部插入的,为什么不是尾插法呢?

你想,如果用尾插法,那是不是还得保存一下cowait链尾部节点位置,要不然下一个节点过来,哪知道尾部在哪,对不对

而使用头插法,就不需要知道尾部在哪了,每次过来一个新节点,我把自己的cowait指针指向新节点,把老的cowait节点挂到新节点的cowait上,这样就可以省去保存尾部指针的内存消耗了,反正,读模式的节点唤醒的时候都是cowait链一起唤醒的,谁先谁后无所谓

所以,你说JDK的源码为什么那么省内存,就是这样的一个个小细节堆起来的。

下面来到最后一个else分支,里面又是一个for循环

for (;;) {
    WNode pp, c; Thread w;
    if ((h = whead) != null && (c = h.cowait) != null &&
        U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
        (w = c.thread) != null) // help release
        U.unpark(w);
    if (h == (pp = p.prev) || h == p || pp == null) {
        long m, s, ns;
        do {
            if ((m = (s = state) & ABITS) < RFULL ?
                U.compareAndSwapLong(this, STATE, s,
                                     ns = s + RUNIT) :
                (m < WBIT &&
                 (ns = tryIncReaderOverflow(s)) != 0L))
                return ns;
        } while (m < WBIT);
    }
    if (whead == h && p.prev == pp) {
        long time;
        if (pp == null || h == p || p.status > 0) {
            node = null; // throw away
            break;
        }
        if (deadline == 0L)
            time = 0L;
        else if ((time = deadline - System.nanoTime()) <= 0L)
            return cancelWaiter(node, p, false);
        Thread wt = Thread.currentThread();
        U.putObject(wt, PARKBLOCKER, this);
        node.thread = wt;
        if ((h != pp || (state & ABITS) == WBIT) &&
            whead == h && p.prev == pp)
            U.park(false, time);
        node.thread = null;
        U.putObject(wt, PARKBLOCKER, null);
        if (interruptible && Thread.interrupted())
            return cancelWaiter(node, p, true);
    }
}

一点点看,第一个if判断,头节点不为空并且cowait链也不为空,那说明这个主节点是读模式的节点,并且此时正在被唤醒

为什么这么说呢?

读模式好理解,因为只有读模式的节点才有cowait链

前面我们讲过,不管是读线程初始化队列还是写线程初始化,第一个节点都是写模式的虚节点,读节点要想来到头部,一定是前面拿到锁的线程释放锁了,并且把他唤醒了,他拿到锁之后把头部指针指向了自己。

所以,前面2个条件可以确定,此时,整个读模式节点的cowait链正在被唤醒。后面2个条件就是把头节点的cowait指针指向下一个节点,然后唤醒当前的cowait节点,其实从注释也能看出来,此处就是帮助唤醒

因为你想,在读多写少的场景下,一个主节点的cowait链上挂很多的读节点是很正常的,所以这里就是帮助唤醒,提高一下效率。

第二个if判断,尾节点的前一个节点是头节点,或者头节点等于尾节点,或者尾节点的前一个节点为空。

前2个条件都好理解,最后一个条件是什么鬼?

尾节点的前一个节点不等于头节点,尾节点又不等于头节点,这种情况下,尾节点的前一个节点等于空??

要知道,不管是中断取消节点还是线程释放锁唤醒后面的节点,他们都是不会把自己的节点设置为null的。

而且,新的尾节点插入的时候都是先设置自己的前置节点再把尾部指针指向自己的,不可能出现前置节点为null的情况,那这里的pp == null又是什么鬼?

StampedLock那些不为人知的细节

阿轩思来想去,只想到一种可能,就是当前2个条件判断的时候,队列中存在2个以上的节点,所以头节点不等于尾节点,尾节点的前置节点也不是头节点,但是,当执行到第三个条件的时候,前面的线程全部执行完了,释放锁了,此时自己来到了第一个节点,而之前的前置节点因为路径不可达,此时又恰巧发生了GC被回收了,导致pp == null

那有小伙伴可能要问了,如此短的时间内发生这么多事,不可能吧?

按常理来说,这种情况确实极难发生,但也并非完全不可能。

设想下,如果CPU执行了前面2个条件后,当前线程分配的时间片用完了,CPU切出去了,此时前面的线程拿到锁几乎没做什么事立即又释放了锁,如果此时队列中只有3个节点,那么很快,自己就会成为第一个节点,而因为堆大小设置的很小,youngGC频繁发生,此时凑巧发生了GC,那么前置节点就被回收了,最终pp == null了。

这3个条件任意一个满足就说明,此时有机会获取锁,如果此时确实没有写锁,那么就会开始循环获取锁了,while这段代码和前面的几乎没什么区别,就不再赘述了。

来到最后一个if判断,whead == h && p.prev == pp,这个条件成立说明什么?

说明头节点还是那个头节点,尾节点的前置节点还是那个节点,你大爷还是你大爷,但是你却不是之前的那个你了StampedLock那些不为人知的细节StampedLock那些不为人知的细节

进来之后进行这么一个判断pp == null || h == p || p.status > 0,有没有一点眼熟的感觉。

是的,就是上面说的那个if里的后2个条件加上一个尾节点取消,如果满足,就把自己设置为null,也就是把当前节点丢弃跳出循环,重新去创建新节点,最后一个条件可以理解,但是前2个条件又是为什么呢?

有的小伙伴可能要现学现卖了,阿轩,阿轩,如果出现了你之前说的那种情况,有可能pp == null吗?

可以告诉你,如果是之前那种情况,都执行不到这里,滑稽—

why?

之前那种情况出现的话,头节点肯定是已经换过好几轮了,if里的条件whead == h肯定是不成立的。

所以,真出现pp == null || h == p || p.status > 0这种情况的话,一定是发生了什么意想不到的意外,我上面刚执行过pp == null || h == p不成立,咋到你这就成立了呢?

所以,果断丢弃掉,换个新的。

继续往下执行,就是判断有没有超时,没有超时就把当前线程设置到节点上,然后挂起等待被唤醒了,没什么好说的。

下面来看第二个大for循环,比较长,先看第一层 if 判断

if ((h = whead) == p) {
    if (spins < 0)
        spins = HEAD_SPINS;
    else if (spins < MAX_HEAD_SPINS)
        spins <<= 1;
    for (int k = spins;;) { // spin at head
        long m, s, ns;
        if ((m = (s = state) & ABITS) < RFULL ?
            U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
            (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {
            WNode c; Thread w;
            whead = node;
            node.prev = null;
            while ((c = node.cowait) != null) {
                if (U.compareAndSwapObject(node, WCOWAIT,
                                           c, c.cowait) &&
                    (w = c.thread) != null)
                    U.unpark(w);
            }
            return ns;
        }
        else if (m >= WBIT &&
                 LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
            break;
    }
}
else if (h != null) {
    WNode c; Thread w;
    while ((c = h.cowait) != null) {
        if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
            (w = c.thread) != null)
            U.unpark(w);
    }
}

首先判断头节点是否等于自己的前一个节点,也就是自己初始化队列之后,有没有节点进来。

没有的话,就再自旋一下看看能不能拿到锁,如果拿到了,把头节点指向自己,唤醒cowait链上的所有节点。

有的话,判断头节点是否为空,不为空并且cowait也不为空的话就循环把cowait链上节点全部唤醒。这又是什么场景呢?

当新的读线程进入队列排队的时候或者自己被中断唤醒了,发现此时头节点正是读模式节点并且正在唤醒整个cowait链上的节点,那就帮一把手,毕竟助人为乐嘛。

第一层 if 判断就结束了,来到第二层 if。

if (whead == h) {
    if ((np = node.prev) != p) {
        if (np != null)
            (p = np).next = node;   // stale
    }
    else if ((ps = p.status) == 0)
        U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
    else if (ps == CANCELLED) {
        if ((pp = p.prev) != null) {
            node.prev = pp;
            pp.next = node;
        }
    }
    else {
        long time;
        if (deadline == 0L)
            time = 0L;
        else if ((time = deadline - System.nanoTime()) <= 0L)
            return cancelWaiter(node, node, false);
        Thread wt = Thread.currentThread();
        U.putObject(wt, PARKBLOCKER, this);
        node.thread = wt;
        if (p.status < 0 &&
            (p != h || (state & ABITS) == WBIT) &&
            whead == h && node.prev == p)
            U.park(false, time);
        node.thread = null;
        U.putObject(wt, PARKBLOCKER, null);
        if (interruptible && Thread.interrupted())
            return cancelWaiter(node, node, true);
    }
}

如果头节点没变,先判断自己的前置节点有没有变化,如果变化了,重新设置自己的前置节点

接着把前置节点的状态设为-1,和我们之前说过的AQS是不是很像。

如果取消了,把节点前后指针调整一下。

最后的else分支和刚刚说过的几乎一样,就不说了。

简单总结一下读锁的获取:

  • state变量的低7位用来表示读锁的个数,每加一个读锁,state加1,溢出则用readerOverflow变量表示。

  • 主链上的节点进入第二个大for循环运行,cowait上的节点在第一个大for循环运行。

  • 新节点排队的时候,如果尾节点是写节点,那么挂在主链上,如果是读节点,就挂在读节点的cowait链上。

写锁

看完了读锁再来看写锁就很轻松了,写锁的逻辑和读锁非常相似,同样的,有4个方法

StampedLock那些不为人知的细节

他们之间的区别和读锁一样,就不介绍了。我们主要看writeLock方法

public long writeLock() {
    long s, next;  // bypass acquireWrite in fully unlocked case only
    return ((((s = state) & ABITS) == 0L &&
             U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
            next : acquireWrite(false, 0L));
}

首先通过位运算判断是否有写锁,没有的话就CASstate加上128,就是表示锁状态的第8位bit位。失败进入acquireWrite方法。

acquireWrite方法和acquireRead方法非常相似,同样的分为了2个大的for循环,但是作用不一样。先看第一个大for循环。

private long acquireWrite(boolean interruptible, long deadline) {
    WNode node = null, p;
    for (int spins = -1;;) { // spin while enqueuing
        long m, s, ns;
        if ((m = (s = state) & ABITS) == 0L) {
            if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))
                return ns;
        }
        else if (spins < 0)
            spins = (m == WBIT && wtail == whead) ? SPINS : 0;
        else if (spins > 0) {
            if (LockSupport.nextSecondarySeed() >= 0)
                --spins;
        }
        else if ((p = wtail) == null) { // initialize queue
            WNode hd = new WNode(WMODE, null);
            if (U.compareAndSwapObject(this, WHEAD, null, hd))
                wtail = hd;
        }
        else if (node == null)
            node = new WNode(WMODE, p);
        else if (node.prev != p)
            node.prev = p;
        else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
            p.next = node;
            break;
        }
    }

进来同样的,首先看看有没有机会获取锁,没有的话判断此时是写锁还是读锁

如果是写锁,设置一个自旋次数64,开始自旋

如果是读锁,直接设置为0,开始初始化队列

按照常理来说,写锁相对读锁进行的很慢,应该直接进入队列等待才对。而读锁进行的很快,应该自旋才对。但是为什么这里是反着来的呢?

其实结合原理,仔细分析一下,就会发现,如果此时是读锁写线程开始自旋的话,那么后面过来的读线程,因为此时队列还没有初始化头节点等于尾节点等于空,满足whead == wtail && (s & ABITS) < RFULL这个条件,就会直接去获取锁了,那写线程就会一直获取不到锁,所以,想要尽快获取到锁,唯一的方法就是赶紧初始化队列,让后面过来的读线程乖乖的排队,而不是傻乎乎的在那自旋

后面 if 条件就是初始化队列了,然后把尾节点指针指向自己就跳出第一层for循环了。

来到第二层for循环,里面又分为2层 if 判断,先看第一层 if 。

WNode h, np, pp; int ps;
if ((h = whead) == p) {
    if (spins < 0)
        spins = HEAD_SPINS;
    else if (spins < MAX_HEAD_SPINS)
        spins <<= 1;
    for (int k = spins;;) { // spin at head
        long s, ns;
        if (((s = state) & ABITS) == 0L) {
            if (U.compareAndSwapLong(this, STATE, s,
                                     ns = s + WBIT)) {
                whead = node;
                node.prev = null;
                return ns;
            }
        }
        else if (LockSupport.nextSecondarySeed() >= 0 &&
                 --k <= 0)
            break;
    }
}
else if (h != null) { // help release stale waiters
    WNode c; Thread w;
    while ((c = h.cowait) != null) {
        if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
            (w = c.thread) != null)
            U.unpark(w);
    }
}

同样的,先判断一下,从初始化队列之后到现在有没有进入新的节点。

没有的话设置自旋次数1024次,开始自旋尝试获取锁。

有的话判断头节点是否是读模式节点,帮助唤醒cowait链。这里和之前读锁源码一样,当写线程进入队列时或者被中断唤醒时,发现此时头节点是读模式节点且正在唤醒cowait链,才会触发这个 if 条件。

接着来到第二层 if 判断

if (whead == h) {
    if ((np = node.prev) != p) {
        if (np != null)
            (p = np).next = node;   // stale
    }
    else if ((ps = p.status) == 0)
        U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
    else if (ps == CANCELLED) {
        if ((pp = p.prev) != null) {
            node.prev = pp;
            pp.next = node;
        }
    }
    else {
        long time; // 0 argument to park means no timeout
        if (deadline == 0L)
            time = 0L;
        else if ((time = deadline - System.nanoTime()) <= 0L)
            return cancelWaiter(node, node, false);
        Thread wt = Thread.currentThread();
        U.putObject(wt, PARKBLOCKER, this);
        node.thread = wt;
        if (p.status < 0 && (p != h || (state & ABITS) != 0L) &&
            whead == h && node.prev == p)
            U.park(false, time);  // emulate LockSupport.park
        node.thread = null;
        U.putObject(wt, PARKBLOCKER, null);
        if (interruptible && Thread.interrupted())
            return cancelWaiter(node, node, true);
    }
}

先判断一下自己的前置节点有没有发生变化,如果变化了重新设置下。把尾节点的状态设为-1。如果前置节点被取消了,把前后节点的指向重新调整一下。

注意了,这3个条件满足任意一个,都会结束第二层 if 判断,重新进入循环,再循环2048次,然后再次来到第二层 if 判断。

进入最后一个 else 分支,和读锁获取一样,判断是否超时,然后设置节点线程,将自己挂起

但是注意了,这个地方是存在一个bug的,不知道小伙伴们有没有发现?

如果获取锁的线程被中断了,他会被从挂起状态唤醒,执行到下面的 if 条件interruptible && Thread.interrupted(),但是由于writeLock方法和readLock方法的interruptible参数传的都是false,所以第一个条件就不通过,然后重新进入循环开始自旋获取锁,如果此时并不具备获取锁的条件,那么会再一次执行到U.park(false, time)挂起方法这里,我们之前在说AQS源码的时候,带着小伙伴看了park和unpark虚拟机层面的源码实现,忘记的小伙伴可以去回顾一下,AQS那些不为人知的细节,所以,因为这里的中断状态并没有被清除,所以park方法并不会把线程挂起,线程会继续往下执行,又一次进入循环进行自旋,结果就是CPU会在很短的时间内迅速飙高

修复的方法也很简单,就是将判断的条件if (interruptible && Thread.interrupted())改下顺序,变成if (Thread.interrupted() && interruptible),这样第一个条件就会把线程的中断状态清除掉,下次自旋之后就可以挂起了。

这个bug在JDK9已经修复了,感兴趣的小伙伴可以参阅,Lack of save / restore interrupt mechanism undermines the StampedLock

乐观读锁

看下乐观读锁

public long tryOptimisticRead() {
    long s;
    return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}

代码非常简单,和WBIT进行与运算判断结果是否等于0,回顾一下WBIT的二进制表示

  • WBIT 0000 0000 1000 000

所以,等于0就表示没有写锁,然后再和SBITS进行与运算,回顾一下SBITS二进制表示

  • SBITS 1111 1111 10000 0000

也就是将高位写状态全部保存下来。

思考一下,既然StampedLock锁不支持可重入,为什么不直接对写锁进行加1减1这样的操作呢?

原因就在于,乐观锁都有一个普遍的问题,就是ABA问题

当线程尝试获取乐观锁时,发现没有写锁,然后开始将数据复制一份到线程本地栈,复制完之后判断一下此时锁有没有被写线程占有,如果没有,就开始对读数据进行操作了。

这里就会存在一个问题,就是一开始没有写锁的,但是在线程复制数据的这段时间内,写线程过来拿到了锁,然后操作完之后又释放了锁,然后线程再来检查的时候发现是没有写锁的,但是此时数据实际上已经被修改了,那么就会导致结果不准。

所以,这里通过将高位写状态保存下来,如果中间过程有写线程获取过锁,那么高位的二进制就会发生变化,说白了就是添加一个版本号,原子类中的AtomicReference类也是基于版本号的原理避免ABA问题的。

释放锁

写锁释放

public void unlockWrite(long stamp) {
    WNode h;
    if (state != stamp || (stamp & WBIT) == 0L)
        throw new IllegalMonitorStateException();
    state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
    if ((h = whead) != null && h.status != 0)
        release(h);
}

读锁释放

public void unlockRead(long stamp) {
    long s, m; WNode h;
    for (;;) {
        if (((s = state) & SBITS) != (stamp & SBITS) ||
            (stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)
            throw new IllegalMonitorStateException();
        if (m < RFULL) {
            if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
                if (m == RUNIT && (h = whead) != null && h.status != 0)
                    release(h);
                break;
            }
        }
        else if (tryDecReaderOverflow(s) != 0L)
            break;
    }
}

首先判断一下是否发生了异常情况,然后对state进行操作,写锁会把state继续加128,如果达到了long型最大值就初始化为256,。读锁就是简单的把state减1.接着调用release开始唤醒节点。

private void release(WNode h) {
    if (h != null) {
        WNode q; Thread w;
        U.compareAndSwapInt(h, WSTATUS, WAITING, 0);
        if ((q = h.next) == null || q.status == CANCELLED) {
            for (WNode t = wtail; t != null && t != h; t = t.prev)
                if (t.status <= 0)
                    q = t;
        }
        if (q != null && (w = q.thread) != null)
            U.unpark(w);
    }
}

首先将头节点的状态设为0,然后判断第二个节点是否为空或者是否被取消了,满足任意一个条件,就从尾节点向前遍历,找出第一个节点状态小于等于0的节点。

为什么从后往前遍历呢?

之前我们在看AQS源码的时候也是这样,主要是因为新节点入队的时候都是先设置的前置节点,再设置的后置节点

最后如果第二个节点存在,调用unpark唤醒。

还有一个通用释放方法

public void unlock(long stamp) {
    long a = stamp & ABITS, m, s; WNode h;
    while (((s = state) & SBITS) == (stamp & SBITS)) {
        if ((m = s & ABITS) == 0L)
            break;
        else if (m == WBIT) {
            if (a != m)
                break;
            state = (s += WBIT) == 0L ? ORIGIN : s;
            if ((h = whead) != null && h.status != 0)
                release(h);
            return;
        }
        else if (a == 0L || a >= WBIT)
            break;
        else if (m < RFULL) {
            if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
                if (m == RUNIT && (h = whead) != null && h.status != 0)
                    release(h);
                return;
            }
        }
        else if (tryDecReaderOverflow(s) != 0L)
            return;
    }
    throw new IllegalMonitorStateException();
}

将之前获取锁拿到的state状态戳stamp当前状态分别和SBITS与运算,看看是否相等,回忆一下SBITS的二进制表示。

  • SBITS 1111 1111 1000 0000

第七位之后全部为1,为什么?

如果是获取的写锁,那么低7位肯定都是0,只要保存高位的状态即可。

如果是获取的读锁,因为读锁是多个线程共享的,低7位的二进制表示会一直发生变化,而读锁的释放又不需要判断低7位有没有发生变化,只需要判断写锁状态有没有发生变化即可。

所以,如果通过while条件,只能说明state高位写锁状态没有发生变化,即从第七位开始往后的二进制位没有发生变化。

进入循环首先判断和ABITS与运算结果是否为0

  • ABITS 0000 0000 1111 1111

因为读锁是低7位,写锁是第八位,所以,只需要看低8位就可以知道当前状态有没有写锁或者读锁,为0表示没有写锁也没有读锁,什么锁都没有,肯定无法释放锁,跳出循环抛异常。

如果存在锁再判断是否等于WBIT

  • WBIT 0000 0000 1000 0000

m == WBIT表示当前是写锁状态,写锁状态的话那么当初获取锁拿到的state状态戳必须和现在相等才能释放锁,不相等表示之前的锁和现在的锁不是同一个锁,不能释放,跳出循环抛异常。如果相等就进行之前所说的释放写锁逻辑。

如果前2个 if 都不满足,那么说明当前是读锁状态

a == 0L表示之前获取的是乐观读,乐观读是不用释放锁的,跳出循环抛异常。

a >= WBIT表示之前获取的是写锁,而现在却是读锁状态,也是有问题的,抛异常。

如果前3个 if 都不满足,那么就说明之前获取的锁和现在的锁是一致的,都是读锁,下面就看锁的数量有没有溢出了。

最后我们来看下取消获取锁的逻辑

cancelWaiter

取消方法也是分为了2大块,一个大的if判断,一个while循环,先看 if 判断的上半部分

Thread w;
node.status = CANCELLED;
// unsplice cancelled nodes from group
for (WNode p = group, q; (q = p.cowait) != null;) {
    if (q.status == CANCELLED) {
        U.compareAndSwapObject(p, WCOWAIT, q, q.cowait);
        p = group; // restart
    }
    else
        p = q;
}

首先将自身状态设为取消1,如果是读模式节点,顺便看下cowait链上有没有取消的节点,有的话就把取消的节点删除掉。

进入下面一个 if 判断

if (group == node) {
    for (WNode r = group.cowait; r != null; r = r.cowait) {
        if ((w = r.thread) != null)
            U.unpark(w);       // wake up uncancelled co-waiters
    }
    for (WNode pred = node.prev; pred != null; ) { // unsplice
        WNode succ, pp;        // find valid successor
        while ((succ = node.next) == null ||
               succ.status == CANCELLED) {
            WNode q = null;    // find successor the slow way
            for (WNode t = wtail; t != null && t != node; t = t.prev)
                if (t.status != CANCELLED)
                    q = t;     // dont link if succ cancelled
            if (succ == q ||   // ensure accurate successor
                U.compareAndSwapObject(node, WNEXT,
                                       succ, succ = q)) {
                if (succ == null && node == wtail)
                    U.compareAndSwapObject(this, WTAIL, node, pred);
                break;
            }
        }
        if (pred.next == node) // unsplice pred link
            U.compareAndSwapObject(pred, WNEXT, node, succ);
        if (succ != null && (w = succ.thread) != null) {
            succ.thread = null;
            U.unpark(w);       // wake up succ to observe new pred
        }
        if (pred.status != CANCELLED || (pp = pred.prev) == null)
            break;
        node.prev = pp;        // repeat if new pred wrong/cancelled
        U.compareAndSwapObject(pp, WNEXT, pred, succ);
        pred = pp;
    }
}

第一个for循环,如果取消的节点是读模式节点并且是挂在主链上的,那么把cowait链上的节点全部唤醒。正所谓,树倒猢狲散,老大都被干掉了,做小弟的也只能另谋出路了。

第二个for循环主要做的就是将前后节点的指针调整一下。先进入一个while循环,如果当前节点的后置节点是或者被取消了,那么就从尾节点从后往前寻找最接近node节点非取消状态的节点,这里会出现几种情况

  1. succ==null,q没找到也等于null,也就是node就是尾节点
  2. succ==null,但是q找到了,这种情况是因为新节点入队的时候,是先设置新节点的前置指针,在设置尾节点指针,最后才设置前置节点的后置指针,所以是会出现node的后置节点为空,但是他又不是尾节点的情况的
  3. succ取消了,q没找到,也就是node是尾节点,或者node后面的节点全部都取消了,q==null
  4. succ取消了,q找到了,存在未取消的节点

下面来看下基于这4种情况的判断

if (succ == q ||   // ensure accurate successor
    U.compareAndSwapObject(node, WNEXT,
                           succ, succ = q)) {
    if (succ == null && node == wtail)
        U.compareAndSwapObject(this, WTAIL, node, pred);
    break;
}

succ == q,这个条件只有1才能满足,进入之后succ == null && node == wtail也满足,将尾节点指针指向前一个节点,跳出循环。

剩余的3种情况会执行到U.compareAndSwapObject(node, WNEXT, succ, succ = q)条件,不管q为空或者非空,都将node后置指针指向他。

这里2这种情况比较特殊一点,稍微回顾一下前面讲新节点入队的源码

else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
    p.next = node;
    break;
}

如果p.next = nodeCAS前面执行,那么执行CAS的时候因为WNEXT位置已经不是null了,那么CAS会执行失败,重新进入while条件(succ = node.next) == null || succ.status == CANCELLED,但是此时succ已经不等于null了,所以还是会跳出循环,殊途同归

如果是CAS先执行,那么会通过判断进入 if 体内。

但是不管2能不能进入 if 体内,这3种情况都不满足succ == null && node == wtail条件,2是因为node不是尾节点3和4是因为succ非空,所以跳出循环。

接着将pred节点的后置指针指向succ,如果succ不为空并且绑定了线程就将他唤醒,因为执行到这里,只是设置了前置节点后置指针succ前置指针一直没有设置,所以需要将他唤醒让他去设置一下。

最后判断一下pred节点是否被取消,或者pred前置节点是否为空,如果没取消,或者前置节点为空也就是pred就是头节点,那么跳出循环。

否则,就是pred节点取消了,并且pred前置节点不为空,那么将前后节点的指针重新调整一下,再次进入循环。

第一个 if 块执行完后来到最后的while循环

WNode h; 
while ((h = whead) != null) {
    long s; WNode q; 
    if ((q = h.next) == null || q.status == CANCELLED) {
        for (WNode t = wtail; t != null && t != h; t = t.prev)
            if (t.status <= 0)
                q = t;
    }
    if (h == whead) {
        if (q != null && h.status == 0 &&
            ((s = state) & ABITS) != WBIT && 
            (s == 0L || q.mode == RMODE))
            release(h);
        break;
    }
}

首先判断头节点的下一个节点是否为空或者被取消了,满足任一条件就从尾节点向前遍历,寻找最接近头节点非取消状态的节点。

第二个 if 判断就非常有意思了。

先看前2个条件,q != null && h.status == 0,头节点之后存在非取消状态的节点,并且头节点的status为0,这2个条件什么情况下会满足呢?

2个场景可以满足。

  1. 队列初始化的时候,不管是写节点还是读节点都会执行这一句U.compareAndSwapInt(p, WSTATUS, 0, WAITING),在这句代码执行之前,是可以满足前2个条件的。

  2. 线程释放锁的时候,会执行release方法中的U.compareAndSwapInt(h, WSTATUS, WAITING, 0)代码,此时也是可以满足前2个条件的。

来到第三个条件((s = state) & ABITS) != WBIT,表示此时锁状态不是写锁,那么就是读锁或者没有锁

结合1情况,就是读锁

结合2情况,就是没有锁

最后一个条件,s == 0L || q.mode == RMODE,s == 0L我们先暂且不看,只看q.mode == RMODE这个条件。

那么1这种情况,q写节点就不满足了。

2这种情况,因为获取锁的线程已经在调用release方法了,这里就算通过了条件判断,也是调用release方法,重复调用了,没有什么意义。

所以,1和2这2种情况不管满足不满足 if 判断,都没有什么实际的意义,那作者写这段代码又是为了什么呢?

阿轩觉得,作者写这段代码可能是考虑这么一种场景

StampedLock那些不为人知的细节

q节点此时排在第三位,处于挂起状态,此时的锁是读锁,在某一时刻,有一个节点中断取消了,执行到了while方法这里,在同时,1和2节点也发生了中断取消,刚执行完node.status = CANCELLED,此时,q节点已经具备获取锁的条件了,那么就会提前把他唤醒

其实,就算这里不执行release方法唤醒,在我们刚刚说过的上面 if 方法里,1节点取消时也会唤醒q

if (succ != null && (w = succ.thread) != null) {
    succ.thread = null;
    U.unpark(w);       // wake up succ to observe new pred
}

这里只是提前唤醒了,提高一下效率而已。

但是这种场景,因为h.status == 0这个条件的存在,实际上是无法出现的,所以,阿轩觉得,这个条件是不是删掉会比较好一点,为此,阿轩特地在stackoverflow上提了一个问题,Confusion about the cancelWaiter method in the StampedLock class,想听听大佬们的意见,可能StampedLock这个类平时使用的很少吧,目前还没有大佬给出意见。

在提一下刚刚说的另一个问题,s == 0L这个条件,也就是锁状态state为0。

StampedLock类只有一个构造方法

public StampedLock() {
    state = ORIGIN;
}

在构造方法里将state设为了ORIGIN

private static final long ORIGIN = WBIT << 1;

也就是256,在写锁不断的获取和释放中,每次都会加128,在state超过long型最大值溢出时,会被重新初始化为ORIGIN

state = (stamp += WBIT) == 0L ? ORIGIN : stamp;

也就是state永远无法等于0,那这里的s == 0L条件又是干嘛的呢?

阿轩思考了许久,也没想出什么场景下会出现s == 0L,说来惭愧,好像每篇文章阿轩都会留一个问题,暂且叫做阿轩未解之谜吧,StampedLock那些不为人知的细节StampedLock那些不为人知的细节,等到日后哪天阿轩水平提高了,搞明白了,到时在另开文章说明吧。

总结

关于StampedLock类,阿轩觉得最精华的地方在于只用一个long型变量就把读写锁的状态完美的体现了出来。

低7位表示读锁的数量,溢出使用额外的变量保存。

第八位表示写锁标识,其余位用来表示写锁的版本号,规避ABA问题。每次释放锁时,state加128,即将第八位变成了0,表示无写锁状态,又会使高位进1,更新版本号,着实精妙啊。

好了,到此StampedLock类的源码剖析就全部结束了,非常感谢能够耐心看到这里的小伙伴,如果你读StampedLock也有着自己的一些独特见解,欢迎添加阿轩的个人微信号,一起讨论讨论。


原文始发于微信公众号(程序员阿轩):StampedLock那些不为人知的细节

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/37000.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!