一文掌握CyclicBarrier实现原理

简介

CyclicBarrier(循环屏障)是一个同步工具类,通过CyclicBarrier可以实现一组线程等待至某个屏障点之后再全部同时执行(即:线程之间相互等待)。

CyclicBarrier通过一个计数器来实现,初始化时,计数器是参与同步处理的线程数量,当某个线程达到屏障时,调用await()方法将计数器的值-1,同时检查计数器的值是否为0:

  • 如果计数器的值为0,表示所有线程均已达到屏障,则唤醒条件队列中所有等待的线程继续执行后续处理。
  • 如果计数器的值不为0,则处于条件队列中的线程继续等待,直到最后一个线程到达屏障。

注意:CyclicBarrier可以设置回调处理对象和被重置,这是CyclicBarrier和CountDownLatch的主要区别。

应用场景

CyclicBarrier是一种非常实用的并发控制工具,它的主要应用场景:

  • CyclicBarrier无法阻塞主线程,不适合在需要同步返回的接口中使用;CountDownLatch可以阻塞主线程,适用于需要同步返回的接口。
  • CyclicBarrier适用于异步任务,尤其适合需要对各子线程的执行结果做聚合计算的场景。

实现原理

成员变量

CyclicBarrier的成员变量:

  • parties:屏障拦截的线程数量(即:参与者数量)。当调用CyclicBarrier构造方法创建实例时,parties的值被初始化。当一个线程调用await()方法,parties的值-1,当parties的值递减为0时,表示所有的线程都已经到达屏障。
  • barrierCommand:回调处理对象(可选)。当所有线程都到达屏障时,首先会执行回调处理对象(一个实现Runnable接口的线程)的run()方法。回调处理对象在调用CyclicBarrier构造方法创建实例时设置,如果未设置该对象,则所有线程到达屏障时不做任务处理。
  • generation:CyclicBarrier周期。当所有的线程都到达屏障时,会创建一个新的Generation对象重新开始新的CyclicBarrier周期。
  • count:计数器。初始值为参数parties设置的值。当一个线程到达屏障时,count值-1;当count值递减到0时,打开屏障并唤醒所有调用await()方法进入阻塞的线程。
  • lock:ReentrantLock对象(即:可重入锁)。用于保护屏障的状态,防止多个线程同时修改屏障的状态。
  • trip:条件队列。当一个线程到达屏障时,如果屏障未打开,则该线程进入条件队列中等待;如果屏障打开,则唤醒条件队列中所有等待的线程。

构造函数

CyclicBarrier的构造函数:

/**
 * 构造函数
 * 参数:
 *   parties-表示屏障拦截的线程数量
 */

public CyclicBarrier(int parties) {
    // 调用原始构造函数
    this(parties, null);
}
/**
 * 原始构造函数
 * 参数:
 *   parties-表示屏障拦截的线程数量
 *   barrierAction-回调处理对象(一个Runnable线程),当所有线程到达屏障后,执行回调处理
 */

