码农在囧途
任何一次的失败都不无意义,几十万年形成的钟乳石没有一滴水是白落下的,漫长历史长河中的任何一次革新都是无数次的失败与改进,只有无数次的摧残和打磨才会铸就一个人的坚韧品格如越王勾践剑历经千年仍然锋利又或如子美诗篇至今让人荡气回肠!
前言
在开发过程中我们常常遇到需要对多个任务进行汇总,比如报表,或者大屏显示,需要将所有接口的数据都获取到后再进行汇总,如果使用同步的方式,那么会比较耗时,体验不好,所以我们使用多线程,但是使用多线程只能异步的执行,有些接口响应比较快,有些比较慢,而返回结果之间又有依赖,这样就无法汇总了,所以我们引入了CountDownLatch
,它能让所有子线程全部执行完毕后主线程才会往下执行,如果子线程没有执行完毕 ,那么主线程将无法继续向下执行。
我们需要对三个接口的返回结果进行求和。
模拟三个接口
public static Integer getOne(){
return 1;
}
public static Integer getTwo(){
return 2;
}
public static Integer getThree(){
return 3;
}
我们创建一个线程池和CountDownLatch,CountDownLatch构造函数参数我们传3,表示计数器为3
static ExecutorService executorService = Executors.newCachedThreadPool();
/**
* CountDownLatch(3) , 构造函数参数为3,
*/
static volatile CountDownLatch countDownLatch = new CountDownLatch(3);
main函数,我们将三个任务加入线程池中,并且调用了countDownLatch.countDown(),调用此方法后计数器减1,因为一开始我们设置的 计数器为3,而三个线程执行后,每个-1,此时计算器变为0,这时候主线程的await才会返回,主线程才会向下执行,如果我们将计算器设置为 10,三个线程-3,此时计算器为7,那么await将会一直阻塞,主线程则无法向下执行,所以一定要让计算器为0后才会向下执行,
public static void main(String[] args) throws InterruptedException, ExecutionException {
Future<Integer> futureOne = executorService.submit(() -> {
System.out.println(Thread.currentThread().getName()+" is over");
Thread.sleep(2000);
countDownLatch.countDown();
return getOne();
});
Future<Integer> futureTwo = executorService.submit(() -> {
System.out.println(Thread.currentThread().getName()+" is over");
Thread.sleep(2000);
//计数器-1
countDownLatch.countDown();
return getTwo();
});
Future<Integer> futureThree = executorService.submit(() -> {
System.out.println(Thread.currentThread().getName()+" is over");
Thread.sleep(2000);
countDownLatch.countDown();
System.out.println("count3 "+countDownLatch.getCount());
return getThree();
});
//阻塞,等到计数器为0蔡往下执行
countDownLatch.await();
System.out.println("count "+countDownLatch.getCount());
System.out.println("child thread over , main thread start");
Integer value1 = futureOne.get();
Integer value2 = futureTwo.get();
Integer value3 = futureThree.get();
int total = value1 + value2 + value3;
System.out.println("total "+total);
}
原理
构造函数对AQS的成员变量state赋值
CountDownLatch
是基于AQS
来实现的,关于AQS的介绍,前面我们已经说了,有兴趣可以去了解,那么在AQS中我们说了最关键的一个变量state
,在CountDownLatch中state就是一个计数器,它的所用就是计数,下面我们来深度解剖一下。
首先从CountDownLatch的构造函数函数开始,构造函数首先判断count,count就是我们初始化计数器的指,这个值会赋给AQS的state。
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
public class CountDownLatch {
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
}
}
进入AbstractQueuedSynchronizer内部,我们发现最终会将count赋给state,所以,CountDownLatch的构造函数不能为空。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
private volatile int state;
protected final void setState(int newState) {
state = newState;
}
}
使用await()等待线程执行任务
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
tryAcquireShared(arg)
的作用就是判断AQS状态值state
,如果state
= 0 , 返回1,否则返回-1,如果返回1,则证明线程对资源的操作已经结束,如果为-1,则证明对资源的操作 尚未结束,此时就会调用doAcquireSharedInterruptibly(arg)
方法。
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
调用doAcquireSharedInterruptibly(arg)
会通过addWaiter(Node.SHARED)
将当前线程封装成SHARED
类型的Node,然后插入AQS队列的尾部,然后调用 parkAndCheckInterrupt()
使用LockSupport.park(this)
将当前线程挂起。
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
使用countDown()唤醒线程,对计数器进行减操作
使用countDown()
后会进入AQS内部的releaseShared()
方法中,AQS会尝试释放共享资源。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
在tryReleaseShared(arg)
中如果state
等于0,那么计数器就已经被减完了,代表资源已经被释放了,所以直接返回,如果大于0,则递减state,并使用CAS
修改state的值,然后继续执行doReleaseShared()
。
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
doReleaseShared()会调用unparkSuccessor(h)
来唤醒线程(LockSupport.unpark(s.thread)),因为传递过去的参数h = head , 所以可知唤醒的线程是队头节点的线程。
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
我们粗略地分析了源码,其实最关键的就是对state的操作,每个线程在使用countDown()的时候都会对state执行递减操作,直到state为0,代表共享资源 操作完毕,任务执行完毕。
今天的分享就到这里,感谢你的观看,我们下期见
原文始发于微信公众号(刘牌):浅谈Java并发编程CountDownLatch
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/150761.html