背景
-
性能比对
-
源码分析
-
核心属性
-
`Celll`类
-
add
-
参考
背景
在看本篇文章,希望你已经对多线程和原子类有了一定的了解。说说我们在多线程对一些变量做自增同时要保证线程安全,我们最容易使用高性能的原子类来处理,因为他们是采用cas保证线程安全的,一般情况下比重量级加锁性能更佳,所以能用原子类的地方我们尽力不使用加锁。但是有没有比原子类更快线程安全的累加计数器呢?答案是有的,那就jdk1.8新引入juc包中的Longadder
性能比对
我们先来基于Longadder
和AtomicLong
性能作一个比较。这里我们让5个线程分别去去循环自增1000、10000、100000、10000000次看看效率
首先是使用AtomicLong
private static final AtomicLong atomicLong = new AtomicLong(0);
public static void main(String[] args) throws Exception {
int count = 10;
int task = 10000000;
ExecutorService executorService = Executors.newFixedThreadPool(count);
CountDownLatch countDownLatch = new CountDownLatch(count * task);
System.out.println("执行任务开始");
long start = System.currentTimeMillis();
for (int i = 0; i < 10; i++) {
executorService.execute(() -> {
for (int j = 0; j < task; j++) {
atomicLong.incrementAndGet();
countDownLatch.countDown();
}
});
}
countDownLatch.await();
long end = System.currentTimeMillis();
long time = end - start;
System.out.println("AtomicLong耗时:" + time + ",最后结果值:" + atomicLong.get());
executorService.shutdown();
}
然后我们来看看基于Longadder
的累加
private static final LongAdder longAdder = new LongAdder();
public static void main(String[] args) throws Exception {
int count = 10;
int task = 10000000;
ExecutorService executorService = Executors.newFixedThreadPool(10);
CountDownLatch countDownLatch = new CountDownLatch(count * task);
System.out.println("执行任务开始");
long start = System.currentTimeMillis();
for (int i = 0; i < count; i++) {
executorService.execute(() -> {
for (int j = 0; j < task; j++) {
longAdder.add(1);
countDownLatch.countDown();
}
});
}
countDownLatch.await();
long end = System.currentTimeMillis();
long time = end - start;
System.out.println("longAdder耗时:" + time + ",最后结果值:" + longAdder.longValue());
executorService.shutdown();
}
这里需要注意的是要让每个线程去执行固定的任务数量,而不是直接把所有任务放在丢入线程池直接执行,不然会发现可能存在的问题是,某些线程执行很快一直抢到任务,没有太大的线程竞争压力,导致效果测试不出来。因为在线程竞争越激烈的条件下,Longadder性能提升越明显
测试机器:
类型 | 线程数 * 每个线程的任务 | |
---|---|---|
10*1000 | 10*10000 | |
AtomicLong | ||
Longadder |
实验结果:
类型 | |||||
---|---|---|---|---|---|
线程数 * 每个线程的任务 | 10*1000 | 10*10000 | 10*100000 | 10*1000000 | 10*10000000 |
耗时(AtomicLong) | 15 | 94 | 280 | 2209 | 30571 |
耗时(Longadder) | 23 | 97 | 218 | 1656 | 19539 |
可以看到随着线程任务的增多,Longadder
性能越来越好。所以从这里可以看多,在任务量大,并发高的情况下Longadder
性能明显是更好的。
源码分析
这里我们来分析Longadder
的源码实现,因为在很多高性能队列、并发量,都有参考Longadder
的实现,比如ConcurrentHashMap
Disruptor
等。
所以在研究这些高级并发类前,我们先来研究研究Longadder
是如何解决并发冲突问题保证如此高的性能
首先需要知道的是AtomicLong
是使用内部变量value保存着实际的long值,然后所有的操作都是对这个long进行cas。所以并发越激烈,CAS的失败概率越高,从而进入恶性CAS空自旋影响性能。
要解决这个问题就要解决多个线程同时对一个long对象进行cas。所以Longadder
对此做了改进,L将value值分散到一个数组中,不同线程会命中到数组的不同槽(元素)中,各个线程只对自己槽中的那个值进行CAS操作。
这样冲突的概率就小很多,然后获取值的时候只需要加所有数组的值累加即可,这样就解决了AtomicLong
中恶意自旋的问题。Longadder
解决方式就是典型的==空间换时间== 需要注意的是在源码中Longadder
的重要属性都是继承于Striped64
其中 Striped64
的
核心属性
static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
* Table of cells. When non-null, size is a power of 2.
* 存放Cell的hash表,大小为2的幂
*/
transient volatile Cell[] cells;
/**
* Base value, used mainly when there is no contention, but also as
* a fallback during table initialization races. Updated via CAS.
* 基础值,在没有竞争时会更新这个值,在有竞争cells没有初始化时,也会cas操作base
*/
transient volatile long base;
/**
* Spinlock (locked via CAS) used when resizing and/or creating Cells.
* 自旋锁,通过cas加锁,0 表示cells没有处于创建、扩容阶段
* 1 表示正在创建或者扩容cells,不能进行cells新元素设置操作
*/
transient volatile int cellsBusy;
然后下面看看
Celll
类
可以看到Striped64
中的Cell
使用了@jdk.internal.vm.annotation.Contended
注解解决伪共享问题(不懂可以看这里)
然后我们在分析一下LongAdder
几个核心方法
add
Cell[] cs; long b, v; int m; Cell c;
if ((cs = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
//表示 cells没有初始化
if (cs == null || (m = cs.length - 1) < 0 ||
(c = cs[getProbe() & m]) == null ||
!(uncontended = c.cas(v = c.value, v + x)))
longAccumulate(x, null, uncontended);
}
我们来一行一行分析 首先是
(cs = cells) != null
cells数组不为null,说明存在竞争;在不存在竞争的时候,cells数组一定为null,一旦对base的cas操作失败,才会初始化cells数组
!casBase(b = base, b + x)
如果cells数组为null,表示之前不存在竞争,并且此次casBase执行成功,表示基于base成员累加成功,add方法直接返回;如果casBase方法执行失败,说明产生了第一次多线程冲突,需要对cells数组初始化,此时即将进入内层if块。
然后我们看看下面的判断条件
cs == null || (m = cs.length - 1) < 0
这里表示cells没有初始化
(c = cs[getProbe() & m]) == null
表示前线程的hash值在cells数组映射位置的Cell对象为空,意思是还没有其他线程在同一个位置做过累加操作
(uncontended = c.cas(v = c.value, v + x))
指当前线程的哈希值在cells数组映射位置的Cell对象不为空,然后在该Cell对象上进行CAS操作,设置其值为v+x(x为该Cell需要累加的值),但是CAS操作失败,表示存在竞争。这里三个判断条件任意一个成立,就调用longAccumulate()
方法
longAccumulate()
是Striped64
中最重要的方法,它实现了不同的线程更新各自Cell中的值,实现逻辑类似于分段锁,具体源码如下:
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
// 存储线程的probe值
int h;
// 如果getProbe()方法返回0,说明随机数未初始化
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
// 是否可以扩容
boolean collide = false; // True if last slot nonempty
// 自旋,一直到操作成功
done: for (;;) {
// cs cells的引用
Cell[] cs;
// 当前线程中命中的 cell
Cell c;
// cells 数组长度
int n;
//期望值
long v;
// cells已经初始化过
if ((cs = cells) != null && (n = cs.length) > 0) {
//当前线程所在的Cell未初始化
if ((c = cs[(n - 1) & h]) == null) {
// 当前无其它线程在创建或扩容cells,也没有线程在创建Cell
if (cellsBusy == 0) { // Try to attach new Cell
// 新建一个Cell,值为当前需要增加的值
Cell r = new Cell(x); // Optimistically create
// 再次检测cellsBusy,并尝试更新它为1(cas保证线程安全) 相当于当前线程加锁
if (cellsBusy == 0 && casCellsBusy()) {
try { // Recheck under lock
Cell[] rs; int m, j;
// 重新获取cells,并找到当前线程hash到cells数组中的位置
// 这里一定要重新获取cells,因为as并不在锁定范围内
// 有可能已经扩容了,这里要重新获取
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
// 把上面新建的Cell放在cells的j位置处
rs[j] = r;
break done;
}
} finally {
cellsBusy = 0;
}
continue; // Slot is now non-empty
}
}
collide = false;
}
//当前线程所在的Cell不为空,且更新失败了
// 设置 wasUncontended 为true 表示已经进行过了自旋
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
// 再次尝试CAS更新当前线程所在Cell的值,如果成功了就返回
else if (c.cas(v = c.value,
(fn == null) ? v + x : fn.applyAsLong(v, x)))
break;
// 如果cells数组的长度达到了CPU核心数,或者cells发生了扩容
else if (n >= NCPU || cells != cs)
collide = false; // At max size or stale
//设置扩容意向,但不是真的扩容
else if (!collide)
collide = true;
// 正真的扩容逻辑
else if (cellsBusy == 0 && casCellsBusy()) {
try {
//双重检查,是否已有其他线程扩容
if (cells == cs) // Expand table unless stale
// 新数组扩容为原数组的两倍,并把旧数组拷贝到新数组
cells = Arrays.copyOf(cs, n << 1);
} finally {
// 释放锁
cellsBusy = 0;
}
// 标记当前未出现冲突
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
// cells未初始化,加锁
else if (cellsBusy == 0 && cells == cs && casCellsBusy()) {
try { // Initialize table
// 是否被其他线程初始化了,类似双重检查的单利
if (cells == cs) {
//新建一个大小为2的Cell数组
Cell[] rs = new Cell[2];
//找到当前线程hash到数组中的位置并创建其对应的Cell
rs[h & 1] = new Cell(x);
//赋值给cells数组
cells = rs;
// 退出循环
break done;
}
} finally {
// 类似最终释放锁
cellsBusy = 0;
}
}
// Fall back on using base
// 如果有其它线程在初始化cells数组中,就尝试更新base
// 如果成功了就返回
else if (casBase(v = base,
(fn == null) ? v + x : fn.applyAsLong(v, x)))
break done;
}
}
这段代码的逻辑有点长,需要耐心慢慢看完。大量运用了类似双重简单的方式,下面对上面流程作了一个总结
-
如果cells数组未初始化,当前线程会尝试占有cellsBusy锁并创建cells数组 -
如果当前线程尝试创建cells数组时,发现有其它线程已经在创建了,就尝试更新base,如果成功就返回 -
如果cells数组已经初始化了,就通过线程的probe值找到当前线程所在cells数组的位置 -
如果当前线程所在的Cell未初始化,就占有占有cellsBusy锁并在相应的位置创建一个Cell -
尝试CAS更新当前线程所在的Cell,如果成功就返回,如果失败说明出现冲突,修改wasUncontended值自旋 -
当前线程更新Cell失败后并不是立即扩容,而是尝试更新probe值后再重试一次 -
如果在重试的时候还是更新失败,就扩容 -
扩容时当前线程占有cellsBusy锁,并把数组容量扩大到两倍,再迁移原cells数组中元素到新数组中
整体流程还是比较复杂的,推荐大家多看几遍源码,自己整理一份流程图供自己理解,我这里就先不整理了
至此Longadder
的核心源码就分析完了。Longadder
向我们演示了什么是空间换时间,后续的一些优秀的高性能队列比如Disruptor
也是参照了Longadder
的实现。
如有刊误,欢迎指出留言,期待一起进步
参考
博客book:《java高并发编程》
原文始发于微信公众号(小奏技术):只会原子类AtomicLong不懂Longadder?我怀疑你多线程计数没我快
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/30410.html