public CyclicBarrier(int parties, Runnable barrierAction) {
    // 如果parties的值小于等于0,则抛出IllegalArgumentException异常
    if (parties <= 0throw new IllegalArgumentException();
    // 屏障拦截的线程数量
    this.parties = parties;
    // 初始化计数器的值为屏障拦截的线程数
    this.count = parties;
    // 回调处理对象
    this.barrierCommand = barrierAction;
}

主要方法

CyclicBarrier的主要方法:

// 线程执行完业务处理后,调用此方法进入阻塞,等待其他线程处理完成
public int await() throws InterruptedException, BrokenBarrierException {...}
/** 
 * 线程执行完业务处理后,调用此方法进入阻塞,等待其他线程处理完成或者等待超时
 * 参数:
 *   timeout-线程等待超时时间,如果线程等待时间超过timeout设定的值,则抛出TimeoutException异常
 *   unit-超时时间单位
 */

public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
           BrokenBarrierException,
           TimeoutException 
{...}
// 等待线程数量
public int getNumberWaiting() {...}
// 获取屏障拦截的线程数量
public int getParties() {...}
/**
 * 查询屏障是否被打破
 * 打破屏障可能的条件:
 *   1)超时或者屏障被重置(reset)
 *   2)某个屏障操作抛出异常
 */

public boolean isBroken() {...}
// 重置屏障的初始值,如果调用了该函数,则等待中的线程将会抛出BrokenBarrierException异常
public void reset() {...}

Generation内部类

Generation是CyclicBarrier的一个静态内部类,表示CyclicBarrier的一个周期。

Generation中只有一个broken变量,该变量用来追踪当前周期(Generation)是否被打破。如果一个线程在等待其它线程到达屏障时被中断,或者等待超时,则CyclicBarrier在当前周期内被打破(broken为true)。后续任何到达屏障的线程或者当前仍在等待的线程都将收到BrokenBarrierException异常。

当所有线程到达屏障时,会重新创建一个新的Generation,并将其赋值给CyclicBarrier的generation变量。这是CyclicBarrier能够循环使用的关键。

await方法

CyclicBarrier中的线程业务处理完成后,通过调用CyclicBarrier#await方法进入阻塞,等待其他线程业务处理完成。

CyclicBarrier#await方法处理流程,如图所示:

一文掌握CyclicBarrier实现原理
CyclicBarrier#await方法处理流程

CyclicBarrier#await方法源码解析:

// java.util.concurrent.CyclicBarrier#await()
// 线程执行完业务处理后,调用此方法进入阻塞
public int await() throws InterruptedException, BrokenBarrierException {
    try {
        // 调用dowait()方法阻塞当前线程
        return dowait(false0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

// java.util.concurrent.CyclicBarrier#dowait
// 阻塞当前线程
private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException 
{
    // 获取ReentrantLock对象
    final ReentrantLock lock = this.lock;
    // 获取独占锁
    lock.lock();
    try {
        // Generation是CyclicBarrier的一个静态内部类,代表CyclicBarrier的一个周期
        final Generation g = generation;
        // 如果屏障被打破,则抛出BrokenBarrierException异常
        if (g.broken)
            throw new BrokenBarrierException();
        // 如果线程被中断,则抛出InterruptedException异常
        if (Thread.interrupted()) {
            /**
             * 打破屏障:
             *   1)标识屏障被打破
             *   2)重置计数器
             *   3)唤醒所有被阻塞的线程
             */

            breakBarrier();
            // 抛出InterruptedException异常
            throw new InterruptedException();
        }
        // 屏障未被打破且线程未被中断,则计数器的值-1
        int index = --count;
        // 判断计数器的值是否为0
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                // 获取回调处理对象
                final Runnable command = barrierCommand;
                // 回调处理对象不为空,则执行回调处理
                if (command != null)
                    // 执行回调处理
                    command.run();
                ranAction = true;
                /** 
                 * 开启下一个CyclicBarrier周期:
                 *   1)唤醒条件队列中所有等待的线程
                 *   2)重置计数器
                 *   3)重新创建Generation(即:开启下一个周期)
                 */

                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    // 打破屏障
                    breakBarrier();
            }
        }
        
        // loop until tripped, broken, interrupted, or timed out
        // 计数器的值不为0,则通过自旋的方式进行判断线程是否等待超时、屏障是否被打破、线程是否被中断等
        for (;;) {
            try {
                // 未设置超时时间,则按条件阻塞当前线程
                if (!timed)
                    // 按条件阻塞当前线程
                    trip.await();
                // 线程设置了超时时间且等待未超时,则当前线程阻塞指定时长  
                else if (nanos > 0L)
                    // 当前线程阻塞指定时长  
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {  // 线程中断
                // 屏障未被打破
                if (g == generation && ! g.broken) {
                    // 打破屏障
                    breakBarrier();
                    // 抛出InterruptedException异常
                    throw ie;
                } else {
                    // 重置中断标识,因为抛出InterruptedException异常后,中断标识会被清除
                    Thread.currentThread().interrupt();
                }
            }
            // 打破屏障,则抛出BrokenBarrierException异常
            if (g.broken)
                throw new BrokenBarrierException();
            // CyclicBarrier周期不匹配,则返回计数器的值
            if (g != generation)
                return index;
            // 线程设置了超时时间且超时等待时长小于0,则抛出TimeoutException异常
            if (timed && nanos <= 0L) {
                // 打破屏障
                breakBarrier();
                // 抛出TimeoutException异常
                throw new TimeoutException();
            }
        }
    } finally {
        // 释放独占锁
        lock.unlock();
    }
}

CyclicBarrier与CountDownLatch的区别

CyclicBarrier与CountDownLatch的主要区别:

  • 1)CountDownLatch是一个或多个线程,等待另外N个线程完成之后才能执行;CyclicBarrier是N个线程之间相互等待,任何一个线程完成之前,所有的线程都必须等待。
  • 2)CountDownLatch不可重用;CyclicBarrier可以重复使用。
  • 3)CountDownLatch基于AQS实现;CyclicBarrier基于ReentrantLock和Condition实现。
  • 4)CountDownLatch不能设置回调处理对象;CyclicBarrier可以设置回调处理对象。

