聊聊AbstractQueuedSynchronizer

导读:本篇文章讲解 聊聊AbstractQueuedSynchronizer,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,原文地址:Java

也许你感觉自己的努力总是徒劳无功,但不必怀疑,你每天都离顶点更进一步。今天的你离顶点还遥遥无期。但你通过今天的努力,积蓄了明天勇攀高峰的力量。加油!

什么是AbstractQueuedSynchronizer

AbstractQueuedSynchronizer是一个框架,一个基于FIFO队列用于实现阻塞锁和同步器的框架。

AbstractQueuedSynchronizer类设计为能支持大多数情况的同步器,其中通过一个原子性的value来表示锁的状态(state),对于实现者来说,只需要安全的去改变这个state(什么叫安全),然后定义state处于什么情况(什么值)下是获取(acquired),释放(released)。

基于此同步器需要在内部创建一个helper来继承于AbstractQueuedSynchronizer,然后在定义自己的一些同步方法,在同步方法中按照自己的逻辑去访问helper方法以达到同步目的。(范例)。

AbstractQueuedSynchronizer提供了两种模式,一种独占模式(exclusive),一种共享模式(shared),通过独占表示只有一个线程能占有,共享代表着允许多个线程同时持有。

实现同步器步骤

要实现AQS只需要按照自己的需要完成下面几个方法的实现就可以了

  1. tryAcquir() 尝试获取独占锁
  2. tryRelease() 尝试释放独占锁
  3. tryAcquireShared() 尝试获取共享锁
  4. tryReleaseShared() 尝试释放共享锁
  5. isHeldExclusively() 是否持有独占锁

如果我们只需要其中一种方式,那么只需要实现对应的方法就行了,比如:

我们需要完成一个共享模式的同步器,那么只需要完成tryAcquireShared()和tryReleaseShared()方法的实现,当然我们也可以两种方式都需要比如说ReentrantReadWriteLock可重入的读写锁。

对于获取锁的场景会有两种:

  1. 直接返回是否能获取到锁,也就是我们实现的tryAcquir() 或者 tryAcquireShared()
  2. 当获取不到锁的时候希望能阻塞,一直到获取到锁为止,也就是AbstractQueuedSynchronizer提供的acquire() 或者 acquireShared()

第一种场景我们是自己实现的,而比较复杂的第二种AbstractQueuedSynchronizer已经帮我们完成了

安全的去改变state

AbstractQueuedSynchronizer中定义了getState(),setState(),compareAndSetState()三个方法来操作state,基于此在自己实现上述的五个方法,可以参考范例 中的tryAcquir(),tryRelease()等方法的实现。

范例

class Mutex implements Lock, java.io.Serializable {
 
    // 继承AbstractQueuedSynchronizer实现sync同步器
    private static class Sync extends AbstractQueuedSynchronizer {
        // Reports whether in locked state
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }
 
         // Acquires the lock if state is zero
        public boolean tryAcquire(int acquires) {
            assert acquires == 1; // Otherwise unused
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
 
        // Releases the lock by setting state to zero
        protected boolean tryRelease(int releases) {
            assert releases == 1; // Otherwise unused
            if (getState() == 0) throw new IllegalMonitorStateException();
                setExclusiveOwnerThread(null);
                setState(0);
            return true;
        }
    }
    // The sync object does all the hard work. We just forward to it.
    private final Sync sync = new Sync();
    /**
    * 
    * 定义自己的锁对象包含的功能,然后通过sync实现其功能
    *
    **/
    public void lock()                { sync.acquire(1); }
    public boolean tryLock()          { return sync.tryAcquire(1); }
    public void unlock()              { sync.release(1); }
    public Condition newCondition()   { return sync.newCondition(); }
    public boolean isLocked()         { return sync.isHeldExclusively(); }
    public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
    public void lockInterruptibly() throws InterruptedException {
      sync.acquireInterruptibly(1);
    }
    public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
      return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
  }

AbstractQueuedSynchronizer设计

通过上面描述我们简单的感受了下AbstractQueuedSynchronizer是做什么的,以及我们该怎么做,那么接下来我们就将详细看看AbstractQueuedSynchronizer中是如何设计整个框架的。

第一步:给框架进行定位

AbstractQueuedSynchronizer的定位是一个同步器框架,是一个基于FIFO队列用于实现阻塞锁和同步器的框架。

基于此,我们就需要准备一些东西:

  1. 锁的资源or同步的资源
  2. 一个FIFO队列的实现

对于第一点,既然是锁既然是同步器,那么我们就必须要有实际的物体来锁定,而不能是虚无的,它可以是对象,可以是任意一个实实在在的东西。AbstractQueuedSynchronizer选择int类型来做为资源标识,也就是上述的state成员变量。

对于第二点,AbstractQueuedSynchronizer选择CLH队列的变种,一个节点持有前后对象的双向链表来作为阻塞队列。

