简介
CyclicBarrier(循环屏障)是一个同步工具类,通过CyclicBarrier可以实现一组线程等待至某个屏障点之后再全部同时执行(即:线程之间相互等待)。
CyclicBarrier通过一个计数器来实现,初始化时,计数器是参与同步处理的线程数量,当某个线程达到屏障时,调用await()方法将计数器的值-1,同时检查计数器的值是否为0:
-
如果计数器的值为0,表示所有线程均已达到屏障,则唤醒条件队列中所有等待的线程继续执行后续处理。 -
如果计数器的值不为0,则处于条件队列中的线程继续等待,直到最后一个线程到达屏障。
注意:CyclicBarrier可以设置回调处理对象和被重置,这是CyclicBarrier和CountDownLatch的主要区别。
应用场景
CyclicBarrier是一种非常实用的并发控制工具,它的主要应用场景:
-
CyclicBarrier无法阻塞主线程,不适合在需要同步返回的接口中使用;CountDownLatch可以阻塞主线程,适用于需要同步返回的接口。 -
CyclicBarrier适用于异步任务,尤其适合需要对各子线程的执行结果做聚合计算的场景。
实现原理
成员变量
CyclicBarrier的成员变量:
-
parties:屏障拦截的线程数量(即:参与者数量)。当调用CyclicBarrier构造方法创建实例时,parties的值被初始化。当一个线程调用await()方法,parties的值-1,当parties的值递减为0时,表示所有的线程都已经到达屏障。 -
barrierCommand:回调处理对象(可选)。当所有线程都到达屏障时,首先会执行回调处理对象(一个实现Runnable接口的线程)的run()方法。回调处理对象在调用CyclicBarrier构造方法创建实例时设置,如果未设置该对象,则所有线程到达屏障时不做任务处理。 -
generation:CyclicBarrier周期。当所有的线程都到达屏障时,会创建一个新的Generation对象重新开始新的CyclicBarrier周期。 -
count:计数器。初始值为参数parties设置的值。当一个线程到达屏障时,count值-1;当count值递减到0时,打开屏障并唤醒所有调用await()方法进入阻塞的线程。 -
lock:ReentrantLock对象(即:可重入锁)。用于保护屏障的状态,防止多个线程同时修改屏障的状态。 -
trip:条件队列。当一个线程到达屏障时,如果屏障未打开,则该线程进入条件队列中等待;如果屏障打开,则唤醒条件队列中所有等待的线程。
构造函数
CyclicBarrier的构造函数:
/**
* 构造函数
* 参数:
* parties-表示屏障拦截的线程数量
*/
public CyclicBarrier(int parties) {
// 调用原始构造函数
this(parties, null);
}
/**
* 原始构造函数
* 参数:
* parties-表示屏障拦截的线程数量
* barrierAction-回调处理对象(一个Runnable线程),当所有线程到达屏障后,执行回调处理
*/
public CyclicBarrier(int parties, Runnable barrierAction) {
// 如果parties的值小于等于0,则抛出IllegalArgumentException异常
if (parties <= 0) throw new IllegalArgumentException();
// 屏障拦截的线程数量
this.parties = parties;
// 初始化计数器的值为屏障拦截的线程数
this.count = parties;
// 回调处理对象
this.barrierCommand = barrierAction;
}
主要方法
CyclicBarrier的主要方法:
// 线程执行完业务处理后,调用此方法进入阻塞,等待其他线程处理完成
public int await() throws InterruptedException, BrokenBarrierException {...}
/**
* 线程执行完业务处理后,调用此方法进入阻塞,等待其他线程处理完成或者等待超时
* 参数:
* timeout-线程等待超时时间,如果线程等待时间超过timeout设定的值,则抛出TimeoutException异常
* unit-超时时间单位
*/
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {...}
// 等待线程数量
public int getNumberWaiting() {...}
// 获取屏障拦截的线程数量
public int getParties() {...}
/**
* 查询屏障是否被打破
* 打破屏障可能的条件:
* 1)超时或者屏障被重置(reset)
* 2)某个屏障操作抛出异常
*/
public boolean isBroken() {...}
// 重置屏障的初始值,如果调用了该函数,则等待中的线程将会抛出BrokenBarrierException异常
public void reset() {...}
Generation内部类
Generation是CyclicBarrier的一个静态内部类,表示CyclicBarrier的一个周期。
Generation中只有一个broken变量,该变量用来追踪当前周期(Generation)是否被打破。如果一个线程在等待其它线程到达屏障时被中断,或者等待超时,则CyclicBarrier在当前周期内被打破(broken为true)。后续任何到达屏障的线程或者当前仍在等待的线程都将收到BrokenBarrierException异常。
当所有线程到达屏障时,会重新创建一个新的Generation,并将其赋值给CyclicBarrier的generation变量。这是CyclicBarrier能够循环使用的关键。
await方法
CyclicBarrier中的线程业务处理完成后,通过调用CyclicBarrier#await方法进入阻塞,等待其他线程业务处理完成。
CyclicBarrier#await方法处理流程,如图所示:

