大家好,我是阿轩。
今天我们来剖析一下 AQS 的源码。
说到 AQS,我们的第一反应就是 ReentrantLock,CountDownLatch,Semaphore 等等这些并发组件,他们都是基于 AQS 实现的。
前言
关于 AQS
的原理机制,网上写的文章很多,不是我们今天的重点,本篇文章主要是对源码一些细节
的剖析。
在开始剖析源码之前,我们先简单回顾一下 AQS 的整体结构和原理
。

AQS 使用了模板方法设计模式
,将通用的方法自己实现了,而定制化的方法交给了不同继承者自己实现,如果我们自己基于 AQS
写一个并发组件
的话,只需要实现最上层的 API 层
即可。
简单概括一下 AQS 的原理
,他通过一个 state
变量控制锁的获取和释放
,抢锁失败的线程将自己包装成一个 Node 节点
,排队进入一个双向 FIFO 队列
,也叫 CLH 同步队列
,然后挂起等待唤醒继续抢锁。获取到锁的线程也可以暂时释放锁进入一个 FIFO 的等待队列
,也就是等待队列
,然后等待唤醒,再次进入同步队列
。

同步队列和等待队列的区别是,一个是双向
的,一个是单向
的。
原理部分我们回顾完了,下面我们开始进入到源码的剖析。今天我们主要剖析加锁,解锁,条件等待和唤醒部分的源码,版本基于 JDK1.8。
话不多说,我们开始。Go!
acquire
为了整个流程的完整性,我们选取一个 AQS 的实现类 ReentrantLock
,从最开始入口说起。
public void lock() {
sync.lock();
}
首先ReentrantLock调用lock
方法开始加锁,调用的是ReentrantLock的一个内部类Sync
的lock方法,这个类继承了AQS,lock方法是一个抽象方法
,交由子类实现
abstract void lock();
他有2个子类,NonfairSync
和FairSync
,我们先看NonfairSync的实现
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
上来首先CAS
尝试设置state
,也就是尝试抢锁
,如果失败调用acquire
方法。我们再看下FairSync的实现
final void lock() {
acquire(1);
}
从这里我们可以看出,非公平和公平的区别
,非公平就是上来不管三七二十一,先抢再说
,公平就是老老实实调用acquire方法。
acquire方法是AQS方法。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这里我们看出模板模式
的写法,tryAcquire
方法各个子类根据实现功能的不同会有不同的实现,所以这个方法交给子类自己去实现,而addWaiter
,acquireQueued
方法是通用
的,AQS就自己实现了。
进入tryAcquire
方法,我们主要看非公平
的实现方式,跟着源码最后会来到nonfairTryAcquire
方法
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
这个方法很简单,首先获取state
变量,如果是0,表示没有线程获取锁
,CAS尝试抢锁
,抢到返回reture。如果state不为0,判断持有锁的线程是不是自己,如果是,对state进行相加,从这里我们就可以看出,ReentrantLock锁是可重入
的。如果上述2种情况都不是,那么抢锁失败,返回false。
上面我们说过,抢锁失败会进入同步队列,所以他会调用addWaiter
方法进入队列
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
首先封装一个Node
节点,然后判断尾节点
是否为空。
这里如果让我们来写的话,估计直接就是一个enq
方法,不会有这个if
判断,我们看下enq方法就会发现其实就是多了一个初始化
的动作,else的内容和这里的if一模一样。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
你说这样写有什么好处呢?
这就是大佬对于细节
的把控能力,因为初始化只会进行一次
,后面的线程过来只是简单的排队等待,如果直接用enq
方法的话,那么enq方法里的tail == null
这个判断每次都要判断一次,就显得冗余
了,虽然性能上的提升微乎其微,但是逼格就拉下来了,不符合大佬的风格。

