Java JUC ReentrantReadWriteLock解析

介绍

ReentrantReadWriteLock 和 ReentrantLock 的区别是,ReentrantLock 是独占锁,同一时间只能有一个线程获取锁,但在实际中更多的是读多写少的情况,显然 ReentrantLock 满足不了该情况,而 ReentrantReadWriteLock 采用了读写分离的策略,可以允许多个线程同时进行读取

Java JUC ReentrantReadWriteLock解析
类图

从该类图可以看到,在读写锁内部维护了一个ReadLock和一个WriteLock,它们依赖Sync实现具体功能,而 Sync 继承自 AQS,并且也提供了公平非公平的实现。

下面我们只介绍非公平的实现

我们知道在 AQS 中维护了一个 state 状态,而在 ReentrantReadWriteLock 中则需要维护读状态和写状态,那么一个 state 如何标识写和读两种状态呢?

ReentrantReadWriteLock 巧妙地使用 state 的高 16 位表示读状态,也就是获取到读锁的次数;使用低 16 位表示获取到写锁的线程的可重入次数

static final int SHARED_SHIFT   = 16;
//共享锁(读锁)状态单位值65536
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
//共享锁线程最大个数65535
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
//排它锁(写锁)掩码,二进制,15个1
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

//返回读锁线程数量
static int sharedCount(int c)    return c >>> SHARED_SHIFT; }
//返回写锁可重入可数
static int exclusiveCount(int c) return c & EXCLUSIVE_MASK; }

在 ReentrantReadWriteLock 类图中,firstReader用来记录第一个获取到读锁的线程,firstReaderHoldCount则是记录第一个获取到读锁的线程可重入次数,cachedHoldCounter用来记录最后一个获取读锁的线程可重入次数。

readHolds是 ThreadLocal 变量,用来存放除第一个获取读线程外的其他线程获取读锁的可重入次数。

ThreadLocalHoldCounter 继承了 ThreadLocal,所以 initialValue 方法返回一个 HoldCounter 对象。

static final class ThreadLocalHoldCounter
    extends ThreadLocal<HoldCounter
{
    public HoldCounter initialValue() {
        return new HoldCounter();
    }
}
static final class HoldCounter {
    int count = 0;
    // Use id, not reference, to avoid garbage retention
    final long tid = getThreadId(Thread.currentThread());
}

写锁的获取与释放

📢:在 ReentrantReadWriteLock 中写锁是使用WriteLock类实现

void lock()

写锁为独占锁,在某一时刻只能有一个线程获取该锁,如果当前没有线程获取到读锁和写锁,则当前线程可以获取到写锁然后返回。如果当前已经有线程获取到读锁和写锁,则当前请求写锁的线程会被阻塞挂起,写锁是可重入锁,如果当前线程已经获取了该锁,则再次获取时仅仅将可重入次数加 1 即可。

public void lock() {
    sync.acquire(1);
}
public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
}

可以看到在 lock 内部调用了 AQS 的 acquire 方法,其中 tryAcquire 则是在 ReentrantReadWriteLock 内部的 Sync 类重写。如下:

protected final boolean tryAcquire(int acquires) {
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
       //(1) c != 0说明读锁或写锁已经被某线程获取
            if (c != 0) {
                //(2) w == 0 说明已经有线程获取到该锁,w != 0并且当前线程并不是拥有者则返回false
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                //(3) 说明当前线程获取到了写锁,判断可重入次数
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                //(4)可重入次数加一
                setState(c + acquires);
                return true;
            }
            //(5)第一个写线程获取写锁
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
}

在代码(1)中,如果当前 AQS 状态值不为 0 则说明已经有线程获取到了读锁或者写锁。

在代码(2)中,如果 w==0 则说明状态值的低 16 位为 0,而 AQS 状态值不为 0,暗示已经有线程获取到了读锁,所以返回 false。而如果 w!=0 则说明当前已经有线程获取到了写锁,那么看看是不是自己获得的,不是返回 false。

在代码(3)中,说明当前线程已经获取到了该锁,判断该线程可重入次数是否大于最大值,是则抛出异常,否则执行代码(4)进行增加可重入次数,然后返回 true。

