ReentrantLock 介绍
ReentrantLock类使用AQS同步状态来保存锁重复持有的次数。当锁被一个线程获取时,ReentrantLock也会记录下当前获得锁的线程标识,以便检查是否是重复获取,以及当错误的线程试图进行解锁操作时检测是否存在非法状态异常。
ReentrantLock获取锁定有四种方式:
- lock(), 如果获取了锁立即返回,如果别的线程持有锁,当前线程则一直处于休眠状态,直到获取锁
- tryLock(), 如果获取了锁立即返回true,如果别的线程正持有锁,立即返回false
- tryLock(long timeout,TimeUnit unit), 如果获取了锁定立即返回true,如果别的线程正持有锁,会等待参数给定的时间,在等待的过程中,如果获取了锁定,就返回true,如果等待超时,返回false;
- lockInterruptibly:如果获取了锁定立即返回,如果没有获取锁定,当前线程处于休眠状态,直到获取锁定,或者当前线程被别的线程中断
ReentrantLock 使用示例
ReentrantLock lock = new ReentrantLock();
try{
lock.lock();
// 其它逻辑
} finally{
lock.unlock();
}
可以看到主要有三步骤,实例化、加锁、解锁,接下来我i我们会通过这个简单的示例进行源码分析
ReentrantLock 源码解析
new ReentrantLock() 实例化
我们先来看下ReentrantLock 的构造函数,代码如下:
// 重入锁默认采用非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
// true:公平锁 false:非公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
有两个构造,一个是默认的构造函数,默认的是非公平锁;一个是带参数的构造,如果入参为true,则是公平锁,如果入参是false,则是非公平锁
所有的Sync继承关系如图所示,底层都是采用AQS来进行实现的
公平锁与非公平锁的概念
- 如果是非公平的情况:
- 线程A去抢占锁,发现
state=0
,则成功抢锁 - 线程B再去抢锁,也会先尝试抢锁操作(无论是否有线程在排队),如果刚好抢锁成功则直接运行,否则乖乖的进入队列等待
- 线程A去抢占锁,发现
- 如果是公平锁的情况:
- 线程A去抢占锁,发现
state=0
,则成功抢锁 - 如果线程B再去抢锁,会先判断队列中是否有线程在等待,如果有则直接加入队列等待
- 线程A去抢占锁,发现
lock.lock() 加锁
加锁过程可以被分成四步:
- 在ReentrantLock调用Sync的lock方法并根据公平或非公平的lock逻辑执行
- 执行tryAcquire方法去尝试获取锁
- 执行addWaiter方法构建一个独占模式节点Node并维护好该节点的前后指针Node
- 判断当获取锁失败的时候,是否应该挂起线程,如果是则挂起线程
第一步:lock方法
在ReentrantLock调用Sync的lock方法并根据公平或非公平的lock逻辑执行
public class ReentrantLock implements Lock, java.io.Serializable {
...
public void lock() {
// 分为公平锁的lock()实现和非公平锁的lock()实现
sync.lock();
}
// 公平锁
static final class FairSync extends Sync {
final void lock() {
acquire(1);
}
}
// 非公平锁
static final class NonfairSync extends Sync {
final void lock() {
// 【重点】使用CAS将AQS.state置为1,表示已抢占该锁,否则失败进入else
if (compareAndSetState(0, 1)) {
// 将AbstractOwnableSynchronizer.exclusiveOwnerThread置为当前线程
setExclusiveOwnerThread(Thread.currentThread());
} else {
// 走公平锁的流程
acquire(1);
}
}
}
...
}
在ReentrantLock类中主要还在在内部静态类FairSync
和NonfairSync
,他们的主要区别在于非公平锁会每次都去尝试compareAndSetState(0, 1)
,传入的expert为0,如果设置成功,说明state没有被其它线程抢锁,否则就是失败执行AbstractQueuedSynchronizer类中的acquire
方法
public final void acquire(int arg) {
/**
* tryAcquire(1): 判断当前线程是否成功的抢占锁
* addWaiter(Node.EXCLUSIVE): 构建一个独占模式节点Node,并维护好该节点的前后指针Node
* acquireQueued(addWaiter(Node.EXCLUSIVE), 1):
*/
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
// 设置当前线程的中断标识
selfInterrupt();
}
}
第二步:acquire方法
在第一步中,我们发现最终都是调用了acquire方法来执行,其中调用了三个方法,这一步我们主要来看下tryAcquire
方法
整体的tryAcquire逻辑流程如下:
公平锁的实现代码:
/**
* 进行抢锁操作,返回是否成功
* 抢占成功条件:
* case1> 没人抢占锁,线程A执行抢占锁操作,执行成功。
* case2> 有人已经抢占了这个锁,但是抢占这个锁的线程就是线程A自己,那么对自己重入加锁,执行成功。
*
* true:抢占到了锁 false:没有抢到锁
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
/** 如果c == 0,说明可以抢占锁 */
if (c == 0) {
/** 如果线程不需要排队 并且 抢占锁成功(即:如果state=0,则将该值修改为1,CAS操作成功)*/
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
// 设置抢到锁的线程为current
setExclusiveOwnerThread(current);
return true;
}
}
/** 如果c != 0,判断,是否是重入操作(即:锁本来就是被自己抢占的,支持多次抢占。) */
else if (current == getExclusiveOwnerThread()) {
// 相当于state+1
int nextc = c + acquires;
if (nextc < 0) {
throw new Error("Maximum lock count exceeded");
}
setState(nextc);
return true;
}
return false;
}
- 如果state==0,则在线程不需要排队的情况下使用CAS去设置state=1,设置成功后更新
exclusiveOwnerThread
对象 - 如果state != 0,会判断是否是已获取锁的线程再来获取锁(重入),是的话则将state++
- 如果state != 0且不是重入的线程,则尝试抢锁失败,返回false
非公平锁的实现代码:
/**
* 进行抢锁操作,是否抢到非公平锁
*
* 处理内容:
* 1>如果抢到锁,返回true
* 1.1>如果当前线程第一次抢到锁:
* AQS.status由0变为1
* AQS.exclusiveOwnerThread=Thread.currentThread()
* 返回true
* 1.2>如果当前线程再次抢到锁(重入加锁):
* AQS.status++
* 返回true
* 2>如果没抢到锁,返回false
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
/**
* 获得当前独享线程,如果就是当前线程,那么执行重入操作
* 执行tryLock()时:
* 如果第二次进入,则nextc = 0 + 1 = 1
* 如果第三次进入,则nextc = 1 + 1 = 2
* 如果第四次进入,则nextc = 2 + 1 = 3
*/
int nextc = c + acquires;
// overflow 溢出
if (nextc < 0) {
throw new Error("Maximum lock count exceeded");
}
setState(nextc);
return true;
}
// nf-eg—2:线程B 返回false
return false;
}
所有的逻辑都与公平锁类似,关键点在于非公平锁不会去判断队列中是否有线程在排队(if (compareAndSetState(0, acquires))
),只要state==0,就直接去尝试更新state。其余步骤全部一致
第三步:addWaiter方法
/**
* 为代表当前线程并指定模式为mode的节点创建/进入队列。
*/
private Node addWaiter(Node mode) {
/** 首先,创建一个节点Node */
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred; /**[原尾节点pred] <-- prev.[新节点node]*/
if (compareAndSetTail(pred, node)) {
pred.next = node; /**[原尾节点pred].next --> [新节点node]*/
return node;
}
}
//
enq(node);
return node;
}
/**
* 如果是空队列
* 第一步:则初始化一个空内容node作为第一个节点;
* 第二步:然后将入参node加到队列末尾
* 如果不是空队列
* 将入参node加到队列末尾
*
* 返回:node节点的前置节点
*/
private Node enq(final Node node) {
// 自旋
for (; ; ) {
Node t = tail;
/** 如果是空队列,则初始化一个空内容node作为第一个节点 */
if (t == null) {
if (compareAndSetHead(new Node())) { /** 初始化一个空内容的节点,作为头节点 */
tail = head;
}
}
/** 不是空队列 */
else {
node.prev = t; /** [原尾部node] <-- prev.[入参node] */
if (compareAndSetTail(t, node)) { /** 将入参的node节点,作为尾节点 */
t.next = node; /** [原尾部node].next --> [入参node] */
return t;
}
}
}
}
- 如果tail != null,则将当前线程的节点加入队列的队尾中
- 如果tail == null,则调用enq来初始化一个空内容的node节点,然后将tail指向头节点,并将node加入到队尾
第四步:acquireQueued方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 会循环一定次数,结果为:获得锁或挂起线程
for (; ; ) {
/** 获得入参节点node的前置节点 */
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { /** tryAcquire(arg) 进行抢锁操作,返回是否成功*/
setHead(node); // 更新头节点为入参node
p.next = null; // help GC
failed = false;
return interrupted;
}
// shouldParkAfterFailedAcquire 判断当获取锁失败的时候,是否应该挂起该线程
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) { /** 阻塞 */
interrupted = true;
}
}
} finally {
if (failed) {
cancelAcquire(node);
}
}
}
/**
* 判断当获取锁失败的时候,是否应该挂起该线程
*
* 当获取锁失败的时候,针对入参node的prev node做检查和更新AQS.status
* true:表示线程应该被阻塞
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
/** 获得pred节点(node的前置节点)的waitStatus*/
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) {
return true;
}
/** 如果waitStatus是CANCELLED */
if (ws > 0) {
do {
/**
* 1> 将pred节点(node的前置节点),赋值为它自己的前置节点。
* 2> 将node的前置节点执行全新的pred节点。
**/
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 将前置节点的AQS.waitStatus设置为-1(Node.SIGNAL)
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
/** 阻塞挂起当前线程,不会释放当前线程占有的锁资源;直到被另一个线程调用LockSupport.unpark(this)方法唤醒。不需要捕获异常 */
LockSupport.park(this);
/** 判断线程是否被中断。该方法调用后会将中断标示位清除,即重新设置为false */
return Thread.interrupted();
}
- 如果node的前置节点等于head,则执行tryAcquire方法尝试抢锁操作,抢锁成功后会更新head为node,并将原来的head的next置空,帮助GC
- 如果node的前置节点不等于head或抢锁失败,则会去判断是否应该挂起该线程(
shouldParkAfterFailedAcquire
)- 如果前置节点的waitStatus为SIGNAL,则挂起该线程
- 如果waitStatus为CANCELLED,则循环移除前置节点,直到前置节点不为CANCELLED
- 如果前置节点不为SIGNAL且不为CANCELLED,则将前置节点的AQS.waitStatus设置为-1(Node.SIGNAL)
- 通过
shouldParkAfterFailedAcquire
方法来判断了是否挂起线程或是继续自旋,当shouldParkAfterFailedAcquire返回true,则挂起线程(LockSupport.park(this)
)
至此,lock.lock()加锁流程全部执行完毕
lock.unlock() 解锁
有加锁,自然就有解锁,解锁的过程相对于加锁会简单多
首先我们来看下ReentrantLock类中的unlock方法:
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
/** 判断是否可以进行释放锁操作 */
if (tryRelease(arg)) {
Node h = head;
// 头节点为null且waitStatus不等于0
if (h != null && h.waitStatus != 0) {
// 唤醒节点的后续节点
unparkSuccessor(h);
}
return true;
}
return false;
}
abstract static class Sync extends AbstractQueuedSynchronizer {
/**
* 判断是否可以进行释放锁操作
*/
// releases=1
protected final boolean tryRelease(int releases) {
// c=1-1=0
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread()) {
throw new IllegalMonitorStateException();
}
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}
// 唤醒节点的后续节点
private void unparkSuccessor(Node node) {
// nf-eg—1-线程A: node.waitStatus==SIGNAL(-1)
int ws = node.waitStatus;
/** ws为非CANCELLED(1)状态 */
if (ws < 0) {
compareAndSetWaitStatus(node, ws, 0);
}
/** 获得head节点的后续节点 */
Node s = node.next;
/**
* node是tail节点 或者 node的tail节点waitStatus>0,即:CANCELLED(1)
*/
if (s == null || s.waitStatus > 0) {
s = null; // 通知GC可以对s进行回收
/**
* 从尾节点开始向头节点遍历,遍历到整个队列最前排的waitStatus<=0的节点,赋值给s,用于后续的unpark操作。
*/
for (Node t = tail; t != null && t != node; t = t.prev) {
if (t.waitStatus <= 0) {
s = t;
}
}
}
if (s != null) {
LockSupport.unpark(s.thread); /** 针对head节点的后续节点,执行unpark唤醒操作,促使其再次执行抢锁操作*/
}
}
unlock过程并没有区分公平锁或非公平锁,都是调用通用的release方法,基本流程如下:
- 调用
tryRelease
方法尝试释放资源,里面主要是将state-1(int c = getState() - releases;
),将c更新到state中则释放成功 - 如果在上一步锁释放成功且有后续节点,则调用
unparkSuccessor
方法唤醒下一个节点- 如果waitStatus不是CANCELLED(1)状态,都会将其更新为0
- 如果下一个节点是null或waitStatus是CANCELLED(1),则将s设置为null,并从tail节点开始向前遍历,找到最前面一个waitStatus != CANCELLED 的节点
- 如果上一步找到了一个符合条件的后置节点,则将其唤醒
LockSupport.unpark(s.thread);
,促使其再次执行抢锁操作
至此,所有的lock.unlock()解锁操作全部完成
思考
Synchronized与ReentrantLock的区别是什么?
区别点 | Synchronized | ReentrantLock |
---|---|---|
使用方式 | 关键字 | 实现类 |
实现方式 | JVM实现控制 | AQS实现控制 |
是否自动 | yes | no |
锁的获取 | 如果资源被锁,会一直等待 | 如果资源被锁,可以有多种处理方式 |
锁的释放 | 被锁的代码执行完or发生异常 | finally中手动编程释放 |
锁的状态 | 无法判断 | 可以判断,isLocked() |
锁的类型 | 可重入,不可中断,非公平锁 | 可重入,可中断(lockInterruptibly),公平锁or非公平锁 |
总结
在ReentrantLock中,主要也是基于AQS的同步器来进行实现,包括公平锁、非公平锁;加锁、解锁;锁的重入特性等
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/17838.html