Condition的使用与原理

ReentrantLock中,还提供了Condition条件控制类,他的功能与wait/notify一样,都是基于某个条件进行等待与唤醒。之所以在JUC中又重复造轮子,是因为原来 Java 提供的wait()/notify()是基于synchronized实现的,这对于使用ReentrantLock来实现同步锁的场景显然是不适用的,所以为了提供配套的功能,针对ReentrantLock实现了Condition条件控制类。

Condition的基本应用

Condition也提供了两种方法:

  • await(): 让线程等待,并释放锁;
  • signal()/signalAll(): 唤醒因为await()方法进入等待的线程。

下面看一下示例:

public class ConditionDemo {
    private static ReentrantLock lock = new ReentrantLock();
    private static Condition condition = lock.newCondition();
    public static void main(String[] args) {
        Thread t1 = new Thread(() -> {
            lock.lock();
            try {
                condition.await();
                System.out.println("this is t1 thread!");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                lock.unlock();
            }
        });
        Thread t2 = new Thread(() -> {
           try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            lock.lock();
            System.out.println("this is t2 thread!");
            condition.signal();
            lock.unlock();
        });
        t1.start();
        t2.start();
    }
}

运行结果:

this is t2 thread!
this is t1 thread!

在示例中,通过ReentrantLock创建一个新的Condition,然后在main方法中分别创建并启动两个线程,t1线程会调用condition.await()方法进入阻塞状态,t2线程会调用condition.signal()方法唤醒处于阻塞状态的t1线程,所以运行结果总会先打印t2线程的信息。

在使用Condition.await()之前,同样需要争抢到锁资源,所以需要先使用ReentrantLock.lock()方法,这里使用lock.newCondition()创建一个新的condition,这是与wait()/notify()的区别,Condition可以针对不同的场景设置多个不同的Condition,当调用condition.signal()时,不需要唤醒所有线程,只需要唤醒特定Condition的线程即可,这样可以减少线程的无效竞争。

Condition源码分析

先从lock.newCondition()方法开始,发现调用的是sync.newCondition()方法,可以发现,最后返回的是一个ConditionObject对象。

final ConditionObject newCondition() {
  return new ConditionObject();
}

先看下Condition的类关系图,可以看到,ConditionObject实现了Condition接口,并且它是AQS的内部类,这是因为Condition需要依赖同步队列。

Condition的使用与原理


在了解源码之前,我们先从Condition的设计入手,在Condition中,除了AQS的同步队列之外,还设计了另外一个Condition队列,当线程A调用了await()方法后,会将线程A包装成一个节点添加到Condition等待队列中,如果线程B调用了signal()方法唤醒线程A,此时等待队列中的线程A节点会被移到AQS同步队列中等待线程B执行完unlock()方法才能重新竞争锁。这一点在设计上是和原生wait/notify机制一样的。

下面根据这个设计一起去了解源码实现。

await()方法

接下来从await()方法入手,了解线程是如何被挂起并释放锁的。

