JUC源码简析AbstractQueuedSynchronizer

导读:本篇文章讲解 JUC源码简析AbstractQueuedSynchronizer,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

相关阅读

简介

AQS是用来构建锁或者其它同步组件(信号量、事件等)的基础框架类;
并发工具类的内部实现都依赖AQS,如:ReentrantLockSemaphoreCountDownLatch等。主要使用方式是用一个内部辅助类继承它作为实现同步原语、简化并发工具的内部实现,屏蔽同步状态管理、线程排队、等待和唤醒等底层操作。

核心思想

如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并将共享资源设置为锁定状态;如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到同步队列中
CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列,虚拟的双向队列即不存在队列实例,仅存在节点之间的关联关系。
AQS是将每一条请求共享资源的线程封装成一个CLH锁队列的一个节点(Node),来实现锁的分配,队列头节点不保存线程信息。

分类

AQS通过模板方法封装了底层操作实现,分为:

  • 共享式
  • 独占式

共享式子类需要实现的方法如下:

protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}

protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}

独占式子类需要实现的方法如下:

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

protected boolean isHeldExclusively() {
    throw new UnsupportedOperationException();
}

源码简析

等待状态

  • CANCELLED:值为1,在同步队列中等待的线程因等待超时或者被中断,需要从等待队列中取消等待,节点进入该状态后将不会变化;
  • 0:非其他状态,即初始状态;
  • SIGNAL:值为-1,当前节点的线程如果释放了同步状态或者被取消,将会唤醒后继节点,使得后继节点得以运行;
  • CONDITION:值为-2,节点在Condition的等待队列中,当其他线程对Condition调用signal方法后,该节点会从等待队列中转移到同步队列中;
  • PROPAGATE:值为-3,表示下一次共享式同步状态获取将会无条件地被传播下去;

关键函数

enq

private Node enq(final Node node) {
    // 死循环保证执行成功,除非线程本身异常
    for (;;) {
        // 获取当前队列的尾节点
        Node t = tail;
        // 判断队列是否为空,队列不为空时尾节点一定有效
        if (t == null) {
            // 为空,则创建空节点并尝试设置为头节点(CLH队列的头节点是dummy header,不保存线程信息),再进行节点插入

            // 1. 创建空节点并尝试设置为头节点
            if (compareAndSetHead(new Node()))
                // 2. 头节点创建成功后,设置尾节点等同于头节点,此时队列中只有一个节点
                tail = head;
                // 3. 继续循环,此时队列不为空,可以进行节点插入
        } else {
            // 不为空,则尝试将当前节点当作尾节点插入队列

            // 1. 将当前节点的prev指向当前尾节点
            node.prev = t;
            // 2. CAS更新尾节点为当前节点
            if (compareAndSetTail(t, node)) {
                // 3. CAS更新成功,则设置旧尾节点的next为当前节点,即新尾节点
                t.next = node;
                // 返回旧尾节点
                return t;
            }
        }
    }
}

为什么可以通过if (t == null)判断队列是否为空呢?
通过后续操作可以知道,当创建队列头节点,或者插入新节点时,都会及时更新尾节点信息,保证尾节点有效;所以通过尾节点是否有效可以反推队列是否为空;
为什么不通过判断头节点是否有效来反推队列是否为空呢?
首先,可以通过头节点判断;那为什么不用呢,因为插入节点是采用尾插法,即插入到队列的尾部,那么使用尾节点会更易入编码实现;

场景分析

插入节点主要分两种情况:

  1. 队列为空;
  2. 队列不为空;

先分析第一种情况:队列为空,则先创建队列,再进行插入

  1. 创建空节点并尝试设置为头节点;
    1. 设置成功,则设置尾节点等同于头节点,及时更新尾节点信息,以便其它线程可见;
    2. 设置失败,则当前存在另一线程成功创建了头节点,那么相应的尾节点也有效了;
  2. 继续下一循环,此时尾节点一定有效(可能是当前线程设置的,也可能是另一线程设置的),即队列不为空,同第二种情况;

再分析第二种情况:队列不为空,则直接执行插入

  1. 将当前节点的prev指向当前尾节点;
  2. CAS更新尾节点为当前节点
    1. CAS更新成功,则设置旧尾节点的next为当前节点,即新尾节点,并返回旧尾节点;
    2. CAS更新失败,则当前存在另一线程成功插入节点,则继续下一循环,再次尝试插入;死循环会保证节点插入成功,除非当前线程异常;

addWaiter

private Node addWaiter(Node mode) {
    // 封装线程信息
    Node node = new Node(Thread.currentThread(), mode);
    // 尝试快速入队
    Node pred = tail;
    // 判断尾节点是否有效,尾节点有效才可以尝试快速入队
    if (pred != null) {
        // 将当前节点的prev指向当前尾节点
        node.prev = pred;
        // CAS更新尾节点为当前节点
        if (compareAndSetTail(pred, node)) {
            // CAS更新成功,则设置旧尾节点的next为当前节点,即新尾节点
            pred.next = node;
            // 返回旧尾节点
            return node;
        }
    }
    // 快速入队失败,则通过死循环保证入队
    enq(node);
    return node;
}

快速入队的操作和enq的队列不为空情况时的操作一样,尽量入队前获取锁,避免过多的队列操作;