如果 AQS 的状态值等于 0 则说明目前没有线程获取到读锁和写锁,所以执行代码(5)。其中,对于 writerShouldBlock 方法,非公平锁的实现为如下:

final boolean writerShouldBlock() {
      return false// writers can always barge
}

可以看到该方法返回 false,说明需要进行 CAS 尝试获取写锁,获取成功则设置当前锁的持有人为当前线程,然后返回 true,否则返回 false。

公平锁实现为如下:

final boolean writerShouldBlock() {
      return hasQueuedPredecessors();
}
public final boolean hasQueuedPredecessors() {
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
}

这里还是使用 hasQueuedPredecessors 来判断当前节点是否有前驱节点等,在《ReentrantLock 解析》文章已经讲过了,这里就不再多讲了。

void lockInterruptibly()

类似于 lock 方法,它的不同之处在于,它会对中断进行响应,也就是当其他线程调用了该线程的 interrupt 方法中断了当前线程时,当前线程会抛出异常 InterruptedException 异常。

public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
}

boolean tryLock()

尝试获取写锁,如果当前没有其他线程持有写锁或者读锁,则当前线程获取写锁然后返回 true;如果已经有线程获取写锁或读锁则返回 false,并且当前线程不会阻塞;如果当前线程已经持有了写锁则 state 值加 1 然后返回 true。实现与 tryAcquire 方法类似,不多讲。