其实在初始化这里还隐藏了一个细节
,我们刚刚说到非公平锁的tryAcquire
方法,我们看下公平锁的tryAcquire方法
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
对比下可以发现,就是在c==0
的判断里多了一个条件,多了hasQueuedPredecessors
这个方法。
既然是公平锁,那肯定是不能上来就抢锁的,所以要判断下队列中是否有节点在等待
,如果有,就别抢了,乖乖排队吧,看下hasQueuedPredecessors
这个方法
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
这个方法是判断队列中是否有节点在排队
,第一个条件,h != t
,我们都知道,AQS同步队列里第一个节点是虚节点
,只有个status
属性,真正等待
的是第二个节点,所以如果有节点等待,那么头节点是肯定不等于尾节点的。
后面2个条件是或
连接的,只要有一个成立,就表示有节点在等待,第一个,(s = h.next) == null
,看到这个条件,可能有人会疑惑,如果头节点的next节点为空,那么表示有节点在等待
?next
为空,那队列中不就只有一个头节点吗,而只有一个节点的情况下,head
应该是等于tail
的啊,why?

原因就在刚刚说的初始化
那里,我们把那段代码拿过来再仔细看一下
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
}
这里先CAS
设置了头节点
,然后在设置尾节点
,这2个操作不是原子性
的,也就是说会出现,头节点
设置完之后,尾节点
还是null
的情况,尽管这种状态存在的时间非常短
,但是还是存在的,那么就会出现h != t && (s = h.next) == null
这种情况,虽然他还在初始化
,但是说明已经有节点在等待
了。
第二个条件,当前等待线程不是自己,这个都理解。
acquireQueued
好了,接着往下看,添加节点
之后,接下来就是队列中的节点尝试获取锁
了,进入acquireQueued
方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
这个方法是获取锁的核心方法
,我们一点点细看。
首先,上来就是一个死循环
,先判断当前节点的前一个节点是否是头节点
,如果是就尝试去抢锁,可能获取锁的线程此时刚好释放锁,如果正好抢到就不用后面挂起
了,tryAcquire
方法前面说过了,这里就不再赘述了。
如果抢到了,就会把头节点
设置成自己,把之前的头节点的next设置成null方便垃圾回收
,failed
变量设置成false,表示没有失败,这个变量是用来中断之后取消节点的,后面会说到。最后返回中断状态
,然后就会进入自己写的业务代码,开始执行抢到锁之后的逻辑。
如果没抢到,先进入shouldParkAfterFailedAcquire
方法
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
这个方法的目的就是把前一个节点的waitStatus
设置为-1
,说到waitStatus,就简单回顾下这个变量值的范围
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
总共有5个值,1表示取消
,-1表示唤醒
,-2表示等待队列
,-3表示共享状态
,还有一个默认值0
。
第一个判断ws == Node.SIGNAL
,这个正合我意,直接返回。
第二个判断ws > 0
,表示前一个节点取消
了,那么就循环向前,直到找到第一个未取消的节点,返回false,然后在外层再循环一次
再次进入这个方法,走到else
分支,CAS
尝试把前一个节点状态设置成-1
,设置成功之后,又在外层循环一次进来,这次终于第一个条件成立
,返回true。
现在已经把前一个节点的status设置为-1
了,意思就是告诉前一个节点,如果你结束了记得把我唤醒
,接下来,我要开始装逼了,额不,开始沉睡了。
进入parkAndCheckInterrupt
方法
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
二话不说,直接挂起
。
在这里,发生了一件非常有趣
的事,我们稍后会说到。
先回到外层,大家注意到没有,这里有一个finally
代码块,用来处理取消节点的。
阿轩当时心想,这个cancelAcquire
方法怎样才能进去呢?
似乎只有一种情况,就是在线程还没有获取锁的时候抛出异常
,然后阿轩就开始找异常,整个方法里,就2处会抛异常,一处是predecessor
方法里
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
如果前一个节点是null会抛空指针
,阿轩想了半天,也没想出什么情况下,prev
会出现null
的情况。瞟了一眼注释
,发现作者也说明了,这种情况基本不存在
,是可以省略
的,可能是为了谨慎起见吧。
Returns previous node, or throws NullPointerException if null. Use when predecessor cannot be null. The null check could be elided, but is present to help the VM.
注意这个词,elided
,省略
另一处在tryAcquire
方法里
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
超过整型最大值
的时候会出现,但是这和中断也没有什么关系呀。
于是阿轩就很纳闷了,线程中断唤醒之后怎样才能进入cancelAcquire
方法取消获取锁呢,如果进不去,那么唤醒之后,在外层循环走一遍,然后又会被挂起,那我这中断
有啥意义呢,中断不和没中断一样吗,没区别
呀。
于是只能求助www.baidu.com了,和阿轩想的一样,这种问题基本百度不出什么东西,无奈,只能求助全球最大的同性交友社区stackoverflow
了。
阿轩搜了半天,没有发现和自己相似的问题,只能亲自出马了,于是,阿轩就提出了人生中在stackoverflow的第一个问题