shouldParkAfterFailedAcquire

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 获取prev节点的等待状态
    int ws = pred.waitStatus;
    // 如果等待状态是SIGNAL,则说明prev节点会负责唤醒本节点,那么本节点可以挂起
    if (ws == Node.SIGNAL)
        // 返回true,表示可以安全挂起
        return true;
    // 如果等待状态是CANCELLED,则说明prev节点线程已经异常,无法负责唤醒本节点,且需要从队列踢出
    if (ws > 0) {
        do {
            // 找到prev节点的前一个节点并作为本节点的prev节点,这样就可以踢出原prev节点
            node.prev = pred = pred.prev;
            // 继续判断新prev节点的等待状态是否是CANCELLED,如果是,则继续找到前一个节点并踢出新prev节点
        } while (pred.waitStatus > 0);
        // 新prev节点的等待状态不是CANCELLED
        // 设置新prev节点的next为本节点,完成队列的更新
        pred.next = node;
    } else {
        // 进入该分支,则说明prev节点的等待状态是0或者PROPAGATE

        // CAS设置prev节点的等待状态为SIGNAL,即表示会负责唤醒next节点
        // 不考虑CAS是否成功,因为后续还会进入该方法
        // 如果CAS设置成功(且再次进入该方法前没有线程——prev自身线程或者其它线程——修改prev节点的等待状态),那么下次就会进入SIGNAL分支
        // 如果CAS设置失败(且再次进入该方法前没有线程——prev自身线程或者其它线程——修改prev节点的等待状态),那么下次还会再次CAS设置,直到设置成功
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    // 返回false,表示还不可以安全挂起
    return false;
}

根据prev节点的等待状态可以分为三种情况:

  1. 正好就是SIGNAL,表示负责唤醒本节点线程,那么直接返回true,表示本节点可以安全挂起;
  2. CANCELLED,表示prev节点线程已经异常,那么需要将更新本节点的prev节点为新有效节点,且将旧prev节点从队列中踢出;
  3. 0或者PROPAGATE,表示需要有SIGNAL操作,那么尝试更新prev节点为SIGNAL状态,但不关心更新结果,因为acquireQueued会死循环调用shouldParkAfterFailedAcquire,如果更新失败,则说明有其它线程同时更新prev节点,竞争的影响会在下次进入shouldParkAfterFailedAcquire时可见;

unparkSuccessor

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    // 如果本节点的等待状态值小于0,大概率是SIGNAL
    if (ws < 0)
        // CAS设置等待状态为0,完成唤醒任务后需要清除唤醒标识
        // 但不关心成功与否,因为本节点的等待状态在完成唤醒后继节点后已经不重要了
        compareAndSetWaitStatus(node, ws, 0);

    // 寻找后继节点,一般都是本节点的next节点
    Node s = node.next;
    // 如果本节点的next节点不存在,或者等待状态为CANCELLED(自身异常退出锁竞争)
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 从后向前遍历队列,寻找最靠近本节点的有效节点,即为后继节点
        // 因为插入节点采用尾插法,踢出无效节点也是从后向前遍历,因此从后向前遍历队列一直是可靠的,但从前向后遍历无法保证可靠
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                // 等待状态不为CANCELLED,认为有效
                s = t;
    }
    if (s != null)
        // 存在有效后继节点,则缓存该节点线程
        LockSupport.unpark(s.thread);
}

本方法就是负责唤醒后继节点线程的实现,其中核心的逻辑就是从后向前遍历队列寻找当前节点的有效后继节点,为什么是从后向前遍历队列呢?

场景分析

主要和插入节点、踢出节点、取消节点实现有关,接下来就分析下这两动作的实现;
插入节点
插入节点动作发生在新线程尝试获取锁失败的情况,这时线程会被包装成新节点插入到队列中,插入方式为尾插法

// 1. 将当前节点的prev指向当前尾节点
node.prev = t;
// 2. CAS更新尾节点为当前节点
if (compareAndSetTail(t, node)) {
    // 3. CAS更新成功,则设置旧尾节点的next为当前节点,即新尾节点
    t.next = node;
    // 返回旧尾节点
    return t;
}

由第一步和第二步可知,当新节点成功变成尾节点后,新尾节点的prev一定会指向旧尾节点,但旧尾节点的next还未指向新尾节点,这个动作是第三步完成的,那么在成功执行完第二步和还未执行第三步之间的这段空档期,整个队列只支持从后向前遍历,如果这时候旧尾节点恰好释放锁并且需要负责唤醒后继节点,但因为其next为null,如果从前向后遍历,则无法找到新尾节点,自然无法执行唤醒操作;
题外话:其实这也没啥影响,因为新尾节点在加入队列后会进入acquireQueued方法,还会再次尝试获取锁(这里不得不佩服下),这时候因为没有线程持有锁,新尾节点自然获取锁成功,成功后不会挂起,自然不需要被唤醒;

踢出节点
当节点线程因为自身异常原因,退出锁竞争,那么对应节点的等待状态会设置为CANCELLED,即该节点无效,可以被踢出;踢出动作会发生下如下时机:

  1. shouldParkAfterFailedAcquire,线程在挂起之前,会踢出队列中本节点前无效节点;
  2. cancelAcquire,线程在等待获取锁时异常,会主动取消获取锁,此时会踢出队列中本节点前无效节点(也可能包括自身对应节点);

PS:踢出的节点都是本节点和本节点前第一个有效节点之间的无效节点(可能包含本节点)

踢出节点逻辑如下:

do {
    // 1. 从后向前遍历队列
    node.prev = pred = pred.prev;
    // 2. 判断前继节点是否有效
} while (pred.waitStatus > 0);
// 3. 更新前继节点next指向本节点
pred.next = node;

shouldParkAfterFailedAcquire一般是新节点——即尾节点——执行的,也就是先从后向前遍历整个队列,直到找到有效的前继节点,再将前继节点的next更新为本节点,完成踢出前继节点和本节点之间的无效节点;但是再第三步执行之前,前继节点节点和本节点之间的无效节点可以通过从前向后遍历得到;
题外话:其实从前向后遍历也不影响功能,只是会重复过滤无效节点,从后向前遍历,则unparkSuccessor可以使用已有的过滤结果;

取消节点
当节点线程因为自身异常原因,退出锁竞争时,会finally执行cancelAcquire,该方法的最后会执行node.next = node,如果当从前向后遍历队列时,node还未被踢出队列,那么就会发生死循环;

