相关阅读
- JUC源码简析 AbstractQueuedSynchronizer
- JUC源码简析 CountDownLatch
- JUC源码简析 CyclicBarrier
- JUC源码简析 ReentrantLock
- JUC源码简析 Semaphore
简介
Condition
是提供了和Java中Object
的wait
/notify
/notifyAll
方法类似功能的一个接口,通过这个接口,可以实现线程在某个特定的条件下等待/睡眠;
与Object
的wait
/notify
/notifyAll
操作需要获得对象监视器类似,一个Condition
实例需要和互斥锁绑定,在执行等待/唤醒动作时必须需要获得互斥锁,被唤醒的线程需要获取到互斥锁才能执行后续动作,否则继续等待;
相比较Object
的wait
/notify
/notifyAll
的操作而言,Condition
具有更丰富的功能,比如可以响应/不响应中断,可以设置超时时间或者等待到具体某个时间点;同一个互斥锁可以绑定多个Condition
实例,意味着在同一个互斥锁上竞争的线程可以在不同的条件下等待,可以根据条件来唤醒线程,这是Object
的wait
/notify
/notifyAll
的操作不具备的功能;
源码简析
外部类——AbstractQueuedSynchronizer
isOnSyncQueue
final boolean isOnSyncQueue(Node node) {
// 如果当前节点的等待状态为CONDITION,或者没有前继节点(同步队列一定会使用前继节点,则说明不在等待队列中
if (node.waitStatus == Node.CONDITION || node.prev == null)
// false表示当前节点不在等待队列中
return false;
// 如果当前节点有后继节点(条件队列中不使用后继节点),则说明在等待队列中;
if (node.next != null)
// true表示当前节点在等待队列中
return true;
// 存在节点的状态不是CONDITION,且前继节点存在,后继节点不存在的情况,即插入节点的时刻(插入节点时,先更新节点的前继节点,再CAS更新节点为新尾节点,若CAS操作失败,则节点还未插入等待队列),这时需要从队列尾部向前遍历,确保节点是否真得成功插入等待队列;
return findNodeFromTail(node);
}
findNodeFromTail
private boolean findNodeFromTail(Node node) {
Node t = tail;
// 死循环保证找到或者遍历完等待队列
for (;;) {
// 若匹配当前节点
if (t == node)
// 返回true,表示找到
return true;
// 若待匹配节点不存在,则表示遍历完等待队列也未匹配上
if (t == null)
// 返回false,表示未找到
return false;
// 继续匹配前继节点
t = t.prev;
}
}
transferForSignal
final boolean transferForSignal(Node node) {
// CAS尝试设置当前节点的等待状态为0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
// 设置失败,则表示当前节点的等待状态因节点异常而变化,无法转移异常节点
// false表示转移失败
return false;
// 当前节点插入等待队列
Node p = enq(node);
int ws = p.waitStatus;
// 如果当前节点的前继节点的状态异常,或者CAS更新前继节点的状态为SIGNAL(确保前继节点负责唤醒当前节点)
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 唤醒当前节点的线程,来重新同步
LockSupport.unpark(node.thread);
// true表示转移成功
return true;
}
transferAfterCancelledWait
final boolean transferAfterCancelledWait(Node node) {
// CAS尝试设置当前节点的等待状态为0
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
// 设置成功,则表示在被其它线程唤醒前发生中断
// 插入等待队列
enq(node);
// true表示后续要抛出InterruptedException
return true;
}
// 设置失败,则表示中断后,有其它线程调用转移当前节点到等待队列
// 等待其它线程转移成功
while (!isOnSyncQueue(node))
// 循环调用Thread.yield()直到本线程节点出现在等待队列中
Thread.yield();
// false表示后续需要重新中断,因为acquireQueue必须在节点成功入队后才可以调用
return false;
}
fullyRelease
final int fullyRelease(Node node) {
boolean failed = true;
try {
// 获取当前持有的互斥锁资源
int savedState = getState();
// 释放持有的互斥锁资源
if (release(savedState)) {
// 释放成功
// 清空失败标识
failed = false;
return savedState;
} else {
// 释放失败,说明未持有互斥锁资源,抛出IllegalMonitorStateException异常
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
// 释放互斥锁资源失败,则设置当前节点状态异常
node.waitStatus = Node.CANCELLED;
}
}
关键方法
addConditionWaiter
private Node addConditionWaiter() {
Node t = lastWaiter;
// 未持有锁线程调用await会先加入条件队列,后续释放锁失败,会抛出IllegalMonitorStateException异常,这种情况下在finally代码中会将该节点的等待状态设置为CANCELLED
// 如果当前条件队列尾节点的等待状态不为CONDITION
if (t != null && t.waitStatus != Node.CONDITION) {
// 清除条件队列中无效节点
unlinkCancelledWaiters();
// 获取最新的尾节点
t = lastWaiter;
}
// 封装当前线程为等待节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
// 条件队列中不存在任何节点,则更新头节点为当前节点
firstWaiter = node;
else
// 条件队列中存在节点,则设置当前节点为尾节点的nextWaiter
t.nextWaiter = node;
// 更新尾节点为当前节点
lastWaiter = node;
return node;
}
doSignal
private void doSignal(Node first) {
do {
// 设置新头节点为当前节点的nextWaiter(踢出当前节点)
// 进一步判断新头节点是否存在
if ( (firstWaiter = first.nextWaiter) == null)
// 新头节点不存在,则说明条件队列已空,设置尾节点为null
lastWaiter = null;
// 清空当前节点信息
first.nextWaiter = null;
// 尝试转移当前节点
// 转移失败,且新头节点存在,则继续转移新头节点,直至
// 1. 转移成功
// 2. 新头节点不存在,即条件队列中无节点可转移
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
doSignalAll
private void doSignalAll(Node first) {
// 清空条件队列的头节点和尾节点信息,避免doSignal重复遍历条件队列
lastWaiter = firstWaiter = null;
do {
// 缓存当前节点的nextWaiter
Node next = first.nextWaiter;
first.nextWaiter = null;
// 转移当前节点
transferForSignal(first);
// 继续在下一循环转移当前节点的nextWaiter
first = next;
// 直至当前节点的nextWaiter不存在,即遍历完队列
} while (first != null);
}
unlinkCancelledWaiters
private void unlinkCancelledWaiters() {
// 从头节点开始遍历
Node t = firstWaiter;
Node trail = null;
// 只要条件队列中还存在节点,就循环执行
while (t != null) {
// 缓存当前节点的nextWaiter
Node next = t.nextWaiter;
// 如果当前节点的等待状态不是CONDITION,则表明节点无效
if (t.waitStatus != Node.CONDITION) {
// 清空当前节点的nextWaiter
t.nextWaiter = null;
if (trail == null)
// trail不存在,则表示从头节点开始遍历到现在都未发现有效节点
// 暂且认为当前节点的nextWaiter有效,更新头节点为当前节点的nextWaiter,踢出当前节点
firstWaiter = next;
else
// 存在有效节点,则设置有效节点的nextWaiter为当前节点的nextWaiter,踢出当前节点
trail.nextWaiter = next;
if (next == null)
// 当前节点的nextWaiter不存在,则表示遍历完队列,更新尾节点为最新的有效节点
lastWaiter = trail;
}
else
// 如果当前节点的等待状态是CONDITION,则表明节点有效
// 更新trail,即有效的节点的跟踪
trail = t;
// 继续在下一循环检查当前节点的nextWaiter
t = next;
}
}
关键流程
唤醒头节点——signal
源码
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
主要逻辑
- 本方法必须在持有互斥锁的情况下才可以本调用,如果执行方法的线程未持有互斥锁,
IllegalMonitorStateException
异常; - 条件队列中头节点存在,则唤醒头节点,即将头节点转移到同步队列中;不存在,则条件队列为空,无需转移操作;
- 设置新头节点为当前节点的nextWaiter(踢出当前节点);
- 进一步判断新头节点是否存在;
- 若新头节点不存在,则说明条件队列已空,设置尾节点为null
- 清空当前节点信息,断开和条件队列的联系;
- 调用
transferForSignal
,尝试转移当前节点;- CAS尝试设置当前节点的等待状态为0;
- 设置失败,则表示当前节点的等待状态因节点异常而变化,无法转移异常节点,返回false表示转移失败;
- 当前节点插入等待队列;
- 如果当前节点的前继节点的状态异常,或者CAS更新前继节点的状态为SIGNAL(确保前继节点负责唤醒当前节点),则唤醒当前节点的线程,来重新同步;
- 返回true表示转移成功;
- CAS尝试设置当前节点的等待状态为0;
- 若转移失败,且新头节点存在,则继续转移新头节点,直至:
- 转移成功;
- 新头节点不存在,即条件队列中无节点可转移;
唤醒所有节点——signalAll
源码
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
主要逻辑
- 本方法必须在持有互斥锁的情况下才可以本调用,如果执行方法的线程未持有互斥锁,
IllegalMonitorStateException
异常; - 条件队列中头节点存在,则唤醒头节点即其后所有节点;不存在,则条件队列为空,无需转移操作;
- 首先清空条件队列的头节点和尾节点信息,这样随后(其它线程可能触发)的
doSignal
就不会重复遍历条件队列了; - 进入循环;
- 缓存当前节点的nextWaiter;
- 转移当前节点;
- CAS尝试设置当前节点的等待状态为0;
- 设置失败,则表示当前节点的等待状态因节点异常而变化,无法转移异常节点,返回false表示转移失败;
- 当前节点插入等待队列;
- 如果当前节点的前继节点的状态异常,或者CAS更新前继节点的状态为SIGNAL(确保前继节点负责唤醒当前节点),则唤醒当前节点的线程,来重新同步;
- 返回true表示转移成功;
- CAS尝试设置当前节点的等待状态为0;
- 继续在下一循环转移当前节点的nextWaiter,直至当前节点的nextWaiter不存在,即遍历完队列;
- 首先清空条件队列的头节点和尾节点信息,这样随后(其它线程可能触发)的
等待——await
源码
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
// 释放当前线程持有的互斥锁资源
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
// 获取资源过程中发生过中断,且当前不需要抛出中断异常,则设置中断模式为REINTERRUPT
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
// 清除条件队列中所有无效节点
unlinkCancelledWaiters();
if (interruptMode != 0)
// 处理中断
reportInterruptAfterWait(interruptMode);
}
主要逻辑
- 如果当前线程发生过中断,则直接抛出中断异常;
- 执行
addConditionWaiter
,将当前线程封装成等待节点加入条件队列;- 如果当前条件队列尾节点的等待状态不为CONDITION,则表示尾节点异常;
- 清除条件队列中无效节点;
- 从头节点开始遍历;
- 只要条件队列中还存在节点,就循环执行;
- 缓存当前节点的nextWaiter;
- 如果当前节点的等待状态不是CONDITION,则表明节点无效,需要:
- 清空当前节点的nextWaiter;
- trail不存在,则表示从头节点开始遍历到现在都未发现有效节点,那么暂且认为当前节点的nextWaiter有效,更新头节点为当前节点的nextWaiter,踢出当前节点;
- trail存在,即存在有效节点,则设置有效节点的nextWaiter为当前节点的nextWaiter,踢出当前节点;
- 当前节点的nextWaiter不存在,则表示遍历完队列,更新尾节点为最新的有效节点;
- 如果当前节点的等待状态是CONDITION,则表明节点有效,则:
- 更新trail,即有效的节点的跟踪;
- 继续在下一循环检查当前节点的nextWaiter;
- 获取最新的尾节点;
- 清除条件队列中无效节点;
- 封装当前线程为等待节点;
- 条件队列中不存在任何节点,则更新头节点为当前节点;否则设置当前节点为尾节点的nextWaiter;
- 更新尾节点为当前节点;
- 如果当前条件队列尾节点的等待状态不为CONDITION,则表示尾节点异常;
- 执行
fullyRelease
释放当前线程持有的互斥锁资源;- 获取当前持有的互斥锁资源;
- 释放持有的互斥锁资源;
- 释放成功,则清空失败标识并返回释放的资源;
- 释放失败,说明未持有互斥锁资源,抛出IllegalMonitorStateException异常;
- finally代码块判断若释放互斥锁失败,则设置当前节点的等待状态为CANCELLED;
- 循环执行
isOnSyncQueue
判断节点是否还在条件队列中:- 如果当前节点的等待状态为CONDITION,或者没有前继节点(同步队列一定会使用前继节点,则说明不在等待队列中;
- 如果当前节点有后继节点(条件队列中不使用后继节点),则说明在等待队列中;
- 存在节点的状态不是CONDITION,且前继节点存在,后继节点不存在的情况,即插入节点的时刻(插入节点时,先更新节点的前继节点,再CAS更新节点为新尾节点,若CAS操作失败,则节点还未插入等待队列),这时需要从队列尾部向前遍历,确保节点是否真得成功插入等待队列;
- 死循环保证找到或者遍历完等待队列;
- 若匹配当前节点,则返回true,表示找到;
- 若待匹配节点不存在,则遍历完等待队列也未匹配上,返回false,表示未找到;
- 否则继续匹配前继节点;
- 如果不在同步队列中,即还在条件队列中,则挂起当前线程;
- 线程挂起后,当被其它线程唤醒(
signal
将当前线程节点转移到同步队列并唤醒当前线程,或者signalAll
唤醒所有节点,或者其它线程中断当前线程,当前线程需要自行尝试进入同步队列)后,执行checkInterruptWhileWaiting
判断本线程是否在睡眠期间发生过中断;发生过中断就直接break,否则继续判断是否在同步队列中,若还在则继续挂起;- Thread.interrupted()判断是否发生中断,未发生则直接返回0;
- 发生中断,则调用transferAfterCancelledWait尝试加入同步队列;
- CAS尝试设置当前节点的等待状态为0;
- 设置成功,则表示在被其它线程唤醒前发生中断;
- 插入等待队列;
- 返回true表示后续要抛出
InterruptedException
;
- 设置失败,则表示中断后,有其它线程调用转移当前节点到等待队列;
- 循环调用Thread.yield()直到本线程节点出现在等待队列中,再进行后续操作,因为acquireQueue必须在节点成功入队后才可以调用;
- 返回false表示后续需要重新中断;
- 设置成功,则表示在被其它线程唤醒前发生中断;
- CAS尝试设置当前节点的等待状态为0;
- 总结:如果线程发生中断后可以主动加入等待队列,说明中断发生前,该线程还没有被其它线程singal,还在条件队列中,属于正常等待中的状态,而中断导致该线程的正常行为被打断,被唤醒去竞争互斥锁,这种情况就返回THROW_IE,表示await方法返回后需要重新抛出中断,因为当前线程是因为中断而被唤醒(响应中断);否则,说明中断发生前,该线程已经被其它线程signal,线程已经在被转移到等待队列的过程中,只不过后续发生了中断(抢锁前和抢锁后一样),这种情况就返回REINTERRUPT,表示await方法返回后需要设置中断状态(不响应中断);
- 退出循环后,执行
acquireQueued
获取原先释放的互斥锁资源; - 获取资源过程中如果发生过中断,并且interruptMode不是THROW_IE,那就设置interruptMode为REINTERRUPT,表示需要设置中断状态;
- 如果当前节点的nextWaiter存在(signal方式唤醒时会清除nextWaiter,但是中断方式唤醒不会),则调用
unlinkCancelledWaiters
清除条件队列中所有无效节点; - 如果interruptedMode有效,则执行
reportInterruptAfterWait
处理发生过的中断;- 如果是THROW_IE,直接抛出中断;
- 如果是REUNTERRUPT,则执行
selfInterrupt()
设置中断状态;
用法
Demo
class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length)
putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length)
takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/4775.html