使用示例

/**
 * CyclicBarrier使用示例
 */

@Slf4j
public class CyclicBarrierTest {
    
    
    @SneakyThrows
    public static void main(String[] args) {
        // 定义固定大小的线程池
        ExecutorService pool = Executors.newFixedThreadPool(5);
        log.info("5位同学相约去酒店聚餐!");
        // 初始化CyclicBarrier(线程数为3),同时设置一个回调函数,当所有线程到达屏障时,调用回调函数进行处理
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5new Runnable() {
            @Override
            public void run() {
                log.info("所有同学均到达酒店,开始吃饭!");
            }
        });
        for (int i = 0; i < 5; i++) {
            int number = i;
            pool.execute(new Runnable() {
                @SneakyThrows
                @Override
                public void run() {
                    log.info("同学-{}开始出发去酒店...", number);
                    Thread.sleep(number * 1000 + 1000);
                    log.info("同学-{}已经到达酒店", number);
                    cyclicBarrier.await();
                }
            });
        }
        Thread.sleep(3500);
        log.info("目前到达酒店的人数:{}", cyclicBarrier.getNumberWaiting());
        /**
         * 重置CyclicBarrier
         * 如果此时存在等待中的线程,则等待中的线程会抛出BrokenBarrierException异常
         * 同时,所有线程执行完成后,不会执行回调处理
         */

        //cyclicBarrier.reset();
    }
}

执行结果

21:08:42.964 [main] INFO org.dromara.web.Test -- 5位同学相约去酒店聚餐!
21:08:42.974 [pool-1-thread-5] INFO org.dromara.web.Test -- 同学-4开始出发去酒店...
21:08:42.974 [pool-1-thread-4] INFO org.dromara.web.Test -- 同学-3开始出发去酒店...
21:08:42.974 [pool-1-thread-1] INFO org.dromara.web.Test -- 同学-0开始出发去酒店...
21:08:42.974 [pool-1-thread-2] INFO org.dromara.web.Test -- 同学-1开始出发去酒店...
21:08:42.974 [pool-1-thread-3] INFO org.dromara.web.Test -- 同学-2开始出发去酒店...
21:08:43.993 [pool-1-thread-1] INFO org.dromara.web.Test -- 同学-0已经到达酒店
21:08:44.991 [pool-1-thread-2] INFO org.dromara.web.Test -- 同学-1已经到达酒店
21:08:45.983 [pool-1-thread-3] INFO org.dromara.web.Test -- 同学-2已经到达酒店
21:08:46.484 [main] INFO org.dromara.web.Test -- 目前到达酒店的人数:3
21:08:46.988 [pool-1-thread-4] INFO org.dromara.web.Test -- 同学-3已经到达酒店
21:08:47.987 [pool-1-thread-5] INFO org.dromara.web.Test -- 同学-4已经到达酒店
21:08:47.987 [pool-1-thread-5] INFO org.dromara.web.Test -- 所有同学均到达酒店,开始吃饭!
一文掌握CyclicBarrier实现原理


原文始发于微信公众号(小小开发者):一文掌握CyclicBarrier实现原理

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/255198.html

(0)
李, 若俞的头像李, 若俞

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!