很快,过了11分钟,就有人回答了

第一句就是欢迎来到stackoverflow,因为stackoverflow对于新人是有标签
的,所以能看出来。可以看出,老外对于新人还是非常热情友好
的,阿轩悄悄去瞄了一眼这个人的简介

JungleeGames这家公司首席工程师

可能阿轩第一次提问,问题描述的不是很清楚,他说希望我能提供更多的信息,不知道我到底想问啥,这就很尴尬了,于是阿轩屁颠屁颠的又解释了一遍

大家可别以为阿轩的英语很好哈,都是先中文然后翻译成英文的,这里就可以看出会英文
多么的重要,翻译软件有时候翻译的其实并不是非常准确,哈哈,扯远了。
阿轩在提问的这段时间里也一直在反复思考原因,终于,不经意间瞄了一眼注释
,才豁然开朗,原来如此
Acquires in exclusive uninterruptible mode for thread already in queue. Used by condition wait methods as well as acquire.
先不看整段话的意思,注意到uninterruptible
这个词没有,不可中断的
。
这段注释翻译一下就是,对于已经在队列中的节点,以独占非中断模式
获取锁,意思就是不能中断
。所以有时候源码的注释很重要,要好好看一下。
那既然不能中断,那肯定有方法能中断。
于是阿轩找了一下,果然,对于独占模式
,AQS
提供了3种方法获取锁
-
acquireQueued -
doAcquireInterruptibly -
doAcquireNanos
这3个方法大体流程几乎一样,唯一的不同就在于挂起唤醒
之后的处理
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
第一个方法只是标记
一下中断状态,但是后面2个方法却是抛出中断异常
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
至此,阿轩的疑惑也终于解开
了,对于已经在队列中的节点,如果你想中断
他取消获取锁,那么你就调用后面2种方法,否则你就用第一个。
现在,我们来说下刚才提到的那个非常有趣
的事。
什么事呢?
阿轩一开始在读这块源码的时候,为了弄清楚线程唤醒
之后的流程,把ReentrantLock
和AbstractQueuedSynchronizer
的源码复制
了一份出来,在里面加了一个打印

