AQS源码分析系列:(三)AQS锁的自定义和实现

阅读本文前,需要储备的知识点如下,点击链接直接跳转。

由于AQS源码分析篇幅较长,为避免阅读疲劳,特采用系列的形式分成了三篇,建议按顺序阅读。

  • AQS源码分析系列:(一)AQS基础知识
  • AQS源码分析系列:(二)AQS核心:加锁、释放锁、超时中断流程
  • AQS源码分析系列:(三)AQS锁的自定义和实现

讲完了基础知识和核心流程和逻辑之后,没阅读过的伙伴可以自行阅读,本篇我们来看一下AQS锁的自定义和实现。

AQS的使用

AQS是一个抽象队列同步框架,支持独占模式和共享模式,由于AQS是一个抽象类,仅仅需要子类去实现具体的获取锁释放锁方法,锁的获取和释放入口统一由AQS提供,如下所示。

独占模式

  • 获取锁入口

(1)不响应中断

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

(2)响应中断

public final void acquireInterruptibly(int arg)
        throws InterruptedException 
{
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

独占模式下,不管是否响应中断,获取锁时子类仅需要实现tryAcquire(arg)方法,尝试获取资源,成功则返回true,失败则返回false,其他都由AQS提供。

  • 释放锁入口
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

独占模式下,释放锁时子类仅需要实现tryRelease(arg)方法,尝试释放资源,成功则返回true,失败则返回false,其他都由AQS提供。

共享模式

  • 获取锁入口

(1) 不响应中断

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

(2) 响应中断

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException 
{
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

共享模式下,不管是否响应中断,获取锁时子类仅需要实现tryAcquireShared(arg)方法,尝试获取资源,返回值<0表示失败;=0表示成功,但没有剩余可用资源;>0表示成功,且有剩余资源,其他都由AQS提供。

  • 释放锁入口
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

共享模式下,释放锁时子类仅需要实现tryReleaseShared(arg)方法,尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false,其他都由AQS提供。

自定义锁的实现

使用AQS自定义锁时,子类可以实现Lock接口(因为Lock定义了获取锁和释放锁的方法,也可以不实现这个接口,自己定义方法),然后实现尝试获取锁和释放锁的方法即可。

需求

实现一个独占不响应中断不可重入的公平锁。

分析

独占锁需要实现tryAcquire(arg)、tryRelease(arg)这两个方法。不可重入,则要判断只要有线程占用锁,不管是不是当前线程都返回获取失败,公平锁说明尝试获取锁时要先看队列里是否有等待获取锁的Node。

实现

其实也就是ReentrantLock的另一个版本

  1. 定义一个实现需求的MyLock类。
  2. 定义MyLock类的加锁方法lock()和释放锁方法unLock()。
  3. 在MyLock类内部定义一个Sync类继承AbstractQueuedSynchronizer类,实现tryAcquire(int arg)和tryRelease(int arg)方法。
  4. MyLock类中定义一个Sync的变量,构造函数中实例化Sync类,在lock方法调用sync.acquire(1),在unlock方法中调用sync.release(1)

这样锁的定义和实现都完成了,代码如下。

public class MyLock {

    private Sync sync;

    public MyLock() {
        sync = new Sync();
    }

    private class Sync extends AbstractQueuedSynchronizer {

        @Override
        protected boolean tryAcquire(int arg) {
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, arg)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (getState() == 1) {
                free = true;
                setExclusiveOwnerThread(null);
                setState(0);
            }
            return free;
        }
    }

    public final void lock() {
        sync.acquire(1);
    }

    public void unLock() {
        sync.release(1);
    }
}

测试

  • 多个线程获取锁
class Test {
    public static void main(String[] args) {
        MyLock myLock = new MyLock();
        List<Thread> list = new ArrayList<>();
        for (int i = 0; i < 5; i++) {
            list.add(new Thread(() -> {
                System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "将要加锁");
                myLock.lock();
                System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "加锁成功");
                try {
                    System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "执行业务逻辑");
                    Thread.sleep(new Random().nextInt(10));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "解锁成功");
                    myLock.unLock();
                }
            }, "t" + i));
        }
        list.forEach(Thread::start);
    }
}

结果输出:

2023-06-08T11:35:27.822:t0将要加锁
2023-06-08T11:35:27.822:t4将要加锁
2023-06-08T11:35:27.822:t3将要加锁
2023-06-08T11:35:27.822:t1将要加锁
2023-06-08T11:35:27.822:t2将要加锁
2023-06-08T11:35:27.823:t0加锁成功
2023-06-08T11:35:27.823:t0执行业务逻辑
2023-06-08T11:35:27.828:t0解锁成功
2023-06-08T11:35:27.828:t4加锁成功
2023-06-08T11:35:27.828:t4执行业务逻辑
2023-06-08T11:35:27.831:t4解锁成功
2023-06-08T11:35:27.831:t3加锁成功
2023-06-08T11:35:27.831:t3执行业务逻辑
2023-06-08T11:35:27.836:t3解锁成功
2023-06-08T11:35:27.836:t1加锁成功
2023-06-08T11:35:27.836:t1执行业务逻辑
2023-06-08T11:35:27.837:t1解锁成功
2023-06-08T11:35:27.837:t2加锁成功
2023-06-08T11:35:27.837:t2执行业务逻辑
2023-06-08T11:35:27.845:t2解锁成功
  • 线程是否可重入
