JUC源码简析Condition

导读:本篇文章讲解 JUC源码简析Condition,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

相关阅读

简介

Condition是提供了和Java中Objectwait/notify/notifyAll方法类似功能的一个接口,通过这个接口,可以实现线程在某个特定的条件下等待/睡眠;
Objectwait/notify/notifyAll操作需要获得对象监视器类似,一个Condition实例需要和互斥锁绑定,在执行等待/唤醒动作时必须需要获得互斥锁,被唤醒的线程需要获取到互斥锁才能执行后续动作,否则继续等待;
相比较Objectwait/notify/notifyAll的操作而言,Condition具有更丰富的功能,比如可以响应/不响应中断,可以设置超时时间或者等待到具体某个时间点;同一个互斥锁可以绑定多个Condition实例,意味着在同一个互斥锁上竞争的线程可以在不同的条件下等待,可以根据条件来唤醒线程,这是Objectwait/notify/notifyAll的操作不具备的功能;

源码简析

外部类——AbstractQueuedSynchronizer

isOnSyncQueue

final boolean isOnSyncQueue(Node node) {
    // 如果当前节点的等待状态为CONDITION,或者没有前继节点(同步队列一定会使用前继节点,则说明不在等待队列中
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        // false表示当前节点不在等待队列中
        return false;
    // 如果当前节点有后继节点(条件队列中不使用后继节点),则说明在等待队列中;
    if (node.next != null)
        // true表示当前节点在等待队列中
        return true;

    // 存在节点的状态不是CONDITION,且前继节点存在,后继节点不存在的情况,即插入节点的时刻(插入节点时,先更新节点的前继节点,再CAS更新节点为新尾节点,若CAS操作失败,则节点还未插入等待队列),这时需要从队列尾部向前遍历,确保节点是否真得成功插入等待队列;
    return findNodeFromTail(node);
}

findNodeFromTail

private boolean findNodeFromTail(Node node) {
    Node t = tail;
    // 死循环保证找到或者遍历完等待队列
    for (;;) {
        // 若匹配当前节点
        if (t == node)
            // 返回true,表示找到
            return true;
        // 若待匹配节点不存在,则表示遍历完等待队列也未匹配上
        if (t == null)
            // 返回false,表示未找到
            return false;
        // 继续匹配前继节点
        t = t.prev;
    }
}

transferForSignal

final boolean transferForSignal(Node node) {
    // CAS尝试设置当前节点的等待状态为0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        // 设置失败,则表示当前节点的等待状态因节点异常而变化,无法转移异常节点
        // false表示转移失败
        return false;

    // 当前节点插入等待队列
    Node p = enq(node);
    int ws = p.waitStatus;
    // 如果当前节点的前继节点的状态异常,或者CAS更新前继节点的状态为SIGNAL(确保前继节点负责唤醒当前节点)
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        // 唤醒当前节点的线程,来重新同步
        LockSupport.unpark(node.thread);
    // true表示转移成功
    return true;
}

transferAfterCancelledWait

final boolean transferAfterCancelledWait(Node node) {
    // CAS尝试设置当前节点的等待状态为0
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        // 设置成功,则表示在被其它线程唤醒前发生中断

        // 插入等待队列
        enq(node);
        // true表示后续要抛出InterruptedException
        return true;
    }
    // 设置失败,则表示中断后,有其它线程调用转移当前节点到等待队列

    // 等待其它线程转移成功
    while (!isOnSyncQueue(node))
        // 循环调用Thread.yield()直到本线程节点出现在等待队列中
        Thread.yield();
    // false表示后续需要重新中断,因为acquireQueue必须在节点成功入队后才可以调用
    return false;
}

fullyRelease

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        // 获取当前持有的互斥锁资源
        int savedState = getState();
        // 释放持有的互斥锁资源
        if (release(savedState)) {
            // 释放成功

            // 清空失败标识
            failed = false;
            return savedState;
        } else {
            // 释放失败,说明未持有互斥锁资源,抛出IllegalMonitorStateException异常
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            // 释放互斥锁资源失败,则设置当前节点状态异常
            node.waitStatus = Node.CANCELLED;
    }
}