综上所述,使用从后向前遍历队列的理由有两个:

  1. 保证队列中所有节点可见,包括正在插入但还未插入完全的节点;
  2. 使用已有的过滤结果,无需再次重复过滤无效节点;
  3. 防止可能的死循环;

cancelAcquire

private void cancelAcquire(Node node) {
    // 如果节点不存在则直接忽略
    if (node == null)
        return;

    // 清空线程信息
    node.thread = null;
    
    // 跳过无效的前继节点,即踢出无效节点
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        // 从后向前遍历队列,直到找到有效的前继节点
        node.prev = pred = pred.prev;

    // predNext明显是要取消摘除的节点(要么是当前节点,要么是当前节点前的无效节点)
    Node predNext = pred.next;

    // 设置本节点的等待状态为CANCELLED,这样其它节点就可以踢出本节点
    node.waitStatus = Node.CANCELLED;

    // 如果本节点就是尾节点,则表示前继节点pred后全是无效节点(包括本节点),需要全部踢出
    // CAS设置前继节点为新尾节点
    // CAS设置成功,则需要清空新尾节点的next以完成摘链
    // CAS设置失败,则说明有新节点插入队列,新节点在shouldParkAfterFailedAcquire中会踢出无效节点
    if (node == tail && compareAndSetTail(node, pred)) {
        // CAS设置成功,则前继节点变成新的尾节点,尝试更新新尾节点的next为空,进行摘链
        // 如果更新失败,则说明有新节点插入队列,新节点会该尾节点的next指向自身,完成摘链;
        // 否则更新成功,完成摘链
        compareAndSetNext(pred, predNext, null);
    } else {
        int ws;
        // 如果前继节点pred是头节点,头节点可能此时恰好释放锁,错过唤醒后继节点时机,那么就需要负责唤醒后继节点
        // 如果前继节点不存在线程信息,那么pred要么成为新头节点,要么以为自身异常进入cancelAcquire
        //     如果成为新头节点,头节点可能此时恰好释放锁,错过唤醒后继节点时机,那么就需要负责唤醒后继节点
        //     如果因为自身异常进入cancelAcquire,这种情况下会处理好唤醒的可传递,但尝试唤醒会更好
        // 如果前继节点pred的等待状态是SIGNAL或者等待状态值小于0且CAS成功更新为SIGNAL,那么就不需要负责唤醒后继节点
        // 否则就需要负责唤醒后继节点
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            // 满足不需要唤醒后继节点的条件

            // 尝试踢出本节点
            Node next = node.next;            
            if (next != null && next.waitStatus <= 0)
                // 如果本节点存在有效后继节点,则CAS设置前继节点的next为后继节点,即踢出本节点
                // CAS设置成功,则完成踢出本节点
                // CAS设置失败,则表明其它线程踢出predNext节点,自然也踢出同为无效节点的本节点
                compareAndSetNext(pred, predNext, next);
        } else {
            // 唤醒后继节点
            unparkSuccessor(node);
        }
        // 如果node节点还存在队列中,那么此时从后向前遍历会出现死循环
        // 本节点不是尾节点,且next节点也是无效节点时,此时不会踢出本节点
        node.next = node;
    }
}

本方法就是线程异常后主动取消节点时调用的,完成踢出本节点前的无效节点和是否唤醒后继节点;
在判断是否唤醒后继节点时,判断要求较多,必须满足以下全部条件:

  1. 前继节点pred不是头节点;
  2. 前继节点pred的等待状态是SIGNAL或者等待状态值小于0且CAS成功更新为SIGNAL;
  3. 前继节点pred还未成为新头节点或者产生异常;

如果是头节点,头节点可能此时恰好释放锁,取消的节点还未来得及设置头节点的等待状态为SIGNAL,头节点就完成release动作,错过了唤醒后继节点,那么确保万无一失,就需要负责唤醒后继节点;
如果等待状态是SIGNAL,那么这个节点会负责唤醒后继节点,就不需要主动去唤醒后继节点了;
如果等待状态不是SIGNAL,但可以更新为SIGNAL,那么这个节点也会负责唤醒后继节点,就不需要主动去唤醒后继节点了;
否则,这个节点不会负责唤醒后继节点,需要主动去唤醒后继节点;

acquireQueued

final boolean acquireQueued(final Node node, int arg) {
    // 设置失败标识,只有获取到锁后正常退出才认为没有失败
    boolean failed = true;
    try {
        // 中断标识默认为false,当发生中断时设置
        boolean interrupted = false;
        // 死循环保证获取锁或者挂起成功
        for (;;) {
            // 获取本节点的前继节点
            final Node p = node.predecessor();
            // 1. 如果前继节点是头节点才允许尝试获取锁
            if (p == head && tryAcquire(arg)) {
                // 获取锁成功

                // 4. 设置自身为头节点
                setHead(node);
                // 清空旧头节点信息,以帮助GC
                p.next = null;
                // 清空失败标识
                failed = false;
                // 返回中断标识
                return interrupted;
            }
            // 暂时无法获取锁,则需要挂起,避免资源浪费

            // 2. 判断是否可以挂起
            // 3. 可以的话就挂起当前线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 当被唤醒之前发生过中断,则设置中断标识
                // parkAndCheckInterrupt会调用Thread.interrupted()判断是否发生过中断,该动作会清除线程的中断状态
                interrupted = true;
        }
    } finally {
        if (failed)
            // 因为异常,无法继续获取锁,需要取消节点
            cancelAcquire(node);
    }
}

