AQS 源码解析
基本介绍
AbstractQueuedSynchronizer
的核心思想是提供了一个同步队列,将未获取到锁的线程阻塞排队。
主要用于实现各种的同步机制,如 锁、同步队列等。
核心思想 是 使用双向队列管理等待获取同步资源的线程,内部含有state表示 同步资源的可用性,state = 0 ,表示可用,state > 0 ,表示资源被占用。具体的子类可以继承AQS 重写抽象方法,从而实现不同的同步机制。支持条件变量,可以通过条件变量来实现线程的等待和 唤醒操作。
CLH队列
基于链表的自旋锁队列,用于实现互斥锁。CLH代表”Craig, Landin, and Hagersten”。
CLH队列的特点是使用链表节点来表示每个线程的等待状态,并通过自旋的方式实现线程之间的协调。每个线程在等待锁时,会创建一个节点,并将其添加到队列的尾部。当线程需要获取锁时,它会自旋等待前一个线程释放锁。
CLH队列的基本思想是
使用一个虚拟的前驱节点(dummy node)来表示队列的头部,而每个线程的节点则作为队列中的一个元素
。每个节点包含一个boolean类型的locked字段,用于表示线程是否持有锁。当线程需要获取锁时,它会自旋等待前一个节点的locked字段变为false,表示前一个线程已经释放了锁。CLH队列的优点是避免了线程之间的竞争,因为每个线程只需要关注前一个线程的状态,而不需要与其他线程直接竞争锁。这种设计可以减少锁竞争带来的性能开销,并提高系统的并发性能。
需要注意的是,CLH队列是一种公平的锁队列,即线程获取锁的顺序与它们加入队列的顺序相同。这保证了线程按照先来先服务的原则获取锁,避免了线程饥饿现象。
CLH队列是一种基于链表的自旋锁队列,通过自旋等待前一个线程释放锁来实现线程之间的协调,从而提高并发性能和公平性。
Node节点
static final class Node {
/** 节点正在共享模式下等待的标记 */
static final Node SHARED = new Node();
/** 节点正在以独占模式等待的标记*/
static final Node EXCLUSIVE = null;
// waitStatus 值
/* 线程被取消了 */
static final int CANCELLED = 1;
/* 释放资源后需唤醒后继节点 */
static final int SIGNAL = -1;
/* 等待condition唤醒 */
static final int CONDITION = -2;
/* 工作于 共享锁 状态,需要向后传播,比如根据资源是否剩余,唤醒后继节点 */
static final int PROPAGATE = -3;
/**
* Status 字段,仅取以下值:
SIGNAL:该节点的后继节点被(或即将)被阻塞(通过 Park),因此当前节点在释放或取消时必须取消其后继节点的 Park 状态。为了避免竞争,获取方法必须首先指示它们需要信号,然后重试原子获取,然后在失败时阻塞。
CANCELLED:该节点因超时或中断而被取消。节点永远不会离开此状态。特别是,具有取消节点的线程永远不会再次阻塞。
CONDITION:该节点当前位于条件队列中。在传输之前,它不会用作同步队列节点,此时状态将设置为 0。
PROPAGATE:A releaseShared 应该传播到其他节点。
0:以上都不是 这些值按数字排列以简化使用。非负值意味着节点不需要发出信号。因此,大多数代码不需要检查特定值,只需检查符号即可。对于正常同步节点,该字段初始化为 0,对于条件节点,该字段初始化为 CONDITION。
*/
volatile int waitStatus;
/**
* 前驱节点
*/
volatile Node prev;
/**
* 后继节点
*/
volatile Node next;
/**
* 等待锁的线程
*/
volatile Thread thread;
/**
* 等待条件的下一个节点,ConditonObject中用到
*/
Node nextWaiter;
/**
* 如果节点在共享模式下等待,则返回 true。
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* 返回前一个节点,如果为 null,则抛出 NullPointerException。当前驱不能为空时使用。
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
--- 构造器
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
waitStatus
的说明:
- CANCELLED =1 线程被取消了
- SIGNAL =-1 释放资源后需唤醒后继节点
- CONDITION = -2 等待condition唤醒
- PROPAGATE = -3 (共享锁)状态需要向后传播
- 0 初始状态,正常状态
AQS—DS
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
private static final long serialVersionUID = 7373984972572414691L;
protected AbstractQueuedSynchronizer() { }
/**
要排队到 CLH 锁中,您可以原子地将其拼接为新的尾部。要出队,只需设置头字段即可。
* +------+ prev +-----+ +-----+
* head | | <---- | | <---- | | tail
* +------+ +-----+ +-----+
等待队列(也叫CLH队列,同步队列)的头节点 可以理解为 持有锁的节点
*/
private transient volatile Node head;
/**
* 等待队列(也叫CLH队列,同步队列)的尾节点,每个新的节点进来,都插入到最后,也就形成了一个链表
*/
private transient volatile Node tail;
/**
* 同步状态
* 代表当前锁的状态,0代表没有被占用,大于 0 代表有线程持有当前锁.这个值可以大于 1,是因为锁可以重入,每次重入都加上 1
*/
private volatile int state;
/**
* 返回同步状态的当前值
*/
protected final int getState() {
return state;
}
/**
*设置同步状态的当前值
*/
protected final void setState(int newState) {
state = newState;
}
/**
* 如果当前状态值等于预期值,则自动将同步状态设置为给定的更新值
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
...
}
AQS的加锁
独占锁
acquire
// 以独占模式获取,忽略中断。通过调用至少一次tryAcquire并在成功时返回来实现。
// 否则,线程将排队,可能会重复阻塞和解除阻塞,调用tryAcquire直到成功。该方法可用于实现Lock.lock方法。
// 该方法可用于实现Lock.lock方法。
public final void acquire(int arg) {
if (!tryAcquire(arg) // 尝试获取锁
&& acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 如果未获取到, 则加入队列
//设置线程中断标志
selfInterrupt();
}
尝试获取锁
tryAcquire
,但是要是获取锁失败,执行addWaiter
,将当前线程封装为 Node ,设置为尾节点,再执行acquireQueued
,将当前节点加到同步队列中
tryAcquire
// 尝试以独占模式获取。此方法应查询对象的状态是否 允许以独占模式 获取它,如果允许 则获取它
// 该方法始终由执行 acquire 的线程调用。
// 如果此方法报告失败,则 acquire 方法可能会将线程排队(如果尚未排队),直到收到其他线程的释放信号为止。
// 这可用于实现方法Lock.tryLock() 。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
本质是一个钩子方法,子类可以对其进行具体实现
addWaiter
添加等待者(waiter)的方法。它的作用是在一个链表中添加一个新的节点,表示当前线程在等待某个特定模式(mode)的资源。
//为当前线程 和 给定模式创建节点并将其排队。
private Node addWaiter(Node mode) {
// mode 是 独占模式 node 表示 该节点包含当前线程 和 指定的模式。
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 将当前节点设置为尾节点
Node pred = tail;
// 要是 tail 不是 null , 表示 存在尾部
if (pred != null) {
// 把 node 插入队尾,设置为 尾节点
node.prev = pred;
// 多线程下 可能有别的线程插入,重新判断检查 tail
if (compareAndSetTail(pred, node)) {
// 设置之前尾结点下一节点,指向当前尾结点, 返回尾结点
pred.next = node;
return node;
}
}
//如果尾结点是null, 或者 CAS 操作失败,进行自旋 enq() 加入尾结点
enq(node);
return node;
}
enq
// 将节点插入队列,必要时进行初始化
private Node enq(final Node node) {
// 自旋
for (;;) {
Node t = tail;
// tail == null 或者 可能刚好有其他线程插入,会导致之前的判断失效,所以重新判断tail是否为空
if (t == null) { // Must initialize
//尾节点是null,表示队列 中没有节点。则 CAS 初始化队列的头节点 head,头结点是空节点, tail = head
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 设置当前节点前一节点为 tail
node.prev = t;
// CAS 操作设置当前节点为 尾结点
if (compareAndSetTail(t, node)) {
// 设置之前尾结点指向当前节点
t.next = node;
return t;
}
}
}
}
- 如果尾结点不为空,则通过 CAS 尝试将当前节点设置为尾结点,返回当前节点。
- 如果 CAS 执行失败 或者 为节点为 空,先封装成一个 Node,则通过 enq(node) 进行自旋尝试加入队尾或者进行初始化。
上述的流程
acquireQueued
acquireQueued 何时结束:
- return 的时候才会结束: 获取到锁的时候return 才会结束
- 异常终止:
tryAcquire
异常报错; 重入锁的时候state
是 int ,所以可能有 整数溢出 的时候报错
思路:
- 通过
acquireQueued
将当前节点加入队尾,并设置阻塞、自旋——如果当前节点的前驱节点 是头结点(head 节点不排队,只记录状态,head 的后驱节点才是真正第一个排队的),则再次尝试 tryAcquire() 获取锁。- 自旋的跳出条件是当前节点是队列中第一个,并且获取锁。
- 如果一直自旋,则会消耗 CPU 资源,所以会用
shouldParkAfterFailedAcquire
判断是否需要将当前线程阻塞,如果是则通过parkAndCheckInterrupt
阻塞线程的运行(LockSupport.park() 是通过 native 方法 Unsafe.park() 实现的线程阻塞)
// 以独占不间断模式获取已在队列中的线程。由条件等待方法和获取方法使用。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// p 是前一个节点
final Node p = node.predecessor();
/* 返回前一个节点,如果为 null,则抛出 NullPointerException。当前驱不能为空时使用。
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
*/
// 如果 p 是头节点, 再尝试获取锁
if (p == head && tryAcquire(arg)) {
//获取成功,设置当前node节点为头节点
setHead(node);
/*
将队列头设置为节点,从而出队。
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
*/
p.next = null; // help GC
failed = false;
return interrupted;
}
// 检查是否需要线程阻塞等待,如果前一个不是待唤醒,则自旋
// 如果前一个待唤醒,则当前线程也阻塞等待
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire
如果前置节点是SIGNAL状态,就挂起,返回true。
如果前置节点状态为CANCELLED,就一直往前找,直到找到最近的一个处于正常等待状态的节点,并排在它后面,返回false;acquireQueed() 继续自旋尝试
//检查并更新 获取失败 的节点的状态。如果线程应该阻塞则返回 true
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 前驱节点的状态是SIGNAL,说明前驱节点释放资源后会通知自己,此时当前节点可以安全的park(),因此返回true
if (ws == Node.SIGNAL) // -1
return true;
//前驱节点的状态是CANCLLED,说明前置节点已经放弃获取资源了
//此时一直往前找,直到找到最近的一个处于 正常等待状态的节点 ,并把 node 排在这个等待节点的后面,返回false
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//前驱节点的状态是0或PROPGATE,则利用CAS将前置节点的状态置为SIGNAL,让它释放资源后通知自己
//如果前置节点刚释放资源,状态就不是SIGNAL了,这时就会失败
// 返回false
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt
// park 之后 检查是否中断
private final boolean parkAndCheckInterrupt() {
// // 阻塞线程
LockSupport.park(this);
return Thread.interrupted();
}
只有
node
的前驱节点 是head 节点
才能获取锁,就是说 同步队列中的节点获取锁的顺序是从前(头结点)开始的
cancelAcquire
去除失败的节点
// 取消正在进行的获取尝试
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null; // 代码将节点的线程引用置为null,表示该节点的线程已经被取消。
// 跳过已取消的前驱节点
Node pred = node.prev;
//使用一个循环,将前驱节点的waitStatus 是取消的 的节点都跳过,将node.prev指向最新的未取消的前驱节点
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 获取前驱节点的下一个节点predNext。
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
//将当前节点的waitStatus设为CANCELLED,表示当前节点已取消。
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
// 如果当前节点是尾节点,并且成功通过CAS操作将尾节点指向前驱节点pred
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null); // CAS操作将前驱节点的next指针设置为null,从而移除当前节点
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
//如果前驱节点不是头节点,并且前驱节点的waitStatus为SIGNAL或者小于等于0(表示需要唤醒后继节点),并且前驱节点的线程不为null
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL)))
&& pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next); //则尝试将前驱节点的下一个节点predNext设为当前节点的下一个节点。
} else {
unparkSuccessor(node); // 唤醒后继节点
}
node.next = node; // help GC 帮助GC回收节点
}
}
unparkSuccessor
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
// 判断节点的等待状态(waitStatus)。
// 如果等待状态小于0,表示可能需要发出信号(signal),则尝试将等待状态清零,以便进行信号的预期清除。
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// 获取后继节点 s。
// 通常情况下,后继节点就是当前节点的下一个节点。
Node s = node.next;
// 如果后继节点被取消或者为空,
if (s == null || s.waitStatus > 0) {
s = null;
// 则需要从尾部向前遍历链表,找到实际上未被取消的后继节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//如果找到了非空的后继节点(s),则调用LockSupport.unpark方法,唤醒该节点对应的线程
if (s != null)
LockSupport.unpark(s.thread);
}
selfInterrupt
/**
* 中断当前线程
*/
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
acquireInterruptibly
// 以独占模式获取,如果中断则中止。
// 通过首先检查中断状态,然后调用至少一次tryAcquire并在成功时返回来实现。
// 否则,线程将排队,可能会重复阻塞和解除阻塞,调用tryAcquire直到成功或线程被中断。
// 该方法可用于实现Lock.lockInterruptibly方法。
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
doAcquireInterruptibly
//以独占可中断模式获取。
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE); //将当前线程添加到等待队列中,并创建一个节点 node
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor(); //获取当前节点的前驱节点 p
//前驱节点 p 是头节点,并且当前线程成功获取锁(通过 tryAcquire() 方法),
if (p == head && tryAcquire(arg)) {
setHead(node); //则将当前节点设置为头节点,
p.next = null; // help GC //并将前驱节点的 next 指针设置为 null,帮助垃圾回收
failed = false;
return;
}
//如果无法获取锁,则判断是否应该在获取锁失败后进行阻塞(通过 shouldParkAfterFailedAcquire() 方法判断)。如果需要阻塞,并且在阻塞过程中被中断,则抛出 InterruptedException 异常。如果没有获取到锁并且没有被中断,则继续下一轮循环。
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
//最后,在 finally 块中,如果获取锁失败,则取消当前节点的获取操作。
if (failed)
cancelAcquire(node);
}
}
tryAcquireNanos
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);
}
doAcquireNanos
// 独占模式下在规定时间内获取锁, 只工作于独占模式,自旋获取资源超时后则返回false;如果有必要挂起且未超时则挂起。
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
//计算截止时间
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
//获取锁成功后,出队
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return true;
}
//重新计算超时时间
nanosTimeout = deadline - System.nanoTime();
//超时则返回false
if (nanosTimeout <= 0L)
return false;
//否则判断是否需要被阻塞,阻塞规定时间
//static final long spinForTimeoutThreshold = 1000L; 旋转比使用定时停车更快的纳秒数。粗略估计足以通过非常短的超时来提高响应能力。
if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
共享锁
// 以共享模式获取,忽略中断。
// 通过首先调用至少一次tryAcquireShared并在成功时返回来实现。
// 否则 tryAcquireShared(arg) < 0 ,线程将排队,可能会重复阻塞和解除阻塞,调用tryAcquireShared直到成功。
public final void acquireShared(int arg) {
// 尝试直接去获取锁,如果成功,acquireShared(int)就结束了
// 否则,调用doAcquireShared(Node)将线程加入等待队列,直到获取到资源为止。
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
//尝试直接去获取资源
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
// 以共享不间断模式获取
// 实现上和acquire()方法差不多,就是多判断了是否还有剩余资源,唤醒后继节点。
private void doAcquireShared(int arg) {
//将线程加入等待队列,设置为共享模式
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
// 自旋尝试获取资源
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
//设置头节点,且如果还有剩余资源,唤醒后继节点获取资源
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//是否需要被挂起
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
setHeadAndPropagate
/*设置队列头,并检查后继者是否可能在共享模式下等待,如果是,则在设置传播 > 0 或 PROPAGATE 状态时进行传播。 */
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // 它将当前的头节点保存在变量h中,以便后面的检查使用
setHead(node);
/*
*如果满足以下条件,请尝试向下一个排队节点发出信号:
传播由调用者指示,或者由先前操作记录(在 setHead 之前或之后作为 h.waitStatus)(注意:这使用 waitStatus 的符号检查,因为 PROPAGATE 状态可能会转换为 SIGNAL。 ) 并且下一个节点正在以共享模式等待,
或者我们不知道,因为它出现 null 这两个检查中的保守性可能会导致不必要的唤醒,但只有当有多个 racing acquiresreleases 时,所以现在大多数需要信号或者无论如何很快。
*/
// 如果传入的propagate参数大于0,或者变量h为null,或者变量h的waitStatus小于0,或者再次获取head节点时发现h为null或者h的waitStatus小于0
//那么就满足发出信号的条件。这些条件的目的是确保只有在需要时才会发出信号,以避免不必要的唤醒。
if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
// // 如果满足发出信号的条件,获取当前节点的下一个节点s,
Node s = node.next;
// 并检查s是否为null或者共享模式
if (s == null || s.isShared())
// 如果s为null或者共享模式,那么调用doReleaseShared方法来释放共享锁。
doReleaseShared();
}
}
// 在共享模式下释放锁,确保在释放锁时,锁的释放能够传播给后继节点
private void doReleaseShared() {
/* 确保发布能够传播,即使还有其他正在进行的获取发布。如果需要信号,则以尝试取消头部后继者的通常方式进行。但如果没有,状态将设置为 PROPAGATE 以确保发布后继续传播。此外,我们必须循环,以防在执行此操作时添加新节点。另外,与 unparkSuccessor 的其他用途不同,我们需要知道 CAS 重置状态是否失败,如果失败则重新检查。*/
for (;;) {
Node h = head; // 会获取头节点h
// 在循环中,首先判断头节点h是否存在且不是尾节点
if (h != null && h != tail) {
// 如果满足条件,则继续判断头节点的等待状态ws
int ws = h.waitStatus;
// 如果等待状态ws为SIGNAL,
if (ws == Node.SIGNAL) {
// 则尝试将等待状态设置为0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // 如果设置失败,则继续循环
// 如果设置成功,则唤醒后继节点。
unparkSuccessor(h);
}
//如果等待状态ws为0,并且尝试将等待状态设置为PROPAGATE,如果设置失败,则继续循环。
else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果头节点h没有发生改变,则跳出循环。
if (h == head) // loop if head changed
break;
}
}
共享和独占的区别
// 以共享模式获取,如果中断则中止。
// 通过首先检查中断状态,然后调用至少一次tryAcquireShared并在成功时返回来实现。
// 否则,线程将排队,可能会重复阻塞和解除阻塞,调用tryAcquireShared直到成功或线程被中断。
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
//以共享可中断模式获取。
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
AQS 解锁
独占锁
release
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // 获取 后续节点
return true;
}
return false;
}
tryRelease
//尝试设置状态以反映独占模式下的发布。 该方法始终由执行释放的线程调用
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
共享锁
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
//尝试设置状态以反映共享模式下的发布。 该方法始终由执行释放的线程调用。
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
分界线
上面的是同步队列,用来竞争锁的
下面的是条件队列,用来释放锁 并 等待的队列
ConditionObject
DS
// AbstractQueuedSynchronizer的条件实现作为Lock实现的基础。
// 该类是可序列化的,但所有字段都是瞬态的,因此反序列化条件没有等待者
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** 条件队列的第一个节点。 */
private transient Node firstWaiter;
/** 条件队列的最后一个节点 */
private transient Node lastWaiter;
//发生了中断,但在后续不抛出中断异常,而是“补上”这次中断
private static final int REINTERRUPT = 1;
//发生了中断,且在后续需要抛出中断异常
private static final int THROW_IE = -1;
public ConditionObject() { }
...
}
await
await():当前线程处于阻塞状态,直到调用signal()或中断才能被唤醒。
1)将当前线程封装成node且等待状态为CONDITION。
2)释放当前线程持有的所有资源,让下一个线程能获取资源。
3)加入到条件队列后,则阻塞当前线程,等待被唤醒。
4)while 循环中
: 如果是因signal被唤醒,则节点会从条件队列转移到等待队列;如果是因中断被唤醒,则记录中断状态,此时直接break。两种情况都会跳出循环。
5)若是因signal被唤醒, 尝试获取锁;或者清除其他取消状态的线程。并报告中断,后续处理中断异常。
/*
实现可中断条件等待。
1. 如果当前线程被中断,则抛出InterruptedException。
2. 保存getState返回的锁定状态。
3. 使用保存的状态作为参数调用release ,如果失败则抛出IllegalMonitorStateException。
4. 阻塞直到收到信号或中断。
5. 通过以保存的状态作为参数调用专门版本的acquire来重新获取。
6. 如果在步骤 4 中阻塞时被中断,则抛出 InterruptedException
*/
public final void await() throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException(); // 如果当前线程被中断,则抛出 InterruptedException。
Node node = addConditionWaiter(); // 创建一个新的Node对象,并将其添加到等待队列中
int savedState = fullyRelease(node); // 释放当前线程持有的锁,并保存当前状态。
int interruptMode = 0;
// 如果当前线程不在等待队列中,说明此时一定在条件队列里,将其阻塞。
while (!isOnSyncQueue(node)) { //检查当前线程是否已经被加入到同步队列中
// 如果没有加入同步队列,则调用LockSupport.park()方法使当前线程进入等待状态,直到被唤醒。
LockSupport.park(this);
// 检查当前线程是否被中断,如果是则记录中断模式,并跳出循环
/*
线程从条件队列被 唤醒后,线程要从条件队列移除,进入到同步等待队列
被 唤醒的两种情况;一是条件满足,收到singal信号,二是线程被取消(中断),
如果是被中断,需要根据不同模式,处理中断,
处理中断,也有两种方式:
1、设置中断位,人工处理;
2、直接抛出InterruptedException异常
1、先singal,再Interrupt:因为singal唤醒之后已经进入AQS(已经跳过了await流程,进入获取锁的流程了),此时是进入获取锁的流程,要等到获取锁成功,才会报错(这是acquire中断的语义)
2、先Interrupt再singal,直接抛出异常:因为await期间中断会报错
*/
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 检查是否成功获取了一个节点,并根据中断模式的值更新中断模式
// 尝试获取锁
//当前线程执行了signal方法会经过这个,也就是重新将当前线程加入同步队列中
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT; // 1
// 检查是否有其他等待线程被取消,如果有则清除它们
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 根据中断模式报告中断情况
if (interruptMode != 0)
// 如果是THROW_IE,说明signal之前发生中断
// 如果是REINTERRUPT,signal之后中断,
// 所以成功获取资源后会调用selfInterrupt()
reportInterruptAfterWait(interruptMode);
}
addConditionWaiter
t == null
t != null
/* 将新 waiter 添加到等待队列 */
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
// 判断队尾元素,如果非条件等待状态则清理出去
if (t != null && t.waitStatus != Node.CONDITION) { // CONDITION = -2
unlinkCancelledWaiters();
// 可能t之前引用的节点被删除了,所以要重新引用
t = lastWaiter;
}
// 这个节点就表示当前线程
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 说明条件按队列中没有元素
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
unlinkCancelledWaiters
// 从条件队列中取消已取消的等待节点的链接。仅在持有锁时调用。
// 当条件等待期间发生取消时,以及当发现 lastWaiter 已被取消时插入新的等待者时,将调用此函数。
// 需要此方法来避免在没有信号的情况下保留垃圾。
// 即使它可能需要完全遍历,但只有在没有信号的情况下发生超时或取消时,它才会发挥作用。
// 它遍历所有节点,而不是在特定目标处停止以取消指向垃圾节点的所有指针的链接,而无需在取消风暴期间进行多次重新遍历
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
// 记录在循环中上一个waitStatus有效的节点
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) { // -2
t.nextWaiter = null;
// 确保当前节点无效后删除引用
if (trail == null)
firstWaiter = next;
else
// 否则就直接加到队尾的后面
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
// 记录有效的节点
trail = t;
// 向后遍历
t = next;
}
}
fullyRelease
// 使用当前状态 调用释放;返回保存的状态。取消节点并在失败时引发异常
// 尽可能完全地释放资源,并在必要时抛出异常。
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState(); // 尝试获取当前状态
if (release(savedState)) { // 尝试释放节点
failed = false;
return savedState;
} else { // 如果释放失败
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
isOnSyncQueue
//如果节点(始终是最初放置在条件队列中的节点)现在正在等待重新获取同步队列,则返回 true。
final boolean isOnSyncQueue(Node node) {
// 检查node.waitStatus是否等于Node.CONDITION,如果是,则返回false。这表示节点是在条件队列上而不是同步队列上。
// 检查node.prev是否为null,如果是,则返回false。这表示节点是头节点,因为头节点的prev为null,头节点不在同步队列上。
if (node.waitStatus == Node.CONDITION || node.prev == null) // == -2
return false;
// 它检查node.next是否为null,如果不是,则返回true。这表示节点有后继节点,因为只有在同步队列上的节点才会有后继节点。
if (node.next != null) // If has successor, it must be on queue
return true;
/*
node.prev 可以为非 null,但尚未放入队列,因为将其放入队列的 CAS 可能会失败。
所以我们必须从尾部开始遍历以确保它确实成功了。
在调用此方法时,它始终位于尾部附近,除非 CAS 失败(这不太可能),否则它会在那里,因此我们几乎不会遍历太多。
*/
return findNodeFromTail(node);
}
findNodeFromTail
从队尾开始遍历同步队列,以确保节点是否在同步队列上。
// 如果通过从尾部向后搜索节点位于同步队列上,则返回 true。仅在 isOnSyncQueue 需要时调用。
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
checkInterruptWhileWaiting
// 检查中断,如果在调用signal()(发出信号)之前中断则返回 THROW_IE,如果在调用signal()(发出信号)之后则返回 REINTERRUPT,如果没有中断则返回 0。
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
transferAfterCancelledWait
// 线程是否因为中断从park中唤醒。
// 在取消等待状态后,将节点添加到同步队列中,并返回是否成功将节点添加到队列的结果。
final boolean transferAfterCancelledWait(Node node) {
// 尝试将节点的等待状态(waitStatus)从CONDITION设置为0(即表示等待状态结束)
// // 如果无法将等待状态设置为0,说明在设置等待状态时被其他线程先行唤醒,此时需要等待其他线程完成enq操作。
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
//如果成功将等待状态设置为0,则将该节点添加到同步队列中,并返回true。
enq(node);
// true表示中断先于signal发生
return true;
}
/*
*如果中断在 signal() 之后,那么在它完成 enq() 之前我们无法继续。在不完整的传输期间取消既罕见又短暂,因此只需旋转即可。
*/
// 在等待期间,方法会通过循环调用Thread.yield()方法来让出CPU的使用权,以便其他线程有机会执行。当节点成功进入同步队列后,返回false
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
reportInterruptAfterWait
// 抛出 InterruptedException、重新中断当前线程或不执行任何操作,具体取决于模式
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
awaitNanos
总的来说和
await
流程大同小异,就是多了些时间的判断,没到时间就继续挂起,到了就调用方法:transferAfterCancelledWait
自动唤醒
public final long awaitNanos(long nanosTimeout) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
// 采用自旋的方式检查是否已在等待队列当中
while (!isOnSyncQueue(node)) {
// 如果挂起超过一定的时间,则退出
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
// 继续挂起线程
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
// //采用自旋的方式竞争锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}
transferAfterCancelledWait
final boolean transferAfterCancelledWait(Node node) {
// 使用compareAndSetWaitStatus方法将节点的等待状态设置为0(即未等待)
//如果compareAndSetWaitStatus方法设置失败,意味着当前节点已经被其他线程发出了信号(signal()),当前线程无法继续执行。
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
// 如果设置成功,表示当前线程可以继续执行,它会调用enq方法将节点添加到等待队列中,并返回true。
enq(node);
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
//进入循环,不断调用Thread.yield()方法,让出CPU的执行权给其他线程,直到节点成功被转移到同步队列中。最后返回false。
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
signal
//将等待时间最长的线程(如果存在)从该条件的等待队列移动到拥有锁的等待队列。
public final void signal() {
// 检查当前线程是否持有独占锁,如果没有持有则抛出IllegalMonitorStateException异常。
if (!isHeldExclusively()) throw new IllegalMonitorStateException();
// 获取等待队列中的第一个节点。
Node first = firstWaiter;
if (first != null)
// 如果第一个节点存在,则调用doSignal()方法,将第一个节点唤醒。
doSignal(first);
}
isHeldExclusively
// 如果仅针对当前(调用)线程保持同步,则返回true 。每次调用非等待AbstractQueuedSynchronizer.ConditionObject方法时都会调用此方法。
//判断当前线程是否为资源的持有者
//这也是必须在lock()与unlock()代码中间执行的原因
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
doSignal
// 删除并传输节点,直到达到非取消的 1 或 null。从信号中分离出来,部分是为了鼓励编译器内联没有服务员的情况。
// 处理等待队列中的节点,并进行信号传递。
private void doSignal(Node first) {
do {
//将传入的节点的下一个等待节点赋值给局部变量firstWaiter,并检查是否为null。如果为null,表示没有下一个等待节点,那么将lastWaiter置为null。
// 后续的等待条件为空,说明condition队列中只有一个节点
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
//然后,将传入的节点的下一个等待节点设置为null。
first.nextWaiter = null;
//循环处理信号传递。
//调用transferForSignal方法来尝试传递信号给传入的节点。
//如果传递失败,那么将firstWaiter赋值给first,并继续下一次循环。
//如果firstWaiter为null,表示没有下一个等待节点,循环结束。
transferForSignal()是真正唤醒头节点的地方
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}
transferForSignal
// 将节点从条件队列转移到同步队列,并尝试唤醒等待线程以进行重新同步。
final boolean transferForSignal(Node node) {
/*
* 如果无法改变waitStatus,则该节点已被取消。
*/
// 通过compareAndSetWaitStatus方法尝试将节点的等待状态从CONDITION(表示在条件队列中等待)修改为0(表示不再等待)。如果修改等待状态失败,说明节点已被取消,返回false。
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*拼接到队列上并尝试设置前驱的 waitStatus 以指示线程(可能)正在等待。如果取消或尝试设置 waitStatus 失败,请唤醒以重新同步(在这种情况下,waitStatus 可能会出现短暂且无害的错误)
*/
//如果成功修改等待状态,则将节点添加到同步队列中(通过enq方法),并获取其前驱节点p。
Node p = enq(node);
// 获取前驱节点的等待状态ws。
int ws = p.waitStatus;
// 如果前驱节点的等待状态大于0,或者尝试将前驱节点的等待状态设置为SIGNAL失败,
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 则通过LockSupport.unpark方法唤醒节点对应的线程。
LockSupport.unpark(node.thread);
//返回true。
return true;
}
signalAll
// 将所有线程从条件队列移动到拥有 同步队列。
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
doSignalAll
// 删除并转移所有节点
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
总结:
AQS 中获取和释放资源
里面用到了大量的CAS操作和自旋
acquire
是获取独占
锁,不响应中断,只有获取成功锁才响应中断但是只是返回中断标志位
acquireInterruptibly
获取独占
锁 并 响应中断,要是在park()
过程中被中断
就会抛出异常
acquireShared
获取共享
锁,会唤醒后面还是共享锁的Node 节点
acquireSharedInterruptibly
获取共享
锁 并 响应中断,抛出错误
release
释放独占
锁
releaseShared
释放共享
锁,和独占锁的唤醒动作是一样的,不同的是会在共享锁的时候会进行安全判断,释放锁和获取锁的时候都会调用。
AQS里面维护了两个队列,一个是等待(同步)队列(CLH),还有一个是条件队列(condition)。
主要方法流程:
acquire()
首先会尝试获取资源,如果获取失败,将存储线程的节点
插入等待队列。插入后继续根据前置节点状态状态判断是否应该继续获取资源。如果前置节点是头结点,继续尝试获取资源;如果前置节点是SIGNAL
状态,就中断
当前线程;否则继续尝试获取资源。直到当前线程被阻塞或者获取到资源
,结束。release()
释放资源,需要唤醒后继节点。如果后继节点不为空
且不是取消
状态,则唤醒
这个后继节点;否则从尾部往前面找最近的未被取消的节点进行唤醒。await()
,线程会进入条件队列,等待被唤醒,唤醒后
以自旋方式获取资源或处理中断异常;signal()
,线程会插入到等待(同步)队列,唤醒被阻塞的线程。
参考文章
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/180226.html