阅读本文前,需要储备的知识点如下,点击链接直接跳转。
由于AQS源码分析篇幅较长,为避免阅读疲劳,特采用系列的形式分成了三篇,建议按顺序阅读。
-
AQS源码分析系列:(一)AQS基础知识 -
AQS源码分析系列:(二)AQS核心:加锁、释放锁、超时中断流程 -
AQS源码分析系列:(三)AQS锁的自定义和实现
基础知识熟悉之后,本篇我们来看一下AQS的核心:加锁、释放锁、超时、中断的逻辑和流程。
独占锁加锁流程
以ReentrantLock公平锁方式不带超时不可中断获取锁为例。 整体流程如下,先了解整体流程有助于我们理解,会涉及到子流程,流程图单独给出。主要获取锁代码如下,这也是调用获取锁的入口,逻辑看代码注释:
public final void acquire(int arg) {
/*
(1)tryAcquire方法由子类实现尝试获取锁的逻辑,
返回true就不走后面的判断,表示获取到了锁,返回false表示未获取到锁,走后续入队等待流程
(2)addWaiter方法是将当前线程封装成Node对象返回,里面也有关于入队的操作
(3)acquireQueued方法主要是先再尝试获取一次锁,
获取到了就返回是否被中断标识,获取不到则需要确认线程是否需要阻塞以及阻塞操作,
最终返回释放被中断标识
(4)selfInterrupt是将当前线程中断,因为LockSupport.park阻塞线程时是不会响应中断的,
但是通过Thread.interrupted()这个方法可以获取到当前线程是否被中断标识
*/
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这里tryAcquire(arg)尝试获取锁的方法由AQS子类实现,其余三个方法(acquireQueued、addWaiter、selfInterrupt)都是AQS来实现的,这也是个模板方法设计模式。 tryAcquire(arg)流程,尝试获取锁的具体实现逻辑。代码如下:
protected final boolean tryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取AQS的同步状态值state
int c = getState();
// state是0则表示没有线程持有锁,可以尝试去获取锁
if (c == 0) {
/*
(1)hasQueuedPredecessors方法判断队列里当前线程的Node之前是否还有其他Node,
返回true说明有其他线程也在等待,尝试获取锁失败,返回false说明前面没有线程等待,
可以继续执行逻辑,这里先判断了state=0没有直接cas操作而是再判断队列里是否有等待的线程,
充分体现了公平性
(2)如果compareAndSetState(0, acquires)也设置成功,则说明加锁成功,
将exclusiveOwnerThread设置成当前线程,返回true表示获取锁成功
*/
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
/*
这个else if逻辑主要就是可重入的判断和处理,
如果持有锁的线程是当前线程则state= state + acquires
*/
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
addWaiter(Node.EXCLUSIVE)流程,将线程包装成Node节点的逻辑,有入队排队的逻辑,返回包装的Node节点。代码如下:
private Node addWaiter(Node mode) {
// 将当前节点封装成Node对象
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
/*
(1)队列不为空的情况下,先尝试将node插入到队尾,
compareAndSetTail返回成功则说明node变成队列成功,直接返回,否则需要走入队流程
(2)主要是将当前node的prev指向原tail,原tail节点的next指向当前node上,
这样就完成了node的入队
*/
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 尝试直接插入队尾失败了就走入队逻辑
enq(node);
// 返回当前线程封装成的Node对象
return node;
}
private Node enq(final Node node) {
// 入队使用的for无限循环,是一个自旋的过程,直到成功
for (;;) {
Node t = tail;
/*
如果队尾tail为空,则说明队列还未初始化,先初始化head节点,然后tail也指向head,
完成初始化队列,虽然只有一个节点,但head和tail都有了指向
*/
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
/*
如果队尾tail不为空,则采用cas方式将当前node插入队尾,
成功则返回,否则一直自旋尝试直到成功
*/
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
线程阻塞逻辑,acquireQueued(final Node node, int arg)具体实现流程代码如下:
final boolean acquireQueued(final Node node, int arg) {
/*
failed变量表示获取锁是否失败,初始化为true表示失败,只有在获取到锁时failed为false,
为true时表示获取锁过程中异常,finally块里的判断是否需要取消当前这个线程获取锁的相关逻辑,
包括队列的调整以及后继Node里线程的唤醒
*/
boolean failed = true;
try {
/*
interrupted变量表示当前线程是否被中断的标识,true:线程被中断,false:线程未被中断,
这个方法整体返回的就是这个值,用来确定后续是否要调用selfInterrupt()方法中断当前线程
*/
boolean interrupted = false;
// for无限循环,自旋处理
for (;;) {
// 取当前节点的前一个节点
final Node p = node.predecessor();
// 如果前一个节点是head并且tryAcquire尝试获取到锁了,则将当前线程设置成head
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
/*
这里就是线程阻塞等待的核心了,尝试获取锁失败时,判断是否需要阻塞,
需要阻塞的话就调用LockSupport.park方法阻塞当前线程
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
/*
在不可中断模式下,failed的值始终会是false,因为虽然被中断了,
但是当前线程还是获取到锁了,走正常的后续处理逻辑,finally这里的逻辑就不会走了
*/
if (failed)
cancelAcquire(node);
}
}
尝试获取锁失败时是否需要阻塞当前线程判断流程,shouldParkAfterFailedAcquire(Node pred, Node node)逻辑
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
/*
当前线程的前一个节点的waitStatus状态是Node.SIGNAL,
则说明前一个线程如果获取到锁并且执行完成后释放了锁需要唤醒后续节点,
从另一个角度来说当前线程自然要阻塞等待了
*/
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
/*
当前线程的前一个节点的waitStatus状态是Node.CANCELLED时,说明前驱节点已经取消获取锁了
需要从当前节点一直向前查找知道节点没有被取消,
然后把找到的第一个没有被取消的节点的next指向当前节点,这样就把当前节点前取消状态的都删掉
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
前一个节点的waitStatus状态还是0,或者是共享锁的传播状态PROPAGATE时,
则会把前一个节点的waitStatus状态改成Node.SIGNAL
所以是后一个节点排队时把前一个节点waitStatus改成Node.SIGNAL,
表示前一个节点执行完释放锁了要走唤醒后续节点的逻辑,
依次类推,队列里只有最后一个Node节点的waitStatus是0,因为它没有后续节点,
也不需要执行唤醒操作,其余在没有被中断状态下应该都是Node.SIGNAL
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
/*
阻塞当前线程调的就是LockSupport.park,原理之前文章有讲过,这就是线程阻塞等待的核心实现了
线程被LockSupport.park了不会响应中断,
如果线程被中断了需要用Thread.interrupted()获取当前线程的中断标识
*/
LockSupport.park(this);
return Thread.interrupted();
}
独占锁释放锁流程
以ReentrantLock释放锁为例,释放锁不区分公平锁还是非公平锁,释放的逻辑是一样的,整体流程如下。release(int arg)这是AQS里定义的模板方法,主要释放锁代码如下,这也是调用释放锁的入口,逻辑看代码注释:
public final boolean release(int arg) {
// 尝试释放锁,由子类实现具体逻辑
if (tryRelease(arg)) {
Node h = head;
// 头节点不为null,并且waitStatus!=0,说明要唤醒后续节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
// 返回锁是否空闲标识,其实就是tryRelease(arg)的返回结果
return false;
}
tryRelease(int releases)是尝试释放锁的逻辑,AQS定义的方法,默认是抛异常,子类根据具体场景实现逻辑。以下是ReentrantLock的内部类Sync的具体实现,返回true表示现在锁空闲了,返回false表示锁现在还被占用。
protected final boolean tryRelease(int releases) {
// 计算释放releases后,新的state值
int c = getState() - releases;
// 如果当前释放锁的线程不是持有锁的线程直接抛异常,只有持有锁的线程才能释放锁
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
/*
如果释放releases后,新的state是0,那么说明锁就空闲了,将free标识赋值为true,
然后将exclusiveOwnerThread赋值为null
*/
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// 设置state新值,只有持有锁的线程才可操作,无需cas
setState(c);
return free;
}
unparkSuccessor(Node node) 这个方法就是关键的唤醒后续等待队列里的线程关键方法。通过调用LockSupport.unpark方法将阻塞的线程唤醒继续执行。
private void unparkSuccessor(Node node) {
// node是当前释放锁的线程,它的waitStatus如果<0就把他置成0
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
如果node的next节点是null或者取消了,则从队尾往前查找,一直找到node节点,
获得第一个未被取消的节点
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 找到第一个未被取消的节点,并唤醒线程,使其继续执行
if (s != null)
LockSupport.unpark(s.thread);
}
这里有一个比较关键的地方,如果node的next节点是null或者取消状态,则从队尾往前查找,一直找到node节点,为什么会从后往前遍历? 这里考虑了并发的场景,从后往前不会导致node丢失,具体我们可以从addWaiter方法看。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
这里的第6、7、8行就是关键了,先设置prev节点,这样就保证了所有的节点都有前驱节点,第7、8这两行没有保证原子操作,如果cas成功了,但是刚好cpu时间片切换,第8行未执行,那么pred的next就是空了,所以从前往后可能会漏节点,从后往前是完整的队列,举个栗子:
-
假如释放锁的线程是tail尾节点,刚好unparkSuccessor时,执行到node.next为空的判断之前,cpu时间片切换了。 -
有个线程调用了addWaiter方法,把新node的prev指向了tail,cas设置尾节点也成功了,就在这儿cpu又切换了,那么原tail节点的next还没有设置。 -
cpu再切回到unparkSuccessor的node.next为空判断时,这时候他的next是null(因为next指针还没有指向新node节点),实际上后面还有一个node节点,这样就会漏掉节点数据了。 如果从后往前的话,每一个node的前驱肯定是有值的,但是高并发情况下不能保证每一个node的后继节点也能及时连接上。所以从后往前就确保了能遍历到每一个节点。 也就是从等待队列里阻塞的方法恢复执行,返回线程是否中断标识,然后再继续尝试获取锁。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
到这里,基本上已经把独占锁的获取锁和释放锁的流程和逻辑都讲完了,AQS基本已经把大部分的核心功能帮我们写好了,我们只用去写或利用他已有的方法,实现我们自己的逻辑即可,就比如以上讲到的独占锁的获取和释放,其实我们自己仅仅具体实现了tryAcquire(int acquires)、tryRelease(int releases)这两个方法,花了大篇幅讲的都是AQS的流程和逻辑,由此,真正的感受到了AQS的巧妙设计。
超时&中断处理
理解了上面的独占锁的加锁流程,对于超时和中断处理的理解就很容易了,这两种其实都有线程中断抛出异常逻辑,另外将带超时时间获取锁和可响应中断获取锁这两种方式关于获取结果交给开发人员自行处理,既体现了设计的灵活性也可让开发人员根据具体业务场景具体处理,还是以ReentrantLock来讲解。
超时
关于超时,就是在指定的时间内未获取到锁就返回获取失败,在指定的时间内获取到了锁返回成功,有两种,一个是尝试获取,例如:tryLock(),不管有没有获取到立即返回,相当于超时是0,另一种是指定超时时间,如果指定时间未获取到锁就返回false,例如:tryLock(long timeout, TimeUnit unit),下面详细讲解下。
-
tryLock()
public boolean tryLock() {
// 入口方法,是以非公平方式尝试获取锁,返回true:获取成功,false:获取失败
return sync.nonfairTryAcquire(1);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// state值是0时,表示暂时锁空闲,尝试cas赋值,也可以理解成尝试加锁
if (c == 0) {
// cas成功,则说明加锁成功,设置当前线程为持有锁的线程,返回true:获取成功
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 当前线程如果是持有锁的线程,可重入,判断并设置state=state+acquires,返回true:获取成功
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 尝试没有获取到锁,当前线程也不是持有锁的线程,直接返回false:获取失败
return false;
}
tryLock()的实现逻辑还是挺简单了,不带超时相关设置,相当于超时时间是0,要么立即成功,要么立即失败,不涉及复杂的入队、阻塞、唤醒、取消相关逻辑。单纯的看state=0说明空闲cas成功则立即获取锁,或者持有锁的线程是当前线程,这样就可重入,获取锁成功,其他情况均尝试获取锁失败,直接返回。
-
tryLock(long timeout, TimeUnit unit)
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
/*
主入口方法,带超时时间尝试获取锁,获取到返回true,未获取到返回false,
注意还有可能抛出被中断异常InterruptedException
*/
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 判断如果线程被中断,则抛异常
if (Thread.interrupted())
throw new InterruptedException();
//还是先尝试获取锁,获取成功则返回true,获取失败执行后面的doAcquireNanos方法,带超时等待
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
/**
* 这个方法就是带超时等待获取锁的核心实现,
* 大体流程上跟acquireQueued(final Node node, int arg)这个方法差不多
* 逻辑里调用了相同的方法的就不再详细阐述了,只说不同的核心关键逻辑
*/
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
// 先入队列
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
// 如果头节点是head并且尝试获取锁成功则返回true
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
// 方法执行到这里已经超时了,直接返回false
if (nanosTimeout <= 0L)
return false;
/*
以下的逻辑是关键实现超时返回的逻辑
先判断是否需要阻塞,再判断超时时间是否大于1000纳秒即0.001 毫秒,
这个时间可以说非常短了,但对于高速CPU来说还是需要一定的时间,
如果这两个条件都成功,则阻塞,否则自旋
阻塞调用的是LockSupport.parkNanos(this, nanosTimeout);精确到纳秒级的阻塞,
并且第一个参数是this,表明了这个线程具体阻塞在哪个对象上,通过jstat可查看到
*/
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
// 判断如果线程被中断,则抛异常
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
/*
这里可能会走,虽然LockSupport.parkNanos不响应中断,
但是最后的逻辑判断了当前线程是否中断的标识,如果中断了则会抛InterruptedException异常,
那么failed变量的值还是true,需要走取消的逻辑,将当前线程的Node从队列去掉相关逻辑处理
*/
if (failed)
cancelAcquire(node);
}
}
中断
上文已经说过了,如果线程进入等待队列并且阻塞了,那么它是不会响应中断的,虽然阻塞队列不响应中断,但是被唤醒后,线程的中断标识是可以获取到的,所以可以通过该标识来处理是否需要主动抛异常中断处理。
需要注意中断并不是实时感知的,虽然被中断了如果没有被唤醒,还是需要继续等待,直到被唤醒后,获取中断标识来做处理。
我们还是以ReentrantLock为例,lockInterruptibly()这个就是可以响应中断的方法。
public void lockInterruptibly() throws InterruptedException {
// sync这个对象继承了AbstractQueuedSynchronizer,这里直接调用的是AQS的方法了。
sync.acquireInterruptibly(1);
}
public final void acquireInterruptibly(int arg)
throws InterruptedException {
// 先判断下如果线程已经被中断了,直接抛出InterruptedException异常
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
// 尝试获取锁没有成功时,才进入可响应中断获取锁的方法里
doAcquireInterruptibly(arg);
}
/**
* 这个方法就是获取锁时可响应中断核心实现,
* 大体流程上跟tryLock(long timeout, TimeUnit unit)这个方法差不多
* 逻辑里调用了相同的方法的就不再详细阐述了,只说不同的核心关键逻辑
*/
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
/*
主要的处理就在这里了,判断需要阻塞并且阻塞被唤醒后,
如果中断标识为true则抛出InterruptedException异常
*/
throw new InterruptedException();
}
} finally {
/*
这里可能会走,如果线程被中断了,抛出InterruptedException异常后,failed变量还是true
需要走取消的逻辑,将当前线程的Node从队列去掉相关逻辑处理
*/
if (failed)
cancelAcquire(node);
}
}
欢迎关注公众号,欢迎分享、点赞、在看
原文始发于微信公众号(小新成长之路):AQS源码分析系列:(二)AQS加锁、释放锁、超时中断流程
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/238553.html