大家好,我是阿轩。
今天我们来剖析一下LongAdder的源码。
说到LongAdder
,可能有些小伙伴会比较陌生,毕竟日常工作中用的真的很少。
但是,不能说用的很少就不用去学习研究他了,我们看源码,一方面是为了更加深刻的了解他,避免实际工作中使用到他时踩坑,更重要的是学习他的设计思想和理念
。
前言
前面我们学习了AQS
的CLH
队列机制。
学习了ConcurentHashMap
的分治思想
。
学习了StampedLock
通过巧妙的设计,如何让一个long型变量即表示出了读写锁的状态
,又表示出了读写锁的数量
。
没看过或者忘记的小伙伴可以去回顾一下,ConcurrentHashMap那些不为人知的细节,AQS那些不为人知的细节,StampedLock那些不为人知的细节。
今天我们将要学习的LongAdder
,主要是用于在高并发时,对某个数字型变量进行统计
的场景。
比如,博客平台的某篇文章,他的点赞数,在不使用任何第三方中间件只使用JDK的条件下,你会怎么实现呢?
通常的做法是定义一个int
或者long
型变量,每次在修改他的时候加一个锁
,不管是synchronized锁
或者AQS锁
都可以。
或者是使用一个原子类
,比如AtomicInteger
。
但是这2种方案,虽然计算的结果是准确的,但是在高并发的场景下,显然性能
是不能满足要求的。
这个时候LongAdder
就闪亮登场了。
其实LongAdder的设计思想
之前在讲ConcurrentHashMap的时候也提到过,2个字,分治
。
下面我们就一起来看看,他的庐山真面目。
LongAdder类概览
首先我们看下LongAdder
类的整体结构

比较简单,一个构造函数。
add
方法,也是最核心的方法,待会细讲。
increment
方法和decrement
方法都是调用的add
方法

sum方法
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
就是简单的对数组进行求和
。
reset
方法,将数组初始化
public void reset() {
Cell[] as = cells; Cell a;
base = 0L;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
a.value = 0L;
}
}
}
sumThenReset
方法,先求和再初始化。
剩下的longValue
之类的4个方法都是调用的sum
方法

看到这里,小伙伴们可能会有疑问,他的具体实现写哪了,没看到呀?

LongAdder继承了一个叫Striped64
的类,具体实现都在Striped64
类里,待会我们会看到。
下面我们就看下最核心的方法,add
方法
add
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
add方法里引用到了2个变量,cells和base
,这2个变量是Striped64
类里的。
/**
* Table of cells. When non-null, size is a power of 2.
*/
transient volatile Cell[] cells;
/**
* Base value, used mainly when there is no contention, but also as
* a fallback during table initialization races. Updated via CAS.
*/
transient volatile long base;
一个是Cell
类型的数组,一个是long
型变量。
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
Cell
是一个内部类
,里面只有一个long
型变量和一个cas
方法,其实,说白了,就是对一个long型变量做了层包装
。
这2个变量就是实现LongAdder机制
的基石。他将我们传统的通过对一个变量
进行累加来统计数据拆分
成了对多个变量
进行累加,最后将所有变量的值求和
获得最终的统计数据。
base
就是基础变量,当没有并发或者并发不高时,只通过对base
累加就可以了,当高并发时,对base变量的累加就会出现竞争
,导致累加效率低下,这个时候,就会把并发的流量分散
到cell数组
的不同元素上,对cell数组
的不同元素进行累加,大大提高效率,最后将base
变量和数组所有元素值进行求和
就是我们要的最终统计数据。
下面我们详细的看下add
方法。
首先判断cell数组
是否为空,为空就表示还没有初始化
,说明并发压力不大,这个时候尝试cas
对base
变量进行累加,如果成功就直接结束add
方法了,失败说明开始有并发压力了,需要初始化
cell数组了。
接着是4个或运算
相连的条件判断,前2个条件判断cell数组
是否初始化,第三个条件判断该请求线程定位到的数组下标位是否为空,第四个条件尝试cas
对线程定位到的数组下标位cell元素进行累加。
这里看起来是4个简单的条件判断,实则已经发生了2次逻辑短路
。
首先判断cell数组
是否为空,不为空才会执行第三个条件,判断该线程定位到的数组下标位元素是否为空。如果也不为空,会尝试cas
累加cell
元素值。
表面上看起来是一个if判断,实则是3个if判断。
巧妙的使用或运算
可以大大缩减代码量,学到了吗

