ReentranLock
类图
ReentranLock实现了Lock接口、支持序列化,并且根据传入的参数来判断是使用公平锁还是非公平锁。底层是使用AQS阻塞队列来实现。
Sync类直接继承了AQS,它的子类NonfairSync和FairSync分别实现了获取锁的非公平与公平策略。
案例
此案例使用了ReentranLock,来保证线程安全。如果不上锁,number的值由于线程之间的竞争导致最终结果是不正确的。
public class ReentranlockDemo extends Thread{
private static int number = 0;
private static ReentrantLock l = new ReentrantLock(); // 上锁
public static void main(String[] args) throws InterruptedException {
Thread[] threads = new Thread[10];
for(int j = 0; j < 10; j++){
threads[j] = new ReentranlockDemo();
threads[j].start();
}
for(int i =0; i < 10; i++){
threads[i].join();
}
System.out.println(number);
}
@Override
public void run() {
l.lock(); // 上锁
for(int i =0; i < 1000; i++){
number++;
}
l.unlock(); // 解锁
}
}
先结合AQS中的底层实现以及公平锁、非公平锁来讲解ReentranLock的实现原理。
实现原理
AQS
AbstractQueuedSynchronizer称为抽象同步队列,简称AQS。它是实现同步器的基础组件,并发包中锁的底层就是使用了AQS实现的。
AQS是一个虚拟FIFO的双向队列(CLH队列),其内部通过节点head和tail记录队首和队尾元素,队列元素类型为Node(AQS将每条请求共享资源的线程封装成一个CLH锁队列的一个节点(Node)来实现锁的分配)。它的核心思想是:如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。
部分属性
-
state:AQS使用一个int成员变量来表示同步状态,通过内置的FIFO队列来完成获取资源线程的排队工作。AQS使用CAS对该同步状态进行原子操作实现对其值的修改。它的状态通过getState()、setState()、compareAndSetState()等方法进行操作。 -
head:记录队首元素 -
tail:记录队尾元素
private transient volatile Node head; // 记录队首元素
private transient volatile Node tail; // 记录队尾元素
private volatile int state; //记录状态信息,通过CAS操作修改此值
Node对象
在这个同步队列中,一个节点表示一个线程,它保存着线程的引用thread、状态waitStatus、前驱节点prev、后继节点next。
-
shared用来标记该线程是获取共享资源时被阻塞挂起后放入AQS队列 -
exclusive用来标记该线程是获取独占资源时被挂起后放入AQS队列 -
waitStatus记录当前线程等待状态 -
cancelled(1):线程被取消了 -
signal(-1):线程需要被唤醒 -
condition(-2):线程在条件队列里面等待 -
propagate(-3):释放共享资源时需要通知其他节点 -
0:新节点入队时的默认状态 -
prev:记录当前节点的前驱节点 -
next:记录当前节点的后继节点 -
thread:thread变量是用来存放进入AQS队列里面的线程
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
猜想
-
当只有一个线程去调用lock方法时,这个线程一定会拿到锁。这个CLH队列也不会被初始化。

-
如果有两个线程去调用lock方法,只有一个线程会拿到锁,另外一个线程肯定要进入阻塞队列当中,并且还会调用一个阻塞的方法将自己阻塞,等待运行完的线程在释放锁的同时去唤醒队列中的某一个线程。

-
如果有三个线程去调用lock方法,同样只有一个线程会拿到锁,另外两个线程都会进入到阻塞队列中等待唤醒。

