目录
课程内容
JDK除了提供synchronized以外,还丰富了很多JUC工具类供我们在不同的场景中使用,主要的如:ReentrantLock,实现了可重入的独占锁;Semaphore信号量;CountDownLatch闭锁
*一、ReentrantLock(可重入的独占锁)
基本介绍
ReentrantLock是一种可重入的独占锁,它允许同一个线程多次获取同一个锁而不会被阻塞。
它的功能类似于synchronized是一种互斥锁,可以保证线程安全。
特点
相对于 synchronized, ReentrantLock具备如下特点:
- 可中断
- 可以设置超时时间
- 可以切换为公平/非公平锁(默认非公平)
- 可以尝试获取锁
- 支持多个条件变量
- 与synchronized一样,都支持可重入
应用场景
它的主要应用场景是在多线程环境下对共享资源进行独占式访问,以保证数据的一致性和安全性。从这点来看,他跟synchronized的使用场景区别不是很大
ReentrantLock具体应用场景如下:
- 解决多线程竞争资源的问题,例如多个线程同时对同一个数据库进行写操作,可以使用ReentrantLock保证每次只有一个线程能够写入。
- 实现多线程任务的顺序执行,例如在一个线程执行完某个任务后,再让另一个线程执行任务。
- 实现多线程等待/通知机制,例如在某个线程执行完某个任务后,通知其他线程继续执行任务。
与synchronized的区别
常用API
void lock(); // 获取锁,调用该方法当前线程会获取锁,当锁获得后,该方法返回
void unlock(); // 释放锁
void lockInterruptibly() throws InterruptedException; // 可中断的获取锁,和lock()方法不同之处在于该方法会响应中断,即在锁的获取中可以中断当前线程
boolean tryLock(); // 尝试非阻塞的获取锁,调用该方法后立即返回。如果能够获取到返回true,否则返回false
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 超时获取锁,当前线程在以下三种情况下会被返回:
当前线程在超时时间内获取了锁
当前线程在超时时间内被中断
超时时间结束,返回false
Condition newCondition(); // 获取等待通知组件,该组件和当前的锁绑定,当前线程只有获取了锁,才能调用该组件的await()方法,而调用后,当前线程将释放锁
基本语法
//加锁 阻塞
lock.lock();
try {
...
} finally {
// 解锁
lock.unlock();
}
//尝试加锁 非阻塞
if (lock.tryLock(1, TimeUnit.SECONDS)) {
try {
...
} finally {
lock.unlock();
}
}
在使用时要注意 4 个问题:
- 默认情况下 ReentrantLock 为非公平锁而非公平锁;
- 加锁次数和释放锁次数一定要保持一致,否则会导致线程阻塞或程序异常;
- 加锁操作一定要放在 try 代码之前,这样可以避免未加锁成功又释放锁的异常;
- 释放锁一定要放在 finally 中,否则会导致线程阻塞
使用示例
1.独占锁:模拟抢票场景
场景:抢票。总共10个人,抢8张票
思考:理论上,肯定有两个人抢不到票对吧,那我们看看不加锁的代码结果会是如何
示例1:无锁版
public class ReentrantLockTest {
// 票数
public static int tickets = 8;
// 总人数
public static final int PERSONS = 10;
public static void main(String[] args) {
for (int i = 0; i < PERSONS; i++) {
new Thread(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if(tickets > 0) {
System.out.println("我是" + Thread.currentThread().getName() + ",我来抢第【" + tickets-- + "】张票");
} else {
System.out.println("我是" + Thread.currentThread().getName() + ",票卖完了我没抢到");
}
}).start();
}
// 系统输出内容:
// 我是Thread-1,我来抢第【8】张票
// 我是Thread-8,我来抢第【4】张票
// 我是Thread-2,我来抢第【8】张票
// 我是Thread-0,我来抢第【3】张票
// 我是Thread-7,我来抢第【6】张票
// 我是Thread-3,我来抢第【7】张票
// 我是Thread-9,我来抢第【8】张票
// 我是Thread-6,我来抢第【8】张票
// 我是Thread-5,我来抢第【5】张票
// 我是Thread-4,我来抢第【8】张票
}
}
大家看看最后的输出打印结果,第【8】张票同时卖给了4、6、9线程。这显然是不对的。我们这里加上ReentrantLock看看,结果会是怎样,代码示例如下:
public class ReentrantLockTest {
// 票数
public static int tickets = 8;
// 总人数
public static final int PERSONS = 10;
public static final Lock LOCK = new ReentrantLock();
public static void main(String[] args) {
for (int i = 0; i < PERSONS; i++) {
new Thread(() -> {
buyTicket();
}).start();
}
}
public static void buyTicket() {
// 获取锁
LOCK.lock();
try {
Thread.sleep(1000);
if(tickets > 0) {
System.out.println("我是" + Thread.currentThread().getName() + ",我来抢第【" + tickets-- + "】张票");
} else {
System.out.println("我是" + Thread.currentThread().getName() + ",票卖完了我没抢到");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放说
LOCK.unlock();
}
}
// 系统输出如下:
// 我是Thread-0,我来抢第【8】张票
// 我是Thread-1,我来抢第【7】张票
// 我是Thread-2,我来抢第【6】张票
// 我是Thread-3,我来抢第【5】张票
// 我是Thread-4,我来抢第【4】张票
// 我是Thread-5,我来抢第【3】张票
// 我是Thread-9,我来抢第【2】张票
// 我是Thread-7,我来抢第【1】张票
// 我是Thread-8,票卖完了我没抢到
// 我是Thread-6,票卖完了我没抢到
}
看最后的输出结果,销售正常了
2.公平锁和非公平锁
ReentantLock公平锁跟非公平锁的新建方式如下:
ReentrantLock lock = new ReentrantLock(); //参数默认false,不公平锁
ReentrantLock lock = new ReentrantLock(true); //公平锁
那什么是公平锁,什么是非公平锁?
- 公平锁:线程在获取锁时,按照等待的先后顺序获取锁;
- 非公平锁:线程在获取锁时,不按照等待的先后顺序获取锁,而是随机获取锁。ReentrantLock默认是非公平锁
总的来说,非公平锁,就是在请求获取锁的瞬间,有一次能马上获得锁的机会(插队),获得成功直接拿锁,获取不成功就排队等候。这里就不写示例了
3.可重入锁
可重入锁又名递归锁,是指同一个线程在获得锁之后,那么它可以再前一次锁了之后,重复(继续)获取这把锁(不可重入的话,就算是相同的线程来获取,也会被挡在外面)。Java中ReentrantLock和synchronized都是可重入锁,可重入锁的一个优点是可一定程度避免死锁。在实际开发中,可重入锁常常应用于递归操作、调用同一个类中的其他方法、锁嵌套等场景中。
public class ReentrantLockTest {
// 创建 ReentrantLock 对象
private final ReentrantLock lock = new ReentrantLock();
public void recursiveCall(int num) {
// 获取锁
lock.lock();
try {
if (num == 0) {
return;
}
System.out.println("执行递归,num = " + num);
// 自己调用自己
recursiveCall(num - 1);
} finally {
// 释放锁
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
// 创建计数器对象
ReentrantLockTest lockTest = new ReentrantLockTest();
// 测试递归调用
lockTest.recursiveCall(10);
}
}
4.结合Condition实现生产者消费者模式
java.util.concurrent类库中提供Condition类来实现线程之间的协调。调用Condition.await() 方法使线程等待,其他线程调用Condition.signal() 或 Condition.signalAll() 方法唤醒等待的线程。
注意:调用Condition的await()和signal()方法,都必须在lock之内(同Object的wait跟notify要在synchronized一样)
案例:基于ReentrantLock和Condition实现一个简单队列
- 场景:有一个库存容量为1的容器,提供了put()方法增加库存;take()方法减去库存。生产者先生产,库存满了之后,生产者停下,并且通知消费者来消费;消费者收到通知后,开始消费,消费完所有库存后,通知生产者接着生产
- 思路:一个Condition代表一种状态,库存有两种状态【满】、【空】
public class ReentrantLockTest {
private static StockManage stockManage = new StockManage(1);
public static void main(String[] args) {
// 启动【生产者】线程
Producer producer = new Producer();
producer.setName("生产者");
producer.start();
// 启动【消费者】线程
Consumer consumer = new Consumer();
consumer.setName("消费者");
consumer.start();
}
/**
* 消费者
*/
public static class Consumer extends Thread {
@Override
public void run() {
try {
// 隔2秒轮询消费一次
while (true) {
Thread.sleep(2000);
System.out.println("consumer消费:" + stockManage.take());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 生产者
*/
public static class Producer extends Thread {
@Override
public void run() {
try {
// 隔1秒轮询生产一次
while (true) {
Thread.sleep(1000);
stockManage.put(new Random().nextInt(1000));
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static class StockManage {
private Object[] items;
int size = 0;
int takeIndex;
int putIndex;
private ReentrantLock lock;
// 消费者线程阻塞唤醒条件,队列为空阻塞,生产者生产完唤醒
public Condition notEmpty;
// 生产者线程阻塞唤醒条件,队列满了阻塞,消费者消费完唤醒
public Condition notFull;
public StockManage(int capacity) {
this.items = new Object[capacity];
lock = new ReentrantLock();
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
/**
* 【生产者】增加库存
*/
public void put(Object value) throws Exception {
// 加锁
lock.lock();
try {
while (size == items.length) {
// 队列满了让生产者等待
notFull.await();
}
// 这里是一个典型环形数组思路
items[putIndex] = value;
if (++putIndex == items.length) {
putIndex = 0;
}
size++;
// 生产完,使用【notEmpty】条件唤醒消费者
notEmpty.signal();
} finally {
System.out.println("producer生产:" + value);
//解锁
lock.unlock();
}
}
/**
* 【消费者】拿库存
*/
public Object take() throws Exception {
lock.lock();
try {
while (size == 0) {
// 队列空了就让消费者等待
notEmpty.await();
}
Object value = items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length) {
takeIndex = 0;
}
size--;
// 消费完,使用【notFull】条件来唤醒生产者继续生产
notFull.signal();
return value;
} finally {
lock.unlock();
}
}
}
// 系统输出如下:
// producer生产:190
// consumer消费:190
// producer生产:775
// consumer消费:775
// producer生产:188
// consumer消费:188
// 剩下的打印省略....
}
*二、Semaphore(信号量)
基本介绍
Semaphore(信号量)是一种用于多线程编程的同步工具,用于控制同时访问某个资源的线程数量。它通过维护若干个许可证来控制线程对资源的访问。当许可证数量大于零时,线程可以访问;反之,则拒绝,并阻塞线程让其等待。许可证的数量,就是最多可访问线程数。
典型应用
- 各种连接池
- 服务限流
常用API
构造方法:
permits 表示许可证的数量(资源数)
fair 表示公平性,如果这个设为 true 的话,下次执行的线程会是等待最久的线程
获取、释放资源:
acquire() 表示阻塞并获取许可
tryAcquire() 方法在没有许可的情况下会立即返回 false,要获取许可的线程不会阻塞
release() 表示释放许可
使用示例
- 场景:某快家庭幸福型企业为了给予员工幸福体验,决定公司餐厅不再提供餐具,转而让员工自带餐具,并且要求自己吃完饭后洗碗。由于地方有限,修的洗碗池只能同时容纳5人洗碗。但是公司有100+人吃完饭后在等待洗碗。
- 100+人竞争,但是可供共享的资源只有5个
代码1【限流】:(幸福家庭型企业的洗碗池)
public class SemaphoreTest {
// 【洗碗池】数量
private static Semaphore washPool = new Semaphore(5);
// 【员工】数量
private static final int EMP_COUNT = 108;
public static void main(String[] args) {
for (int i = 0; i < EMP_COUNT; i++) {
Thread thread = new Thread(() -> {
try {
washPool.acquire();
System.out.println("我是【" + Thread.currentThread().getName() + "】,我占到洗碗池了");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("我是【" + Thread.currentThread().getName() + "】,我洗完啦,溜了溜了");
washPool.release();
}
}, "员工_" + i);
thread.start();
}
}
// 系统输出如下:
// 我是【员工_2】,我占到洗碗池了
// 我是【员工_3】,我占到洗碗池了
// 我是【员工_0】,我占到洗碗池了
// 我是【员工_1】,我占到洗碗池了
// 我是【员工_4】,我占到洗碗池了
// 我是【员工_1】,我洗完啦,溜了溜了
// 我是【员工_4】,我洗完啦,溜了溜了
// 我是【员工_0】,我洗完啦,溜了溜了
// 我是【员工_3】,我洗完啦,溜了溜了
// 我是【员工_2】,我洗完啦,溜了溜了
// 省略剩下的打印.................
}
示例2【资源池】:(模拟数据库连接池的简单示例)
package org.tuling.juc.sema;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
/**
* @author zhanghuitong
* @Date 2023/7/19 9:01
* @slogan 编码即学习
**/
public class SimulateThreadPoolTest {
public static void main(String[] args) {
// 初始化连接池,并且设置连接池数为:2
final ConnectPool connectPool = new ConnectPool(2);
ExecutorService executorService = Executors.newCachedThreadPool();
// 模拟5个线程来并发争抢连接资源
final int requireConnectCount = 5;
for (int i = 0; i < requireConnectCount; i++) {
final String name = "线程_" + i;
new Thread(()->{
Connect connect = null;
try {
System.out.println("线程" + Thread.currentThread().getName() + "申请获取数据库连接池");
connect = connectPool.openConnect();
System.out.println("线程" + Thread.currentThread().getName() + "成功拿到数据库连接" + connect);
// 这里大概会做一些数据库操作,接着释放
Thread.sleep(2000);
System.out.println("线程" + Thread.currentThread().getName() + "释放数据库连接" + connect);
} catch (Exception e) {
e.printStackTrace();
} finally {
connectPool.releaseConnect(connect);
}
}, name).start();
}
}
}
/**
* 连接池,管理连接的地方
*/
class ConnectPool {
/**
* 连接池大小,即最大可同时连接数量
*/
private int size;
private Connect[] connects;
/**
* 记录对应下标的Connect是否已被使用
*/
private boolean[] connectFlag;
/**
* 信号量对象
*/
private Semaphore semaphore;
/**
* size:初始化连接池大小
*/
public ConnectPool(int size) {
this.size = size;
semaphore = new Semaphore(size, true);
connects = new Connect[size];
connectFlag = new boolean[size];
initConnects();
}
private void initConnects() {
for (int i = 0; i < this.size; i++) {
connects[i] = new Connect(i);
}
}
/**
* 获取数据库连接
*/
public Connect openConnect() throws InterruptedException {
semaphore.acquire();
return doGetConnect();
}
/**
* 获取连接
*/
private synchronized Connect doGetConnect() {
for (int i = 0; i < connectFlag.length; i++) {
if (!connectFlag[i]) {
// 标记该连接已被使用
connectFlag[i] = true;
return connects[i];
}
}
return null;
}
/**
* 释放连接
*/
public synchronized void releaseConnect(Connect connect) {
if (connect == null) {
return;
}
for (int i = 0; i < this.size; i++) {
if (connect == connects[i]) {
connectFlag[i] = false;
semaphore.release();
}
}
}
}
/**
* 连接池中的连接对象
*/
class Connect {
/**
* 下面这两个变量只是用来生成id的
* 代表这个链接
*/
private int id;
public Connect(int id) {
this.id = id;
// 我们知道打开一个连接很耗费资源的,需要等待1秒钟
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("连接#" + id + "#已与数据库建立通道!");
}
@Override
public String toString() {
return "Connect{" +
"id=" + id +
'}';
}
}
// 系统输出如下:
// 连接#0#已与数据库建立通道!
// 连接#1#已与数据库建立通道!
// 线程线程_0申请获取数据库连接池
// 线程线程_4申请获取数据库连接池
// 线程线程_1申请获取数据库连接池
// 线程线程_4成功拿到数据库连接Connect{id=1}
// 线程线程_3申请获取数据库连接池
// 线程线程_0成功拿到数据库连接Connect{id=0}
// 线程线程_2申请获取数据库连接池
// 线程线程_4释放数据库连接Connect{id=1}
// 线程线程_0释放数据库连接Connect{id=0}
// 线程线程_1成功拿到数据库连接Connect{id=0}
// 线程线程_3成功拿到数据库连接Connect{id=1}
// 线程线程_3释放数据库连接Connect{id=1}
// 线程线程_1释放数据库连接Connect{id=0}
// 线程线程_2成功拿到数据库连接Connect{id=0}
// 线程线程_2释放数据库连接Connect{id=0}
使用总结
以下是一些使用Semaphore的常见场景:
- 限流:Semaphore可以用于限制对共享资源的并发访问数量,以控制系统的流量。
- 资源池:Semaphore可以用于实现资源池,以维护一组有限的共享资源。
*三、CountDownLatch(闭锁)
基本介绍
CountDownLatch(闭锁)是一个同步协助类,允许一个或多个线程等待,直到其他线程完成操作集。CountDownLatch内部维护了一个计数器,初始值为线程的数量。每有一个线程完成了任务,计数器的值减1,当计数器值为0时,代表所有线程已经完成了任务。
典型应用
LOL游戏载入中的时候,需要等到10个人都100%加载成功了,才可以进入游戏
常用API
构造方法:
count: 初始化要等待的线程数量,即:计数器数量
获取、释放资源:
await(); // 使当前线程进入同步队列进行等待,直到latch的值被减到0或者当前线程被中断,当前线程就会被唤醒
await(long timeout, TimeUnit unit); // 带超时时间的await()
countDown(); // 使latch的值减1,如果减到了0,则会唤醒所有等待在这个latch上的线程
getCount(); // 获得latch的数值
使用示例
示例1:(LOL/王者游戏加载,m个线程等待n个线程完成之后继续执行)
- 场景:所有玩家选择完英雄后,开始进入游戏加载页面。所有玩家加载完后,进入游戏,游戏聊天广播线程也开始载入
- 思路:英雄联盟加载游戏可以视为1个主线程,10个玩家为加载子线程。只有10个玩家都加载至100%后,主线程才能继续运行游戏,然后聊天广播线程也开始运行。所以
public class LOLLoadingTest {
/**
* LOL游戏总人数
*/
private static final int TOTAL_PLAYERS = 10;
/**
* 游戏主线程等待玩家加载线程
*/
private static CountDownLatch gameWaitAllPlayerLoaded = new CountDownLatch(TOTAL_PLAYERS);
public static void main(String[] args) throws InterruptedException {
System.out.println("所有玩家已经选择完毕,游戏载入中");
LOLChatBroadcast lolChatBroadcast = new LOLChatBroadcast();
lolChatBroadcast.start();
for (int i = 0; i < TOTAL_PLAYERS; i++) {
final String name = "玩家_" + (i + 1);
LOLPlayer lolPlayer = new LOLPlayer();
lolPlayer.setName(name);
lolPlayer.start();
}
gameWaitAllPlayerLoaded.await();
System.out.println("所有玩家都已载入成功");
System.out.println("欢迎来到英雄联盟!!!!!");
}
/**
* 游戏聊天广播线程
*/
public static class LOLChatBroadcast extends Thread {
@Override
public void run() {
System.out.println("我是聊天广播线程,等待游戏载入");
try {
gameWaitAllPlayerLoaded.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("游戏载入完毕,开始聊天广播");
System.out.println("请大家文明发言,建设文明游戏");
}
}
/**
* 游戏玩家加载线程
*/
public static class LOLPlayer extends Thread {
@Override
public void run() {
System.out.println("我是【" + Thread.currentThread().getName() + "】,已加载0%");
// 睡个几秒钟
try {
TimeUnit.SECONDS.sleep(new Random().nextInt(10));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("我是【" + Thread.currentThread().getName() + "】,已加载100%");
gameWaitAllPlayerLoaded.countDown();
}
}
// 系统输出:
// 所有玩家已经选择完毕,游戏载入中
// 我是聊天广播线程,等待游戏载入
// 我是【玩家_1】,已加载0%
// 我是【玩家_2】,已加载0%
// 我是【玩家_3】,已加载0%
// 我是【玩家_4】,已加载0%
// 我是【玩家_5】,已加载0%
// 我是【玩家_6】,已加载0%
// 我是【玩家_7】,已加载0%
// 我是【玩家_8】,已加载0%
// 我是【玩家_9】,已加载0%
// 我是【玩家_10】,已加载0%
// 我是【玩家_8】,已加载100%
// 我是【玩家_3】,已加载100%
// 我是【玩家_7】,已加载100%
// 我是【玩家_1】,已加载100%
// 我是【玩家_5】,已加载100%
// 我是【玩家_4】,已加载100%
// 我是【玩家_9】,已加载100%
// 我是【玩家_2】,已加载100%
// 我是【玩家_6】,已加载100%
// 我是【玩家_10】,已加载100%
// 游戏载入完毕,开始聊天广播
// 请大家文明发言,建设文明游戏
// 所有玩家都已载入成功
// 欢迎来到英雄联盟!!!!!
}
示例2:(百米赛跑)
- 场景:有8名选手参加百米赛跑,8名参赛选手,需要等待哨子声响,才可以起跑;裁判也只能等8名选手都到达终点之后才能宣布比赛结束
- 思路:8名运动员需要等1个哨声响起,才能起跑,这是第1个计数器(m个线程等待1个线程执行完才能继续执行);裁判需要等8名运动员跑完,才能结束比赛,宣布结果(1个线程等待m个线程执行完才能继续执行)
public class CountDownLatchTest {
/**
* 运动员等待裁判哨声响起,哨子只会响一次
*/
private static CountDownLatch athleteWaitWhistleBlows = new CountDownLatch(1);
/**
* 裁判等待8名远动员跑完的计数器
*/
private static CountDownLatch wait8AthletesArrival = new CountDownLatch(8);
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 8; i++) {
final String name = "运动员_" + i;
new Thread(() -> {
System.out.println("我是运动员【" + Thread.currentThread().getName() + "】,我已经做好准备了");
try {
athleteWaitWhistleBlows.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("运动员【" + Thread.currentThread().getName() + "】冲出去了!!!");
try {
int i1 = new Random().nextInt(1000);
Thread.sleep(1000 + i1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("运动员【" + Thread.currentThread().getName() + "】到达了终点!!!");
wait8AthletesArrival.countDown();
}, name).start();
}
Thread.sleep(1000);
System.out.println("运动员已经全部入场,裁判准备吹哨");
Thread.sleep(3000);
System.out.println("吹哨:哔哔哔");
athleteWaitWhistleBlows.countDown();
System.out.println("裁判在观察,等待所有运动远跑完");
wait8AthletesArrival.await();
System.out.println("比赛结束");
}
// 系统输出:
// 我是运动员【运动员_0】,我已经做好准备了
// 我是运动员【运动员_3】,我已经做好准备了
// 我是运动员【运动员_2】,我已经做好准备了
// 我是运动员【运动员_1】,我已经做好准备了
// 我是运动员【运动员_5】,我已经做好准备了
// 我是运动员【运动员_4】,我已经做好准备了
// 我是运动员【运动员_6】,我已经做好准备了
// 我是运动员【运动员_7】,我已经做好准备了
// 运动员已经全部入场,裁判准备吹哨
// 吹哨:哔哔哔
// 裁判在观察,等待所有运动远跑完
// 运动员【运动员_3】冲出去了!!!
// 运动员【运动员_2】冲出去了!!!
// 运动员【运动员_7】冲出去了!!!
// 运动员【运动员_0】冲出去了!!!
// 运动员【运动员_6】冲出去了!!!
// 运动员【运动员_4】冲出去了!!!
// 运动员【运动员_5】冲出去了!!!
// 运动员【运动员_1】冲出去了!!!
// 运动员【运动员_6】到达了终点!!!
// 运动员【运动员_4】到达了终点!!!
// 运动员【运动员_0】到达了终点!!!
// 运动员【运动员_7】到达了终点!!!
// 运动员【运动员_3】到达了终点!!!
// 运动员【运动员_5】到达了终点!!!
// 运动员【运动员_2】到达了终点!!!
// 运动员【运动员_1】到达了终点!!!
// 比赛结束
}
示例3:(多任务完成后合并汇总)
场景:很多时候,我们的并发任务,存在前后依赖关系;比如数据详情页需要同时调用多个接口获取数据,并发请求获取到数据后、需要进行结果合并;或者多个数据操作完成后,需要数据check。
public static void main(String[] args) throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(5);
for (int i = 0; i < 5; i++) {
final int index = i;
new Thread(() -> {
try {
Thread.sleep(1000 + ThreadLocalRandom.current().nextInt(2000));
System.out.println("任务" + index +"执行完成");
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
// 主线程在阻塞,当计数器为0,就唤醒主线程往下执行
countDownLatch.await();
System.out.println("主线程:在所有任务运行完成后,进行结果汇总");
}
// 系统输出:
// 任务4执行完成
// 任务2执行完成
// 任务1执行完成
// 任务0执行完成
// 任务3执行完成
// 主线程:在所有任务运行完成后,进行结果汇总
使用总结
以下是使用CountDownLatch的常见场景:
- 并行任务同步:CountDownLatch可以用于协调多个并行任务的完成情况,确保所有任务都完成后再继续执行下一步操作。
- 多任务汇总:CountDownLatch可以用于统计多个线程的完成情况,以确定所有线程都已完成工作。
- 资源初始化:CountDownLatch可以用于等待资源的初始化完成,以便在资源初始化完成后开始使用
四、CyclicBarrier(循环屏障)
基本介绍
CyclicBarrier(循环屏障),是 Java 并发库中的一个同步工具,它跟上面说到的CountDownLatch很相似,都可以使一批线程在等待之后执行。根据官方的释义,CountDownLatch: 一个或者多个线程,等待其他多个线程完成某件事情之后才能执行;CyclicBarrier : 多个线程互相等待,直到到达同一个同步点,再继续一起执行。另外,CyclicBarrier侧重于可以重复使用的栅栏,不像是CountDownLatch,是一次性的。
常用API
构造器:
public CyclicBarrier(int parties); // parties表示屏障拦截的线程数量,每个线程调用 await 方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。
…
public CyclicBarrier(int parties, Runnable barrierAction); // 用于在线程到达屏障时,优先执行 barrierAction,方便处理更复杂的业务场景(该线程的执行时机是在到达屏障之后再执行)
普通方法:
// 指定数量的线程全部调用await()方法时,这些线程不再阻塞
// BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断或者超时
public int await() throws InterruptedException, BrokenBarrierException;
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException;public void reset(); // 循环 通过reset()方法可以进行重置
使用示例
示例1:(模拟人满发车)
- 场景:这里有一辆车,司机很有个性,我人满才发车,人不满,我是不会动的
- 思路:人满才发车,发车的动作是一致的,但是人不一定是之前的人(重置发车这个操作)
public class CyclicBarrierDemo {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(5);
// 人满之后的操作
Runnable startTheCarWhileCarIsFull = new Runnable() {
@Override
public void run() {
System.out.println("人齐了是吧,那我开车啦");
}
};
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, startTheCarWhileCarIsFull);
for (int i = 0; i < 10; i++) {
final int id = i + 1;
executorService.submit(new Runnable() {
@Override
public void run() {
try {
System.out.println(id + "号马上就到");
int sleepMills = ThreadLocalRandom.current().nextInt(2000);
Thread.sleep(sleepMills);
System.out.println(id + "号到了,上车");
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
});
}
}
// 系统输出:
// 1号马上就到
// 5号马上就到
// 4号马上就到
// 3号马上就到
// 2号马上就到
// 4号到了,上车
// 1号到了,上车
// 5号到了,上车
// 2号到了,上车
// 3号到了,上车
// 人齐了,准备发车
// 6号马上就到
// 9号马上就到
// 7号马上就到
// 8号马上就到
// 10号马上就到
// 7号到了,上车
// 6号到了,上车
// 10号到了,上车
// 8号到了,上车
// 9号到了,上车
// 人齐了,准备发车
}
示例2:(多线程批量处理数据)
public class CyclicBarrierBatchProcessorDemo {
public static void main(String[] args) {
//生成数据
List<Integer> data = new ArrayList<>();
for (int i = 1; i <= 50; i++) {
data.add(i);
}
//指定数据处理大小
int batchSize = 5;
CyclicBarrierBatchProcessor processor = new CyclicBarrierBatchProcessor(data, batchSize);
//处理数据
processor.process(batchData -> {
for (Integer i : batchData) {
System.out.println(Thread.currentThread().getName() + "处理数据" + i);
}
});
}
}
class CyclicBarrierBatchProcessor {
private List<Integer> data;
private int batchSize;
private CyclicBarrier barrier;
private List<Thread> threads;
public CyclicBarrierBatchProcessor(List<Integer> data, int batchSize) {
this.data = data;
this.batchSize = batchSize;
this.barrier = new CyclicBarrier(batchSize);
this.threads = new ArrayList<>();
}
public void process(BatchTask task) {
// 对任务分批,获取线程数
int threadCount = (data.size() + batchSize - 1) / batchSize;
for (int i = 0; i < threadCount; i++) {
int start = i * batchSize;
int end = Math.min(start + batchSize, data.size());
//获取每个线程处理的任务数
List<Integer> batchData = data.subList(start, end);
Thread thread = new Thread(() -> {
task.process(batchData);
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
threads.add(thread);
thread.start();
}
}
public interface BatchTask {
void process(List<Integer> batchData);
}
// 系统输出:
// Thread-1处理数据6
// Thread-5处理数据26
// Thread-4处理数据21
// Thread-4处理数据22
// Thread-4处理数据23
// Thread-7处理数据36
// Thread-7处理数据37
// Thread-2处理数据11
// Thread-3处理数据16
// Thread-3处理数据17
// Thread-3处理数据18
// Thread-3处理数据19
// Thread-3处理数据20
// Thread-0处理数据1
// Thread-0处理数据2
// Thread-0处理数据3
// Thread-0处理数据4
// Thread-0处理数据5
// Thread-2处理数据12
// Thread-2处理数据13
// Thread-7处理数据38
// Thread-7处理数据39
// Thread-7处理数据40
// Thread-4处理数据24
// Thread-4处理数据25
// Thread-9处理数据46
// Thread-9处理数据47
// Thread-8处理数据41
// Thread-8处理数据42
// Thread-8处理数据43
// Thread-8处理数据44
// Thread-8处理数据45
// Thread-5处理数据27
// Thread-5处理数据28
// Thread-5处理数据29
// Thread-5处理数据30
// Thread-6处理数据31
// Thread-6处理数据32
// Thread-6处理数据33
// Thread-1处理数据7
// Thread-1处理数据8
// Thread-6处理数据34
// Thread-6处理数据35
// Thread-9处理数据48
// Thread-2处理数据14
// Thread-9处理数据49
// Thread-1处理数据9
// Thread-9处理数据50
// Thread-2处理数据15
// Thread-1处理数据10
}
使用场景
以下是一些常见的 CyclicBarrier 应用场景:
- 多线程任务:CyclicBarrier 可以用于将复杂的任务分配给多个线程执行,并在所有线程完成工作后触发后续操作。
- 数据处理:CyclicBarrier 可以用于协调多个线程间的数据处理,在所有线程处理完数据后触发后续操作。
五、Exchanger(数据交换器)
基本介绍
Exchanger是一个用于线程间协作的工具类,用于两个线程间交换数据。具体交换数据是通过exchange方法来实现的,如果一个线程先执行exchange方法,那么它会同步等待另一个线程也执行exchange方法,这个时候两个线程就都达到了同步点,两个线程就可以交换数据。
常用API
public V exchange(V x) throws InterruptedException; // 等待另一个线程到达此交换点(除非当前线程被中断),然后将给定的对象传送给该线程,并接收该线程的对象
…
public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException; // 等待另一个线程到达此交换点,或者当前线程被中断——抛出中断异常;又或者是等候超时——抛出超时异常,然后将给定的对象传送给该线程,并接收该线程的对象。
使用示例
示例1:(一手交钱一手交货)
- 场景:我有一台闲置的电脑,刚好有个人想买,于是我们约在了瑞幸咖啡店相约交易
- 思路:卖家拿上电脑,去约好的瑞幸咖啡店;买家也是。 这里就有一个交易地点:瑞幸咖啡店
public class ExchangerDemo {
/**
* 交易地点:瑞幸咖啡店
*/
private static Exchanger luckinCoffeeShop = new Exchanger();
static String goods = "电脑";
static String money = "¥3000";
public static void main(String[] args) throws InterruptedException {
System.out.println("准备交易,一手交钱一手交货...");
// 卖家
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("卖家到瑞幸咖啡店了,已经准备好货:" + goods);
try {
String money = (String) luckinCoffeeShop.exchange(goods);
System.out.println("卖家收到钱:" + money);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
Thread.sleep(3000);
// 买家
new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("买家到瑞幸咖啡店了,已经准备好钱:" + money);
String goods = (String) luckinCoffeeShop.exchange(money);
System.out.println("买家收到货:" + goods);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
// 系统输出:
// 准备交易,一手交钱一手交货...
// 卖家到瑞幸咖啡店了,已经准备好货:电脑
// 买家到瑞幸咖啡店了,已经准备好钱:¥3000
// 买家收到货:电脑
// 卖家收到钱:¥3000
}
示例2:(模拟对账场景)
public class ExchangerDemo2 {
private static final Exchanger<String> exchanger = new Exchanger();
private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
String A = "12379871924sfkhfksdhfks";
exchanger.exchange(A);
} catch (InterruptedException e) {
}
}
});
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
String B = "32423423jknjkfsbfj";
String A = exchanger.exchange(B);
System.out.println("A和B数据是否一致:" + A.equals(B));
System.out.println("A= "+A);
System.out.println("B= "+B);
} catch (InterruptedException e) {
}
}
});
threadPool.shutdown();
}
// 系统输出:
// A和B数据是否一致:false
// A= 12379871924sfkhfksdhfks
// B= 32423423jknjkfsbfj
}
从上面两个例子可以看出,使用上并不困难,重点是要确认交换器必须是同一个。
使用总结
Exchanger 可以用于各种应用场景,具体取决于具体的 Exchanger 实现。常见的场景包括:
- 数据交换:在多线程环境中,两个线程可以通过 Exchanger 进行数据交换。
- 数据采集:在数据采集系统中,可以使用 Exchanger 在采集线程和处理线程间进行数据交换。
六、Phaser(阶段协同器)
基本介绍
Phaser(阶段协同器)是一个Java实现的并发工具类,用于协调多个线程的执行。Phaser可以被视为CyclicBarrier和CountDownLatch的进化版。
对Phaser阶段协同器的理解,Phaser适用于多个线程协作的任务,分为多个阶段,每个阶段都可以有任意个参与者,线程可以随时注册并参与某个阶段;当一个阶段中所有任务都成功完成后,Phaser的onAdvance()被调用,然后Phaser释放等待线程,自动进入下个阶段。如此循环,直到Phaser不再包含任何参与者。
它提供了一些方便的方法来管理多个阶段的执行,可以让程序员灵活地控制线程的执行顺序和阶段性的执行。,它能够自适应地调整并发线程数,可以动态地增加或减少参与线程的数量。所以Phaser特别适合使用在重复执行或者重用的情况。
上面的两张图其实表达的意思是一样的,那就是表现出了多屏障,并且线程数量在不同阶段的动态修改性。另外,相对于CyclicBarrier和CountDownLatch,它还具有如下特性:
特性一:CountDownLatch、CyclicBarrier只适用于固定数量的参与者,而Phaser适用于可变数目的屏障
特性二:Phaser可能是分层的,这允许你以树形结构来安排移相器Phaser以减少竞争。
特性三:Phaser使用独立的对象可以监视Phaser的当前状态,监视器可以查询注册到Phaser的参与者的数量,以及已经到达和还没有到达某个特定相数的参与者的数量。
常用API
构造方法:
Phaser(); // 参与任务数0
Phaser(int parties); // 指定初始参与任务数,即线程数
Phaser(Phaser parent); // 指定parent阶段器, 子对象作为一个整体加入parent对象, 当子对象中没有参与者时,会自动从parent对象解除注册
Phaser(Phaser parent,int parties); // 集合上面两个方法
增、减参与任务数方法:
int register(); // 增加一个任务数,返回当前阶段号。
int bulkRegister(int parties); // 增加指定任务个数,返回当前阶段号。
int arriveAndDeregister(); // 减少一个任务数,返回当前阶段号。
到达、等待方法:
int arrive(); // 到达(任务完成),返回当前阶段号。
int arriveAndAwaitAdvance(); // 到达后等待其他任务到达,返回到达阶段号。
int awaitAdvance(int phase); // 在指定阶段等待(必须是当前阶段才有效)
int awaitAdvanceInterruptibly(int phase); // 阶段到达触发动作
int awaitAdvanceInterruptiBly(int phase,long timeout,TimeUnit unit)
protected boolean onAdvance(int phase,int registeredParties); // 类似CyclicBarrier的触发命令,通过重写该方法来增加阶段到达动作,该方法返回true将终结Phaser对象。
注意,上面说的parties或者任务数,就是指参加的线程数
使用示例
示例1:(代替CountDownLatch,实现LOL加载游戏)
代码示例如下:
public class PhaserLOLTest {
/**
* LOL游戏总人数
*/
private static final int TOTAL_PLAYERS = 10;
/**
* 游戏主线程等待玩家加载线程
*/
private static Phaser gameWaitAllPlayerLoaded = new Phaser(TOTAL_PLAYERS);
public static void main(String[] args) throws InterruptedException {
System.out.println("所有玩家已经选择完毕,游戏载入中");
LOLChatBroadcast lolChatBroadcast = new LOLChatBroadcast();
lolChatBroadcast.start();
for (int i = 0; i < TOTAL_PLAYERS; i++) {
final String name = "玩家_" + (i + 1);
LOLPlayer lolPlayer = new LOLPlayer();
lolPlayer.setName(name);
lolPlayer.start();
}
gameWaitAllPlayerLoaded.awaitAdvance(gameWaitAllPlayerLoaded.getPhase());
System.out.println("所有玩家都已载入成功");
System.out.println("欢迎来到英雄联盟!!!!!");
}
/**
* 游戏聊天广播线程
*/
public static class LOLChatBroadcast extends Thread {
@Override
public void run() {
System.out.println("我是聊天广播线程,等待游戏载入");
gameWaitAllPlayerLoaded.awaitAdvance(gameWaitAllPlayerLoaded.getPhase());
System.out.println("游戏载入完毕,开始聊天广播");
System.out.println("请大家文明发言,建设文明游戏");
}
}
/**
* 游戏玩家加载线程
*/
public static class LOLPlayer extends Thread {
@Override
public void run() {
System.out.println("我是【" + Thread.currentThread().getName() + "】,已加载0%");
// 睡个几秒钟
try {
TimeUnit.SECONDS.sleep(new Random().nextInt(10));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("我是【" + Thread.currentThread().getName() + "】,已加载100%");
gameWaitAllPlayerLoaded.arrive();
}
}
}
示例2:(用Phaser模拟百米赛跑)
- 场景:有8名选手参加百米赛跑。本次赛跑分为3个阶段。阶段1运动员热完身后在起跑线等待裁判号令枪一响就冲出去;阶段2运动员开始跑步;阶段3运动员们等待裁判宣布结果退场
- 思路:因为有8名远动员参加,所以需要设置Phaser的初始化parties为8;然后有3个阶段,所以运动员会使用3次Phaser的等待方法
public class PhaserRunningRaceTest {
private static Phaser runningRacePhaser = new Phaser(8);
public static void main(String[] args) {
for (int i = 0; i < 8; i++) {
final String name = "运动员_" + i;
Athlet athlet = new Athlet();
athlet.setName(name);
athlet.start();
}
}
public static class Athlet extends Thread {
@Override
public void run() {
// 阶段1:等待裁判吹哨出发
System.out.println("【" + Thread.currentThread().getName() + "】已经站在起跑线上准备起跑了,只要枪声一响就开跑。这是第【" + runningRacePhaser.getPhase() + "】阶段");
runningRacePhaser.arriveAndAwaitAdvance();
// 阶段2:开炮
System.out.println("【" + Thread.currentThread().getName() + "】听到枪声响了,冲啊!!!!!这是第【" + runningRacePhaser.getPhase() + "】阶段");
int runningTime = new Random().nextInt(5);
try {
TimeUnit.SECONDS.sleep(runningTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("【" + Thread.currentThread().getName() + "】冲过了终点。这是第【" + runningRacePhaser.getPhase() + "】阶段");
runningRacePhaser.arriveAndAwaitAdvance();
// 阶段3:冲过终点,等待裁判宣布比赛结束,退场
System.out.println("裁判宣布比赛结束了,退场,溜了溜了。这是第【" + runningRacePhaser.getPhase() + "】阶段");
}
}
// 系统输出:
// 【运动员_0】已经站在起跑线上准备起跑了,只要枪声一响就开跑。这是第【0】阶段
// 【运动员_2】已经站在起跑线上准备起跑了,只要枪声一响就开跑。这是第【0】阶段
// 【运动员_1】已经站在起跑线上准备起跑了,只要枪声一响就开跑。这是第【0】阶段
// 【运动员_3】已经站在起跑线上准备起跑了,只要枪声一响就开跑。这是第【0】阶段
// 【运动员_4】已经站在起跑线上准备起跑了,只要枪声一响就开跑。这是第【0】阶段
// 【运动员_5】已经站在起跑线上准备起跑了,只要枪声一响就开跑。这是第【0】阶段
// 【运动员_6】已经站在起跑线上准备起跑了,只要枪声一响就开跑。这是第【0】阶段
// 【运动员_7】已经站在起跑线上准备起跑了,只要枪声一响就开跑。这是第【0】阶段
// 【运动员_3】听到枪声响了,冲啊!!!!!这是第【1】阶段
// 【运动员_2】听到枪声响了,冲啊!!!!!这是第【1】阶段
// 【运动员_7】听到枪声响了,冲啊!!!!!这是第【1】阶段
// 【运动员_6】听到枪声响了,冲啊!!!!!这是第【1】阶段
// 【运动员_5】听到枪声响了,冲啊!!!!!这是第【1】阶段
// 【运动员_0】听到枪声响了,冲啊!!!!!这是第【1】阶段
// 【运动员_1】听到枪声响了,冲啊!!!!!这是第【1】阶段
// 【运动员_4】听到枪声响了,冲啊!!!!!这是第【1】阶段
// 【运动员_6】冲过了终点。这是第【1】阶段
// 【运动员_2】冲过了终点。这是第【1】阶段
// 【运动员_0】冲过了终点。这是第【1】阶段
// 【运动员_1】冲过了终点。这是第【1】阶段
// 【运动员_5】冲过了终点。这是第【1】阶段
// 【运动员_3】冲过了终点。这是第【1】阶段
// 【运动员_4】冲过了终点。这是第【1】阶段
// 【运动员_7】冲过了终点。这是第【1】阶段
// 裁判宣布比赛结束了,退场,溜了溜了。这是第【2】阶段
// 裁判宣布比赛结束了,退场,溜了溜了。这是第【2】阶段
// 裁判宣布比赛结束了,退场,溜了溜了。这是第【2】阶段
// 裁判宣布比赛结束了,退场,溜了溜了。这是第【2】阶段
// 裁判宣布比赛结束了,退场,溜了溜了。这是第【2】阶段
// 裁判宣布比赛结束了,退场,溜了溜了。这是第【2】阶段
// 裁判宣布比赛结束了,退场,溜了溜了。这是第【2】阶段
// 裁判宣布比赛结束了,退场,溜了溜了。这是第【2】阶段
}
示例3:(模拟百米赛跑,但是运动员有几率受伤退场。动态修改任务数)
代码示例如下:
public class PhaserRunningRaceTest {
private static Phaser runningRacePhaser = new Phaser(8);
public static void main(String[] args) {
for (int i = 0; i < 8; i++) {
final String name = "运动员_" + i;
Athlet athlet = new Athlet();
athlet.setName(name);
athlet.start();
}
}
public static class Athlet extends Thread {
@Override
public void run() {
// 阶段1:等待裁判吹哨出发
System.out.println("【" + Thread.currentThread().getName() + "】已经站在起跑线上准备起跑了,只要枪声一响就开跑。这是第【" + runningRacePhaser.getPhase() + "】阶段");
runningRacePhaser.arriveAndAwaitAdvance();
// 阶段2:开炮
System.out.println("【" + Thread.currentThread().getName() + "】听到枪声响了,冲啊!!!!!这是第【" + runningRacePhaser.getPhase() + "】阶段");
int runningTime = new Random().nextInt(5);
try {
if (runningTime <= 2) {
TimeUnit.SECONDS.sleep(runningTime);
runningRacePhaser.arriveAndDeregister();
System.out.println("【" + Thread.currentThread().getName() + "】摔了个狗吃屎只能退场了!!!这是第【" + runningRacePhaser.getPhase() + "】阶段");
return;
} else {
TimeUnit.SECONDS.sleep(runningTime);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("【" + Thread.currentThread().getName() + "】冲过了终点。这是第【" + runningRacePhaser.getPhase() + "】阶段");
runningRacePhaser.arriveAndAwaitAdvance();
// 阶段3:冲过终点,等待裁判宣布比赛结束,退场
System.out.println("【" + Thread.currentThread().getName() + "】听到裁判宣布比赛结束了,退场,溜了溜了。这是第【" + runningRacePhaser.getPhase() + "】阶段");
}
}
// 系统输出:
// 【运动员_0】已经站在起跑线上准备起跑了,只要枪声一响就开跑。这是第【0】阶段
// 【运动员_4】已经站在起跑线上准备起跑了,只要枪声一响就开跑。这是第【0】阶段
// 【运动员_3】已经站在起跑线上准备起跑了,只要枪声一响就开跑。这是第【0】阶段
// 【运动员_2】已经站在起跑线上准备起跑了,只要枪声一响就开跑。这是第【0】阶段
// 【运动员_7】已经站在起跑线上准备起跑了,只要枪声一响就开跑。这是第【0】阶段
// 【运动员_1】已经站在起跑线上准备起跑了,只要枪声一响就开跑。这是第【0】阶段
// 【运动员_6】已经站在起跑线上准备起跑了,只要枪声一响就开跑。这是第【0】阶段
// 【运动员_5】已经站在起跑线上准备起跑了,只要枪声一响就开跑。这是第【0】阶段
// 【运动员_5】听到枪声响了,冲啊!!!!!这是第【1】阶段
// 【运动员_4】听到枪声响了,冲啊!!!!!这是第【1】阶段
// 【运动员_0】听到枪声响了,冲啊!!!!!这是第【1】阶段
// 【运动员_7】听到枪声响了,冲啊!!!!!这是第【1】阶段
// 【运动员_6】听到枪声响了,冲啊!!!!!这是第【1】阶段
// 【运动员_1】听到枪声响了,冲啊!!!!!这是第【1】阶段
// 【运动员_2】听到枪声响了,冲啊!!!!!这是第【1】阶段
// 【运动员_3】听到枪声响了,冲啊!!!!!这是第【1】阶段
// 【运动员_2】摔了个狗吃屎只能退场了!!!这是第【1】阶段
// 【运动员_6】摔了个狗吃屎只能退场了!!!这是第【1】阶段
// 【运动员_7】冲过了终点。这是第【1】阶段
// 【运动员_4】冲过了终点。这是第【1】阶段
// 【运动员_1】冲过了终点。这是第【1】阶段
// 【运动员_0】冲过了终点。这是第【1】阶段
// 【运动员_5】冲过了终点。这是第【1】阶段
// 【运动员_3】冲过了终点。这是第【1】阶段
// 【运动员_0】听到裁判宣布比赛结束了,退场,溜了溜了。这是第【2】阶段
// 【运动员_4】听到裁判宣布比赛结束了,退场,溜了溜了。这是第【2】阶段
// 【运动员_3】听到裁判宣布比赛结束了,退场,溜了溜了。这是第【2】阶段
// 【运动员_7】听到裁判宣布比赛结束了,退场,溜了溜了。这是第【2】阶段
// 【运动员_1】听到裁判宣布比赛结束了,退场,溜了溜了。这是第【2】阶段
// 【运动员_5】听到裁判宣布比赛结束了,退场,溜了溜了。这是第【2】阶段
}
示例4:(比较复杂的版本,公司团建。阶段性任务;动态修改参与任务数parities)
- 场景:某公司举行了一个小团建,分为4个阶段。阶段1来公司集合;阶段2出发去公园;阶段3去餐厅;阶段4吃饭。由于有人不爱运动,所以,本次活动全程参与的人只有3人;有2个员工今晚有约了,不吃饭,所以只参加前面两个阶段;另外有4个人,不喜欢运动,所以只参加后面的2个阶段
- 思路:每个阶段参与人数不一样,所以不初始化parites了,而是动态添加。注册3个任务,执行全部阶段的;注册2个,只参加前面2个阶段的;注册4个,只参加后面2个阶段的
package org.example.lock;
import java.util.Random;
import java.util.concurrent.Phaser;
public class PhaserDemo {
public static void main(String[] args) {
final Phaser phaser = new Phaser() {
//重写该方法来增加阶段到达动作
@Override
protected boolean onAdvance(int phase, int registeredParties) {
// 参与者数量,去除主线程
int staffs = registeredParties - 1;
switch (phase) {
case 0:
System.out.println("大家都到公司了,出发去公园,人数:" + staffs);
break;
case 1:
System.out.println("大家都到公园门口了,出发去餐厅,人数:" + staffs);
break;
case 2:
System.out.println("大家都到餐厅了,开始用餐,人数:" + staffs);
break;
}
// 判断是否只剩下主线程(一个参与者),如果是,则返回true,代表终止
return registeredParties == 1;
}
};
// 注册主线程 ———— 让主线程全程参与
phaser.register();
final StaffTask staffTask = new StaffTask();
// 3个全程参与团建的员工
for (int i = 0; i < 3; i++) {
// 添加任务数
phaser.register();
new Thread(() -> {
try {
staffTask.step1Task();
//到达后等待其他任务到达
phaser.arriveAndAwaitAdvance();
staffTask.step2Task();
phaser.arriveAndAwaitAdvance();
staffTask.step3Task();
phaser.arriveAndAwaitAdvance();
staffTask.step4Task();
// 完成了,注销离开
phaser.arriveAndDeregister();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
// 两个不聚餐的员工加入
for (int i = 0; i < 2; i++) {
phaser.register();
new Thread(() -> {
try {
staffTask.step1Task();
phaser.arriveAndAwaitAdvance();
staffTask.step2Task();
System.out.println("员工【" + Thread.currentThread().getName() + "】回家了");
// 完成了,注销离开
phaser.arriveAndDeregister();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
while (!phaser.isTerminated()) {
int phase = phaser.arriveAndAwaitAdvance();
if (phase == 2) {
// 到了去餐厅的阶段,又新增4人,参加晚上的聚餐
for (int i = 0; i < 4; i++) {
phaser.register();
new Thread(() -> {
try {
staffTask.step3Task();
phaser.arriveAndAwaitAdvance();
staffTask.step4Task();
// 完成了,注销离开
phaser.arriveAndDeregister();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
}
}
static final Random random = new Random();
static class StaffTask {
public void step1Task() throws InterruptedException {
// 第一阶段:来公司集合
String staff = "员工【" + Thread.currentThread().getName() + "】";
System.out.println(staff + "从家出发了……");
Thread.sleep(random.nextInt(5000));
System.out.println(staff + "到达公司");
}
public void step2Task() throws InterruptedException {
// 第二阶段:出发去公园
String staff = "员工【" + Thread.currentThread().getName() + "】";
System.out.println(staff + "出发去公园玩");
Thread.sleep(random.nextInt(5000));
System.out.println(staff + "到达公园门口集合");
}
public void step3Task() throws InterruptedException {
// 第三阶段:去餐厅
String staff = "员工【" + Thread.currentThread().getName() + "】";
System.out.println(staff + "出发去餐厅");
Thread.sleep(random.nextInt(5000));
System.out.println(staff + "到达餐厅");
}
public void step4Task() throws InterruptedException {
// 第四阶段:就餐
String staff = "员工【" + Thread.currentThread().getName() + "】";
System.out.println(staff + "开始用餐");
Thread.sleep(random.nextInt(5000));
System.out.println(staff + "用餐结束,回家");
}
}
}
使用总结
以下是一些常见的 Phaser 应用场景:
- 多线程任务分配:Phaser 可以用于将复杂的任务分配给多个线程执行,并协调线程间的合作。
- 多级任务流程:Phaser 可以用于实现多级任务流程,在每一级任务完成后触发下一级任务的开始。
- 模拟并行计算:Phaser 可以用于模拟并行计算,协调多个线程间的工作。
- 阶段性任务:Phaser 可以用于实现阶段性任务,在每一阶段任务完成后触发下一阶段任务的开始。
学习总结
- 学习了ReentrantLock及其特性,还有它与synchronized的区别。如:前者可以切换公平锁,可以设置超时时间,另外还提供尝试获取锁的机制
- 学习了Semaphore信号量,在服务限量跟池化技术上应用
- 学习了CountDownLatch闭锁,用来在并行任务同步或者多任务汇总的场景下
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/180533.html