这里注意一下,cas
失败才会进入方法体,同时将uncontended
变量设置为false
,后面会用到。
接着进入真正的实现方法longAccumulate
longAccumulate
这个方法有点长,我们分2部分看
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
第一部分,这里是获取线程的一个类似于hash
的值,看过HashMap
源码的小伙伴应该知道,HashMap
的底层数据结构是数组加链表
,那怎么定位到key
落在哪个索引位上呢,就是通过key的hash
值来定位,这里,同理,怎样定位到线程去累加哪个cell
元素值呢?
在线程Thread
类里有这么一个属性threadLocalRandomProbe
/** Probe hash value; nonzero if threadLocalRandomSeed initialized */
@sun.misc.Contended("tlr")
int threadLocalRandomProbe;
看注释也能看出来,这个值相当于线程的一个hash
值,通过getProbe
方法获取
static final int getProbe() {
return UNSAFE.getInt(Thread.currentThread(), PROBE);
}
private static final long PROBE;
static {
try {
Class<?> tk = Thread.class;
PROBE = UNSAFE.objectFieldOffset
(tk.getDeclaredField("threadLocalRandomProbe"));
} catch (Exception e) {
throw new Error(e);
}
}
threadLocalRandomProbe
变量初始的时候是0,需要调用ThreadLocalRandom.current
方法初始化
public static ThreadLocalRandom current() {
if (UNSAFE.getInt(Thread.currentThread(), PROBE) == 0)
localInit();
return instance;
}
最终调用localInit
方法
static final void localInit() {
int p = probeGenerator.addAndGet(PROBE_INCREMENT);
int probe = (p == 0) ? 1 : p; // skip 0
long seed = mix64(seeder.getAndAdd(SEEDER_INCREMENT));
Thread t = Thread.currentThread();
UNSAFE.putLong(t, SEED, seed);
UNSAFE.putInt(t, PROBE, probe);
}
可以看到probe
的值是由probeGenerator
这个变量产生的
private static final AtomicInteger probeGenerator = new AtomicInteger();
这是一个静态
的原子整数类
private static final int PROBE_INCREMENT = 0x9e3779b9;
PROBE_INCREMENT
是一个常数,所以,每次线程调用初始化方法时,都会在之前的基础上再加一下PROBE_INCREMENT
,从而使线程的probe
值一直在变化。
接下来就会进入一个for死循环
,里面分3大块
-
if ((as = cells) != null && (n = as.length) > 0) -
else if (cellsBusy == 0 && cells == as && casCellsBusy()) -
else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
第一块,cell
数组不为空,第二块,为空,尝试初始化
数组,这里会存在并发竞争,所以通过cellsBusy
变量来进行加锁
transient volatile int cellsBusy;
加锁的方法是casCellsBusy
final boolean casCellsBusy() {
return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
}
哪个线程cas
设置成功了,哪个线程就相当于抢到了锁,然后开始初始化cell
数组。
最后一块,cell
数组为空,线程又没有抢到锁进行初始化,那该干什么呢,总不能干等着吧,所以,会再尝试cas
对base
变量进行累加,万一成功了呢,是不是!
如果还是失败,那只能进入下一次循环了。
第二块和第三块很简单,我们重点看第一块。

①如果线程定位到的数组元素为空,那么就要进行初始化
,而初始化是存在并发竞争
的,所以需要加锁
,也就是通过对cellsBusy
变量进行cas
设值,成功了才能开始初始化
。
看到这里,有的小伙伴可能会想,如果某一时刻,2个线程定位到的cell
数组索引位不同,但是都是空,那么此时这2个线程进行初始化
其实是互不干扰
的,但是却要竞争同一个锁
,只能有一个线程初始化成功。
可不可以给cell
数组的每个索引位都设置一个锁呢,哪个索引位要初始化了就抢对应的锁
?
可以当然也是可以的,但是一同带来的问题就是cell
数组有多大,就要创建多少个cellsBusy
变量,而且数组会一直的动态扩容
,cellsBusy
变量的创建也是个问题,而且,初始化只会进行一次,这点性能的损失是微乎其微
的。
记住,没有十全十美的方案,任何一种方案都是一种权衡
。
这里定义了一个collide
变量,直译过来就是碰撞
的意思,也就是有没有发生hash碰撞
,如果定位到的数组元素为null
,就说明没有发生hash碰撞
,需要将collide
变量重置为false
。
②,wasUncontended
这个变量是从方法参数传进来的,还记得在外层add
方法里,如果cas
对数组元素设置值失败的话会把wasUncontended
设置为false
,这里,如果外层已经cas
失败过一次了,那么就不会再进入到 ③ 尝试cas
了。
③,这里就是对数组元素进行cas
设值,成功就退出循环结束整个方法了。
④,NCPU
表示当前运行应用的机器的cpu核数
static final int NCPU = Runtime.getRuntime().availableProcessors();
理论上,数组大小达到cpu核数
的时候效率最高,因为最多只能同时运行cpu个数
的线程,所以,就算数组大小超过cpu核数,效率也不会得到提高。当数组大小第一次大于等于NCPU
时,后面就不会在进行扩容
了。
如果没超过NCPU
,就判断第二个条件,数组是否发生了变化,也就是有没有发生扩容
,如果发生了,将碰撞变量collide
重置为false
。
⑤,走到这里,说明肯定已经发生了碰撞,将collide
改为true。
⑥,这里就是扩容
了,没什么好说的,扩容一倍
,将老数组的元素复制到新数组上。
最后要注意一下,不管进入的是哪个if方法体,执行完后都会执行advanceProbe
方法
static final int advanceProbe(int probe) {
probe ^= probe << 13; // xorshift
probe ^= probe >>> 17;
probe ^= probe << 5;
UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
return probe;
}
这个方法其实就是将线程的hash
值变换一下,让他定位到数组的其他索引位上。
到此,LongAdder
的主要核心源码就看完了,画个图总结一下

总结
看完了LongAdder
源码的设计思想,下次我们再遇到类似的高并发场景下需要对某个变量进行操作的时候,就可以借鉴一下这里的分治思想
。
其实,有时候仔细想想,真的是代码来源于生活
而又高于生活。
就比如早上地铁高峰期的时候,地铁就相当于我们实际运行的应用,如果让所有的客流都一下子涌入,那地铁就爆了,也就是我们经常看见的服务器宕机了。
所以,现实生活中,我们会在地铁站的入口用栅栏围成一条通道,让乘客逐个的进入而不是一下子全部涌入,这不就是我们程序中用到的队列吗,削峰解耦是不是。
好了,我是阿轩,欢迎小伙伴关注阿轩的公众号,里面有阿轩的个人微信号,欢迎添加,一起交流,一起进步。
原文始发于微信公众号(程序员阿轩):LongAdder那些不为人知的细节
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/36990.html