关键方法

addConditionWaiter

private Node addConditionWaiter() {
    Node t = lastWaiter;
    // 未持有锁线程调用await会先加入条件队列,后续释放锁失败,会抛出IllegalMonitorStateException异常,这种情况下在finally代码中会将该节点的等待状态设置为CANCELLED
    // 如果当前条件队列尾节点的等待状态不为CONDITION
    if (t != null && t.waitStatus != Node.CONDITION) {
        // 清除条件队列中无效节点
        unlinkCancelledWaiters();
        // 获取最新的尾节点
        t = lastWaiter;
    }
    // 封装当前线程为等待节点
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        // 条件队列中不存在任何节点,则更新头节点为当前节点
        firstWaiter = node;
    else
        // 条件队列中存在节点,则设置当前节点为尾节点的nextWaiter
        t.nextWaiter = node;
    // 更新尾节点为当前节点
    lastWaiter = node;
    return node;
}

doSignal

private void doSignal(Node first) {
    do {
        // 设置新头节点为当前节点的nextWaiter(踢出当前节点)
        // 进一步判断新头节点是否存在
        if ( (firstWaiter = first.nextWaiter) == null)
            // 新头节点不存在,则说明条件队列已空,设置尾节点为null
            lastWaiter = null;
        // 清空当前节点信息
        first.nextWaiter = null;
    // 尝试转移当前节点
    //     转移失败,且新头节点存在,则继续转移新头节点,直至
    //         1. 转移成功
    //         2. 新头节点不存在,即条件队列中无节点可转移
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

doSignalAll

private void doSignalAll(Node first) {
    // 清空条件队列的头节点和尾节点信息,避免doSignal重复遍历条件队列
    lastWaiter = firstWaiter = null;
    do {
        // 缓存当前节点的nextWaiter
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        // 转移当前节点
        transferForSignal(first);
        // 继续在下一循环转移当前节点的nextWaiter
        first = next;
    // 直至当前节点的nextWaiter不存在,即遍历完队列
    } while (first != null);
}

unlinkCancelledWaiters

private void unlinkCancelledWaiters() {
    // 从头节点开始遍历
    Node t = firstWaiter;
    Node trail = null;
    // 只要条件队列中还存在节点,就循环执行
    while (t != null) {
        // 缓存当前节点的nextWaiter
        Node next = t.nextWaiter;
        // 如果当前节点的等待状态不是CONDITION,则表明节点无效
        if (t.waitStatus != Node.CONDITION) {
            // 清空当前节点的nextWaiter
            t.nextWaiter = null;
            if (trail == null)
                // trail不存在,则表示从头节点开始遍历到现在都未发现有效节点

                // 暂且认为当前节点的nextWaiter有效,更新头节点为当前节点的nextWaiter,踢出当前节点
                firstWaiter = next;
            else
                // 存在有效节点,则设置有效节点的nextWaiter为当前节点的nextWaiter,踢出当前节点
                trail.nextWaiter = next;
            if (next == null)
                // 当前节点的nextWaiter不存在,则表示遍历完队列,更新尾节点为最新的有效节点
                lastWaiter = trail;
        }
        else
            // 如果当前节点的等待状态是CONDITION,则表明节点有效

            // 更新trail,即有效的节点的跟踪
            trail = t;
        // 继续在下一循环检查当前节点的nextWaiter
        t = next;
    }
}

关键流程

唤醒头节点——signal

源码

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

主要逻辑

  1. 本方法必须在持有互斥锁的情况下才可以本调用,如果执行方法的线程未持有互斥锁,IllegalMonitorStateException异常;
  2. 条件队列中头节点存在,则唤醒头节点,即将头节点转移到同步队列中;不存在,则条件队列为空,无需转移操作;
    1. 设置新头节点为当前节点的nextWaiter(踢出当前节点);
    2. 进一步判断新头节点是否存在;
      1. 若新头节点不存在,则说明条件队列已空,设置尾节点为null
    3. 清空当前节点信息,断开和条件队列的联系;
    4. 调用transferForSignal,尝试转移当前节点;
      1. CAS尝试设置当前节点的等待状态为0;
        1. 设置失败,则表示当前节点的等待状态因节点异常而变化,无法转移异常节点,返回false表示转移失败;
      2. 当前节点插入等待队列;
      3. 如果当前节点的前继节点的状态异常,或者CAS更新前继节点的状态为SIGNAL(确保前继节点负责唤醒当前节点),则唤醒当前节点的线程,来重新同步;
      4. 返回true表示转移成功;
    5. 若转移失败,且新头节点存在,则继续转移新头节点,直至:
      1. 转移成功;
      2. 新头节点不存在,即条件队列中无节点可转移;

唤醒所有节点——signalAll

源码

public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignalAll(first);
}

主要逻辑

  1. 本方法必须在持有互斥锁的情况下才可以本调用,如果执行方法的线程未持有互斥锁,IllegalMonitorStateException异常;
  2. 条件队列中头节点存在,则唤醒头节点即其后所有节点;不存在,则条件队列为空,无需转移操作;
    1. 首先清空条件队列的头节点和尾节点信息,这样随后(其它线程可能触发)的doSignal就不会重复遍历条件队列了;
    2. 进入循环;
      1. 缓存当前节点的nextWaiter;
      2. 转移当前节点;
        1. CAS尝试设置当前节点的等待状态为0;
          1. 设置失败,则表示当前节点的等待状态因节点异常而变化,无法转移异常节点,返回false表示转移失败;
        2. 当前节点插入等待队列;
        3. 如果当前节点的前继节点的状态异常,或者CAS更新前继节点的状态为SIGNAL(确保前继节点负责唤醒当前节点),则唤醒当前节点的线程,来重新同步;
        4. 返回true表示转移成功;
      3. 继续在下一循环转移当前节点的nextWaiter,直至当前节点的nextWaiter不存在,即遍历完队列;

等待——await

源码

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    // 释放当前线程持有的互斥锁资源
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        // 获取资源过程中发生过中断,且当前不需要抛出中断异常,则设置中断模式为REINTERRUPT
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        // 清除条件队列中所有无效节点
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        // 处理中断
        reportInterruptAfterWait(interruptMode);
}