在本方法中,有一个关键的地方就是线程在挂起之前会再次尝试获取锁,避免线程挂起后立马再被唤醒,带来过多的线程上下文切换;但是有一个先决条件,即本节点的前继节点必须是头节点,这是为什么呢?
通过setHead(node);可知,队列的头节点几乎都是当前持有锁的线程的对应节点(队列刚创建时头节点只是个空节点,和任何线程无关),该线程很有可能释放锁,然后唤醒其后继节点;如果本节点的前继节点是头节点,也就是说头节点的后继节点一定是本节点(本节点一定有效),释放锁时会唤醒本节点;那么此时尝试去获取锁,获取失败没啥影响,获取成功却可以避免挂起再唤醒的消耗,还是很有意义的;
如果本节点的前继节点不是头节点,那么头节点的后继节点不一定是本节点(本节点一定有效,但头节点和本节点之间的节点不一定都无效),此时获取锁成功会导致本节点成为新头节点,旧头节点和本节点之间的节点就会丢失,这无法接受;

线程从acquireQueued方法正常退出的步骤分以下情况:

  1. 执行1 -> 4;即第一次循环就获取到锁;
  2. 执行1 -> 2 -> 1 -> 4;即第一次循环未获取到共享锁,确保前继节点唤醒自身后,第二次循环获取到锁;
  3. 执行(1 -> 2)+ -> 3 -> 1 -> 4;即经过挂起和唤醒后获取到锁;
  4. 执行((1 -> 2)+ -> 3)+ -> 1 -> 4;即经过两次挂起(特殊情况,详见场景分析)和唤醒后获取到锁;

(1 -> 2)+表示第一、二步可能执行多次,原因见shouldParkAfterFailedAcquire分析;
((1 -> 2)+ -> 3)+表示第三步可能执行一次或者两次,原因见接下来的场景分析

场景分析

下面看一个场景,假设队列中仅存在头节点(线程还持有锁),此时有个新节点刚加入队列,随后进入acquireQueued方法,请分析新节点正常退出本方法,会存在哪些可能:
因为新节点的前继节点就是头节点(等待状态为0),所以可以尝试获取锁

  1. 1 -> 4;新节点在执行第一步尝试获取锁时,头节点线程正好释放锁,此时新节点没有挂起就直接获取锁成功,成为新头节点后退出方法;而旧头节点释放锁后,因为等待状态为0,不会执行唤醒后继节点的动作;
  2. 1 -> 2 -> 1 -> 2 -> 3 -> 1 -> 4;新节点第一次执行第一步尝试获取锁失败后,会执行第二步,将前继节点即头节点的等待状态设置为SIGNAL,然后第二次执行第一步尝试获取锁失败后,会进行挂起操作,但在挂起之前,前继节点释放了锁并完成唤醒动作,则新节点线程执行挂起操作时会立即返回,进行下一轮循环,尝试获取锁,成功后,成为新头节点后退出方法;
  3. 1 -> 2 -> 1 -> 2 -> 3 -> 1 -> 4;新节点第一次执行第一步尝试获取锁失败后,会执行第二步,将前继节点即头节点的等待状态设置为SIGNAL,然后第二次执行第一步尝试获取锁失败后,会进行挂起操作,挂起之后,前继节点释放了锁并唤醒新节点线程,新节点线程进行下一轮循环,尝试获取锁,成功后,成为新头节点退出方法;和上一种的区别为:新节点线程是先被挂起再被唤醒;
  4. 1 -> 2 -> 1 -> 4;新节点第一次执行第一步尝试获取锁失败后,会执行第二步,将前继节点即头节点的等待状态设置为SIGNAL,然后再执行第一步尝试获取锁,此时头节点线程正好释放锁但还未执行唤醒后继节点动作,新节点没有挂起就直接获取锁成功,成为新头节点然后退出方法;而旧头节点释放锁后,因为等待状态为SIGNAL,会执行唤醒后继节点,即新节点的动作,那么新节点线程会拥有一次可以抵消LockSupport.park特权,详见LockSupport.unpark代码:
/**
 * 使给定线程的许可证可用(如果它尚不可用)。
 * 如果线程在park上被阻塞,那么它将解除阻塞。
 * 否则,它对park的下一次调用保证不会阻塞。
 * 如果给定线程尚未启动,则不保证此操作有任何效果。
 */
public static void unpark(Thread thread) {
    if (thread != null)
        UNSAFE.unpark(thread);
}

这会带来什么影响呢?

  1. 如果线程后续不再使用LockSupport.park,那么就没啥影响;
  2. 如果线程后续还会使用LockSupport.park
    1. 线程再次竞争锁,并且进入parkAndCheckInterrupt,需要挂起,特权会抵消这次挂起,让线程再次执行一次循环,然后再挂起;只是多了一次循环,没啥影响;1 -> 2 -> 1 -> 2 -> 3 -> 1 -> 2 -> 3 -> 1 -> 4
    2. 线程在用户代码中执行了LockSupport.park(可能性较低),那就会出现线程没挂住的意外情况;

doReleaseShared

private void doReleaseShared() {
    // 死循环保证方法正确退出
    for (;;) {
        Node h = head;
        // 如果头节点存在,且不为尾节点,表明队列中存在中间节点
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // 头节点的等待状态为SIGNAL,表示负责唤醒后继节点
            if (ws == Node.SIGNAL) {
                // CAS设置成功才能唤醒后继节点,防止重复唤醒
                // CAS设置等待状态为0
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    // 因为releaseShared和setHeadAndPropagate可能同时调用doReleaseShared,存在竞争,其中一个执行成功,另一个则会执行失败
                    // 继续下一个循环,再次检查新头节点的等待状态
                    continue;
                // CAS设置成功,则唤醒头节点的后继节点
                unparkSuccessor(h);
            }
            // 如果等待状态为0,则CAS设置等待状态为PROPAGATE,将唤醒动作传播下去,虽然没有执行唤醒动作
            else if (ws == 0 &&
                    !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                // 因为releaseShared和setHeadAndPropagate可能同时调用doReleaseShared,存在竞争,其中一个执行成功,另一个则会执行失败
                // 或者存在新节点未拿到锁在挂起前修改了头节点的等待状态为SIGNAL,让头节点负责唤醒自身
                // 继续下一个循环,再次检查新头节点的等待状态
                continue;
        }
        if (h == head)
            // 头节点没有变化则可以退出

            // 此时,头节点的状态要么是PROPAGATE,表示唤醒动作会被传播下去
            // 要么是0,表示已执行唤醒动作
            // 不会是CANCELLED,因为是在释放锁,而不是在等待获取锁
            break;
    }
}