CyclicBarrier#await方法源码解析:
// java.util.concurrent.CyclicBarrier#await()
// 线程执行完业务处理后,调用此方法进入阻塞
public int await() throws InterruptedException, BrokenBarrierException {
try {
// 调用dowait()方法阻塞当前线程
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
// java.util.concurrent.CyclicBarrier#dowait
// 阻塞当前线程
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// 获取ReentrantLock对象
final ReentrantLock lock = this.lock;
// 获取独占锁
lock.lock();
try {
// Generation是CyclicBarrier的一个静态内部类,代表CyclicBarrier的一个周期
final Generation g = generation;
// 如果屏障被打破,则抛出BrokenBarrierException异常
if (g.broken)
throw new BrokenBarrierException();
// 如果线程被中断,则抛出InterruptedException异常
if (Thread.interrupted()) {
/**
* 打破屏障:
* 1)标识屏障被打破
* 2)重置计数器
* 3)唤醒所有被阻塞的线程
*/
breakBarrier();
// 抛出InterruptedException异常
throw new InterruptedException();
}
// 屏障未被打破且线程未被中断,则计数器的值-1
int index = --count;
// 判断计数器的值是否为0
if (index == 0) { // tripped
boolean ranAction = false;
try {
// 获取回调处理对象
final Runnable command = barrierCommand;
// 回调处理对象不为空,则执行回调处理
if (command != null)
// 执行回调处理
command.run();
ranAction = true;
/**
* 开启下一个CyclicBarrier周期:
* 1)唤醒条件队列中所有等待的线程
* 2)重置计数器
* 3)重新创建Generation(即:开启下一个周期)
*/
nextGeneration();
return 0;
} finally {
if (!ranAction)
// 打破屏障
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
// 计数器的值不为0,则通过自旋的方式进行判断线程是否等待超时、屏障是否被打破、线程是否被中断等
for (;;) {
try {
// 未设置超时时间,则按条件阻塞当前线程
if (!timed)
// 按条件阻塞当前线程
trip.await();
// 线程设置了超时时间且等待未超时,则当前线程阻塞指定时长
else if (nanos > 0L)
// 当前线程阻塞指定时长
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) { // 线程中断
// 屏障未被打破
if (g == generation && ! g.broken) {
// 打破屏障
breakBarrier();
// 抛出InterruptedException异常
throw ie;
} else {
// 重置中断标识,因为抛出InterruptedException异常后,中断标识会被清除
Thread.currentThread().interrupt();
}
}
// 打破屏障,则抛出BrokenBarrierException异常
if (g.broken)
throw new BrokenBarrierException();
// CyclicBarrier周期不匹配,则返回计数器的值
if (g != generation)
return index;
// 线程设置了超时时间且超时等待时长小于0,则抛出TimeoutException异常
if (timed && nanos <= 0L) {
// 打破屏障
breakBarrier();
// 抛出TimeoutException异常
throw new TimeoutException();
}
}
} finally {
// 释放独占锁
lock.unlock();
}
}
CyclicBarrier与CountDownLatch的区别
CyclicBarrier与CountDownLatch的主要区别:
-
1)CountDownLatch是一个或多个线程,等待另外N个线程完成之后才能执行;CyclicBarrier是N个线程之间相互等待,任何一个线程完成之前,所有的线程都必须等待。 -
2)CountDownLatch不可重用;CyclicBarrier可以重复使用。 -
3)CountDownLatch基于AQS实现;CyclicBarrier基于ReentrantLock和Condition实现。 -
4)CountDownLatch不能设置回调处理对象;CyclicBarrier可以设置回调处理对象。
使用示例
/**
* CyclicBarrier使用示例
*/
@Slf4j
public class CyclicBarrierTest {
@SneakyThrows
public static void main(String[] args) {
// 定义固定大小的线程池
ExecutorService pool = Executors.newFixedThreadPool(5);
log.info("5位同学相约去酒店聚餐!");
// 初始化CyclicBarrier(线程数为3),同时设置一个回调函数,当所有线程到达屏障时,调用回调函数进行处理
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
@Override
public void run() {
log.info("所有同学均到达酒店,开始吃饭!");
}
});
for (int i = 0; i < 5; i++) {
int number = i;
pool.execute(new Runnable() {
@SneakyThrows
@Override
public void run() {
log.info("同学-{}开始出发去酒店...", number);
Thread.sleep(number * 1000 + 1000);
log.info("同学-{}已经到达酒店", number);
cyclicBarrier.await();
}
});
}
Thread.sleep(3500);
log.info("目前到达酒店的人数:{}", cyclicBarrier.getNumberWaiting());
/**
* 重置CyclicBarrier
* 如果此时存在等待中的线程,则等待中的线程会抛出BrokenBarrierException异常
* 同时,所有线程执行完成后,不会执行回调处理
*/
//cyclicBarrier.reset();
}
}
执行结果
21:08:42.964 [main] INFO org.dromara.web.Test -- 5位同学相约去酒店聚餐!
21:08:42.974 [pool-1-thread-5] INFO org.dromara.web.Test -- 同学-4开始出发去酒店...
21:08:42.974 [pool-1-thread-4] INFO org.dromara.web.Test -- 同学-3开始出发去酒店...
21:08:42.974 [pool-1-thread-1] INFO org.dromara.web.Test -- 同学-0开始出发去酒店...
21:08:42.974 [pool-1-thread-2] INFO org.dromara.web.Test -- 同学-1开始出发去酒店...
21:08:42.974 [pool-1-thread-3] INFO org.dromara.web.Test -- 同学-2开始出发去酒店...
21:08:43.993 [pool-1-thread-1] INFO org.dromara.web.Test -- 同学-0已经到达酒店
21:08:44.991 [pool-1-thread-2] INFO org.dromara.web.Test -- 同学-1已经到达酒店
21:08:45.983 [pool-1-thread-3] INFO org.dromara.web.Test -- 同学-2已经到达酒店
21:08:46.484 [main] INFO org.dromara.web.Test -- 目前到达酒店的人数:3
21:08:46.988 [pool-1-thread-4] INFO org.dromara.web.Test -- 同学-3已经到达酒店
21:08:47.987 [pool-1-thread-5] INFO org.dromara.web.Test -- 同学-4已经到达酒店
21:08:47.987 [pool-1-thread-5] INFO org.dromara.web.Test -- 所有同学均到达酒店,开始吃饭!

原文始发于微信公众号(小小开发者):一文掌握CyclicBarrier实现原理
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/255198.html