Lock 常用方法
首先要说明的就是Lock,通过查看Lock的源码可知,Lock是一个接口:
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}
首先lock()方法是平常使用得最多的一个方法,就是用来获取锁。如果锁已被其他线程获取,则进行等待。
如果采用Lock,必须主动去释放锁,并且在发生异常时,不会自动释放锁。因此一般来说,使用Lock必须在try{}catch{}块中进行,并且将释放锁的操作放在finally块中进行,以保证锁一定被被释放,防止死锁的发生。通常使用Lock来进行同步的话,是以下面这种形式去使用的:
Lock lock = ...;
lock.lock();
try{
//处理任务
}catch(Exception ex){
}finally{
lock.unlock(); //释放锁
}
tryLock()方法是有返回值的,它表示用来尝试获取锁,如果获取成功,则返回true,如果获取失败(即锁已被其他线程获取),则返回false,也就说这个方法无论如何都会立即返回。在拿不到锁时不会一直在那等待。
tryLock(long time, TimeUnit unit)方法和tryLock()方法是类似的,只不过区别在于这个方法在拿不到锁时会等待一定的时间,在时间期限之内如果还拿不到锁,就返回false。如果如果一开始拿到锁或者在等待期间内拿到了锁,则返回true。
所以,一般情况下通过tryLock来获取锁时是这样使用的:
Lock lock = ...;
if(lock.tryLock()) {
try{
//处理任务
}catch(Exception ex){
}finally{
lock.unlock(); //释放锁
}
}else {
//如果不能获取锁,则直接做其他事情
}
ReentrantLock 可重入锁
实现原理
满足线程的互斥特性
意味着同一个时刻,只允许一个线程进入到加锁的代码中。 -> 多线程环境下,线程的顺序访问
将阻塞线程放入AQS队列中
Lock类图
ReentrantLock继承了AQS(AbstractQueuedSynchronizer)类,并且定义了同步器对象Sync。AQS类是实现锁的基础对象,在该类中定义了线程的同步状态及一个FIFO的阻塞队列。
首先,来看ReentrantLock构造方法。
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
通过上述构造方法构建使用的同步器,默认是采用非公平锁的方式,可以通过构造传参进行选择。公平锁严格遵守FIFO模型,最先进入阻塞队列的线程在锁释放后最先执行。而非公平锁在锁释放后,任意线程均可抢占锁。
public void lock() {
sync.lock();
}
非公平锁
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() { //不管当前AQS队列中是否有排队的情况,先去插队
if (compareAndSetState(0, 1)) //返回false表示抢占锁失败
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
在NonfairSync.lock()方法中,会调用Unsafe
类的CAS方法修改线程状态,若能修改成功,则表示当前线程获得锁,会记录下当前线程信息;若CAS调用返回false,则调用AQS里面的acquire()方法。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
addWaiter(Node.EXCLUSIVE) -> 添加一个互斥锁的节点
acquireQueued() -> 自旋锁和阻塞的操作
这里的tryAcquire()会调用ReentrantLock#NonfairSync中的tryAcquire方法,源码如下:
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { //hasQueuedPredecessors
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
在这个过程中,会再次判断线程的状态,若线程是无锁状态,则再次通过CAS获取锁。若当前线程再次获取锁,则直接增加获取锁的次数,这里是实现可重入锁的核心方法。
若还是获取不到锁,回到AQS中的acquire方法,会将当前线程组装一个Node节点,然后添加到阻塞队列中:
private Node addWaiter(Node mode) {
//把当前线程封装成一个Node节点。
//后续唤醒线程的时候,需要 得到被唤醒的线程.
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
//假设不存在竞争的情况
if (pred != null) {
node.prev = pred;
//自旋
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
//自旋
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
//初始化一个head节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
通过AQS类中定义的compareAndSetHead和compareAndSetTail方法,将获取锁失败的线程构建成Node,并添加至一个双向链表中,然后在添加至阻塞队列时,将当前线程阻塞:
//node表示当前来抢占锁的线程,有可能是ThreadB、 ThreadC。。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) { //自旋
//begin ->尝试去获得锁(如果是非公平锁)
final Node p = node.predecessor();
//如果返回true,则不需要等待,直接返 回。
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//end
//否则,让线程去阻塞(park)
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//ThreadB、 ThreadC、ThreadD、ThreadE -> 都会阻塞在下面这个代码的位置.
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//被唤醒. (interrupt()->)
return Thread.interrupted();//中断状态(是否因为中断被唤醒的.)
}
AQS类中的tryRelease()方法也是一个抽象方法,具体实现由子类进行实现,源码如下:
abstract static class Sync extends AbstractQueuedSynchronizer {
...
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
// 删除独占锁标识
setExclusiveOwnerThread(null);
}
// 将state状态复原
setState(c);
return free;
}
}
state记录了获取锁的次数,如果是重入锁,需要等当前线程将锁完全释放后,才会将独占锁的线程标识清空。
在AQS的release方法中,若当前线程释放锁,且阻塞队列中有节点时,则会唤醒处于阻塞状态的下一个线程:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head; //得到当前AQS队列中的head节点。
if (h != null && h.waitStatus != 0)//head节点不为空
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)//表示可以唤醒状态
compareAndSetWaitStatus(node, ws, 0); //恢复成0
// 获取下一个节点,若下一个节点为空或者为取消状态,则从尾部往前找到当前节点后第一个未取消的节点
Node s = node.next;
if (s == null || s.waitStatus > 0) { //说明ThreadB这个线程可能已经被销毁,或 者出现异常...
s = null;
//从tail -> head进行遍历.
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0) //查找到小于等于0的节点
s = t;
}
// 释放锁
if (s != null)
LockSupport.unpark(s.thread); //释放在Node中的被阻塞的线程。ThreadB、 ThreadC。
}
这里借助LockSupport.unpark唤醒处于双向链表中的下一个节点,直到所有的锁释放,程序执行结束。
公平锁
而公平锁在获取锁时会检查阻塞队列中是否有值,若有值,则会将当前线程加入阻塞队列:
final void lock() {
acquire(1); //抢占1把锁.
}
public final void acquire(int arg) { //AQS里面的方法
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();//获得当前线程
int c = getState();
if (c == 0) {//表示无锁状态
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) { //CAS(#Lock) -> 原子操作| 实现互斥 的判断
setExclusiveOwnerThread(current); //把获得锁的线程保存到 exclusiveOwnerThread中
return true;
}
}
//如果当前获得锁的线程和当前抢占锁的线程是同一个,表示重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;//增加重入次数.
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);//保存state
return true;
}
return false;
}
其他的操作与非公平锁一致。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/16830.html