在ReentrantLock
中,还提供了Condition
条件控制类,他的功能与wait/notify
一样,都是基于某个条件进行等待与唤醒。之所以在JUC中又重复造轮子,是因为原来 Java 提供的wait()/notify()
是基于synchronized
实现的,这对于使用ReentrantLock
来实现同步锁的场景显然是不适用的,所以为了提供配套的功能,针对ReentrantLock
实现了Condition
条件控制类。
Condition的基本应用
Condition
也提供了两种方法:
-
await(): 让线程等待,并释放锁; -
signal()/signalAll(): 唤醒因为await()方法进入等待的线程。
下面看一下示例:
public class ConditionDemo {
private static ReentrantLock lock = new ReentrantLock();
private static Condition condition = lock.newCondition();
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
lock.lock();
try {
condition.await();
System.out.println("this is t1 thread!");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
});
Thread t2 = new Thread(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.lock();
System.out.println("this is t2 thread!");
condition.signal();
lock.unlock();
});
t1.start();
t2.start();
}
}
运行结果:
this is t2 thread!
this is t1 thread!
在示例中,通过ReentrantLock
创建一个新的Condition
,然后在main
方法中分别创建并启动两个线程,t1线程
会调用condition.await()
方法进入阻塞状态,t2线程
会调用condition.signal()
方法唤醒处于阻塞状态的t1线程
,所以运行结果总会先打印t2线程
的信息。
在使用Condition.await()
之前,同样需要争抢到锁资源,所以需要先使用ReentrantLock.lock()
方法,这里使用lock.newCondition()
创建一个新的condition
,这是与wait()/notify()
的区别,Condition
可以针对不同的场景设置多个不同的Condition
,当调用condition.signal()
时,不需要唤醒所有线程,只需要唤醒特定Condition
的线程即可,这样可以减少线程的无效竞争。
Condition源码分析
先从lock.newCondition()
方法开始,发现调用的是sync.newCondition()
方法,可以发现,最后返回的是一个ConditionObject
对象。
final ConditionObject newCondition() {
return new ConditionObject();
}
先看下Condition
的类关系图,可以看到,ConditionObject
实现了Condition
接口,并且它是AQS的内部类,这是因为Condition
需要依赖同步队列。
在了解源码之前,我们先从Condition
的设计入手,在Condition
中,除了AQS的同步队列之外,还设计了另外一个Condition队列,当线程A调用了await()
方法后,会将线程A包装成一个节点添加到Condition等待队列
中,如果线程B调用了signal()
方法唤醒线程A,此时等待队列中的线程A节点会被移到AQS同步队列中等待线程B执行完unlock()
方法才能重新竞争锁。这一点在设计上是和原生wait/notify
机制一样的。
下面根据这个设计一起去了解源码实现。
await()方法
接下来从await()
方法入手,了解线程是如何被挂起并释放锁的。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 创建一个新的节点,状态为condiation,添加到等待队列中(Condition队列。)
Node node = addConditionWaiter();
// 释放锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// 判断当前节点是否在同步队列中,如果没有在同步队列上,说明节点还未被唤醒;第一次进来为false,因为前面已经释放了锁资源
while (!isOnSyncQueue(node)) {
// 挂起当前线程
LockSupport.park(this);
// 检查线程是否被中断,如果返回的是0.说明是正常唤醒,继续循环
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 调用accquireQueued()方法自旋争抢锁,争抢到锁时,如果返回true,则说明线程在等待时已经被中断过了;返回false,正常争抢到锁
// interruptMode != THROW_IE -> 表示这个线程没有成功将node入队,但signal执行了enq()方法让其入队了
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 如果condition队列不为空,,则继续清理队列上的无效节点
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 如果当前线程节点状态不是正常的0,则需要抛出异常或再次中断
//THROW_IE --> 抛出InterruptedException异常
//REINTERRUTPT --> 调用线程的interrupt()方法中断线程
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
结合Condition
的设计,await()
方法主要做了以下几件事情。
-
先把当前线程包装成一个 node
节点添加到Condition等待队列
中; -
通过 fullyRelease()
方法释放当前的锁; -
在循环中不断通过 isOnSyncQueue()
方法判断节点是否在AQS上,如果不在则通过LockSupport.part()
方法挂起当前线程进行阻塞,如果在AQS上,说明线程被signal()
唤醒,可以再次去争抢锁资源。
addConditionWaiter
前面说到,Condition
除了AQS的同步队列之外,还维护了一个等待队列,这个队列是单向链表,下面我们看看addConditionWaiter()
是如何将线程添加到此队列中的。
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果尾节点不为空,并且状态不是CONDITION(-2)的情况下,则清理掉队列中的无效节点。
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 将当前线程包装成一个Node节点,状态为CONDITION
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 如果队列为空,则将当前节点作为头节点
if (t == null)
firstWaiter = node;
else
// 如果队列不为空,则将当前节点继续追加到尾节点后面
t.nextWaiter = node;
lastWaiter = node;
return node;
}
可以看到,该方法只是向一个单向链表追加一个新的节点操作,主要功能就是把当前线程添加到等待队列中。
fullyRelease
当线程添加到队列之后,便可以调用fullyRelease()
方法释放当前的锁了。
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
在该方法中,会通过getState()
方法得到当前线程的重入次数savedStated
,然后调用release()
方法,将state
设置为0,与ReentrantLock
的流程是一样的。
signal
从wait()
方法可以了解到,wait()
方法就是释放当前线程的锁资源,并将其添加到等待队列中,我们不难猜出,与之相对的signal()
方法,应该是将等待队列中的线程节点重新添加到AQS中,等待其被重新唤醒争抢锁资源。
public final void signal() {
// 校验当前正在执行的线程是否为争抢到锁的线程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 拿到等待队列的头节点
Node first = firstWaiter;
if (first != null)
// 调用doSignal方法唤醒线程
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
在sinal()
方法中,会调用doSignal()
方法,从头遍历等待队列,逐一将线程节点转移到AQS同步队列中。
transferForSignal
final boolean transferForSignal(Node node) {
// 如果调用cas方法设置失败,则说明该线程节点是无效的,可能已经被中断了
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 将节点添加到同步队列中,拿到该节点的前置节点
Node p = enq(node);
int ws = p.waitStatus;
// 如果前置节点的状态为CANCELLED(1)状态 或者 尝试将前置节点状态设置为SIGNAL(SIGNAL表示后继线程需要停止阻塞了),如果设置失败了,则唤醒输入节点上的线程。
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
这里有一个点需要注意一下,在前置节点状态为CANCELLED或者将前置节点的状态设置为SIGNAL状态失败后,这里会唤醒当前线程。这里为什么要提前唤醒线程呢?
这里需要结合await()
方法的一段源码来看
while (!isOnSyncQueue(node)) {
// 挂起当前线程
LockSupport.park(this);
// 检查线程是否被中断,如果返回的是0.说明是正常唤醒,继续循环
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
当线程提前被唤醒之后,会执行checkInterruptWhileWaiting()
方法,然后isOnSyncQueue()
方法会返回true
跳出循环,最终争抢锁又会被acquireQueued()
方法阻塞,这意味着当前线程可以提前执行这些不需要涉及同步操作的代码。
总结
Condition
是基于AQS实现,在AQS同步队列上,又设计了一个等待队列来完成wait/notify
机制。
J.U.C包的大部分工具,都是基于AQS实现,只在功能场景代码上有稍许不同,Condition
亦是如此,所以此篇文章不再对相同的方法做过多分析,有兴趣的同学可参考之前的文章。
原文始发于微信公众号(DevUnion):Condition的使用与原理
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/35431.html