public final void await() throws InterruptedException {
  if (Thread.interrupted())
    throw new InterruptedException();
  // 创建一个新的节点,状态为condiation,添加到等待队列中(Condition队列。)
  Node node = addConditionWaiter();
  // 释放锁
  int savedState = fullyRelease(node);
  int interruptMode = 0;
  // 判断当前节点是否在同步队列中,如果没有在同步队列上,说明节点还未被唤醒;第一次进来为false,因为前面已经释放了锁资源
  while (!isOnSyncQueue(node)) {
    // 挂起当前线程
    LockSupport.park(this);
    // 检查线程是否被中断,如果返回的是0.说明是正常唤醒,继续循环 
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
      break;
  }
  // 调用accquireQueued()方法自旋争抢锁,争抢到锁时,如果返回true,则说明线程在等待时已经被中断过了;返回false,正常争抢到锁
  // interruptMode != THROW_IE -> 表示这个线程没有成功将node入队,但signal执行了enq()方法让其入队了
  if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    interruptMode = REINTERRUPT;
  // 如果condition队列不为空,,则继续清理队列上的无效节点
  if (node.nextWaiter != null// clean up if cancelled
    unlinkCancelledWaiters();
  // 如果当前线程节点状态不是正常的0,则需要抛出异常或再次中断
  //THROW_IE --> 抛出InterruptedException异常
  //REINTERRUTPT --> 调用线程的interrupt()方法中断线程
  if (interruptMode != 0)
    reportInterruptAfterWait(interruptMode);
}

结合Condition的设计,await()方法主要做了以下几件事情。

  • 先把当前线程包装成一个node节点添加到Condition等待队列中;
  • 通过fullyRelease()方法释放当前的锁;
  • 在循环中不断通过isOnSyncQueue()方法判断节点是否在AQS上,如果不在则通过LockSupport.part()方法挂起当前线程进行阻塞,如果在AQS上,说明线程被signal()唤醒,可以再次去争抢锁资源。

addConditionWaiter

前面说到,Condition除了AQS的同步队列之外,还维护了一个等待队列,这个队列是单向链表,下面我们看看addConditionWaiter()是如何将线程添加到此队列中的。

private Node addConditionWaiter() {
  Node t = lastWaiter;
  // 如果尾节点不为空,并且状态不是CONDITION(-2)的情况下,则清理掉队列中的无效节点。
  if (t != null && t.waitStatus != Node.CONDITION) {
    unlinkCancelledWaiters();
    t = lastWaiter;
  }
  // 将当前线程包装成一个Node节点,状态为CONDITION
  Node node = new Node(Thread.currentThread(), Node.CONDITION);
  // 如果队列为空,则将当前节点作为头节点
  if (t == null)
    firstWaiter = node;
  else
    // 如果队列不为空,则将当前节点继续追加到尾节点后面
    t.nextWaiter = node;
  lastWaiter = node;
  return node;
}

可以看到,该方法只是向一个单向链表追加一个新的节点操作,主要功能就是把当前线程添加到等待队列中。

fullyRelease

当线程添加到队列之后,便可以调用fullyRelease()方法释放当前的锁了。

 final int fullyRelease(Node node) {
   boolean failed = true;
   try {
     int savedState = getState();
     if (release(savedState)) {
       failed = false;
       return savedState;
     } else {
       throw new IllegalMonitorStateException();
     }
   } finally {
     if (failed)
       node.waitStatus = Node.CANCELLED;
   }
 }

在该方法中,会通过getState()方法得到当前线程的重入次数savedStated,然后调用release()方法,将state设置为0,与ReentrantLock的流程是一样的。

signal

wait()方法可以了解到,wait()方法就是释放当前线程的锁资源,并将其添加到等待队列中,我们不难猜出,与之相对的signal()方法,应该是将等待队列中的线程节点重新添加到AQS中,等待其被重新唤醒争抢锁资源。

public final void signal() {
  // 校验当前正在执行的线程是否为争抢到锁的线程
  if (!isHeldExclusively())
    throw new IllegalMonitorStateException();
  // 拿到等待队列的头节点
  Node first = firstWaiter;
  if (first != null)
    // 调用doSignal方法唤醒线程
    doSignal(first);
}

 private void doSignal(Node first) {
   do {
     if ( (firstWaiter = first.nextWaiter) == null)
       lastWaiter = null;
     first.nextWaiter = null;
   } while (!transferForSignal(first) &&
            (first = firstWaiter) != null);
 }

sinal()方法中,会调用doSignal()方法,从头遍历等待队列,逐一将线程节点转移到AQS同步队列中。

transferForSignal

final boolean transferForSignal(Node node) {
  // 如果调用cas方法设置失败,则说明该线程节点是无效的,可能已经被中断了
  if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
    return false;
 // 将节点添加到同步队列中,拿到该节点的前置节点
  Node p = enq(node);
  int ws = p.waitStatus;
  // 如果前置节点的状态为CANCELLED(1)状态 或者 尝试将前置节点状态设置为SIGNAL(SIGNAL表示后继线程需要停止阻塞了),如果设置失败了,则唤醒输入节点上的线程。
  if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
    LockSupport.unpark(node.thread);
  return true;
}

这里有一个点需要注意一下,在前置节点状态为CANCELLED或者将前置节点的状态设置为SIGNAL状态失败后,这里会唤醒当前线程。这里为什么要提前唤醒线程呢?

这里需要结合await()方法的一段源码来看

while (!isOnSyncQueue(node)) {
  // 挂起当前线程
  LockSupport.park(this);
  // 检查线程是否被中断,如果返回的是0.说明是正常唤醒,继续循环 
  if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
    break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)

当线程提前被唤醒之后,会执行checkInterruptWhileWaiting()方法,然后isOnSyncQueue()方法会返回true跳出循环,最终争抢锁又会被acquireQueued()方法阻塞,这意味着当前线程可以提前执行这些不需要涉及同步操作的代码。

总结

Condition是基于AQS实现,在AQS同步队列上,又设计了一个等待队列来完成wait/notify机制。

J.U.C包的大部分工具,都是基于AQS实现,只在功能场景代码上有稍许不同,Condition亦是如此,所以此篇文章不再对相同的方法做过多分析,有兴趣的同学可参考之前的文章。


原文始发于微信公众号(DevUnion):Condition的使用与原理

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/35431.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!