文章目录
ReentrantLock
ReentantLock
是java中重入锁的实现,一次只能有一个线程来持有锁,包含三个内部类,Sync
、NonFairSync
、FairSync
。
1、构造函数
无参构造,默认使用的是非公平性锁
public ReentrantLock() {
sync = new NonfairSync();
}
有参构造, Boolean类型的参数 true:表示公平性锁 false:非公平性锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
reentantlock是lock接口的实现类,即实现了Lock接口下所有的方法;
获取锁的方法lock、trylock、lockintertuptibly加锁方式以及释放锁方法。
2、公平性锁和非公平性锁
(1)公平性锁和非公平性锁示例
NonFairAndFairDemo类
import java.util.concurrent.locks.ReentrantLock;
public class NonFairAndFairDemo implements Runnable {
//静态变量(线程共享)
private static int num = 0;
//锁实例
private ReentrantLock rtl;
public NonFairAndFairDemo(ReentrantLock rtl) {
this.rtl = rtl;
}
@Override
public void run() {
while (true) {
//加锁
rtl.lock();
num++;
System.out.println(Thread.currentThread().getName() + ":" + num);
rtl.unlock();
}
}
}
测试公平锁
@Test
public void test01() {
ReentrantLock reentrantLock = new ReentrantLock(true);
Thread threadA = new Thread(new NonFairAndFairDemo(reentrantLock));
threadA.setName("A");
Thread threadB = new Thread(new NonFairAndFairDemo(reentrantLock));
threadB.setName("B");
threadA.start();
threadB.start();
}
测试非公平锁
@Test
public void test02() {
ReentrantLock reentrantLock = new ReentrantLock(false);
Thread threadA = new Thread(new NonFairAndFairDemo(reentrantLock));
threadA.setName("A");
Thread threadB = new Thread(new NonFairAndFairDemo(reentrantLock));
threadB.setName("B");
threadA.start();
threadB.start();
}
执行结果
非公平性锁的特点,是每个线程都连续执行多次之后在替换成其他线程执行。
(2)公平锁和非公平锁的实现
abstract static class Sync extends AbstractQueuedSynchronizer
公平性锁和非公平性锁的父类是
sync
,sync
类是AbstractQueuedSynchronizer
是其子类,AQS是一个同步器,提供同步功能。
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
//加锁操作,声明是抽象方法,nofairsync和fairsync中各自实现
abstract void lock();
//非公平获取,公平性锁和非公平性锁都需要这个方法
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
//AQS获取state值
int c = getState();
if (c == 0) {
//锁空闲状态
//通过cas获取锁状态,修改state状态
if (compareAndSetState(0, acquires)) {
//标记当前线程为获取锁的线程
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
//锁非空闲,表明锁被占中,有一种情况,当前线程即为占用锁的线程
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
//当前线程继续持有锁,仅对state进行加操作
setState(nextc);
return true;
}
return false;
}
//释放锁 sync中的tryRelease是公平性锁和非公平性锁的释放锁流程都是该方法
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
//只有持有锁的线程才能释放锁
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {//锁才会被释放
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
//判断当前线程是否持有锁
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
//获取锁的持有者线程
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
//加锁的次数
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
//是否上锁 true:表示加锁
final boolean isLocked() {
return getState() != 0;
}
}
该Sync中方法的封装是调用AQS中的方法实现的
公平性锁和非公平锁的如何实现?
公平性锁:FairLock
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//同步状态为空闲,需要等待队列中第一个等待着执行
//什么时候当前线程可以执行? 等待队列里没有线程等待或者是有线程等待且等待的第一个线程就是当前线程
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
//当前同步状态非空闲,被线程占用且是当前线程
int nextc = c + acquires;
if (nextc < 0)
//有符号的int类型。最高位为1表示负数
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
类AbstractQueuedSynchronizer#acquire
public final void acquire(int arg) {
//当前同步状态非空闲,并且是其他线程持有锁 返回false
if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
公平性锁获取锁流程:
1、如果同步状态为空,就可以抢锁,能够获取锁的前提条件是当前等待队列为空,或者等待队列队头是当前线程,即当前线程才能够抢锁,通过CAS抢锁(state)抢锁成功记录当前线程信息到锁上。
2、如果同步状态不为空,即存在线程占用锁且占用线程是当前线程,当前线程可成功获取锁(state)。
非公平性锁
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final void lock() {
//执行Lock操作,尝试立即获取锁,失败就退回常规流程
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);//立即获取锁失败进入到acquire,首先调用tryAcquire
}
protected final boolean tryAcquire(int acquires) {
//同步状态为空闲或者不为空闲但是是当前线程持有锁,返回true表示抢锁成功
return nonfairTryAcquire(acquires);
}
}
类AbstractQueuedSynchronizer#acquire
public final void acquire(int arg) {
//当前同步状态非空闲,并且是其他线程持有锁 返回false
if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
通过代码可知:很多方法,trylock,unlock都是在父类sync实现
非公平性锁抢锁流程:
1、直接通过CAS操作抢锁,如果不成功进入常规抢锁流程。
2、获取当前锁的状态(state是否为0),如果为0表示空闲,直接通过CAS抢锁,如果成功,记录线程信息到锁上。
3、如果锁不为空闲且是当前线程持有锁,则可直接获取锁(state+1)。
重入锁的实现:
ReentrantLock
都是将具体实现委托给内部类(Sync、NonFairSync、FairSync)。
ReentrantLock
的重入次数是使用AQS的state属性,state大于0表示锁被占用(值表示当前线程重入次数),等于0表示锁空闲,小于0则表示重入次数太多导致溢出了。
可重入锁需要一个重入计数的变量,初始值为0,当成功请求锁加1,释放锁时减1,当释放锁时计数为0则真正释放锁,重入锁必须持有对锁持有者的引用,用以判断是否可以重入。
(3)Condition
synchronized与wait、notify、notifyAll方法结合可以实现等待/通知模式。reentantlock同样可以实现等待、通知模式,需要借助于Condition对象,具有更好的灵活性。
newCondition方法
public Condition newCondition()
Condition中提供的方法如下:
awaitXXX和Object中的wait方法类似,使当前线程进入休眠等待,
signal和Object中的notify方法类似,唤醒一个处于休眠状态的线程
signalAll和Object中的signalAll方法类似,唤醒所有处于休眠状态的线程。
生产者和消费者
生产者
public class Producer extends Thread {
private LinkedList<Integer> cap;//共享仓库
private Random random = new Random();
private ReentrantLock rlk;
private Condition pToc;
private Condition cTop;
public Producer(LinkedList<Integer> cap, ReentrantLock rlk, Condition pToc, Condition cTop) {
this.cap = cap;
this.rlk = rlk;
this.pToc = pToc;
this.cTop = cTop;
}
@Override
public void run() {
while (true) {
rlk.lock();
try {
if (cap.size() == 3) {//缓冲区满 生产者进行阻塞
System.out.println("缓冲区满");
cTop.await();
}
//生产产品
int i = random.nextInt(1000);
System.out.println("生产者生产了" + i);
cap.add(i);
//通知消费者消费产品
pToc.signal();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
rlk.unlock();
}
}
}
消费者
public class Consumer extends Thread {
private LinkedList<Integer> cap;//共享仓库
private ReentrantLock rlk;
private Condition pToc;
private Condition cTop;
public Consumer(LinkedList<Integer> cap, ReentrantLock rlk, Condition pToc, Condition cTop) {
this.cap = cap;
this.rlk = rlk;
this.pToc = pToc;
this.cTop = cTop;
}
@Override
public void run() {
while (true) {
rlk.lock();
try {
if (cap.size() == 0) { //如果缓冲区为0,消费者阻塞
System.out.println("缓冲区为空");
pToc.await();
}
//消费者消费产品
Integer i = cap.remove();
System.out.println("消费者消费了" + i);
//通知生产者生产
cTop.signal();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
rlk.unlock();
}
}
}
测试类
public class Test {
public static void main(String[] args) {
LinkedList<Integer> cap = new LinkedList<>();
ReentrantLock reentrantLock = new ReentrantLock();
//生产者通知消费者
Condition pToc = reentrantLock.newCondition();
//消费者通知生产者
Condition cTop = reentrantLock.newCondition();
Producer producer = new Producer(cap,reentrantLock,pToc,cTop);
Consumer consumer = new Consumer(cap,reentrantLock,pToc,cTop);
producer.start();
consumer.start();
}
}
执行结果
● 在调用Condition中的await或者是signal这些方法中任何的方法时,必须持有锁(ReentantLock),如果没有持有此锁,则抛出IllegalMonitorStateException异常。
● 在调用await方法时,将释放掉锁,并在这些方法返回之前,重新先获取该锁,才能执行。
● 如果线程在等待中被中断,则等待将终止,并抛出InterruptedException,清除掉中断状态。
● 等待状态的线程按照FIFO顺序接收信号。
● 等待方法返回的线程重新获取锁的顺序与线程最初获取锁的顺序是相同的。
循环打印ABC
ABCThread类
public class ABCThread extends Thread {
private String name;
private ReentrantLock rtl;
private Condition waitc;//等待Condition
private Condition sigalc; //通知Condition
public ABCThread(String name,ReentrantLock rtl,Condition wc,Condition sc){
this.name = name;
this.rtl = rtl;
this.waitc = wc;
this.sigalc = sc;
}
@Override
public void run() {
int num =0;
while (true) {
rtl.lock();
//等待其他线程通知,
try {
waitc.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
//打印当前线程信息
System.out.println(name);
//通知下一个线程
sigalc.signal();
++num;
if (num >= 10) break;
rtl.unlock();
}
}
}
测试
@Test
public void test() {
ReentrantLock reentrantLock = new ReentrantLock();
//A通知B
Condition ab = reentrantLock.newCondition();
//B通知C
Condition bc = reentrantLock.newCondition();
//C通知A
Condition ca = reentrantLock.newCondition();
new ABCThread("A", reentrantLock, ca, ab).start();
new ABCThread("B", reentrantLock, ab, bc).start();
new ABCThread("C", reentrantLock, bc, ca).start();
//先发起通知A线程
reentrantLock.lock();
ca.signal();
reentrantLock.unlock();
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/95482.html