方法正常退出的情况如下:

  1. 队列为空,或者只存在head/tail节点,没有节点需要唤醒;
  2. 头节点的状态为PROPAGATE;
  3. 头节点的状态由原先的SIGNAL变为0,并完成唤醒后继节点动作;
  4. 头节点的状态由原来的0变为PROPAGATE;

setHeadAndPropagate

private void setHeadAndPropagate(Node node, int propagate) {
    // 记录旧头节点
    Node h = head;
    // 设置新头节点
    setHead(node);

    // 如果还存在可获取的资源,则可以唤醒后继节点去获取共享锁
    // 如果旧头节点的等待状态值小于0(SIGNAL or PROPAGATE(PROPAGATE可以转换为SIGNAL)),则需要唤醒后继节点
    // 如果新头节点的等待状态值小于0(SIGNAL or PROPAGATE(PROPAGATE可以转换为SIGNAL)),则需要唤醒后继节点
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        // 如果next节点不存在,则无法确定是否真的不存在共享节点,因为有可能此时存在新节点(应当只需要考虑共享节点,但因为无法得知节点模式,所以都考虑)刚插入队列,还未来得及更新前继节点的next值,这种情况下需要传播唤醒动作,即使唤醒动作可能是多余的
        // 如果next节点是共享节点,则传播唤醒动作
        // 如果next节点是独占节点,则没必要传播,因为这是共享节点获取到共享锁时的传播动作
        // ReentrantReadWriteLock的实现会同时存在独占节点和共享节点
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

考虑到共享锁的并发获取和释放场景,本方法放宽了传播唤醒动作的条件,当同时满足以下两个条件时就需要传播:

  1. 存在可获取的资源,或者新或旧头节点不存在,或者新或旧节点的等待状态值小于0(SIGNAL or PROPAGATE(PROPAGATE可以转换为SIGNAL));
  2. 新头节点的next节点不存在或者是共享节点;

场景分析

假设共享锁的等待队列状态如下:
head(null, -1) <-> node(t3, 0)/tail
此时线程t1和t2持有共享锁,t1和t2先后释放共享锁,分析t3和t4获取共享锁的过程;

  1. t1释放共享锁,进入releaseShared,此时head等待状态为-1, 随后修改了head的等待状态为0,并调用unparkSuccessor,此时队列状态如下:
    head(null, 0) <-> node(t3, 0)/tail
  2. t3被t1唤醒后,执行tryAcquireShared获取到了t1释放的共享锁,返回值为0,但还未设置自身为head节点,此时队列状态如下:
    head(null, 0) <-> node(t3, 0)/tail
  3. t4获取锁失败后加入队列,还未设置前继节点的等待状态为-1,此时队列状态如下:
    head(null, 0) <-> node(t3, 0) <-> node(t4, 0)/tail
  4. t2释放共享锁,进入releaseShared,此时head等待状态为0,随后修改了head的等待状态为-3,此时队列状态如下:
    head(null, -3) <-> node(t3, 0) <-> node(t4, 0)/tail
  5. t4设置前继节点的等待状态为-1并挂起,此时队列状态如下:
    head(null, -3) <-> node(t3, -1) <-> node(t4, 0)/tail
  6. t3调用setHeadAndPropagate设置自身为head节点,此时队列状态如下:
    head(null, -1) <-> node(t4, 0)/tail
  7. 满足传播唤醒动作条件,t3执行唤醒动作,t4被唤醒,此时队列状态如下:
    head(null, 0) <-> node(t4, 0)/tail
  8. t4被唤醒,获取到t2释放的共享锁,调用setHeadAndPropagate设置自身为head节点,此时队列状态如下:
    head(null, 0)/tail
  9. 满足唤醒传播条件,但由于队列只存在head/tail节点,所以无操作,队列状态如下:
    head(null, 0)/tail
  10. 后续t3/t4释放共享锁,但由于队列只存在head/tail节点,所以无操作,队列状态如下:
    head(null, 0)/tail

doAcquireShared

private void doAcquireShared(int arg) {
    // 将当前线程封装成共享式节点插入队列
    final Node node = addWaiter(Node.SHARED);
    // 设置失败标识,只有获取到共享锁后正常退出才认为没有失败
    boolean failed = true;
    try {
        // 中断标识默认为false,当发生中断时设置
        boolean interrupted = false;
        // 死循环保证获取共享锁或者挂起成功
        for (;;) {
            // 获取本节点的前继节点
            final Node p = node.predecessor();
            // 1. 如果前继节点是头节点才允许尝试获取共享锁
            if (p == head) {
                // 2. 尝试获取共享锁
                int r = tryAcquireShared(arg);
                // 判断是否获取共享锁,返回值不小于0则表示获取成功,返回值表示还剩下的资源
                if (r >= 0) {
                    // 5. 设置自身为头节点并传播唤醒动作
                    setHeadAndPropagate(node, r);
                    // 清空旧头节点信息,以帮助GC
                    p.next = null;
                    if (interrupted)
                        // 发生过中断,则恢复线程中断状态
                        selfInterrupt();
                    // 清空失败标识
                    failed = false;
                    return;
                }
            }
            // 3. 判断是否可以挂起
            // 4. 可以的话就挂起当前线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 当被唤醒之前发生过中断,则设置中断标识
                // parkAndCheckInterrupt会调用Thread.interrupted()判断是否发生过中断,该动作会清除线程的中断状态
                interrupted = true;
        }
    } finally {
        if (failed)
            // 因为异常,无法继续获取锁,需要取消节点
            cancelAcquire(node);
    }
}

在本方法中,有一个关键的地方就是线程在挂起之前会再次尝试获取锁,避免线程挂起后立马再被唤醒,带来过多的线程上下文切换;但是有一个先决条件,即本节点的前继节点必须是头节点,这是为什么呢?
通过setHeadAndPropagate(node, r);可知,队列的头节点几乎都是当前持有锁的线程的对应节点(队列刚创建时头节点只是个空节点,和任何线程无关),该线程很有可能释放锁,然后唤醒其后继节点;如果本节点的前继节点是头节点,也就是说头节点的后继节点一定是本节点(本节点一定有效),释放锁时会唤醒本节点;那么此时尝试去获取锁,获取失败没啥影响,获取成功却可以避免挂起再唤醒的消耗,还是很有意义的;
如果本节点的前继节点不是头节点,那么头节点的后继节点不一定是本节点(本节点一定有效,但头节点和本节点之间的节点不一定都无效),此时获取锁成功会导致本节点成为新头节点,旧头节点和本节点之间的节点就会丢失,这无法接受;

对应节点的前继节点是头节点的线程从doAcquireShared方法正常退出的步骤分以下情况:

  1. 执行1 -> 2 -> 5;即第一次循环就获取到共享锁;
  2. 执行1 -> 2 -> 3 -> 1 -> 2 -> 5;即第一次循环未获取到共享锁,确保前继节点唤醒自身后,第二次循环获取到共享锁;
  3. 执行(1 -> 2 -> 3)+ -> 4 -> 1 -> 2 -> 5;即经过挂起和唤醒后获取到共享锁;
  4. 执行((1 -> 2 -> 3)+ -> 4)+ -> 1 -> 2 -> 5;即经过两次挂起(特殊情况,详见场景分析)和唤醒后获取到共享锁;

(1 -> 2 -> 3)+表示第一、二、三步可能执行多次,原因见shouldParkAfterFailedAcquire分析;
((1 -> 2 -> 3)+ -> 4)+表示第四步可能执行一次或者两次,原因见接下来的场景分析

对应节点的前继节点不是头节点的线程从doAcquireShared方法正常挂起的步骤分以下情况:

  1. 执行(1 -> 3)+ -> 4;确保前继节点负责唤醒自身后,在下一循环安全挂起;
  2. 执行((1 -> 3)+ -> 4)+;即经过两次挂起才安全挂起(特殊情况,详见场景分析);

场景分析

首先看下新节点可以尝试获取共享锁的场景;
假设队列中仅存在头节点(线程还持有共享锁),此时有个新节点刚加入队列,随后进入acquireQueued方法,请分析新节点正常退出本方法,会存在哪些可能:
因为新节点的前继节点就是头节点(等待状态为0),所以可以尝试获取锁

  1. 1 -> 2 -> 5;新节点在执行第二步尝试获取锁时,头节点线程正好释放锁,此时新节点没有挂起就直接获取锁成功,成为新头节点后退出方法;而旧头节点释放锁后,因为等待状态为0,不会执行唤醒后继节点的动作;
  2. 1 -> 2 -> 3 -> 1 -> 2 -> 3 -> 4 -> 1 -> 2 -> 5;新节点第一次执行第二步尝试获取锁失败后,会执行第三步,将前继节点即头节点的等待状态设置为SIGNAL,然后第二次执行第二步尝试获取锁失败后,会进行挂起操作,但在挂起之前,前继节点释放了锁并完成唤醒动作,则新节点线程执行挂起操作时会立即返回,进行下一轮循环,尝试获取锁,成功后,成为新头节点后退出方法;
  3. 1 -> 2 -> 3 -> 1 -> 2 -> 3 -> 4 -> 1 -> 2 -> 5;新节点第一次执行第二步尝试获取锁失败后,会执行第三步,将前继节点即头节点的等待状态设置为SIGNAL,然后第二次执行第二步尝试获取锁失败后,会进行挂起操作,挂起之后,前继节点释放了锁并唤醒新节点线程,新节点线程进行下一轮循环,尝试获取锁,成功后,成为新头节点退出方法;和上一种的区别为:新节点线程是先被挂起再被唤醒;
  4. 1 -> 2 -> 3 -> 1 -> 2 -> 5;新节点第一次执行第二步尝试获取锁失败后,会执行第三步,将前继节点即头节点的等待状态设置为SIGNAL,然后再执行第二步尝试获取锁,此时头节点线程正好释放锁但还未执行唤醒后继节点动作,新节点没有挂起就直接获取锁成功,成为新头节点然后退出方法;而旧头节点释放锁后,因为等待状态为SIGNAL,会执行唤醒后继节点,即新节点的动作,那么新节点线程会拥有一次可以抵消LockSupport.park特权,详见LockSupport.unpark代码:
/**
 * 使给定线程的许可证可用(如果它尚不可用)。
 * 如果线程在park上被阻塞,那么它将解除阻塞。
 * 否则,它对park的下一次调用保证不会阻塞。
 * 如果给定线程尚未启动,则不保证此操作有任何效果。
 */
public static void unpark(Thread thread) {
    if (thread != null)
        UNSAFE.unpark(thread);
}

这会带来什么影响呢?

  1. 如果线程后续不再使用LockSupport.park,那么就没啥影响;
  2. 如果线程后续还会使用LockSupport.park
    1. 线程再次竞争锁,并且进入parkAndCheckInterrupt,需要挂起,特权会抵消这次挂起,让线程再次执行一次循环,然后再挂起;只是多了一次循环,没啥影响;1 -> 2 -> 3 -> 1 -> 2 -> 3 -> 4 -> 1 -> 2 -> 3 -> 4 -> 1 -> 2 -> 5
    2. 线程在用户代码中执行了LockSupport.park(可能性较低),那就会出现线程没挂住的意外情况;

再看下新节点不可以尝试获取共享锁的场景;
假设队列中存在多个节点,此时有个新节点刚加入队列,随后进入acquireQueued方法,请分析新节点正常退出本方法,会存在哪些可能:
因为新节点的前继节点不是头节点,所以不可以尝试获取锁

  1. 1 -> 3 -> 1 -> 3 -> 4;新节点执行第三步,将前继节点即头节点的等待状态设置为SIGNAL,然后在下一循环会进行挂起操作;
  2. 1 -> 3 -> 1 -> 3 -> 4 -> 1 -> 3 -> 4;带有特权的新节点执行第三步,将前继节点即头节点的等待状态设置为SIGNAL,然后在下一循环会进行第一次挂起操作;但此次挂起会因特权而直接返回,从而在下一循环中执行第二次挂起操作;

关键流程

获取独占锁——acquire

源码

/**
 * 尝试获取独占锁
 * 算法细节,由子类实现
 */
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
    
/**
 * 获取独占锁,延迟处理中断
 */
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // 发生过中断,则恢复线程中断状态
        selfInterrupt();
}

