并发编程-由浅到深

导读:本篇文章讲解 并发编程-由浅到深,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

一、

线程和进程的区别

一个进程里跑着多个线程—-可以搜索一下线程和进程的类比:火车和火车车厢

并行和串行

并行:一起同时干活
串行:一个等着前一个把活干完才能继续干

二、java线程

1、创建线程的方法
2、参看线程
3、线程API
4、线程的状态

1)创建线程

1、new Thread();
2、将线程和任务(线程中需要执行的任务)分开
使用Runnable 配合Thread

Runnable runnable = new Runnable() {
public void run(){
// 要执行的任务
}
};
// 创建线程对象
Thread t = new Thread( runnable );
// 启动线程
t.start();

原理:Thread和Runnable之间的关系

3、FutureTask 配合Thread
使用get方法获取执行结果,FutureTask能接收Callable类型的参数,用来处理返回的结果

// 创建任务对象
FutureTask<Integer> task3 = new FutureTask<>(
() -> { log.debug("hello");
return 100;
});

// 参数1 是任务对象; 参数2 是线程名字,推荐
new Thread(task3, "t3").start();

// 主线程阻塞,同步等待 task 执行完毕的结果
Integer result = task3.get(); 
log.debug("结果是:{}", result);

2)线程关键概念

1、上下文切换:
以下情况下cpu切换线程,导致上下文切换
-cpu时间片用完
-垃圾回收
-优先级高的线程获取运行
-线程自己调用了sleep、yield、wait、join、park、synchronized、lock 等方法

3)线程中的常见方法

方法名 static 功能说明 注意

start() |
| 启动一个新线程,在新的线程运行 run 方法中的代码 | start 方法只是让线程进入就绪,里面代码不一定立刻运行(CPU 的时间片还没分给它)。每个线程对象的start方法只能调用一次,如果调用了多次会出现IllegalThreadStateException |
|

run() |
|

新线程启动后会调用的方法 | 如果 在构造 Thread 对象时传递了 Runnable 参数,则线程启动后会调用 Runnable 中的 run 方法,否则默认不执行任何操作。但可以创建 Thread 的子类对象, 来覆盖默认行为 |
|

join() |
| 等待线程运行结束 |
|
|

join(long n) |
| 等待线程运行结束,最多等待 n 毫秒 |
|
|

getId() |
| 获取线程长整型的 id | id 唯一 |
| getName() |
| 获取线程名 |
|
| setName(String) |
| 修改线程名 |
|
| getPriority() |
| 获取线程优先级 |
|
|

setPriority(int) |
| 修改线程优先级 | java中规定线程优先级是1~10 的整数,较大的优先级能提高该线程被 CPU 调度的机率 |
|

getState() |
|

获取线程状态 | Java 中线程状态是用 6 个 enum 表示,分别为: NEW, RUNNABLE, BLOCKED, WAITING, TIMED_WAITING, TERMINATED |
|

isInterrupted() |
| 判断是否被打断, | 不会清除 打断标记 |
|

isAlive() |
| 线程是否存活
(还没有运行完毕) |
|
|

interrupt() |
|

打断线程 | 如果被打断线程正在 sleep,wait,join 会导致被打断的线程抛出 InterruptedException,并清除 打断标
记 ;如果打断的正在运行的线程,则会设置 打断标记 ;park 的线程被打断,也会设置 打断标记 |
|

interrupted() |

static | 判断当前线程是否被打断 | 会清除 打断标记 |
|

currentThread() |

static | 获取当前正在执行的线程 |
|
|
|
|
| |

方法名 static 功能说明 注意

sleep(long n) |

static | 让当前执行的线程休眠n毫秒, 休眠时让出 cpu 的时间片给其它线程 |
|
|

yield() |

static | 提示线程调度器让出当前线程对CPU的使用 |

主要是为了测试和调试 |

sleep 与yield
sleep

  1. 调用sleep 会让当前线程从_Running _进入_Timed Waiting _状态(阻塞)
  2. 其它线程可以使用interrupt 方法打断正在睡眠的线程,这时sleep 方法会抛出InterruptedException
  3. 睡眠结束后的线程未必会立刻得到执行
  4. 建议用TimeUnit 的sleep 代替Thread 的sleep 来获得更好的可读性
    代码:
Thread thread = new Thread(() -> {

});
TimeUnit.SECONDS.sleep(1);
Thread.sleep(1);

Yield(让出,谦让)

  1. 调用yield 会让当前线程从_Running _进入_Runnable _就绪状态,然后调度执行其它线程
  2. 具体的实现依赖于操作系统的任务调度器,仅仅是让线程进入到就绪状态进行竞争

线程优先级
线程优先级会提示(hint)调度器优先调度该线程,但它仅仅是一个提示,调度器可以忽略它(和yield类似) 如果cpu 比较忙,那么优先级高的线程会获得更多的时间片,但cpu 闲时,优先级几乎没作用

Runnable task1 = () -> { int count = 0;
for (;;) {
System.out.println("---->1 " + count++);
}
};
Runnable task2 = () -> { int count = 0;
for (;;) {
// Thread.yield();
System.out.println("	---->2 " + count++);
}
};

Thread t1 = new Thread(task1, "t1");


join

  • 等待某一个线程运行结束后继续运行
  • join可以设置等待时间
static int r = 0;
public static void main(String[] args) throws InterruptedException { test1();
}
private static void test1() throws InterruptedException { log.debug("开始");
Thread t1 = new Thread(() -> {
log.debug(" 开 始 "); 
sleep(1);// 睡觉了
log.debug("结束"); 
r = 10;
});
t1.start();

t1.join();//主线程等待t1运行结束

log.debug("结果为:{}", r); log.debug("结束");
}



interrupt打断方法

  • 打断sleep,wait,join 的线程(打断阻塞状态下的线程)

park和unpark

  • 打断park 线程, 不会清空打断状态;当打断标记是true,两次调用park就会失效
  • 如果打断标记已经是true, 则park 会失效
private static void test3() throws InterruptedException { Thread t1 = new Thread(() -> {
log.debug("park...");

// 打断之后下面的方法不在运行
LockSupport.park(); 

log.debug("unpark...");
log.debug("打断状态:{}", Thread.currentThread().isInterrupted());
}, "t1");
t1.start();


sleep(0.5); t1.interrupt();
}

4)线程的生命周期

