文章目录
什么是AbstractQueuedSynchronizer
AbstractQueuedSynchronizer是一个框架,一个基于FIFO队列用于实现阻塞锁和同步器的框架。
AbstractQueuedSynchronizer类设计为能支持大多数情况的同步器,其中通过一个原子性的value来表示锁的状态(state),对于实现者来说,只需要安全的去改变这个state(什么叫安全),然后定义state处于什么情况(什么值)下是获取(acquired),释放(released)。
基于此同步器需要在内部创建一个helper来继承于AbstractQueuedSynchronizer,然后在定义自己的一些同步方法,在同步方法中按照自己的逻辑去访问helper方法以达到同步目的。(范例)。
AbstractQueuedSynchronizer提供了两种模式,一种独占模式(exclusive),一种共享模式(shared),通过独占表示只有一个线程能占有,共享代表着允许多个线程同时持有。
实现同步器步骤
要实现AQS只需要按照自己的需要完成下面几个方法的实现就可以了:
- tryAcquir() 尝试获取独占锁
- tryRelease() 尝试释放独占锁
- tryAcquireShared() 尝试获取共享锁
- tryReleaseShared() 尝试释放共享锁
- isHeldExclusively() 是否持有独占锁
如果我们只需要其中一种方式,那么只需要实现对应的方法就行了,比如:
我们需要完成一个共享模式的同步器,那么只需要完成tryAcquireShared()和tryReleaseShared()方法的实现,当然我们也可以两种方式都需要比如说ReentrantReadWriteLock可重入的读写锁。
对于获取锁的场景会有两种:
- 直接返回是否能获取到锁,也就是我们实现的tryAcquir() 或者 tryAcquireShared()
- 当获取不到锁的时候希望能阻塞,一直到获取到锁为止,也就是AbstractQueuedSynchronizer提供的acquire() 或者 acquireShared()
第一种场景我们是自己实现的,而比较复杂的第二种AbstractQueuedSynchronizer已经帮我们完成了
安全的去改变state
AbstractQueuedSynchronizer中定义了getState(),setState(),compareAndSetState()三个方法来操作state,基于此在自己实现上述的五个方法,可以参考范例 中的tryAcquir(),tryRelease()等方法的实现。
范例
class Mutex implements Lock, java.io.Serializable {
// 继承AbstractQueuedSynchronizer实现sync同步器
private static class Sync extends AbstractQueuedSynchronizer {
// Reports whether in locked state
protected boolean isHeldExclusively() {
return getState() == 1;
}
// Acquires the lock if state is zero
public boolean tryAcquire(int acquires) {
assert acquires == 1; // Otherwise unused
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// Releases the lock by setting state to zero
protected boolean tryRelease(int releases) {
assert releases == 1; // Otherwise unused
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
}
// The sync object does all the hard work. We just forward to it.
private final Sync sync = new Sync();
/**
*
* 定义自己的锁对象包含的功能,然后通过sync实现其功能
*
**/
public void lock() { sync.acquire(1); }
public boolean tryLock() { return sync.tryAcquire(1); }
public void unlock() { sync.release(1); }
public Condition newCondition() { return sync.newCondition(); }
public boolean isLocked() { return sync.isHeldExclusively(); }
public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
AbstractQueuedSynchronizer设计
通过上面描述我们简单的感受了下AbstractQueuedSynchronizer是做什么的,以及我们该怎么做,那么接下来我们就将详细看看AbstractQueuedSynchronizer中是如何设计整个框架的。
第一步:给框架进行定位
AbstractQueuedSynchronizer的定位是一个同步器框架,是一个基于FIFO队列用于实现阻塞锁和同步器的框架。
基于此,我们就需要准备一些东西:
- 锁的资源or同步的资源
- 一个FIFO队列的实现
对于第一点,既然是锁既然是同步器,那么我们就必须要有实际的物体来锁定,而不能是虚无的,它可以是对象,可以是任意一个实实在在的东西。AbstractQueuedSynchronizer选择int类型来做为资源标识,也就是上述的state成员变量。
对于第二点,AbstractQueuedSynchronizer选择CLH队列的变种,一个节点持有前后对象的双向链表来作为阻塞队列。
根据上面的结构和资源,一个简单的流程就有了:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-z7yzQitg-1649401519743)(https://note.youdao.com/yws/res/10256/WEBRESOURCEc35f366090e1f3d6e164e18aa8fb3ead)]
第二步:抽象需自定义的API
对于争夺来说,不同的场景会存在不一样的需求,对于ReentrantLock可重入锁同一个线程允许多次进入,但是在范例中实现的锁就只能进入一次。
因此,需要将争夺相关的操作抽象出来,让实现方去自定义,同理争夺进行了自定义,那么释放必然就存在则差别,同样需要进行自定义。
而锁存在则两种模式,一种独占一种共享,对于框架来说,是没办法直接明白本次争夺是独占还是共享,那么只能进行机械式的拆分为两个方法来表示这层含义。
也就诞生出了这五种需要实现方关注的方法:
- tryAcquir() 尝试获取独占锁
- tryRelease() 尝试释放独占锁
- tryAcquireShared() 尝试获取共享锁
- tryReleaseShared() 尝试释放共享锁
- isHeldExclusively() 是否持有独占锁
第三步:CLH变种队列的设计与管理
现在我们明确了资源,队列结构,争夺实现。接下来就需要完成队列的管理。
- 队列的结构?
- 队列成员属性有哪些?
- 节点成员有几种状态?
- 队列操作
队列结构
基于CLH变种队列,在AbstractQueuedSynchronizer中将持有队列的头对象与尾对象来帮助完成快速进入队列和出队列。
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
对于头部head,只能通过setHead()方法进行修改,对于尾部tail只能通过enq()方法添加到队列中。
队列成员属性
通过队列结构我们知道队列中的成员类型都为Node类型。
static final class Node {
//共享节点的标识
static final Node SHARED = new Node();
//独占节点的标识
static final Node EXCLUSIVE = null;
//前节点
volatile Node prev;
//后节点
volatile Node next;
//当前节点的线程
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
}
基于CLH变种的双向链表,必然存在前节点和后节点,对应的成员就是prev和next。
同时需要持有当前节点的线程来进行阻塞操作和唤醒操作,对应的成员就是thread。
通过nextWaiter成员来判断该节点的类型是共享还是独占
节点成员有几种状态
节点对应的操作通过waitStatus成员来进行表示
static final class Node {
//共享节点的标识
static final Node SHARED = new Node();
//独占节点的标识
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
/**
* 表示节点当前的状态,状态对应着后续该节点能进行的操作
* SIGNAL:表示当前节点的后续节点已经或者将要阻塞,那么当前节点进行
* release或则cancel的时候需要将后续节点进行唤醒
* CANCELLED:表示当前节点已经被取消,并且该状态不可逆,只能等待被
* 移出队列
* CONDITION:条件队列的节点使用,在等待队列中不存在该类型节点
* PROPAGATE:为了修复共享锁多个线程同访问的bug而引入的中间状态
* 0:初始化默认值
**/
volatile int waitStatus;
//前节点
volatile Node prev;
//后节点
volatile Node next;
//当前节点的线程
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
}
节点状态机
共享模式队列操作
通过上面对队列结构的分析和节点node的描述,接下来就该看下队列的操作中是如何使用到队列结构与节点的。
共享模式-总体思路
原则:
- SIGNAL的节点需要对后续节点进行唤醒(PROPAGATE也会,单场景单一,只在替换哨兵节点的时候)
- CANCELLED的节点需要移除队列中
- 执行完毕的节点需要重置为0
- PROPAGATE的节点需要被转换成SIGNAL节点(涉及到共享锁的一个bug)
由第一个节点的waitStatus状态(SIGNAL)来判断是否进行后继节点的唤醒(出队),唤醒时机在释放(releaseShared)是进行。
对于CANCELLED节点,在获取(acquiredShared),释放(releaseShared)进行踢出队列。
第一种简单情形:
node3进入队列前:
node3进入队列后:
第二种包含CANCELLED节点
node3进入队列前:
node3进入队列后:
共享模式-出入队
public final void acquireShared(int arg) { //访问自定义的争夺方法,但没获取到 if (tryAcquireShared(arg) < 0) //进行入队操作 doAcquireShared(arg); }
private void doAcquireShared(int arg) { //1.进行入队操作 final Node node = addWaiter(Node.SHARED); boolean failed = true; //2.告知前驱节点 try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { //2.1 前驱节点是头节点,则直接轮到当前节点,尝试获取资源 int r = tryAcquireShared(arg); if (r >= 0) { //获取成功,则将自己设置为head节点,然后进行传播 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } //2.2 前驱节点不是头部节点,那么告诉前驱节点记得唤醒当前节点 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { //如果出现异常情况,则取消当前节点。 if (failed) cancelAcquire(node); } }
- 当队列初始化之后,头结点和尾节点不会为null,始终会保持存在头结点(哨兵节点),在共享模式下,当头结点获得之后,将让后继节点继续进行获取,并重置为head以达到共享的目的
addWaiter()
private Node addWaiter(Node mode) { //1.创建入队的节点 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure //2.尝试快速加入队列尾部 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //3.失败后则一直尝试入队(初始化链表or冲突过高才会需要进行自旋入队) enq(node); return node; } //不停尝试入队 private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
setHeadAndPropagate()
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); /** * 对于是否唤醒后续节点的条件 * 1.如果通过tryAcquireShare()获取的值大于0,代表有多余资源空出来,进行唤醒后继节点 * 2.如果节点为SIGNAL,本就代表需要唤醒后续节点 * 3.如果节点为PROPAGATE,代表虽然我propagate不满足,可是在判断逻辑之后,有其它线程释放了资源,需要唤醒查看。 * 2,3 合并成waitStatus<0 * 引入PROPAGATE,可查看后面详细解释。 **/ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) //对共享节点进行释放 doReleaseShared(); } }
1.要么将后继节点唤醒,把自身状态重置为0
2.要么把自身状态修改为PROPAGATE,解决共享锁同时访问的并发问题
private void doReleaseShared() {
for (;;) {
Node h = head;
//1.队列中存在多个节点
if (h != null && h != tail) {
int ws = h.waitStatus;
//对后续节点进行唤醒
if (ws == Node.SIGNAL) {
//将标识位重置为0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//对后继节点进行唤醒
unparkSuccessor(h);
}
//头部节点为0则将状态设置为PROPAGATE
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
//2.如果只有head节点,代表队列空了
if (h == head) // loop if head changed
break;
}
}
PROPAGATE状态产生的原因
起因是由于共享锁,同时进行释放导致有资源的情况下线程需要持续等待。
bug地址
在没有PROPAGATE的情况下
假设:我们有2个资源,同时呢有2个线程已经持有了,有2个线程等待获取。
node1释放,state+1。
node3获取,state-1。
这时候node3通过doAcquireShared获取资源为0(刚好够自己用,不需要通知后续节点),但是没有执行完将自己设置为头部。
当node3准备将自己设置为头部之前,node2进行释放:
尴尬的情况出现了,资源state为1,由于前面获取的资源为0,那么node3将自己设置为head之后不会对后续的节点进行唤醒(因为它认为资源是空,唤醒没有意义,但是在它变为head之前已经有其它持有资源的线程释放了资源)。
场景描述清楚了,那么解决方案就是新增一个PROPAGATE状态,将0变更为PROPAGATE状态表示在进行头部节点的替换过程中存在资源的释放,同时在setHeadAndPropagate()判断是否唤醒后续节点的条件加上,当head处于PROPAGATE状态时。
共享模式队列操作
跟共享差别不大,获取锁时不在有扩散行为(PROPAGATE),不再做分析。
ConditionObject条件队列
后续在聊
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/121857.html