/**
 * 获取独占锁,支持响应中断
 */
public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

/**
 * 获取独占锁,支持响应中断和超时限制
 */
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}

主要逻辑

acquire为例,分析获取独占锁的主要流程;

  1. tryAcquire尝试获取独占锁,获取成功,直接返回;尽量入队前拿到锁,避免过多队列操作;
  2. 尝试获取独占锁失败,则需要在等待队列中获取锁,执行addWaiter将当前线程包装成独占式节点,加入等待队列;
    1. 封装线程信息为新节点;
    2. 尝试快速入队:当尾节点不为null时(即队列存在节点),先设置新节点的prev是尾(确保任意时刻从后向前遍历是可靠的),然后CAS更新尾节点为本节点,成功就设置原尾节点的next为本节点并返回本节点,失败则说明存在其它线程竞争设置尾节点;
    3. 快速入队失败,则通过enq加入队列,enq中会死循环保证CAS入队成功,除非线程自身异常;
      1. 死循环保证节点插入成功;
      2. 如果队列为空(tail == null),为空则创建头节点(dummy header,没有真实存放线程信息,且真正发生线程竞争排队情况下才会初始化,避免过多的队列操作造成性能浪费),并设置tail = head,然后进行下一循环;
      3. 如果队列不为空,则尝试CAS尝试将本节点加入队尾,成功则退出,失败则进行下一循环;
  3. 加入队列后,执行acquireQueued,在队列中获取锁;
    1. 设置失败标识为true,中断标识为false;
    2. 死循环保证节点获取到锁后退出,除非线程自身异常;
    3. 获取当前节点的prev,如果prev是头节点,就尝试获取锁(尽量挂起前拿到锁,避免过多线程上下文切换),若成功,则
      1. 设置当前节点为头节点;
      2. 清除失败标识;
      3. 返回中断标识;
    4. 如果prev不是头节点,或者尝试获取锁失败,则执行shouldParkAfterFailedAcquire,确保前继节点负责唤醒自身;
      1. 如果前继节点的等待状态正好是SIGNAL,则说明前继节点会唤醒自己,本节点可以安全挂起,返回true;
      2. 如果前继节点的等待状态为CANCELLED,则一直向前遍历新前继节点,直到新前继节点的等待状态值不大于0,然后设置新前继节点的next为本节点(此过程会踢出队列中的无效节点);
      3. 否则,尝试CAS更新前继节点的等待状态为SIGNAL(想让前继节点唤醒自己,这样就可以安全挂起);
        1. 如果CAS更新成功,则下一次进入本方法就可以在第一步直接返回true;
        2. 如果CAS更新失败(前继节点的线程异常退出了,执行cancelQueue会修改状态),则下一次进入本方法会使用新前继节点;
      4. 除了从第1步的退出点,其它退出点都需要返回false;
    5. 如果当前节点可以安全挂起,则执行parkAndCheckInterrupt
      1. 挂起节点线程;
      2. 节点线程被唤醒后,调用Thread.interrupted()返回是否发生中断;
    6. 如果节点线程被唤醒前发生中断,则设置中断标识;
    7. acquireQueued方法正常或异常退出都会经过finally代码块处理,如果失败标识有效,则表示因线程异常而退出方法,需要调用cancelAcquire取消节点;
      1. 向前遍历本节点的有效前继节点(此过程会完成踢出队列中的本节点前的无效节点的一半工作);
      2. 获取新前继节点的next节点(不一定是本节点,中间可能存在无效节点);
      3. 设置本节点的等待状态是CANCELLED,这样其它节点就可以踢出本节点;
      4. 如果本节点是尾节点,则表示新前继节点后全是无效节点(包括本节点),需要全部踢出,所以尝试CAS设置新前继节点为尾节点来完成摘链;
        1. 设置成功,则需要清空新尾节点的next以完成摘链,尝试CAS设置新尾节点的next为null;
          1. 设置成功,则完成摘链
          2. 设置失败,则说明有新节点插入队列,新节点会将尾节点的next指向自身,这也帮助完成摘链;
        2. 设置失败,因为新的线程节点添加到队尾,那么本节点以及其它无效节点会被新节点在shouldParkAfterFailedAcquire中踢出,此时不需要进一步处理;
      5. 如果前继节点pred是头节点,头节点可能此时恰好释放锁,错过唤醒后继节点时机,那么就需要负责唤醒后继节点;
      6. 或者前继节点pred成为新头节点,头节点可能此时恰好释放锁,错过唤醒后继节点时机,那么就需要负责唤醒后继节点;
      7. 如果前继节点pred的等待状态是SIGNAL或者等待状态值小于0且CAS成功更新为SIGNAL,那么就不需要负责唤醒后继节点;
        1. 此时尝试CAS设置前继节点pred的next为当前节点的next;
          1. 设置成功,完成摘链;
          2. 设置失败,则说明有其它节点踢出了无效节点,帮助完成摘链;
      8. 否则就需要负责唤醒后继节点;
        1. 如果本节点等待状态值小于0(大概率是SIGNAL),则CAS设置等待状态为0,完成唤醒任务后需要清除唤醒标识,但不关心成功与否,因为本节点的等待状态在完成唤醒后继节点后已经不重要了;
        2. 如果本节点的next节点不存在,或者等待状态为CANCELLED(自身异常退出锁竞争),则
          1. 从后向前遍历队列,寻找最靠近本节点的有效节点,即为后继节点;
        3. 存在有效后继节点,则唤醒该节点线程;
      9. 设置本节点的next为自身,方便isOnSyncQueue方法判断一个原先属于条件队列的节点是否转移到了同步队列;
  4. 线程获取锁成功后,会根据返回的中断标识进行重新设置线程中断状态(中断延迟处理);