在这里插入图片描述
网上这个例子完美:
举个通俗一点的例子来解释上面五种状态,比如上厕所:

你平时去商城上厕所,准备去上厕所就是新建状态(new),上厕所要排队,排队就是就绪状态(Runnable),有坑位了,轮到你了,拉屎就是运行状态(Running),你拉完屎发现没有手纸,要等待别人给你送纸过来,这个状态就是阻塞(Blocked),等你上完厕所出来,上厕所这件事情结束了就是死亡状态了。

注意:便秘也是阻塞状态,你便秘太久了,别人等不及了,把你赶走,这个就是挂起,还有一种情况,你便秘了,别人等不及了,跟你说你先出去酝酿一下,5分钟后再过来拉屎,这就是睡眠。

5)主线程和守护线程

默认情况,java进程必须等待所有的线程都执行结束,才会结束。但是守护线程,只要所有非守护线程运行结束,就算守护线程没有执行结束,守护线程也会强制结束。

  • 例如:垃圾回收线程
log.debug(" 开 始 运 行 ..."); Thread t1 = new Thread(() -> {
log.debug("开始运行...");
sleep(2);
log.debug("运行结束...");
}, "daemon");

// 设置该线程为守护线程
t1.setDaemon(true);
t1.start();

sleep(1);
log.debug("运行结束...");

小结:

本章的重点在于掌握线程创建
线程重要api,如start,run,sleep,join,interrupt 等
线程状态应用方面
异步调用:主线程执行期间,其它线程异步执行耗时操作提高效率:并行计算,缩短运算时间
同步等待:join
统筹规划:合理使用线程,得到最优效果
原理方面
线程运行流程:栈、栈帧、上下文切换、程序计数器
Thread 两种创建方式的源码
模式方面
终止模式之两阶段终止

三、共享模型-管程

总览:

  • 共享问题
  • synchronized
  • 线程安全分析
  • Monitor
  • wait/notify
  • 线程状态转换活跃性
  • Lock
  • 无锁-偏向锁-轻量级锁-重量级锁(自旋锁优化)

1)共享产生的问题

多线程环境下,对共享变量进行操作时,会产生问题
举例:
a(线程一)和b(线程二)在同一张白纸(同一个共享资源)画画,a画了一会儿就去厕所(cpu时 间片用完了),b开始来画了(上下文切换);这个时候a画的东西就被b破坏了—–共享问题

1.1临界区

  • 一个程序运行多个线程本身是没有问题的
  • 问题出在多个线程访问共享资源

多个线程读共享资源其实也没有问题
在多个线程对共享资源读写操作时发生指令交错,就会出现问题

  • 一段代码块内如果存在对共享资源的多线程读写操作,称这段代码块为临界区
// 共享资源
static int counter = 0;

static void increment()
// 临界区
{
counter++;
}

static void decrement()
// 临界区
{
counter--;
}

1.2竞态条件

多个线程在临界区内执行,由于代码的执行序列不同(字节码交错)而导致结果无法预测,称之为发生了竞态条件

1.3使用synchronized 解决

解决临界区竞态条件发生

  • 阻塞式的解决方案:synchronized,Lock
  • 非阻塞式的解决方案:原子变量【后面去讲】

1.4使用synchronized对象锁

// 语法
synchronized(对象) // 线程1, 线程2(blocked)
{
    临界区
}


//E.G.
static int counter = 0;
// 对象锁(共享对象)
static final Object room = new Object();

public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(() -> {
for (int i = 0; i < 5000; i++)
// 锁对象(给对象上锁)
 { 
        synchronized (room) {
            counter++;
        }
    }
}, "t1");

Thread t2 = new Thread(() -> {
for (int i = 0; i < 5000; i++) { synchronized (room) {
counter--;
}
}
}, "t2");

t1.start();
t2.start();
t1.join();
t2.join(); log.debug("{}",counter);
}

当线程a获取对象锁的时候,那么线程a必须把临界资源里带代码执行完,才会让别的线程来操作临界区里面的代码。
e.g.
当线程a获取锁,在临界区执行到一半时cup的时间片用完了,这时临界区的代码块依然被锁着,就算有别的线程想执行临界区中的代码,也不会给别的线程机会。只有当线程a完全执行完临界区中的代码,别的线程才有机会获取。


synchronized 实际是用对象锁保证了临界区内代码的原子性,临界区内的代码对外是不可分割的,不会被线程切换所打断。

  • synchronized的的入门使用
  • 将其加在方法上,或给对象加锁
class Test{
public synchronized void test() {

}
}
等价于
class Test{
public void test() { 
    synchronized(this) {

    }
   }
}

1.5变量的线程安全如何分析

成员变量和静态变量是否安全?(什么时候要考虑线程安全问题?)

  • 是否共享
  • 共享的是否存在该=改变
    • 仅读安全
    • 有读有写,代码为临界区需要考虑线程安全

1)
局部变量是线程安全的
但局部变量引用的对象则未必【被引用需要考虑别的代码是否对其进行修改】
如果该对象没有逃离方法的作用访问,它是线程安全的如果该对象逃离方法的作用范围,需要考虑线程安全(return)

原因:局部变量的话,每个线程都会开启一个独立的栈帧;除非有引用,心中有图jvm
并发编程-由浅到深

2)成员变量

class ThreadUnsafe {
// 共享资源
ArrayList<String> list = new ArrayList<>(); public void method1(int loopNumber) {
for (int i = 0; i < loopNumber; i++) {
// { 临界区, 会产生竞态条件,对共享资源进行读写
method2(); method3();

// } 临界区
}
}

private void method2() { 
    list.add("1");
}

private void method3() { 
    list.remove(0);
}
}

在这里插入图片描述

修改上面代码为安全,将成员变量修改为局部变量

class ThreadSafe {
public final void method1(int loopNumber) {
    // 修改为局部变量
    ArrayList<String> list = new ArrayList<>(); 
    for (int i = 0; i < loopNumber; i++) {
method2(list); method3(list);
}
}

private void method2(ArrayList<String> list) { list.add("1");
}

private void method3(ArrayList<String> list) { list.remove(0);
}
}

在这里插入图片描述

2)Monitor

2.1java对象头

  • 普通对象头
|--------------------------------------------------------------|
|	Object Header (64 bits)	|
|------------------------------------|-------------------------|
|	Mark Word (32 bits)	|	Klass Word (32 bits) |
|------------------------------------|-------------------------|

  • 数组对象头