public boolean tryLock( ) {
    return sync.tryWriteLock();
}
final boolean tryWriteLock() {
    Thread current = Thread.currentThread();
    int c = getState();
    if (c != 0) {
        int w = exclusiveCount(c);
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
    }
    if (!compareAndSetState(c, c + 1))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

boolean tryLock(long timeout,TimeUnit unit)

与 tryAcquire 方法不同之处在于,多了超时时间参数,如果尝试获取写锁失败后会把当前线程挂起指定时间,等待时间到后激活该线程重试获取,如果还是没获取到写锁则返回 false。另外,该方法会对中断进行响应,也就是当其他线程调用了该线程的 interrupt 方法中断了当前线程时,当前线程会抛出 InterruptedException 异常。

public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException 
{
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

void unlock()

尝试释放锁,如果当前线程持有该锁,调用后则会 AQS state 状态值减 1,如果减 1 后为 0 则释放该锁,否则仅仅减 1。如果当前线程没有持有该锁而调用该方法则抛出异常。

public void unlock() {
    sync.release(1);
}
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
protected final boolean tryRelease(int releases) {
    //判断调用线程是否是写锁拥有者
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
   //获取可重入值,这里没有考虑高16位,因为获取写锁时状态值肯定为0
    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;
    //如果为0则释放锁,否则只是更新状态值
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}

读锁的获取与释放

📢:在 ReentrantReadWriteLock 中读锁是使用ReadLock类实现

void lock()

获取读锁,如果当前线程没有持有该锁,则当前线程获取该锁,AQS 状态值 state 的高 16 位进行加 1,然后返回。否则如果其他线程持有写锁,则当前线程被阻塞。

public void lock() {
    sync.acquireShared(1);
}
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

在该段代码中,读锁的 lock 方法调用了 AQS 的 acquireShared 方法,随后内部调用了 ReentrantReadWriteLock 中的 Sync 类重写的 tryAcquireShared 方法。

protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    //(1)获取当前状态值
    int c = getState();
    //(2)判断是否被写锁占用
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    //(3)获取读锁计数
    int r = sharedCount(c);
    //(4)尝试获取锁,多个读线程同时只有一个会成功,不成功的进入fullTryAcquireShared方法重试
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        //(5)第一个线程获取锁,设置 firstReader 等于当前线程
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        //(6)如果当前线程是第一个获取锁的线程则进行第一个线程可重入次数加1
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            //(7)记录最后一个获取读锁的线程或其他线程读锁的可重入数
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
   //(8)类似tryAcquireShared,但是是自旋获取
    return fullTryAcquireShared(current);
}

该方法首先获当前 AQS 状态值,然后代码(2)查看是否有其他线程获取到了写锁,如果是则返回-1。然后放入 AQS 阻塞队列。

如果当前要获取读锁的线程已经持有了写锁,则也可以获取读锁。但是需要注意,当一个线程先获取了写锁,然后获取了读锁处理事情完毕后,要记得把读锁和写锁都释放掉,不能只释放写锁。

如果执行到代码(3)首先获取读锁的个数,到这里说明目前没有线程获取到写锁,但是可能已经有线程获取到了读锁,然后执行代码(4),其中非公平锁的 readerShouldBlock 实现如下:

final boolean readerShouldBlock() {
      return apparentlyFirstQueuedIsExclusive();
}
final boolean apparentlyFirstQueuedIsExclusive() {
    Node h, s;
    return //头节点不允许为空,也就是阻塞队列存在
            (h = head) != null &&
            //头节点下一个节点必须存在
            (s = h.next)  != null &&
            //下一个节点不能共享,也就是写锁
            !s.isShared()         &&
            //下一个节点的线程对象不允许为空
        s.thread != null;
}

该方法作用是,如果阻塞队列是空的,那么可以获取;如果阻塞队列不是空的,分两种情况。一:如果第一个节点是写节点,那么你不能获取读锁,阻塞排队。二,如果第一个节点是读节点,那么可以获取。

随后在代码(4)进行判断当前线程当前获取读锁的线程是否达到了最大值。最后执行 CAS 操作将 AQS 状态值的高 16 位值增加 1。

代码(5)(6)记录第一个获取读锁的线程并统计该线程获取读锁的可重入数。

代码(7)使用 cachedHoldCounter 记录最后一个获取到读锁的线程和该线程获取读锁的可重入数,readHolds 记录了当前线程获取读锁的可重入数。

如果 readerShouldBlock 方法返回 true,则说明有线程正在获取写锁,所以执行代码(8),fullTryAcquireShared 的代码与 tryAcquireShared 类似,它们的不同之处在于,前者通过循环自旋获取。

final int fullTryAcquireShared(Thread current) {
            HoldCounter rh = null;
            for (;;) {
                //获取状态值
                int c = getState();
                //如果存在写锁
                if (exclusiveCount(c) != 0) {
                    //持有者不是自己 返回 -1
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                //如果写锁空闲,且可以获取读锁
                } else if (readerShouldBlock()) {
                    // 第一个读线程是当前线程
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                     // 如果不是当前线程
                    } else {
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                                // 获取可重入次数
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        if (rh.count == 0)
                            return -1;
                    }
                }
                //如果读锁线程大于最大值 抛出异常
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // CAS 设置读锁,高位加1
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    // sharedCount(c) == 0 说明读锁空闲 进行设置
                    if (sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                     // 如果不空闲,并且第一个线程为当前线程则进行更新相加
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        //如果不是当前线程
                        if (rh == null)
                            rh = cachedHoldCounter;
                         // 如果最后一个读计数器所属线程不是当前线程
                        if (rh == null || rh.tid != getThreadId(current))
                            //创建一个cachedHoldCounter
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        //计数器自增
                        rh.count++;
                        // 更新缓存计数器
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
}

void lockInterruptibly()

类似于 lock 方法,不同之处在于,该方法会对中断进行响应,也就是当其他线程调用了该线程的 interrupt 方法中断了当前线程时,当前线程会抛出 InterruptedException 异常。

boolean tryLock()

尝试获取读锁,如果当前没有其他线程持有写锁,则当前线程获取读锁会成功,然后返回 true。如果当前已经有其他线程持有写锁则该方法直接返回 false,但当前线程并不会被阻塞。如果当前线程已经持有了该读锁则简单增加 AQS 的状态值高 16 位后直接返回 true。其代码类似 tryLock 的代码,这里不再讲述。

boolean tryLock(long timeout,TimeUnit unit)

与 tryLock 的不同之处在于,多了超时时间参数,如果尝试获取读锁失败则会把当前线程挂起指定时间,待超时时间到后当前线程被激活,如果此时还没有获取到读锁则返回 false。另外,该方法对中断响应,也就是当其他线程调用了该线程的 interrupt 方法中断了当前线程时,当前线程会抛出 InterruptedException 异常。

void unlock()

读锁的释放是委托给 Sync 来做,releaseShared 方法如下。

public void unlock() {
    sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    //如果当前线程是第一个线程
    if (firstReader == current) {
        // 如果等于1 说明没有重复获取,则设置为null
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
          // 否则减1
            firstReaderHoldCount--;
    } else {//如果不是当前线程
        HoldCounter rh = cachedHoldCounter;
        // 如果缓存是 null 或者缓存所属线程不是当前线程,则当前线程不是最后一个读锁
        if (rh == null || rh.tid != getThreadId(current))
          // 获取当前线程的计数器
            rh = readHolds.get();
        int count = rh.count;
        // 如果计数器小于等于一,就直接删除计数器
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        // 对计数器减一
        --rh.count;
    }
    // 死循环
    for (;;) {
        int c = getState();
        // 减去一个读锁。对高16位减1
        int nextc = c - SHARED_UNIT;
        // 修改成功,如果是 0,表示读锁和写锁都空闲,则可以唤醒后面的等待线程
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

如上代码所示,在死循环中,首先获取当前 AQS 状态值并保存到变量 c,然后 c 减去一个读计数单位后使用 CAS 操作更新 AQS 状态值,如果更新成果则查看当前 AQS 状态值是否为 0,为 0 说明没有读线程占用读锁,则返回 true。

随后调用 doReleaseShared 方法释放一个由于获取写锁而被阻塞的线程,如果当前 AQS 状态值不为 0,则说明还有其他读线程,所以返回 false。

锁的升降级

升降级是指读锁升级为写锁,写锁降级为读锁。在 ReentrantReadWriteLock 读写锁中,只支持写锁降级为读锁,而不支持读锁升级为写锁。

代码示例:

public class LockTest {

    private static ReentrantReadWriteLock lock = new ReentrantReadWriteLock(false);
    private static ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
    private static ReentrantReadWriteLock.ReadLock readLock = lock.readLock();

    public static void main(String[] args) {
        new Thread(() -> write()).start();
        new Thread(() -> read()).start();
    }

    private static void read() {
        readLock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + "开始学习《Thinking in Java》");
            writeLock.lock();
            System.out.println(Thread.currentThread().getName() + "获得到了写锁");
        } finally {
            writeLock.unlock();
            System.out.println(Thread.currentThread().getName() + "太难了!我不学了!");
            readLock.unlock();
        }
    }

    private static void write() {
        writeLock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + "开始印刷《Thinking in Java》");
            readLock.lock();
            System.out.println(Thread.currentThread().getName() + "在写锁中获取到了读锁");
        } finally {
            readLock.unlock();
            System.out.println(Thread.currentThread().getName() + "印刷完成");
            writeLock.unlock();
        }
    }
}