释放独占锁—— release

源码

/**
 * 尝试释放独占锁
 * 算法细节,由子类实现
 */
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

/**
 * 释放独占锁
 */
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

主要逻辑

  1. tryRelease尝试释放锁,失败直接返回false;
  2. 释放锁成功后,则考虑唤醒后继节点,当同时满足以下条件才会执行唤醒动作;
    1. 头节点存在,头节点不存在,则意味着未发生锁竞争,没有线程在等待需要被唤醒;
    2. 头节点的等待状态不为0,需要负责唤醒后继节点;
  3. 不满足唤醒条件,则直接返回true;否则执行唤醒动作unparkSuccessor
    1. 如果本节点等待状态值小于0(大概率是SIGNAL),则CAS设置等待状态为0,完成唤醒任务后需要清除唤醒标识,但不关心成功与否,因为本节点的等待状态在完成唤醒后继节点后已经不重要了;
    2. 如果本节点的next节点不存在,或者等待状态为CANCELLED(自身异常退出锁竞争),则
      1. 从后向前遍历队列,寻找最靠近本节点的有效节点,即为后继节点;
    3. 存在有效后继节点,则唤醒该节点线程;
  4. 返回true;

获取共享锁——acuqireShared

源码

/**
 * 尝试获取共享锁
 * 算法细节,由子类实现
 */
protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}

/**
 * 获取共享锁,延迟处理中断
 */
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

/**
 * 获取共享锁,支持响应中断
 */
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

/**
 * 获取共享锁,支持响应中断和超时限制
 */
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquireShared(arg) >= 0 ||
        doAcquireSharedNanos(arg, nanosTimeout);
}

主要逻辑

acquireShared为例,分析获取共享锁的主要流程;

  1. tryAcquireShared尝试获取共享锁,获取成功,直接返回;尽量入队前拿到锁,避免过多队列操作;tryAcquireShared的实现必须满足以下要求:
    1. 必须检查当前上下文是否支持获取共享锁,支持再进行获取
    2. 返回值小于0,表示获取共享锁失败,需要进入同步队列;
    3. 返回值等于0,表示获取共享锁成功,后续线程无法继续获取共享锁;
    4. 返回值大于0,表示获取共享锁成功,后继节点也可能获取共享锁成功;
  2. 尝试获取共享锁失败,则调用doAcquireShared进行获取共享锁;
    1. 执行addWaiter,将当前线程封装成共享式节点加入队列;
    2. 设置失败标识为true,中断标识为false;
    3. 死循环保证节点获取到锁后退出,除非线程自身异常;
    4. 获取当前节点的prev,如果prev是头节点且尝试获取共享锁成功(尽量挂起前拿到锁,避免过多线程上下文切换),则:
      1. 调用setHeadAndPropagate,设置本节点为头节点,并传播唤醒动作;
        1. 记录当前头节点;
        2. 设置本节点为新头节点;
        3. 如果还存在可获取的资源,则需要唤醒后继节点去获取共享锁;
        4. 如果旧头节点的等待状态值小于0(SIGNAL or PROPAGATE(PROPAGATE可以转换为SIGNAL)),则需要唤醒后继节点;
        5. 如果新头节点的等待状态值小于0(SIGNAL or PROPAGATE(PROPAGATE可以转换为SIGNAL)),则需要唤醒后继节点;
          1. 如果后继节点为不存在,则无法确定是否真的不存在共享节点,因为有可能此时存在新节点(应当只需要考虑共享节点,但因为无法得知节点模式,所以都考虑)刚插入队列,还未来得及更新前继节点的next值,这种情况下需要传播唤醒动作,即使唤醒动作可能是多余的;
          2. 如果后继节点是共享类型的,则传播唤醒动作
          3. 也就是说,除非明确指明后继节点是独占式,否则就需要执行doReleaseShared传播唤醒后继节点;
            1. 死循环保证方法正确退出;
            2. 如果头节点存在,且不为尾节点,表明队列中存在中间节点,则:
              1. 如果头节点的等待状态为SIGNAL,表示负责唤醒后继节点,则尝试CAS设置头节点的等待状态为初始值0(完成唤醒动任务后需要清除唤醒标识,防止重复unpark),
                1. 设置成功就调用unparkSuccessor唤醒后继节点;
                  1. 如果本节点等待状态值小于0(大概率是SIGNAL),则CAS设置等待状态为0,完成唤醒任务后需要清除唤醒标识,但不关心成功与否,因为本节点的等待状态在完成唤醒后继节点后已经不重要了;
                  2. 如果本节点的next节点不存在,或者等待状态为CANCELLED(自身异常退出锁竞争),则

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/4780.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!