|---------------------------------------------------------------------------------|
|	Object Header (96 bits)	|
|--------------------------------|-----------------------|------------------------|
|	Mark Word(32bits)	|	Klass Word(32bits) |  array length(32bits) |
|--------------------------------|-----------------------|------------------------|

  • 对锁的定义主要在Mark word中=》markword的结构
// 释义:以无所为例,
//hashcode:对象的hashcode
// age:对象的分代年龄,用于gc使用,
// biased_lock:是否偏向锁
// 最后:是否加锁,01,00,10代表下面的不同的锁
|-------------------------------------------------------|--------------------|
|	Mark Word (32 bits)	|	State	|
|-------------------------------------------------------|--------------------|
| hashcode:25	| age:4 | biased_lock:0 | 01	|	Normal	|
|-------------------------------------------------------|--------------------|
| thread:23 | epoch:2 | age:4 | biased_lock:1 | 01	|	Biased	|
|-------------------------------------------------------|--------------------|
|	ptr_to_lock_record:30	| 00	| Lightweight Locked |
|-------------------------------------------------------|--------------------|
|	ptr_to_heavyweight_monitor:30 | 10	| Heavyweight Locked |
|-------------------------------------------------------|--------------------|
|	| 11	|	Marked for GC	|
|-------------------------------------------------------|--------------------|

2.1Monitor原理

在这里插入图片描述

1、每个对象都会关联一个Monitor对象
2、当使用synchronized将对象进行上锁,对象中的markword会指向monitor对象
3、解释上图的流程:

  • 刚开始的时候owner是null
  • 当线程1进来拿到锁时,将owner设置为线程1
  • 当线程1持有锁时,当别的线程再想尝试获得锁的时候就会进入到entryList
  • 当线程获得锁,但是不满足线程的执行条件的时候,将改线程方法waitSet中(这个后面具体分析)

2.2synchronized原理

总:

1)轻量级锁

synchronized的锁升级由浅到深
代码例子

static final Object obj = new Object(); 
public static void method1() {
synchronized( obj ) {
// 同步块 A method2();
}
}
public static void method2() { 
    synchronized( obj ) {
// 同 步 块 B
}
}

解释:

  • 当线程a获得锁,会在栈帧中添加一条锁记录

在这里插入图片描述

  • 第一步:将锁记录中的锁记录地址和对象头中的hashcode尝试进行cas(后面也会详细讲)。简单讲一下cas:拿新值和内存中的老值,比较,如果老值没变就交换成功,反之失败
  • 第二步: 将锁记录中的对象引用指向对象的地址

在这里插入图片描述

  • 注:如果cas成功就会交换,并将锁记录后的锁状态改为00(这里其实只需要知道修改了锁状态轻量级锁即可,没必要死记00这些),轻量级锁(对照上文对象头结构)
  • cas失败有两种情况
  • 1,别的线程已经持有该锁,存在锁竞争那么进行锁膨胀
  • 2、自已已经拿到了锁,自己执行了synchronized锁重入,在栈帧中添加一条锁记录,用作计数即可,解锁的时候直接去除,锁记录处是null值

在这里插入图片描述

  • 解锁:就是讲hashcode和锁记录再进行一个Cas操作
    • 成功就解锁成功
    • 失败进入重量级锁的解锁过程
2)锁膨胀,重量级锁
  • 锁膨胀的条件:当有线程已经占有锁时,那么这时又有线程想要占有该锁时(存在锁竞争),那么会认为轻量级锁不够用了,就将轻量级锁升级为重量级锁

在这里插入图片描述

  • 图解:就是线程a刚来的时候cas成功,那么线程b再cas一定是失败的,那么判断锁被占有,进入锁升级申请一个monitor对象(轻量级锁是没有monitor对象的,轻量级只是进行了cas操作,避免monitor对象对性能产生损耗,所以才会被称之为轻量级锁)

重量级锁图解:
在这里插入图片描述

  • 申请monitor对象之后,没拿到锁的线程会进入到monitor对象中的EntryList中进行阻塞
3)自旋优化
  • 因为重量级锁的性能会有损耗,所以再轻量级锁到重量级锁升级的过程中会先进行自旋。自旋成功就不用升级为重量级锁了,避免不必要的开销

注意:

  • 自旋锁的自适应:若在进行自旋的时候如果获得了锁,那么下次会让他多自旋几次,反之少自旋或不自旋
  • 默认自旋锁是开启的
4)偏向锁
  • 因为在轻量级锁的时候,当线程进行锁重入的时候,仍然会进行cas那么还是会有一定的性能损耗。此时应运而生的就是偏向锁。
  • 偏向锁就是:当线程第一次进来的时候进行cas操作,之后只要发现markwork中的线程id属于自己,那么就不进行cas,以后只要不竞争,这个对象就属于这个线程。

注(这里没必要特意去记忆,只要知道markwork中的代表的意义即可):
如果开启了偏向锁(默认开启),那么对象创建后,markword 值为0x05 即最后3 位为101,这时它的thread、epoch、age 都为0
偏向锁是默认是延迟的,不会在程序启动时立即生效,如果想避免延迟,可以加VM 参数-
XX:BiasedLockingStartupDelay=0 来禁用延迟
如果没有开启偏向锁,那么对象创建后,markword 值为0x01 即最后3 位为001,这时它的hashcode、age 都为0,第一次用到hashcode 时才会赋值

  • 偏向锁撤销与失效
  • 1、调用对象的hashcode方法–失效【原因:这里我不理解,稍微去查一下】
  • 2、其他线程有使用–说明不能偏向,一定撤销
  • 3、调用wait/notify
  • 4、批量重偏向:当一开始偏向线程a,但是实际上经常线程b获取,这种情况会偏向撤销,当撤销的阈值达到20次后,那么jvm会认为是不是偏向错了,就会偏向给线程b
  • 5、批量撤销:当撤销偏向锁的次数到达40次后,jvm会认为自己偏向错了以后就不再偏向了【注意和4区别】

2.3wait/notify

在这里插入图片描述