根据上面的结构和资源,一个简单的流程就有了:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-z7yzQitg-1649401519743)(https://note.youdao.com/yws/res/10256/WEBRESOURCEc35f366090e1f3d6e164e18aa8fb3ead)]

第二步:抽象需自定义的API

对于争夺来说,不同的场景会存在不一样的需求,对于ReentrantLock可重入锁同一个线程允许多次进入,但是在范例中实现的锁就只能进入一次。

因此,需要将争夺相关的操作抽象出来,让实现方去自定义,同理争夺进行了自定义,那么释放必然就存在则差别,同样需要进行自定义。

而锁存在则两种模式,一种独占一种共享,对于框架来说,是没办法直接明白本次争夺是独占还是共享,那么只能进行机械式的拆分为两个方法来表示这层含义。

也就诞生出了这五种需要实现方关注的方法:

  1. tryAcquir() 尝试获取独占锁
  2. tryRelease() 尝试释放独占锁
  3. tryAcquireShared() 尝试获取共享锁
  4. tryReleaseShared() 尝试释放共享锁
  5. isHeldExclusively() 是否持有独占锁

第三步:CLH变种队列的设计与管理

现在我们明确了资源,队列结构,争夺实现。接下来就需要完成队列的管理。

  • 队列的结构?
  • 队列成员属性有哪些?
  • 节点成员有几种状态?
  • 队列操作

队列结构

基于CLH变种队列,在AbstractQueuedSynchronizer中将持有队列的头对象与尾对象来帮助完成快速进入队列和出队列。

    /**
     * Head of the wait queue, lazily initialized.  Except for
     * initialization, it is modified only via method setHead.  Note:
     * If head exists, its waitStatus is guaranteed not to be
     * CANCELLED.
     */
    private transient volatile Node head;

    /**
     * Tail of the wait queue, lazily initialized.  Modified only via
     * method enq to add new wait node.
     */
    private transient volatile Node tail;

对于头部head,只能通过setHead()方法进行修改,对于尾部tail只能通过enq()方法添加到队列中。

队列成员属性

通过队列结构我们知道队列中的成员类型都为Node类型。

static final class Node {
        //共享节点的标识
        static final Node SHARED = new Node(); 
        //独占节点的标识
        static final Node EXCLUSIVE = null;
        
        //前节点
        volatile Node prev;
        //后节点
        volatile Node next;
        //当前节点的线程
        volatile Thread thread;

        Node nextWaiter;

        final boolean isShared() {
            return nextWaiter == SHARED;
        }
    }

基于CLH变种的双向链表,必然存在前节点和后节点,对应的成员就是prev和next。
同时需要持有当前节点的线程来进行阻塞操作和唤醒操作,对应的成员就是thread。
通过nextWaiter成员来判断该节点的类型是共享还是独占

节点成员有几种状态

节点对应的操作通过waitStatus成员来进行表示

static final class Node {
        //共享节点的标识
        static final Node SHARED = new Node(); 
        //独占节点的标识
        static final Node EXCLUSIVE = null;

        static final int CANCELLED =  1;
        static final int SIGNAL    = -1; 
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;
        
        /**
        * 表示节点当前的状态,状态对应着后续该节点能进行的操作
        * SIGNAL:表示当前节点的后续节点已经或者将要阻塞,那么当前节点进行
        *         release或则cancel的时候需要将后续节点进行唤醒
        * CANCELLED:表示当前节点已经被取消,并且该状态不可逆,只能等待被
        *            移出队列
        * CONDITION:条件队列的节点使用,在等待队列中不存在该类型节点
        * PROPAGATE:为了修复共享锁多个线程同访问的bug而引入的中间状态
        * 0:初始化默认值
        **/
        volatile int waitStatus;
        
        //前节点
        volatile Node prev;
        //后节点
        volatile Node next;
        //当前节点的线程
        volatile Thread thread;
        
        Node nextWaiter;

        final boolean isShared() {
            return nextWaiter == SHARED;
        }
    }

节点状态机

创建节点时

节点进入队列后,将前驱节点变更为SIGNAL

doReleaseShare进行释放

doReleaseShared进行释放

0

初始化

SIGNAL

PROPAGATE

共享模式队列操作

通过上面对队列结构的分析和节点node的描述,接下来就该看下队列的操作中是如何使用到队列结构与节点的。

共享模式-总体思路

原则:

  1. SIGNAL的节点需要对后续节点进行唤醒(PROPAGATE也会,单场景单一,只在替换哨兵节点的时候)
  2. CANCELLED的节点需要移除队列中
  3. 执行完毕的节点需要重置为0
  4. PROPAGATE的节点需要被转换成SIGNAL节点(涉及到共享锁的一个bug)

由第一个节点的waitStatus状态(SIGNAL)来判断是否进行后继节点的唤醒(出队),唤醒时机在释放(releaseShared)是进行。
对于CANCELLED节点,在获取(acquiredShared),释放(releaseShared)进行踢出队列。

第一种简单情形:

node3进入队列前:

node1:SIGNAL

node2:0

node3进入队列后:

node1:SIGNAL

node2:SIGNAL

node3:0


第二种包含CANCELLED节点
node3进入队列前:

node1:SIGNAL

node2:CANCELLED

node3进入队列后:

node1:SIGNAL

node3:0


共享模式-出入队

    public final void acquireShared(int arg) {        //访问自定义的争夺方法,但没获取到        if (tryAcquireShared(arg) < 0)            //进行入队操作            doAcquireShared(arg);    }
    private void doAcquireShared(int arg) {        //1.进行入队操作        final Node node = addWaiter(Node.SHARED);        boolean failed = true;        //2.告知前驱节点        try {            boolean interrupted = false;            for (;;) {                final Node p = node.predecessor();                                if (p == head) {                    //2.1 前驱节点是头节点,则直接轮到当前节点,尝试获取资源                    int r = tryAcquireShared(arg);                                        if (r >= 0) {                        //获取成功,则将自己设置为head节点,然后进行传播                        setHeadAndPropagate(node, r);                        p.next = null; // help GC                        if (interrupted)                            selfInterrupt();                        failed = false;                        return;                    }                }                //2.2 前驱节点不是头部节点,那么告诉前驱节点记得唤醒当前节点                if (shouldParkAfterFailedAcquire(p, node) &&                    parkAndCheckInterrupt())                    interrupted = true;            }        } finally {            //如果出现异常情况,则取消当前节点。            if (failed)                cancelAcquire(node);        }    }
  1. 当队列初始化之后,头结点和尾节点不会为null,始终会保持存在头结点(哨兵节点),在共享模式下,当头结点获得之后,将让后继节点继续进行获取,并重置为head以达到共享的目的
addWaiter()
    private Node addWaiter(Node mode) {        //1.创建入队的节点        Node node = new Node(Thread.currentThread(), mode);        // Try the fast path of enq; backup to full enq on failure        //2.尝试快速加入队列尾部        Node pred = tail;        if (pred != null) {            node.prev = pred;            if (compareAndSetTail(pred, node)) {                pred.next = node;                return node;            }        }        //3.失败后则一直尝试入队(初始化链表or冲突过高才会需要进行自旋入队)        enq(node);        return node;    }        //不停尝试入队    private Node enq(final Node node) {        for (;;) {            Node t = tail;            if (t == null) { // Must initialize                if (compareAndSetHead(new Node()))                    tail = head;            } else {                node.prev = t;                if (compareAndSetTail(t, node)) {                    t.next = node;                    return t;                }            }        }    }    
setHeadAndPropagate()
    private void setHeadAndPropagate(Node node, int propagate) {        Node h = head; // Record old head for check below        setHead(node);        /**        * 对于是否唤醒后续节点的条件        * 1.如果通过tryAcquireShare()获取的值大于0,代表有多余资源空出来,进行唤醒后继节点        * 2.如果节点为SIGNAL,本就代表需要唤醒后续节点        * 3.如果节点为PROPAGATE,代表虽然我propagate不满足,可是在判断逻辑之后,有其它线程释放了资源,需要唤醒查看。        * 2,3 合并成waitStatus<0        * 引入PROPAGATE,可查看后面详细解释。        **/        if (propagate > 0 || h == null || h.waitStatus < 0 ||            (h = head) == null || h.waitStatus < 0) {            Node s = node.next;            if (s == null || s.isShared())                //对共享节点进行释放                doReleaseShared();        }    }
doReleaseShared()

1.要么将后继节点唤醒,把自身状态重置为0
2.要么把自身状态修改为PROPAGATE,解决共享锁同时访问的并发问题


    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            //1.队列中存在多个节点
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                //对后续节点进行唤醒
                if (ws == Node.SIGNAL) {
                    //将标识位重置为0
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    //对后继节点进行唤醒
                    unparkSuccessor(h);
                }
                //头部节点为0则将状态设置为PROPAGATE
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            //2.如果只有head节点,代表队列空了
            if (h == head)                   // loop if head changed
                break;
        }
    }

PROPAGATE状态产生的原因

起因是由于共享锁,同时进行释放导致有资源的情况下线程需要持续等待。
bug地址

在没有PROPAGATE的情况下
假设:我们有2个资源,同时呢有2个线程已经持有了,有2个线程等待获取。

聊聊AbstractQueuedSynchronizer

当node1进行释放时,唤醒node3:
在这里插入图片描述

node1释放,state+1。
node3获取,state-1。
这时候node3通过doAcquireShared获取资源为0(刚好够自己用,不需要通知后续节点),但是没有执行完将自己设置为头部。


当node3准备将自己设置为头部之前,node2进行释放:
在这里插入图片描述
尴尬的情况出现了,资源state为1,由于前面获取的资源为0,那么node3将自己设置为head之后不会对后续的节点进行唤醒(因为它认为资源是空,唤醒没有意义,但是在它变为head之前已经有其它持有资源的线程释放了资源)。


场景描述清楚了,那么解决方案就是新增一个PROPAGATE状态,将0变更为PROPAGATE状态表示在进行头部节点的替换过程中存在资源的释放,同时在setHeadAndPropagate()判断是否唤醒后续节点的条件加上,当head处于PROPAGATE状态时。

共享模式队列操作

跟共享差别不大,获取锁时不在有扩散行为(PROPAGATE),不再做分析。

ConditionObject条件队列

后续在聊

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/121857.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!