主要逻辑

  1. 如果当前线程发生过中断,则直接抛出中断异常;
  2. 执行addConditionWaiter,将当前线程封装成等待节点加入条件队列;
    1. 如果当前条件队列尾节点的等待状态不为CONDITION,则表示尾节点异常;
      1. 清除条件队列中无效节点;
        1. 从头节点开始遍历;
        2. 只要条件队列中还存在节点,就循环执行;
          1. 缓存当前节点的nextWaiter;
          2. 如果当前节点的等待状态不是CONDITION,则表明节点无效,需要:
            1. 清空当前节点的nextWaiter;
            2. trail不存在,则表示从头节点开始遍历到现在都未发现有效节点,那么暂且认为当前节点的nextWaiter有效,更新头节点为当前节点的nextWaiter,踢出当前节点;
            3. trail存在,即存在有效节点,则设置有效节点的nextWaiter为当前节点的nextWaiter,踢出当前节点;
            4. 当前节点的nextWaiter不存在,则表示遍历完队列,更新尾节点为最新的有效节点;
          3. 如果当前节点的等待状态是CONDITION,则表明节点有效,则:
            1. 更新trail,即有效的节点的跟踪;
          4. 继续在下一循环检查当前节点的nextWaiter;
      2. 获取最新的尾节点;
    2. 封装当前线程为等待节点;
    3. 条件队列中不存在任何节点,则更新头节点为当前节点;否则设置当前节点为尾节点的nextWaiter;
    4. 更新尾节点为当前节点;
  3. 执行fullyRelease释放当前线程持有的互斥锁资源;
    1. 获取当前持有的互斥锁资源;
    2. 释放持有的互斥锁资源;
      1. 释放成功,则清空失败标识并返回释放的资源;
      2. 释放失败,说明未持有互斥锁资源,抛出IllegalMonitorStateException异常;
    3. finally代码块判断若释放互斥锁失败,则设置当前节点的等待状态为CANCELLED;
  4. 循环执行isOnSyncQueue判断节点是否还在条件队列中:
    1. 如果当前节点的等待状态为CONDITION,或者没有前继节点(同步队列一定会使用前继节点,则说明不在等待队列中;
    2. 如果当前节点有后继节点(条件队列中不使用后继节点),则说明在等待队列中;
    3. 存在节点的状态不是CONDITION,且前继节点存在,后继节点不存在的情况,即插入节点的时刻(插入节点时,先更新节点的前继节点,再CAS更新节点为新尾节点,若CAS操作失败,则节点还未插入等待队列),这时需要从队列尾部向前遍历,确保节点是否真得成功插入等待队列;
      1. 死循环保证找到或者遍历完等待队列;
      2. 若匹配当前节点,则返回true,表示找到;
      3. 若待匹配节点不存在,则遍历完等待队列也未匹配上,返回false,表示未找到;
      4. 否则继续匹配前继节点;
  5. 如果不在同步队列中,即还在条件队列中,则挂起当前线程;
  6. 线程挂起后,当被其它线程唤醒(signal将当前线程节点转移到同步队列并唤醒当前线程,或者signalAll唤醒所有节点,或者其它线程中断当前线程,当前线程需要自行尝试进入同步队列)后,执行checkInterruptWhileWaiting判断本线程是否在睡眠期间发生过中断;发生过中断就直接break,否则继续判断是否在同步队列中,若还在则继续挂起;
    1. Thread.interrupted()判断是否发生中断,未发生则直接返回0;
    2. 发生中断,则调用transferAfterCancelledWait尝试加入同步队列;
      1. CAS尝试设置当前节点的等待状态为0;
        1. 设置成功,则表示在被其它线程唤醒前发生中断;
          1. 插入等待队列;
          2. 返回true表示后续要抛出InterruptedException
        2. 设置失败,则表示中断后,有其它线程调用转移当前节点到等待队列;
          1. 循环调用Thread.yield()直到本线程节点出现在等待队列中,再进行后续操作,因为acquireQueue必须在节点成功入队后才可以调用;
          2. 返回false表示后续需要重新中断;
    3. 总结:如果线程发生中断后可以主动加入等待队列,说明中断发生前,该线程还没有被其它线程singal,还在条件队列中,属于正常等待中的状态,而中断导致该线程的正常行为被打断,被唤醒去竞争互斥锁,这种情况就返回THROW_IE,表示await方法返回后需要重新抛出中断,因为当前线程是因为中断而被唤醒(响应中断);否则,说明中断发生前,该线程已经被其它线程signal,线程已经在被转移到等待队列的过程中,只不过后续发生了中断(抢锁前和抢锁后一样),这种情况就返回REINTERRUPT,表示await方法返回后需要设置中断状态(不响应中断);
  7. 退出循环后,执行acquireQueued获取原先释放的互斥锁资源;
  8. 获取资源过程中如果发生过中断,并且interruptMode不是THROW_IE,那就设置interruptMode为REINTERRUPT,表示需要设置中断状态;
  9. 如果当前节点的nextWaiter存在(signal方式唤醒时会清除nextWaiter,但是中断方式唤醒不会),则调用unlinkCancelledWaiters清除条件队列中所有无效节点;
  10. 如果interruptedMode有效,则执行reportInterruptAfterWait处理发生过的中断;
    1. 如果是THROW_IE,直接抛出中断;
    2. 如果是REUNTERRUPT,则执行selfInterrupt()设置中断状态;

用法

Demo

class BoundedBuffer {
    final Lock lock = new ReentrantLock();
    final Condition notFull = lock.newCondition();
    final Condition notEmpty = lock.newCondition();

    final Object[] items = new Object[100];
    int putptr, takeptr, count;


    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length)
                notFull.await();
            items[putptr] = x;
            if (++putptr == items.length)
                putptr = 0;
            ++count;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public Object take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0)
                notEmpty.await();
            Object x = items[takeptr];
            if (++takeptr == items.length)
                takeptr = 0;
            --count;
            notFull.signal();
            return x;
        } finally {
            lock.unlock();
        }
    }
}

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/4775.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!