1)wait/notify原理
  • wait当线程抢到锁时,但是他有某些条件不满足,所以他不能继续执行临界区中的代码,那么总不能让他一直占着锁吧,可以让他先区waitSet中去等待,让别的线程先干活,等他需要的条件满足了,再唤醒他起来干活就行了。——由此可见wait会释放锁,不然别的线程没办法拿到锁干活啊。【你要是老板的话,你也会这么安排活给手下去干,极度压榨】
  • wait和sleep之间的区别
    • 1、sleep是线程的方法,wait是object的方法
    • 2、sleep不需要和synchronized配合使用,wait必须要和它配合
    • 3、sleep不释放锁,wait释放锁
  • API
    • obj.wait() 让进入object 监视器的线程到waitSet 等待
    • obj.notify() 在object 上正在waitSet 等待的线程中挑一个唤醒
    • obj.notifyAll() 让object 上正在waitSet 等待的线程全部唤醒
    • 必须获取对象锁才能进行下面的操作
final static Object obj = new Object(); public static void main(String[] args) {
new Thread(() -> { synchronized (obj) {
log.debug("执行	");
try {
obj.wait(); // 让线程在obj上一直等待下去
} catch (InterruptedException e) { e.printStackTrace();
}
log.debug("其它代码	");
}
}).start();

new Thread(() -> { synchronized (obj) {
log.debug("执行	");
try {
obj.wait(); // 让线程在obj上一直等待下去
} catch (InterruptedException e) { e.printStackTrace();
}
log.debug("其它代码	");
}
}).start();

// 主线程两秒后执行
sleep(2);
log.debug("唤醒 obj 上其它线程"); synchronized (obj) {
obj.notify(); // 唤醒obj上一个线程
// obj.notifyAll(); // 唤醒obj上所有等待线程
}
}

2)正确使用notify和wait
// 正确格式
synchronized(lock) { 
    while(条件不成立) {
    // 调用等待到waitSet中
    lock.wait();
}
// 干 活
}

//另一个线程
synchronized(lock) {
    lock.notifyAll();
}

2.4park和unpark

1)使用

LockSupport类中的方法,这里需要先park后unpark

// 暂停当前线程
LockSupport.park();

// 恢复某个线程的运行
LockSupport.unpark(暂停线程对象)

2)park和unpark、wait和notify对比
  • wait,notify 和notifyAll 必须配合Object Monitor 一起使用,而 park,unpark 不必
  • park & unpark(准确) 是以线程为单位来【阻塞】和【唤醒】线程,而notify 只能随机唤醒一个等待线程,notifyAll 是唤醒所有等待线程,就不那么【精确】
  • park & unpark 可以先unpark(先unpark的话,再调用park会失效),而wait & notify 不能先notify
3)原理

在这里插入图片描述

  • mutex:代表锁,线程需要获取它才能进行
  • counter:代表是否能获得锁,理解为吃的(counter1是有吃的有线程才能进行干活,counter0没有吃的)
  • cond:阻塞区

2.5多把锁

  • 如果锁的粒度太大,会造成并发变低,那么减小锁的粒度是一个很好的方法。
  • 比如:有一个大房间,大房间里面又有小房间,那么现在每个人要到不同的房间进行不同的活动,那我们是给大房间间上锁好呢?还是给里面的小房间上锁好呢?—可想而知小房间吧!

给大房间上锁

class BigRoom {

public void sleep() { 
    synchronized (this) {
log.debug("sleeping 2 小时");
Sleeper.sleep(2);
}
}

public void study() { synchronized (this) {
log.debug("study 1 小时");
Sleeper.sleep(1);
}
}

}

给小房间上锁

class BigRoom {
// 分别上锁,细粒度锁
// 定义两个对象锁
private final Object studyRoom = new Object(); 
private final Object bedRoom = new Object();


public void sleep() {

synchronized (bedRoom) { log.debug("sleeping 2 小时"); Sleeper.sleep(2);
}
}

public void study() { synchronized (studyRoom) {
log.debug("study 1 小时");
Sleeper.sleep(1);
}
}

}

  • 多把锁的优点和缺点:
  • 优点:并发变高
  • 缺点:可能会造成死锁(锁的活跃性,下节介绍)

2.6锁的活跃性

1)死锁

a占有一把锁A,b同时占有一把锁B;这时候a要等B锁去做一些事情(但是B锁被b占有),同时b也在等A锁去做一些事情(但是A锁被a占有)。那么这样等来等去,谁也拿不到,必然死锁
—-有兴趣的可以去了解一下哲学家就餐问题

Object A = new Object(); 
Object B = new Object(); 
Thread t1 = new Thread(() -> {
synchronized (A) { log.debug("lock A"); sleep(1); synchronized (B) {
log.debug("lock B"); log.debug("操作...");
}
}
}, "t1");

Thread t2 = new Thread(() -> { 
    synchronized (B) {
log.debug("lock B"); sleep(0.5); synchronized (A) {

log.debug("lock A");

log.debug("操作...");
}
}
}, "t2");
t1.start();
t2.start();

2)活锁

两个线程互相改变对方结束的条件,导致两个线程都不能结束

public class TestLiveLock {

static volatile int count = 10;

static final Object lock = new Object();

public static void main(String[] args) { new Thread(() -> {
// 期望减到 0 退出循环
while (count > 0) { sleep(0.2);
count--;
log.debug("count: {}", count);
}
}, "t1").start(); new Thread(() -> {
// 期望超过 20 退出循环
while (count < 20) { sleep(0.2); count++;
log.debug("count: {}", count);
}
}, "t2").start();
}
}

3)饥饿

一个线程的优先级太低,导致该线程始终得不到cpu的调度

2.7ReentrantLock

  • ReentrantLock和synchronized对比

在这里插入图片描述

1)基本用法
// 获取锁
static ReentrantLock reentrantLock = new ReentrantLock();
reentrantLock.lock();
try {
// 临界区
} finally {
// 释放锁
reentrantLock.unlock();
}

2)可打断
// 必须使用这个方法尝试获得锁
lock.lockInterruptibly();

3)锁超时
  • 可以很好的解决死锁问题{哲学家问题也可以解决},长时间不释放锁,强制让他释放
// 使用reentrantLock
ReentrantLock lock = new ReentrantLock(); 

Thread t1 = new Thread(() -> {
log.debug("启动...");
try {
if (!lock.tryLock(1, TimeUnit.SECONDS)) {
    log.debug("获取等待 1s 后失败,返回"); 
    return;
}
} catch (InterruptedException e) { e.printStackTrace();
}
try {
log.debug("获得了锁");
} finally {
lock.unlock();
}
}, "t1");