疑问:如果两个线程其中一个线程没有拿到锁,那么这个线程会直接放入队列中吗?
大致流程
根据下图这个流程在去梳理代码就容易的多了。
猜想中的那个疑问在这个流程中已经体现出来了,并不会直接放入队列中,而是考虑自旋2次能否拿的到锁。因为放入队列中就意味着要进行调用park()方法进行打断。就会执行系统调用,调用底层操作系统的方法,用户态到内核态的切换,这样会消耗一部分性能。因此考虑自旋2次。
公平锁具体实现
公平锁:按照线程在队列中的排队顺序,先到者先拿到锁。
ReentranLock的类图介绍了FairSync继承了Sync这个内部类,所以此FairSync内部类中实现了lock这个抽象方法。
-
如果锁当前没有被其他线程占用并且当前线程之前没有获取过锁,则当前线程会获取到该锁,然后设置当前锁的拥有者为当前线程,并设置AQS的状态值为1,然后直接返回。 -
如果当前线程之前已经获取过该锁(可重入),则这次是指简单地把AQS的状态state状态值加1后返回。 -
如果该锁已经被其他线程持有,则调用该方法的线程在自旋2次后还没有获取到锁就会被放入AQS队列后调用park()方法阻塞挂起。
final void lock() {
acquire(1); // 获取锁
}
AQS—acquire方法
-
在这个tryAcquire()方法中尝试获取锁资源,如果成功拿到锁则返回true。 -
如果没有成功,则加入到等待队列中,也就是开始调用addWaiter()方法。 -
acquireQueued这个方法使当前线程在等待队列中休息,但如果轮到自己,进行unpark会去尝试获取资源,获取到资源后返回。如果整个等待的过程中被打断则返回true,否则返回false。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
此方法为了恢复用户行为,如果用户在外面调用了Thread.interrupt()方法。
因为这个方法在调用后让打断标识改为true,但是之后会自动清除打断标识,
重新标记为false。这个selfInterrupt()方法中恢复了用户的行为。
selfInterrupt();
}
ReentranLock—tryAcquire方法
-
当tryAcquire返回true,说明拿到了锁。 -
当tryAcqure返回false,说明没有拿到锁。此方法,获取当前线程,通过getState方法拿到当前锁的状态,如果为0,说明当前锁没有被其他线程占有。如果判断此时进入此方法的线程就是占用锁的线程,对state状态进行累加更新(可重入性)。如果锁被占用了,直接返回false。
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread(); 当前线程
int c = getState(); 获取状态
if (c == 0) { c==0 说明当前锁没有被其他线程占有
if (!hasQueuedPredecessors() &&
// 修改状态 acquires为1(使用CAS方式修改)
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { 可重入性
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
返回false说明当前线程没有拿到锁
return false;
}
AQS—hasQueuedPredecessors方法
用来判断线程需不需要排队,由于队列是FIFO的,所以需要判断队列中有没有相关线程的节点在排队。如果有则返回true表示线程需要排队,如果没有则返回false表示线程无需排队。
此方法表达的意思居多。就这么一句代码。太强了。
返回false
(1) h!=t返回false,经过短路与后直接返回false
第一种情况:当只有一个线程的时候,队列不会初始化。head和tail为null。直接返回false。不需要排队。
第二种情况:队列已经初始化了,头结点和尾结点都指向了一个元素,表示队列中只有一个节点(虚拟出来的空节点),这个节点是不参与排队的,它正在持有锁。但是在调用这个方法的时候,说明此时锁已经释放掉了。所以此时第二个节点根本不需要排队。只有当第三个线程进来的时候,才会需要排队。
(2)h!=t返回true,(s=h.next)==null返回false并且s.thread!=Thread.currentThread()返回false。
(s=h.next)==null返回false表示头节点是有后继节点的。s.thread!=Thread.currentThread()返回false,表示当前线程和后继节点的线程是相同的,那就说明已经轮到这个线程相关的节点去尝试获取同步状态,自然无需排队,直接返回false。
返回true
(1) h!=t返回true,(s=h.next)==null返回true
(s=h.next)==null返回true,说明头节点之后是没有后继节点的。有可能另一个线程已经执行到初始化队列的操作了,介于compareAndSetHead(new Node())与tail=head之间。如代码:
(2)h!=t返回true,(s=h.next)==null返回false,s.thread!=Thread.currentThread()返回true。
(s=h.next)==null返回false表示头节点是有后继节点的。s.thread!=Thread.currentThread()返回true,说明当前线程不是后继节点的线程。头节点后面的线程都还在排队,那当前线程肯定是要老老实实的去排队。
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
尾节点
Node t = tail; // Read fields in reverse initialization order
头节点
Node h = head;
Node s; 头节点的后继节点
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
AQS—addWaiter方法
private Node addWaiter(Node mode) {
创建一个独占模式的Node节点
Node node = new Node(Thread.currentThread(), mode);
这里pred = tail,如果pred为null,说明此时队列没有初始化,并且走到这里的肯定不是第一个线程,只有此时锁被占用,才会走这个addWaiter方法。
Node pred = tail; pred为尾部
if (pred != null) { 放入队列尾部
node.prev = pred;
if (compareAndSetTail(pred, node)) { 使用CAS方式放入tail中
pred.next = node;
return node;
}
}
enq(node); 出现tail=null时,才会走这个方法将队列进行初始化。
return node;
}
AQS—enq方法
此方法会将队列进行初始化,并且必要时将Node节点插入到队列中。
为什么要虚拟出一个空节点?
假如第一个线程拿着锁正在执行,另一个线程无法获取锁,需要排队。这就好比如生活中,在火车站取票,第一个人不是在排队,而是正在办理业务的人,而第二个人才是正在排队的人。所以这一个空节点可以说就是代表正在办理业务的人,也就表明有一个线程正在占用锁办理业务。
自旋
方法中有一个for循环,这个相当于while(true),说专业点就是自旋。当第二个线程进来时需要做的事:(1)初始化这个队列;(2)将Node节点放入到初始化的队列中。
private Node enq(final Node node) {
for (;;) { 自旋
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node())) 虚拟出一个空节点node,这个空节点其实就是表示有一个线程正在占用锁。
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) { 放入队列尾部
t.next = node;
return t;
}
}
}
}
AQS—acquireQueued方法
进入此方法,说明该线程获取锁资源失败,放入队列队尾,进入等待状态. (1)当tryAcquire返回false,此时才会走后面的与操作。也就是将当前线程加入到等待队列当中。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) { 自旋
(1)final Node p = node.predecessor(); 获取node的前驱节点
如果p为虚拟的头结点,则尝试是否可以获取到锁
如果成功获取到锁,那么返回true,此时清空上一个头结点,并且让头结点的后驱节点变为null,这样就可以利用gcRoots GC掉,防止内存泄漏,此时会让当前线程的node置为null,当做虚拟节点,因为当线程在占用锁的时候,不能存在队列当中。
(2)if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
如果上面if语句不符合条件,那么会进入这个if语句,如果走到这里就是说明当前占用锁的线程并没有释放掉。
(3) if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
AQS—shouldParkAfterFailedAcquire方法
-
这个方法如果返回false,那么会再一次自旋,为了多自旋这一次,尝试是否能够拿到锁,避免park线程,也为了多等待此时占用锁的那个线程,万一还未执行到唤醒其他线程的语句呢。这里将当前Node的前驱节点的waitStatus改为了-1。 -
如果自旋之后还没有拿到锁,此时waitStatus为SIGNAL,返回true,开始执行后面的语句。 -
如果第一个线程执行完后,第二个线程正在执行,此时第三个线程进来了,那么他会修改第二个线程的node中的waitStatus状态为-1,修改完之后,也会在进行一次自旋操作,与第一个步骤一样。重复下去。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; 默认值为0
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do { 利用cas修改waitStatus为-1
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
AQS-cancelAcquire方法
acquireQueued()方法中,failed刚开始定义为true,当node获取到锁后会跳出死循环,此时failed会被更改为false,然后进入finally中,并不会执行cancelAcquire()方法。细细想来,只有在发生异常的情况下才会进入。
如在调用lockInterruptibly()方法通过抛出异常来执行cancelAcquire方法,lock方法过程中不会执行该代码。作者可能认为在for循环内部如果出现不可控的因素导致产生未知的异常,则会执行cancelAcqure()方法。
private void cancelAcquire(Node node) {
if (node == null)
return;
将当前node关联的线程置为null
node.thread = null;
找到一个有效的前继节点pred
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
将node的waitStatus置为cancelled
node.waitStatus = Node.CANCELLED;
如果node是tail,更新tail为pred,并将pred.next指向null
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
如果node不是tail,也不是head的后继节点则将node的前继节点的waitStatus置为SIGNAL,并使pre的前继节点指向node的后继节点
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
如果node是head的后继节点,则直接唤醒node的后继节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
AQS—doAcquireInterruptibly方法
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException(); 抛出异常
}
} finally {
if (failed) 只有failed为true的时候
cancelAcquire(node);
}
}
非公平锁实现
非公平锁就是说尝试获取锁的线程并不一定比后尝试获取锁的线程优先获取锁。
// 非公平锁尝试获取锁
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread(); //
int c = getState();
if (c == 0) {
这里与公平锁实现有些不同,这里没有hasQueuedPredecessors()方法
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
剩余的其他代码与公平锁一致,唯独少了hasQueuedPredecessors()这个方法。
释放锁
释放锁就是调用了unlock()方法。
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // 唤醒线程
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c); 更新state为0,释放锁
return free;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0); 修改回0
Node s = node.next; 当前节点的后继节点
if (s == null || s.waitStatus > 0) {
s = null;
unpark 的线程保留在后继节点中,通常只是下一个节点。但如果被取消或明显为空,则从尾部向后遍历以找到实际的未取消后继者。
for (Node t = tail; t != null && t != node; t = t.prev) 从尾部往前遍历
if (t.waitStatus <= 0) 找出Node节点中waitStatus<0的节点,准备唤醒
s = t;
}
if (s != null)
LockSupport.unpark(s.thread); 唤醒
}
总结
AQS是JUC中很多同步组件的构建基础,内部实现主要是状态变量state和一个FIFO队列来完成,同步队列的头节点是当前获取到同步状态的节点,获取同步状态state失败的线程,会被构造成一个节点(以共享式或独占式)加入到队列中(采用自旋CAS来保证此操作的线程安全),随后线程会阻塞;释放时唤醒头节点的后继结点,使其加入对同步状态state的竞争。
在使用AQS构建符合我们需求的同步组件时,只需要重写tryAcquire()、tryAcquireShard()、tryRelease()、tryReleaseShared几个方法,来决定同步状态的释放和获取即可,至于背后复杂的线程排队,线程阻塞/唤醒,如何保证线程安全,都由AQS为我们完成了,这也是非常典型的模板方法的应用。
参考书籍:Java并发编程之美
原文始发于微信公众号(阿黄学编程):ReentranLock实现原理
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/35647.html