然后写了一个测试程序
public static void main(String[] args) {
MyReentrantLock lock = new MyReentrantLock();
Thread thread0 = new Thread(() -> {
lock.lock();
try {
Thread.sleep(3000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.unlock();
});
thread0.start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Thread thread1 = new Thread(() -> {
lock.lock();
lock.unlock();
});
thread1.start();
Thread thread2 = new Thread(() -> {
lock.lock();
lock.unlock();
});
thread2.start();
Thread thread3 = new Thread(() -> {
lock.lock();
lock.unlock();
});
thread3.start();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
thread2.interrupt();
// LockSupport.unpark(thread2);
}
什么意思呢?
就是先启动
thread0线程,抢到锁之后休眠
3000秒,然后再启动
3个线程,最后将thread2中断
。
正常运行的时候结果是这样的

但是如果我在这里加上一个断点
再运行

神奇
的事情就出现了

线程被唤醒之后打印了2次

阿轩第一时间是懵的,这什么情况,难道idea
在debug
模式下有bug
?
然后阿轩将测试程序里的中断改成unpark
方法,结果也是只打印一次,和正常运行一样的结果。
那么问题就出在中断
上面了。
这种时候也只能再次求助交友社区了,于是阿轩又提了一个问题

这次过了42分钟,有人回答阿轩了,他问我,我的MyReentrantLock
和MyAbstractQueuedSynchronizer
在什么地方,然后阿轩又解释了一遍,期间瞄了一下大佬的简介,发现是个Chinese,Chinese和Chinese用英文在这交流,感觉怪怪的,阿轩果断去问了下大佬的微信,想加微信私聊,不过大佬没理阿轩,有点尴尬。

紧接着,他说,你的问题我记下了,我会花时间去研究他然后回复
你。
又过了一个多小时,他回复了,他说他问了一个问题
,然后阿轩点过去看了下

看来,这位大佬也不知道问题出在了哪里
过了3个小时,有人回答了

这个人的回答中也没有说明具体的原因
,但是他提到了一个词,spurious wakeups
,什么意思,虚假唤醒
这个词相信大家或多或少都听过吧,感觉很高端
的样子
什么意思呢,举个通俗的例子:假如有3个线程A,B,C,A和B调用wait处于休眠
中,这个时候C调用signal想唤醒
B,但是因为signal是随机唤醒
的,所以会出现唤醒
了A,但是实际上A并没有达到被唤醒的条件
,但是却被唤醒了,这种情况就叫虚假唤醒
。
想到这,阿轩立马去翻看了下park
方法的注释
Some other thread invokes {@link #unpark unpark} with the current thread as the target; or
Some other thread {@linkplain Thread#interrupt interrupts} the current thread; or
The call spuriously (that is, for no reason) returns.
果然,在注释中,作者也说了,有3种情况
线程会被唤醒
,第一种,调用unpark
唤醒,第二种,中断
唤醒,第三种,就是虚假唤醒
了,作者还说明了下,for no reason
,没有任何理由的。
线索到了这里,只能从虚假唤醒
入手了。
然后阿轩又把stackoverflow上所有关于虚假唤醒
的问题都看了一遍。
结果发现越看越晕,,可能是阿轩的水平太菜了吧,实在看不懂大佬们在聊什么

最后,阿轩本着源码之下无秘密,只能去看JVM源码了
。。。

源码中总共有6个if
判断。
先简单介绍下,JVM
实现park
的机制,JVM通过counter
和mutex
2个变量实现park
。
counter表示许可
的意思,他只有2个值
,0和1
,unpark
会把他设置为1,park
会把他设置为0,多次重复
调用unpark,counter为1不变。这也是为什么wait和notify
顺序写错了会发生死锁
,但是park和unpark不会,因为就算先unpark,把counter设置为1,然后park调用的时候,发现counter为1,会把counter设置为0,然后直接返回,并不会挂起
。也就是图片中标①的地方。
mutex
表示锁,当synchronized
膨胀为重量级锁
的时候底层使用的就是这个mutex锁
,因为counter也存在并发性
的问题,所以需要加锁保护。
铺垫说到这里,我们看下具体的源码,先看2个参数,isAbsolute
和time
,这2个参数表示,如果设置了挂起时间
,那么是绝对时间
还是相对时间
,AQS调用的park
是这样的
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
传的是false
和0
。
我们看①判断,这个函数Atomic::xchg
什么意思,其实就是CAS
的意思,这里判断如果counter
是1,那么把他设置为0,然后返回。
②判断当前线程是否中断
,如果中断了也返回。注意一下第二个参数false
,这个参数表示是否清除中断标识
的意思,JDK
中源码是这样的
private native boolean isInterrupted(boolean ClearInterrupted);
意思就是如果调用这个方法,他会返回当前线程的中断状态
,然后根据参数是否将中断标识清除
掉。
如果一个线程中断了,那么第一次调用,会返回true,第二次调用就会返回false。
这里是不清除中断标识。
③和④是关于时间的,这里传的是0,不会走到。
然后来到ThreadBlockInVM tbivm(jt)
这个方法,这里是真正操作线程挂起
的地方。
⑤仍然是先判断是否中断
,然后尝试获取锁。
⑥再一次判断counter是否为1,为1设置为0,然后释放锁
。
看完了park方法,我们再看下interrupt
方法

interrupt
方法主要就做了2件事,先设置中断标识
为true,然后调用unpark
方法,接着看unpark

unpark也是主要就做了2件事,先将couter
设置为1,然后调用pthread_cond_signal
方法唤醒挂起的线程。
现在,我们将整个流程串起来跑一遍。
首先,线程在park方法里的ThreadBlockInVM tbivm(jt)
这里被挂起,然后中断被唤醒
,来到⑤判断是否中断,是,返回,接着调用AQS
源码中的Thread.interrupted()
,清除了中断标识
。
接着,从外层循环走一遍,再次来到park
方法,因为此时counter=1
,于是将counter
设置为0,返回,再从外层循环走一遍,再次来到park
方法,①不满足条件,②不满足条件,③④也不满足,再次走到ThreadBlockInVM tbivm(jt)
挂起,打印2次。
what?打印2次?

debug
状态下是正确的,正常运行是错误的??
阿轩已经快崩溃
了。。
站在窗户边吹了会冷风,冷静了下,灵光一闪
。
对呀,我的操作系统是windows
啊,我看linux
的干嘛啊

从这里我们可以看到,JVM
源码的底层实现是分操作系统
的
于是赶紧去看了下window的源码

可以看出,主要分为2个大判断
,而且windows
中没有counter
和mutex
变量,①主要是判断时间相关的,根据我们的传参false和0,会走到第二个分支。然后进入②的判断,先判断是否中断
,如果是就返回,否则挂起。
再看下unpark的

这里,说实话,没看的太懂,猜测应该就是简单的唤醒
。
恩,如果是按照windows
的源码的话,确实是只会打印一遍。
但是还是不能解释为什么debug
下会打印2遍。
最后,阿轩抱着所有情况都跑一遍的想法,把程序又放在了阿里云上一台centos8的机器上跑了一下,结果还是只打印一遍
。
至此,阿轩已经彻底懵逼了。。。
说来惭愧,一直到写这篇文章的时候,阿轩也没想清楚为什么会打印2遍,如果有读者朋友知道的话,欢迎添加阿轩的微信号,私聊告诉阿轩,不胜感激!
好了,上面讲中断
扯了很多,感觉这篇文章标题应该改成park那些你不知道的细节
,
cancelAcquire
我们接着看acquireQueued
里的最后一个方法,就是中断后调用的方法
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED;
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
大体流程很清晰,首先设置线程
为null,毕竟已经取消
了嘛,然后从当前节点开始往前遍历
,跳过
所有取消的节点,直到找到第一个未取消的节点
,然后将prev
指针指向他,注意,这里并没有将找到的前一个节点的next
指针指向自己。
然后设置节点状态为取消
接着判断当前节点是否是尾节点
,如果是,CAS
尝试将尾节点指向找到的那个节点,并将next
指向null
。
否则,判断找到的节点是否是头节点
,如果不是,尝试将他的waitStatus
设置为-1,并且判断他的线程不为空,如果都成立的话,判断自己的next节点
是否是取消节点
,不是就将找到的节点的next指向他。
这里思考几个问题。
如果自己的next节点是取消节点
呢,为什么不继续往后遍历
了呢?
其实原因也很简单,如果next节点是取消节点的话,那么他也会去看自己的next节点
是否是取消节点,这样一直递归下去
,一直到next节点是null
或者不是取消节点
为止。
再来思考一个问题
如果多个节点
同时中断取消
,会发生什么,会出现并发性
问题吗?

比如这种情况,为了简化,图中只画了prev
指针,省略了next
指针
现在,4和6节点发生中断取消,根据源码逻辑,他们会把prev
设置到3节点上,然后运行到状态判断的时候
(ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))
3节点也发生中断取消
了,并且在他们判断之前将waitStatus
设置成了1,那么4和6节点就会进入else
分支,执行unparkSuccessor
方法,而3节点成功通过判断
(pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null)
但是由于他的next节点已经取消
了
if (next != null && next.waitStatus <= 0)
判断失败,什么也不会做,将next指向自己
我们再看4和6节点进入unparkSuccessor
方法会做什么
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
因为节点已经取消
,会进入第二个if
,从后往前
,找到距离自己最近
的非取消节点
,也就是说,s节点的前一个节点要么是自己,要么是另一个取消节点,然后将这个节点唤醒
。
唤醒之后就会来到我们前面所说的acquireQueued
方法,循环一次之后会来到shouldParkAfterFailedAcquire
这个方法,回忆一下,因为他的前一个节点是取消节点
int ws = pred.waitStatus;
所以会走到第二个if
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
}
这里节点会从自身出发往前遍历
,找到第一个
未取消节点,将彼此的prev
和next
设置一下
前面我们提到同时唤醒了多个节点,那么在这里,多个节点,每个节点就会各自
负责一段距离,彼此互不干扰,重新构建一个新的链表,过滤掉取消的节点。是不是有点像之前说的ConcurrentHashMap
多线程协同扩容一样。
好了,回到我们刚刚所说的unparkSuccessor
方法
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
思考这么一个问题,为什么这里是从后往前
遍历,而不是从前往后
呢?
原因阿轩觉得有2点
,第一点,在前面的cancelAcquire
方法中
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
这里设置指针只设置了prev
,并没有设置next
,如果从前往后,那么,那些取消的节点会又遍历一遍
。
第二点,还记得,之前新增同步节点
的时候吗
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
他是先设置的prev
再设置的next
,所以,从前往后有可能少遍历
最新的节点。
所以,这里采用的是从后往前遍历,而不是从前往后遍历。
到这里,加锁的源码就剖析完了,我们下面看解锁
unlock
public void unlock() {
sync.release(1);
}
入口还是从ReentrantLock的unlock
看起,调用的是AQS的release
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease
方法也交给了子类去实现
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
这里先判断一下解锁的线程是否是占用锁
的线程,不是抛异常。所以,如果一个线程在没有获取到锁的情况下调用unlock
,是会抛异常
的。
然后判断一下释放之后state
是否等于0,因为是可重入锁
,完全释放了返回true,否则false。
如果队列中有正在排队
的节点,那么调用unparkSuccessor
方法唤醒
他,这个方法前面刚说过,就不再赘述了。
可以看出,解锁还是非常简单的,下面我们看下等待队列的休眠和唤醒
await
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
进来首先判断是否中断。
然后加入等待队列,调用addConditionWaiter
方法
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
这里就是一个简单的入队操作,需要注意的有2个地方,一个是如果尾部节点
被取消
了,那么调用unlinkCancelledWaiters
方法
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
这个方法其实可以演变成一道算法题:给你一个头节点
,每个节点有一个waitStatus
属性,让你把链表中waitStatus
不等于-2
的节点去掉重新
组成一个新的链表。
这里就是纯链表的操作,就不细说了。
另一个要注意的地方是,等待队列的节点,只有nextWaiter
指针,没有prev和next
指针。
接着往下来到fullyRelease
方法
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
这里将state
重置为0,然后调用release
唤醒头节点的下一个节点,前面说过了。
接着来到await
方法中最关键的部分,一个while循环
,循环的判断条件是!isOnSyncQueue(node)
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
return findNodeFromTail(node);
}
这个方法的作用是判断当前节点是否仍在同步队列
中,有3个判断
。
第一个,如果节点waitStatus
等于-2
,这个肯定是已经不在了,prev等于null
,因为同步队列的节点除了头节点
,其他节点prev
肯定不等于null,而此时这个节点是刚刚构建
的等待队列的节点,那么prev
肯定是等于null
的
那么为什么还会有这2个看起来完全没有必要
的判断呢?其实,这2个判断是为后面的另一处服务
的,马上会说到。
第二个,next
不为null,如果next
不为null,那么肯定已经在同步队列中了。
最后一个,调用findNodeFromTail
方法
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
这个方法就是把同步队列从后往前遍历
一遍,看看你究竟在不在,简单粗暴。
如果确定已经不在同步队列了,那么说明已经到了等待队列,接着就会调用park
挂起线程。
这里思考一个问题,如果把while (!isOnSyncQueue(node))
换成if
,行不行?
还记得我们之前说的,一个线程挂起之后有几种方法可以唤醒
吗?
再回顾一下,有3种
-
unpark -
中断 -
虚假唤醒
所以,这里如果发生虚假唤醒
,使用的是if
的话,程序就会继续往下面运行了,那么就会发生一些意想不到
的问题。
所以,防止虚假唤醒
的方法就是使用while循环
,即使不小心把你唤醒了,但是判断条件不满足
,你还是会再一次陷入沉睡

排除掉虚假唤醒
,这里存在中断
和signal唤醒
,看下checkInterruptWhileWaiting
方法
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
如果是正常唤醒
,那么返回0。
有朋友可能要问了,返回0再一次进入循环不是又挂起了吗?
非也,signal
那里会把waitStatus
设置为0,待会会看到。
我们看下如果发生了中断会怎么样,进入transferAfterCancelledWait
方法
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
首先CAS
设置waitStatus
为0,然后进入同步队列,返回true。
这里,可能有读者朋友会产生疑问,什么情况下会设置失败
呢?
是的,设想一下这种情况,另一个线程将该线程先signal
唤醒了,紧接着,又有另一个线程将该线程中断
了,此时waitStatus
已经被设置为0了,那么CAS
再设置就会失败。
然后来到下面的while循环
,因为signal
会将节点转移到同步队列,所以这里会判断是否已经在同步队列
了,此时我们刚刚所说的那2个看起来无用的条件就发挥作用了,如果还没有转移
过去,就让出CPU时间片
稍等一会,等转移好了就返回false。
这里有2个变量
/** Mode meaning to reinterrupt on exit from wait */
private static final int REINTERRUPT = 1;
/** Mode meaning to throw InterruptedException on exit from wait */
private static final int THROW_IE = -1;
1表示需要再次挂起
,-1表示需要抛出中断异常
。
如果是先signal
唤醒再中断,那么返回1,先中断再signal返回-1。
然后if条件不成立,break
跳出循环。
接着再次来到之前说过的acquireQueued
方法,这里需要注意的一点是,如果是正常唤醒
,但是在acquireQueued
方法因为中断
抢到锁返回了,那么需要将interruptMode
设置为1,让他再次挂起
。
接着判断node.nextWaiter != null
,如果是正常唤醒
的话,nextWaiter
会被置为null,但是如果是中断
的话,会把他之后的链带到同步队列,所以,这里需要将等待队列重新整理
一下。
最后来到reportInterruptAfterWait
方法,判断是要再次挂起
还是抛中断异常
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
await说完了,我们最后看下signal
方法
signal
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
如果不是持有锁的线程调用signal
,抛异常
。
如果等待队列有节点,唤醒
他
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
如果只有一个等待节点,将firstWaiter
和lastWaiter
置为null
,将第一个等待节点nextWaiter
置为null,解除链接
。
接着调用transferForSignal
方法开始转移节点
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
这里同样存在一个先唤醒还是先中断
的问题。
如果是先中断
了,那么CAS失败
,返回false,如果等待队列的第二个节点不为null的话,开始转移第二个节点
。
如果先唤醒
,那么调用enq
进入同步队列
,如果前一个节点是取消节点
,或者设置状态失败
,那么就唤醒他,会接着刚才说的await
方法的park
方法往下走。
所以,调用signal
方法不一定会唤醒线程
,如果一切正常,他会继续休眠
,直到前一个节点是头节点释放锁才会唤醒
他。
总结
本篇文章对AQS源码
的一些细节
地方进行了剖析,可以看出作者构思的精妙
,环环相扣,换成是我们来写的话,估计会有很多冗余
的代码,而且还不一定正确。
所以,平时没事的时候多看看源码
,学习一下大佬的编程技巧
,多思考思考,长此以往,代码水平肯定会提升
的。
好了,我是阿轩,目前也在不断的学习中,欢迎关注阿轩的公众号,也欢迎添加阿轩的个人微信号,一起交流,一起进步。
原文始发于微信公众号(程序员阿轩):AQS那些不为人知的细节
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/37028.html