4)条件变量
  • synchronized的条件变量是waitSet,RerentrantLock的条件变量比synchronized更强。
  • synchronized只能有一个条件变量,RerentrantLock支持多个条件变量

使用

  • await 前需要获得锁
  • await 执行后,会释放锁,进入conditionObject 等待(条件变量中,类似synchronized的waitSet一样)
  • await 的线程被唤醒(或打断、或超时)取重新竞争lock 锁
  • 竞争lock 锁成功后,从await 后继续执行
  • signal唤醒
static ReentrantLock lock = new ReentrantLock();
// 第一个休息室,下面第二个
static Condition waitCigaretteQueue = lock.newCondition(); static Condition waitbreakfastQueue = lock.newCondition(); static volatile boolean hasCigrette = false;
static volatile boolean hasBreakfast = false;

public static void main(String[] args) { new Thread(() -> {
try {
lock.lock();

while (!hasCigrette) {

try {
waitCigaretteQueue.await();
} catch (InterruptedException e) { e.printStackTrace();
}
}
log.debug("等到了它的烟");
} finally {
lock.unlock();
}
}).start();

new Thread(() -> { try {
lock.lock();
while (!hasBreakfast) { try {
waitbreakfastQueue.await();
} catch (InterruptedException e) { e.printStackTrace();
}
}
log.debug("等到了它的早餐");
} finally {
lock.unlock();
}
}).start();

sleep(1); sendBreakfast(); sleep(1); sendCigarette();
}

private static void sendCigarette() { lock.lock();
try {
log.debug(" 送 烟 来 了 "); hasCigrette = true; waitCigaretteQueue.signal();
} finally {
lock.unlock();
}
}

private static void sendBreakfast() { lock.lock();
try {
log.debug(" 送 早 餐 来 了 "); hasBreakfast = true; waitbreakfastQueue.signal();
} finally {

lock.unlock();

}
}

2.8小结:

1)

对象头模型,对象头中的markword模型

2)

分析多线程访问共享资源时,哪些代码片段属于临界区使用synchronized 互斥解决临界区的线程安全问题
掌握synchronized 锁对象语法
掌握synchronzied 加载成员方法和静态方法语法掌握wait/notify 同步方法
使用lock 互斥解决临界区的线程安全问题
掌握lock 的使用细节:可打断、锁超时、公平锁、条件变量学会分析变量的线程安全性、掌握常见线程安全类的使用
了解线程活跃性问题:死锁、活锁、饥饿
应用方面
互斥:使用synchronized 或Lock 达到共享资源互斥效果
同步:使用wait/notify 或Lock 的条件变量来达到线程间通信效果
原理方面
monitor、synchronized 、wait/notify 原理
synchronized 进阶原理
park & unpark 原理

四、java内存模型–JMM(Java Memory Model)

  • JMM定义了主存(所有线程共享)、工作内存(每个线程私有)
  • JMM体现在以下方面
    • 原子性- 保证指令不会受到线程上下文切换的影响(简单来说就是代码不可分割)
    • 可见性- 保证指令不会受cpu 缓存的影响(线程a改了共享变量,线程b也必须知道)
    • 有序性- 保证指令不会受cpu 指令并行优化的影响(防止指令重排造成问题)

4.1原子性

4.2可见性

在这里插入图片描述

  • 1、为了减少线程对主存的访问,影响效率,所以讲主存中的数据缓存到自己的工作内存
  • 2、每个线程都会有一块属于自己的工作内存,互不干扰,那么在高并发的情况下可能造成一个问题,每个线程都是在自己的工作内存操作共享变量,没有同步到主存,导致数据产生问题
  • 解决方法
    • volatile(易变关键字)
    • 它可以用来修饰成员变量和静态成员变量,他可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取 它的值,线程操作volatile 变量都是直接操作主存

4.3有序性

JVM 会在不影响正确性的前提下,可以调整语句的执行顺序,思考下面一段代码
对应先执行还是后执行都不会影响执行结果的代码,jvm会对其进行优化【指令重排】
单线程下是没有问题的,但是在并发的情况下会有问题

static int i; static int j;

// 在某个线程内执行如下赋值操作
i = ...;
j = ...;

  • 解决方法
    • volatile 修饰的变量,可以禁用指令重排

由于读写屏障,注意volatile加的位置

@Outcome(id = {"1", "4"}, expect = Expect.ACCEPTABLE, desc = "ok") @Outcome(id = "0", expect = Expect.ACCEPTABLE_INTERESTING, desc = "!!!!") @State
public class ConcurrencyTest {

int num = 0;
// 加一个就行,加在下面那个(写屏障)
volatile boolean ready = false; 
@Actor
public void actor1(I_Result r) { 
    if(ready) {
r.r1 = num + num;
} else {
r.r1 = 1;
}
}

@Actor
public void actor2(I_Result r) { num = 2;
ready = true;
}

}

4.4volatile 原理

底层实现内存屏障
可见性:

  • 对volatile 变量的写指令后会加入写屏障
    • —写屏障(sfence)保证在该屏障之前的,对共享变量的改动,都同步到主存当中
  • 对volatile 变量的读指令前会加入读屏障
    • 而读屏障(lfence)保证在该屏障之后,对共享变量的读取,加载的是主存中最新数据

在这里插入图片描述

有序性:

  • 写屏障会确保指令重排序时,不会将写屏障之前的代码排在写屏障之后
  • 读屏障会确保指令重排序时,不会将读屏障之后的代码排在读屏障之前

五、共享模型之无锁-(为了提高加锁造成的性能损耗)

  • CAS 与volatile
  • 原子整数
  • 原子引用
  • 原子累加器-后期学习添加
  • Unsafe-后期学习添加

5.1CAS (compareAndSet)

  • 1、三个基本操作数:内存地址,旧的预期值,新值
  • 更新一个变量的时候,只有当变量的预期值A和内存地址V当中的实际值相同时,才会将内存地址V对应的值修改为B。

5.2volatile

获取共享变量时,为了保证该变量的可见性,需要使用volatile 修饰。
它可以用来修饰成员变量和静态成员变量,他可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取 它的值,线程操作volatile 变量都是直接操作主存。即一个线程对volatile 变量的修改,对另一个线程可见。
注:volatile 仅仅保证了共享变量的可见性,让其它线程能够看到最新值,但不能解决指令交错问题(不能保证原子性)–和前文有所不一样
CAS 必须借助volatile 才能读取到共享变量的最新值来实现【比较并交换】的效果