运行结果:

Thread-0开始印刷《Thinking in Java》
Thread-0在写锁中获取到了读锁
Thread-0印刷完成
Thread-1开始学习《Thinking in Java》

我们可以看到在写锁中成功获得到了读锁,而在读锁中被一直阻塞。说明不支持锁升级!

为什么 ReentrantReadWriteLock 不支持锁升级

主要是避免死锁,例如两个线程 A 和 B 都在读, A 升级要求 B 释放读锁,B 升级要求 A 释放读锁,互相等待形成死循环。如果能严格保证每次都只有一个线程升级那也是可以的。

在 tryAcquireShared 方法和 fullTryAcquireShared 中都有体现,例如下面的判断:

if (exclusiveCount(c) != 0) {
    if (getExclusiveOwnerThread() != current)
        return -1;

该代码意思是:当写锁被持有时,如果持有该锁的线程不是当前线程,就返回获取锁失败,反之就会继续获取读锁。称之为锁降级。

总结

  1. 读写锁特点:读锁是共享锁,写锁是排他锁,读锁和写锁不能同时存在
  2. 插队策略:为了防止线程饥饿,读锁不能插队
  3. 升级策略:只能降级,不能升级
  4. ReentrantReadWriteLock 适合于读多写少的场合,可以提高并发效率,而 ReentrantLock 适合普通场合

总结

Java JUC ReentrantReadWriteLock解析
总结

ReentrantReadWriteLock 巧妙地使用 AQS 的状态值的高 16 位表示获取到读锁的个数,低 16 位表示获取写锁的线程的可重入次数,并通过 CAS 对其进行操作实现了读写分离,非常适合在读多写少的场景下使用。

看😊

原文始发于微信公众号(Java菜鸟程序员):Java JUC ReentrantReadWriteLock解析

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/21016.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!