class Test {
    public static void main(String[] args) {
        MyLock myLock = new MyLock();
        new Thread(() -> {
            System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "将要加锁");
            myLock.lock();
            System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "加锁成功");
            try {
                myLock.lock();
                System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "再次加锁成功");
                System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "执行业务逻试");
                Thread.sleep(new Random().nextInt(10));
                myLock.unLock();
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "解锁成功");
                myLock.unLock();
            }
        },"t1").start();

        new Thread(() -> {
            System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "将要加锁");
            myLock.lock();
            System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "加锁成功");
            try {
                System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "执行业务逻试");
                Thread.sleep(new Random().nextInt(10));
                myLock.unLock();
                System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "解锁成功");
                myLock.lock();
                System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "再次加锁成功");
                myLock.unLock();
                System.out.println(LocalDateTime.now() + ":" + Thread.currentThread().getName() + "再次解锁成功");
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"t2").start();
    }
}

有两种可能的输出:

  1. t1先获取锁成功

这种情况输出如下,t1先加锁成功,t2等待,实现了多线程间的加锁互斥,另外t1加锁成功后有再次加锁,发现还是等待,这说明锁不可重入,功能实现,这两个线程都将一直等下去。

2023-06-08T11:47:57.016:t1将要加锁
2023-06-08T11:47:57.017:t1加锁成功
2023-06-08T11:47:57.016:t2将要加锁
  1. t2先获取锁成功

这种情况输出如下,t2先加锁成功,正常执行业务逻辑后释放锁,t2释放锁后线程可正常结束。t2释放了锁,则t1加锁成功,当t1想第二次再加锁时,发现需要等待,锁不可重入。

2023-06-08T11:49:28.492:t2将要加锁
2023-06-08T11:49:28.492:t1将要加锁
2023-06-08T11:49:28.493:t2加锁成功
2023-06-08T11:49:28.493:t2执行业务逻试
2023-06-08T11:49:28.501:t2解锁成功
2023-06-08T11:49:28.501:t1加锁成功

通过这两个例子,我们可以看出,这种独占锁、不可重入的情况下,lock()和unlock()方法必须配对使用,不能连续加锁和释放锁。

JUC包下AQS子类锁的实现

java.util.concurrent包下有几个基于AQS实现的锁,如下所示,有了以上知识基础,再理解这些锁是很容易的,了解详细可参考具体源码实现。

类型 描述
ReentrantLock 独享锁 可重入锁
ReentrantReadWriteLock 独享锁、共享锁兼备 ReadLock是共享锁,WriteLock是独享锁
CountDownLatch 共享锁 不可重复使用
Semaphore 共享锁 可重复使用
CyclicBarrier 共享锁 使用ReentrantLock实现的共享锁,可重复使用

总结

主要讲解了AQS的独占模式,提到了一些共享模式相关的知识,有了独享模式的基础,理解共享模式并不难,还有关于Condition相关的知识没有讲,所以关于共享模式和Condition相关的大家可以自行去阅读源码,后续有机会也会出相关的文章。 还有另外一个类AbstractQueuedLongSynchronizer,这个类是AbstractQueuedSynchronizer的一个变种,只是把state的类型从int变成long了,所有涉及跟这个state相关的操作参数和返回都改成long类型了,理论上使用这个类实现的锁可以超过Integer.MAX_VALUE的限制,最大的可获取锁的次数就变成Long.MAX_VALUE,这个在如多级锁和需要64位状态时会非常有用,目前在JDK里并没有发现使用的地方,而在HikariCP连接池com.zaxxer.hikari.util.QueuedSequenceSynchronizer这个类内部使用到了这个类,感兴趣的可自行阅读。 AQS的设计确实相当巧妙、逻辑非常严谨,在多线程下使用,已尽可能最大限度支持高并发操作,通过对源码的学习,我们了解了锁的设计,大部分的工作都由AQS完成(包括线程的包装排队、阻塞、唤醒、超时处理、中断处理等),剩下的小部分代码由开发者根据业务场景具体实现(尝试获取锁,释放锁),不得不佩服如此精美巧妙的设计和实现,Doug Lea,我永远的神!

欢迎关注公众号,欢迎分享、点赞、在看


原文始发于微信公众号(小新成长之路):AQS源码分析系列:(三)AQS锁的自定义和实现

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/238563.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!