5.3为什么无锁效率高(适用于线程数相对较少的情况下)

  • synchronized会让线程发生上下文切换,那么这样是消耗性能的。但是在无锁的情况下,线程一直是在运行着的没有上下文切换带来的损耗。

5.4CAS的特点

结合CAS 和volatile 可以实现无锁并发,适用于线程数少、多核CPU 的场景下,线程数不要多于cpu核心数

  • 乐观锁:最乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系,我吃亏点再重试呗。
  • CAS 体现的是无锁并发、无阻塞并发,请仔细体会这两句话的意思—简单点说就是为了防止上下文切换带来的开销

5.5原子整数

  • juc下的原子类
    • AtomicBoolean 、AtomicInteger
// 以 AtomicInteger 为例

AtomicInteger i = new AtomicInteger(0);

// 获取并自增(i = 0, 结果 i = 1, 返回 0),类似于 i++ System.out.println(i.getAndIncrement());

// 自增并获取(i = 1, 结果 i = 2, 返回 2),类似于 ++i System.out.println(i.incrementAndGet());

// 自减并获取(i = 2, 结果 i = 1, 返回 1),类似于 --i System.out.println(i.decrementAndGet());

// 获取并自减(i = 1, 结果 i = 0, 返回 1),类似于 i-- System.out.println(i.getAndDecrement());

// 获取并加值(i = 0, 结果 i = 5, 返回 0) System.out.println(i.getAndAdd(5));

// 加值并获取(i = 5, 结果 i = 0, 返回 0) System.out.println(i.addAndGet(-5));

// 获取并更新(i = 0, p 为 i 的当前值, 结果 i = -2, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
System.out.println(i.getAndUpdate(p -> p - 2));

// 更新并获取(i = -2, p 为 i 的当前值, 结果 i = 0, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
System.out.println(i.updateAndGet(p -> p + 2));

// 获取并计算(i = 0, p 为 i 的当前值, x 为参数1, 结果 i = 10, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
// getAndUpdate 如果在 lambda 中引用了外部的局部变量,要保证该局部变量是 final 的
// getAndAccumulate 可以通过 参数1 来引用外部的局部变量,但因为其不在 lambda 中因此不必是 final
System.out.println(i.getAndAccumulate(10, (p, x) -> p + x));

// 计算并获取(i = 10, p 为 i 的当前值, x 为参数1, 结果 i = 0, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
System.out.println(i.accumulateAndGet(-10, (p, x) -> p + x));

5.6原子引用

  • AtomicReference
  • AtomicMarkableReference
  • AtomicStampedReference

5.7ABA问题

  • 线程1先进来,期望将A改成B(AB),那么开始进行CAS,首先拿出内存中的值和和旧值进行比较,一比相同可以改,但是在这个阶段线程1的cpu时间片到期了
  • 线程2进来,期望将A改成B(AB)也是先进行CAS,首先拿出内存中的值和和旧值进行比较,一比相同也可以改,那就改掉了,将A改成了B(BA)
  • 但是第一步线程1已经比较过了,也可以交换,那么线程1又将B改回了A(AB)

解决方案:
添加版本号,每次比较的时候都会记录一个版本号,当发生交换的时候先校验版本号,版本号一致才能修改

原子引用的使用:
AtomicStampedReference 可以给原子引用加上版本号,追踪原子引用整个的变化过程,如:
C ,通过AtomicStampedReference,我们可以知道,引用变量中途被更改了几次。
但是有时候,并不关心引用变量更改了几次,只是单纯的关心是否更改过,所以就有了
AtomicMarkableReference

具体还有例子可以参考这篇:
https://blog.csdn.net/qq_32998153/article/details/79529704?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522165413478716782391899635%2522%252C%2522scm%2522%253A%252220140713.130102334…%2522%257D&request_id=165413478716782391899635&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2alltop_positive~default-1-79529704-null-null.142v11control,157v12control&utm_term=cas&spm=1018.2226.3001.4187

六、** 共享模型之不可变**

  • 不可变类的使用
  • 不可变类设计
  • 无状态类设计

6.1可变类是线程不安全的

6.2解决一:通过加同步锁让可变类变成线程安全,那么可能会带来性能上的损耗

6.3解决二:不可变———–如果一个对象在不能够修改其内部状态(属性),那么它就是线程安全的,因为不存在并发修改啊!这样的对象在Java 中有很多,例如在Java 8 后,提供了一个新的日期格式化类:// 线程安全类

以大家都熟悉的String为例,说明不可变元素的设计

public final class String
implements java.io.Serializable, Comparable<String>, CharSequence {
/** The value is used for character storage. */
private final char value[]; // final修饰

/** Cache the hash code for the string */ 
private int hash; // Default to 0
 // 私有的,并且没有set方法

// ...

}

  • final使用
    • 属性用final 修饰保证了该属性是只读的,不能修改
    • 类用final 修饰保证了该类中的方法不能被覆盖,防止子类无意间破坏不可变性
public String substring(int beginIndex) { if (beginIndex < 0) {
throw new StringIndexOutOfBoundsException(beginIndex);
}
int subLen = value.length - beginIndex; if (subLen < 0) {
throw new StringIndexOutOfBoundsException(subLen);
}
                                         // 这里是返回一个新的实例,进入构造方法
return (beginIndex == 0) ? this : new String(value, beginIndex, subLen);
}

// 上面的构造方法
public String(char value[], int offset, int count) { if (offset < 0) {
throw new StringIndexOutOfBoundsException(offset);
}
if (count <= 0) { if (count < 0) {
throw new StringIndexOutOfBoundsException(count);
}
if (offset <= value.length) { this.value = "".value; return;
}
}
if (offset > value.length - count) {
throw new StringIndexOutOfBoundsException(offset + count);
}
                                                    // copy一个新的实例
this.value = Arrays.copyOfRange(value, offset, offset+count);
}

  • 拷贝性保护
    • 可以看到方法中返回的是一个新的实例
    • 构造新字符串对象时,会生成新的char[] value,对内容进行复制 。这种通过创建副本对象来避免共享的手段称之为【保护性拷贝(defensive copy)】

七、** 共享模型之工具**

7.1线程池

优势:
1、降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗
2、提高响应速度:拿着就去使用就好,不用等你去造一个,浪费时间
3、提高线程的可管理性:由线程池统一管理

  • 1、七大参数
    • corePoolSize 核心线程数目(最多保留的线程数)
    • maximumPoolSize 最大线程数目
    • keepAliveTime 生存时间- 针对救急线程
    • unit 时间单位- 针对救急线程
    • workQueue 阻塞队列
    • threadFactory 线程工厂- 可以为线程创建时起个好名字
    • handler 拒绝策略,当到达最大线程的时候执行淘汰策略

参考博客:
https://blog.csdn.net/u013541140/article/details/95225769?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522165413912316781818796672%2522%252C%2522scm%2522%253A%252220140713.130102334…%2522%257D&request_id=165413912316781818796672&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2alltop_positive~default-1-95225769-null-null.142v11control,157v12control&utm_term=%E7%BA%BF%E7%A8%8B%E6%B1%A0&spm=1018.2226.3001.4187
博客注意点:
1、(任务队列)有界队列和无界队列的区别
注意有界队列和无界队列的区别:如果使用有界队列,当队列饱和时并超过最大线程数时就会执行拒绝策略;而如果使用无界队列,因为任务队列永远都可以添加任务,所以设置 maximumPoolSize 没有任何意义。

7.2Fork/Join—JDK1.7新的线程池

7.3JUC

引用:
https://juejin.cn/post/6844903997438951437

  • AQS原理[AbstractQueuedSynchronizer]是用来构建锁和同步器的框架(主要就是定义了标准的同步状态,FIFO同步队列)
  • 基于AQS构建的同步器
    • ReentrantLock
    • Semaphore
    • CountDownLatch
    • ReentrantReadWriteLock【通过两个内部类读锁和写锁分别实现了两套api来实现的】
    • SynchronusQueue
    • FutureTask
  • 以上类都是在类的内部定义了一个静态内部类去继承AQS

AQS核心
在这里插入图片描述

1、FIFO:同步队列,是一个双向列表。包含头和尾节点,头节点是为了后序的调度工作,每个节点的属性定义在AQS的Node节点中,使用node去实现FIFO队列

static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;

        /**
         * Status field, taking on only the values:
         *   SIGNAL:     The successor of this node is (or will soon be)
         *               blocked (via park), so the current node must
         *               unpark its successor when it releases or
         *               cancels. To avoid races, acquire methods must
         *               first indicate they need a signal,
         *               then retry the atomic acquire, and then,
         *               on failure, block.
         *   CANCELLED:  This node is cancelled due to timeout or interrupt.
         *               Nodes never leave this state. In particular,
         *               a thread with cancelled node never again blocks.
         *   CONDITION:  This node is currently on a condition queue.
         *               It will not be used as a sync queue node
         *               until transferred, at which time the status
         *               will be set to 0. (Use of this value here has
         *               nothing to do with the other uses of the
         *               field, but simplifies mechanics.)
         *   PROPAGATE:  A releaseShared should be propagated to other
         *               nodes. This is set (for head node only) in
         *               doReleaseShared to ensure propagation
         *               continues, even if other operations have
         *               since intervened.
         *   0:          None of the above
         *
         * The values are arranged numerically to simplify use.
         * Non-negative values mean that a node doesn't need to
         * signal. So, most code doesn't need to check for particular
         * values, just for sign.
         *
         * The field is initialized to 0 for normal sync nodes, and
         * CONDITION for condition nodes.  It is modified using CAS
         * (or when possible, unconditional volatile writes).
         */
        volatile int waitStatus;

        /**
         * Link to predecessor node that current node/thread relies on
         * for checking waitStatus. Assigned during enqueuing, and nulled
         * out (for sake of GC) only upon dequeuing.  Also, upon
         * cancellation of a predecessor, we short-circuit while
         * finding a non-cancelled one, which will always exist
         * because the head node is never cancelled: A node becomes
         * head only as a result of successful acquire. A
         * cancelled thread never succeeds in acquiring, and a thread only
         * cancels itself, not any other node.
         */
        volatile Node prev;

        /**
         * Link to the successor node that the current node/thread
         * unparks upon release. Assigned during enqueuing, adjusted
         * when bypassing cancelled predecessors, and nulled out (for
         * sake of GC) when dequeued.  The enq operation does not
         * assign next field of a predecessor until after attachment,
         * so seeing a null next field does not necessarily mean that
         * node is at end of queue. However, if a next field appears
         * to be null, we can scan prev's from the tail to
         * double-check.  The next field of cancelled nodes is set to
         * point to the node itself instead of null, to make life
         * easier for isOnSyncQueue.
         */
        volatile Node next;

        /**
         * The thread that enqueued this node.  Initialized on
         * construction and nulled out after use.
         */
        volatile Thread thread;

        /**
         * Link to next node waiting on condition, or the special
         * value SHARED.  Because condition queues are accessed only
         * when holding in exclusive mode, we just need a simple
         * linked queue to hold nodes while they are waiting on
         * conditions. They are then transferred to the queue to
         * re-acquire. And because conditions can only be exclusive,
         * we save a field by using special value to indicate shared
         * mode.
         */
        Node nextWaiter;

        /**
         * Returns true if node is waiting in shared mode.
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        /**
         * Returns previous node, or throws NullPointerException if null.
         * Use when predecessor cannot be null.  The null check could
         * be elided, but is present to help the VM.
         *
         * @return the predecessor of this node
         */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

在这里插入图片描述

2、state:判断资源是否被线程占有的状态
3、条件队列:非必须,单向队列,存在条件时才需要此队列

  • AQS设计思想,基于上文的核心进行
    • 基于Node构建FIFO队列
    • state表示同步状态
    • AQS资源共享方式:独占Exclusive(排它锁模式)和共享Share(共享锁模式)注:只能实现一个
  • 1、state状态
 protected final int getState() {
        return state;
    }

    /**
     * Sets the value of synchronization state.
     * This operation has memory semantics of a {@code volatile} write.
     * @param newState the new state value
     */
    protected final void setState(int newState) {
        state = newState;
    }

    /**
     * Atomically sets synchronization state to the given updated
     * value if the current state value equals the expected value.
     * This operation has memory semantics of a {@code volatile} read
     * and write.
     *
     * @param expect the expected value
     * @param update the new value
     * @return {@code true} if successful. False return indicates that the actual
     *         value was not equal to the expected value.
     */
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

  • 2、Node常量中的含义
static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled */
    // waitStatus值为1时表示该线程节点已释放(超时、中断),已取消的节点不会再阻塞。
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
    // waitStatus为-1时表示该线程的后续线程需要阻塞,即只要前置节点释放锁,就会通知标识为 SIGNAL 状态的后续节点的线程
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
    // waitStatus为-2时,表示该线程在condition队列中阻塞(Condition有使用)
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
    // waitStatus为-3时,表示该线程以及后续线程进行无条件传播(CountDownLatch中有使用)共享模式下, PROPAGATE 状态的线程处于可运行状态
        static final int PROPAGATE = -3;

  • 3、FIFO队列
    • 当前驱节点时头节点时,被唤醒去进行同步状态的获取。当获取到时会将自己置为头节点,以便唤醒后面的值。
  • 4、条件队列
    • 除了同步队列之外,AQS中还存在Condition队列,这是一个单向队列。调用ConditionObject.await()方法,能够将当前线程封装成Node加入到Condition队列的末尾,然后将获取的同步状态释放(即修改同步状态的值,唤醒在同步队列中的线程)。

自定义同步器【以独占方式为例】

package com.xusj;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * @author xusj
 * <br>CreateDate 2022/5/31 17:55
 */
public class TestAqs {
    public static void main(String[] args) {
        MyLock myLock = new MyLock();
        new Thread(() -> {
            myLock.lock();
            try {
                // 业务
            } finally {
                //释放锁
                myLock.unlock();
            }
        }, "t1").start();


        new Thread(() -> {
            myLock.lock();
            try {
                // 业务
            } finally {
                //释放锁
                myLock.unlock();
            }
        }, "t2").start();
    }
}


/**
 * 1、定义自定义锁(不可重入)
 */
class MyLock implements Lock {
    /**
     * 2、同步器类-独占锁
     */
    class MySync extends AbstractQueuedSynchronizer {
        @Override // 尝试获得锁
        protected boolean tryAcquire(int arg) {
            // aqs中state默认是0;通过cas尝试将0改为1--原子

            if (compareAndSetState(0, 1)) {
                // 成功加锁,设置owner为当前线程
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            // 加锁失败
            return false;
        }

        @Override // 释放锁
        protected boolean tryRelease(int arg) {
            // 将state改为0;将owner改为null
            // 这两个方法的先后,和可见性,和指令重排有关--volatile
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        @Override // 是否持有独占锁
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        // 返回条件变量
        public Condition newCondition() {
            return new ConditionObject();
        }
    }

    // 初始化静态内部
    private MySync sync = new MySync();

    @Override // 加锁(不成功进入等待)
    public void lock() {
        // 3、调用同步器方法
        sync.acquire(1);

    }

    @Override // 可打断,加锁
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);

    }

    @Override // 尝试加锁(一次)
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override // 尝试加锁,带超时
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override // 解锁
    public void unlock() {
        sync.release(1);
    }

    @Override // 创建条件变量
    public Condition newCondition() {
        return sync.newCondition();
    }
}

独占下的AQS{ReentrantLock、CyclicBarrier正是基于此设计的}
  • 保证同步的过程

在这里插入图片描述

  • 1、state初始状态为0,首先调用acquire方法,当线程A进行lock时会调用tryAcquire()方法将state+1,代码如下:
// aqs中的方法
public final void acquire(int arg) {
        // state=0调用tryAcquire(arg)
        // 当不为零的时候acquireQueued(addWaiter(Node.EXCLUSIVE), arg))方法判断是否要park
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

  • 2、当state=1时,再有线程来分两种情况:
    • 2.1自己再尝试获取锁,当线程A再次来的时候会将state+1=2
    • 2.2别的线程进来,只有当state=0时才会尝试获取,调用上面的addWaiter(Node.EXCLUSIVE), arg方法,追一下源码(注意代码中的注释)
 private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            // 重点一:通过CAS将该线程加到队列尾部
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
     // 重点二,如果时空队列初始化队列,或者CAS失败时调用该方法,追enq源码
        enq(node);
        return node;
    }

追enq源码如下(代码中的注释):

// 通过自旋的方式获取 
private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                // 上面的代码时CAS到尾部,这里是头部(作为初始化使用)
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                //不断将当前节点使用CAS尾插入队列中直到成功为止
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

  • 当线程已经到队列之后,通过调用acquireQueued方法,用于获取state的状态,代码如下:
final boolean acquireQueued(final Node node, int arg) {
       
    boolean failed = true;
        try {
            // 是否被打断 
            boolean interrupted = false;
            for (;;) {
                // 获取前驱节点
                final Node p = node.predecessor();
                // 只有当当前节点是前驱节点,才会去tryAcquire
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 如果不是进入等待,并检查是否可以被中断
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

  • release释放共享资源的锁,代码如下
public final boolean release(int arg) {
    // 重点
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                // 重点
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

追一下tryRelease(主要以ReentrantReadWriteLock重写为例)

protected final boolean tryRelease(int releases) {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        //减掉相应量的资源(state-=arg)
        int nextc = getState() - releases;
        //是否完全释放资源
        boolean free = exclusiveCount(nextc) == 0;
        if (free)
            setExclusiveOwnerThread(null);
        setState(nextc);
        return free;
    }


追一下unparkSuccessor

private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            // CAS交换状态
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        // 唤醒下一个节点
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            // unpark唤醒
            LockSupport.unpark(s.thread);
    }

共享模式下的AQS(CountDownLatch、Semaphore)

例子:CountDownLatch,这个和join差不多,但是要了解一下区别
在这里插入图片描述

  • 1、主线程进来先调用await方法,进入阻塞
  • 2、其他线程开始做事
  • 3、等其他线程做完事情,主线程才会走下面的逻辑(等你原则)

共享锁的方法差不太多,自己可以追一下源码看一下,就是方法名改了一下(很多叫~Shared方法)。和独占锁的区别

  • 独占模式下的tryRelease()在完全释放掉资源(state=0)后,才会返回true去唤醒其他线程,这主要是基于独占下可重入的考量;而共享模式下的releaseShared()则没有这种要求,共享模式实质就是控制一定量的线程并发执行,那么拥有资源的线程在释放掉部分资源时就可以唤醒后继等待结点。

  • ReentrantLock原理

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/96255.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!