概念
在讲LongAdder之前我们先了解一下AtomicLong。我们知道AtomicLong底层是利用的CAS原理实现并发性的,利用CAS方法去设置内部的value,它的实现逻辑是采用采用自旋的方式不断更新目标值,直到更新成功即乐观锁的实现思路。
那么在并发量比较低的情况下,线程冲突的概率比较小,自旋的次数不会很多,可以实现对应的业务场景。但是,高并发情况下,N个线程同时进行自旋操作,N-1个线程失败,导致CPU打满场景,此时AtomicLong的自旋会成为瓶颈。那么如何解决呢?
这时引入LongAdder来解决高并发环境下AtomicLong的自旋瓶颈问题。LongAdder的原理就是降低对value更新的并发数,也就是将对单一value的变更压力分散到多个value值上,降低单个value的热度。
成员变量
我们知道LongAdder的大致原理之后,再来详细的了解一下它的具体实现,搞起。首先看一下LongAdder类的结构图从类的结构图中我们能看到LongAdder是Striped64的子类,Striped64的设计核心思路就是通过内部的分散计算来避免竞争,其内部包含一个base和一个Cell[] cells数组,又叫hash表。没有竞争的情况下,要累加的数通过CAS累加到base上;如果有竞争的话,会将要累加的数累加到Cells数组中的某个cell元素里面。所以整个Striped64的值为sum=base+∑[0~n]cells。在Striped64中存在几个比较重要的变量,下面我们来说明一下:
// CPU核数
static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
* 存放Cell的hash表,大小为2的幂
*/
transient volatile Cell[] cells;
/**
* 基础值,
* 1. 在没有竞争时会更新这个值;
* 2. 在cells初始化的过程中,cells处于不可用的状态,这时候也会尝试将通过cas操作值累加到base。
*/
transient volatile long base;
/*
* 创建或者扩容Cells数组时使用的自旋锁变量
*/
transient volatile int cellsBusy;
成员变量cells
cells数组是LongAdder高性能实现的非常重要的成员变量,AtomicInteger只有一个value,所有线程累加都要通过CAS竞争value这一个变量,高并发的情况下导致线程争抢严重。
而LongAdder则有两个值用于累加,一个是base,它的作用类似于AtomicInteger里面的value,在没有竞争的情况不会用到cells数组,它为null,这时使用base做累加,有了竞争后cells数组就会就会进行初始化,第一次初始化长度为2,以后每次扩容都是变为原来的两倍,直到cells数组的长度大于等于当前服务器CPU的核数为止就不在扩容,每个线程会通过线程对cells[threadLocalRandomProbe%cells.length]位置的Cell对象中的value做累加,这样相当于将线程绑定到了cells中的某个cell对象上。
cells是LongAdder的父类Striped64中的Cell数组类型的成员变量。每个Cell对象中都包含一个value值,并提供对这个value值的CAS操作。
@sun.misc.Contended static final class Cell {
//用来保存要累加的值
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
.......
}
成员变量cellsBusy
cellsBusy,它有两个值0 或1,它的作用是当要修改cells数组时加锁,防止多线程同时修改cells数组,0为无锁,1为加锁
加锁的状况有三种:分别是cells数组初始化的时候| cells数组扩容的时候|如果cells数组中某个元素为null,给这个位置创建新的Cell对象的时候
成员变量base
它的作用是在开始没有竞争的情况下,将累加值累加到base ,在cells初始化的过程中,cells不可用,这时会尝试将值累加到base上。
Add操作
我们首先来看一下LongAdder的add函数,其会多次尝试CAS操作将值进行累加,如果成功了就直接返回,失败则继续执行。
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
/**
* 如果一下两种条件则继续执行if内的语句
* 1. cells数组不为null(不存在争用的时候,cells数组一定为null,一旦对base的cas操作失败,才会初始化cells数组)
* 2. 如果cells数组为null,如果casBase执行成功,则直接返回,如果casBase方法执行失败(casBase失败,说明第一次争用冲突产生,需要对cells数组初始化)进入if内;
* casBase方法很简单,就是通过UNSAFE类的cas设置成员变量base的值为base+要累加的值
* casBase执行成功的前提是无竞争,这时候cells数组还没有用到为null,可见在无竞争的情况下是类似于AtomticInteger处理方式,使用cas做累加。
*/
if ((as = cells) != null || !casBase(b = base, b + x)) {
//uncontended判断cells数组中,当前线程要做cas累加操作的某个元素是否#不#存在争用,如果cas失败则存在争用;uncontended=false代表存在争用,uncontended=true代表不存在争用。
boolean uncontended = true;
/**
*1. as == null :cells数组未被初始化,成立则直接进入if执行cell初始化
*2. (m = as.length - 1) < 0:cells数组的长度为0
*条件1与2都代表cells数组没有被初始化成功,初始化成功的cells数组长度为2;
*3. (a = as[getProbe() & m]) == null :如果cells被初始化,且它的长度不为0,则通过getProbe方法获取当前线程Thread的threadLocalRandomProbe变量的值,初始为0,然后执行threadLocalRandomProbe&(cells.length-1 ),相当于m%cells.length;如果cells[threadLocalRandomProbe%cells.length]的位置为null,这说明这个位置从来没有线程做过累加,需要进入if继续执行,在这个位置创建一个新的Cell对象;
*4. !(uncontended = a.cas(v = a.value, v + x)):尝试对cells[threadLocalRandomProbe%cells.length]位置的Cell对象中的value值做累加操作,并返回操作结果,如果失败了则进入if,重新计算一个threadLocalRandomProbe;
如果进入if语句执行longAccumulate方法,有三种情况
1. 前两个条件代表cells没有初始化,
2. 第三个条件指当前线程hash到的cells数组中的位置还没有其它线程做过累加操作,
3. 第四个条件代表产生了冲突,uncontended=false
**/
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
当并发量较少时,cell数组尚未初始化,所以只调用casBase函数,对base变量进行CAS累加。我们来看一下casBase函数相关的源码吧。我们可以认为变量base就是第一个value值,也是基础value变量。先调用casBase函数来cas一下base变量,如果成功了,就不需要在进行下面比较复杂的算法。
final boolean casBase(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}
当并发量逐渐提高时,casBase函数会失败。如果cells数组为null或为空,就直接调用longAccumulate方法。因为cells为null或在为空,说明cells未初始化,所以调用longAccumulate进行初始化。否则继续判断。如果cells中已经初始化,就继续进行后续判断。我们先来理解一下getProbe()&m 的这个操作吧,可以把这个操作当作一次计算hash值,然后将cells中这个位置的Cell对象赋值给变量a。如果变量a不为null,那么就调用该对象的cas方法去设置其value值。如果a为null,或在cas赋值发生冲突,那么调用longAccumulate方法。
LongAccumulate方法
我们都知道只有当对base的cas操作失败之后,LongAdder才引入Cell数组.所以在longAccumulate中就是对Cell数组进行操作,分别涉及了数组的初始化,扩容和设置某个位置的Cell对象等操作。在这段代码中,关于cellBusy的cas操作构成了一个SpinLock,这就是经典的SpinLock的编程技巧,大家可以学习一下。我们先来看一下longAccumulate的主体代码,首先是一个无限for循环,然后根据cells数组的状态来判断是要进行cells数组的初始化,还是进行对象添加或者扩容。三个参数第一个为要累加的值,第二个为null,第三个为wasUncontended表示调用方法之前的add方法是否未发生竞争;
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
//获取PROBE变量,探针变量,与当前运行的线程相关,不同线程不同
ThreadLocalRandom.current();
//初始化PROBE变量,和getProbe都使用Unsafe类提供的原子性操作。
h = getProbe();
wasUncontended = true;
}
boolean collide = false;
for (;;) { //cas经典无限循环,不断尝试
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {
// cells不为null,并且数组size大于0,表示cells已经初始化了
// 初始化Cell对象并设置到数组中或者进行数组扩容
}
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
//cells数组未初始化,获得cellsBusy lock,进行cells数组的初始化
// cells数组初始化操作
}
//如果初始化数组失败了,那就再次尝试一下直接cas base变量,
// 如果成功了就直接返回,这是最后一个进行CAS操作的地方。
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
}
}
未进行初始化Cell数组代码如下所示,它首先调用casCellsBusy函数获取了cellsBusy锁,然后进行数组的初始化操作,最后将cellBusy’锁’释放掉。
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
//初始化cells数组,初始容量为2,并将x值通过hash&1,放到0个或第1个位置上
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
//解锁
cellsBusy = 0;
}
//如果init为true说明初始化成功,跳出循环
if (init)
break;
}
如果Cell数组已经初始化过了,那么就进行Cell数组的设置或者扩容。这部分代码有一系列的if else的判断,如果前一个条件不成立,才会进入下一条判断。首先,当Cell数组中对应位置的cell对象为null时,表明该位置的Cell对象需要进行初始化,所以使用casCellsBusy函数获取’锁’,然后初始化Cell对象,并且设置进cells数组,最后释放掉’锁’。当Cell数组中对应位置的cell对象不为null,则直接调用其cas操作进行累加。当上述操作都失败后,认为多个线程在对同一个位置的Cell对象进行操作,这个Cell对象是一个“热点”,所以Cell数组需要进行扩容,将热点分散。
if ((a = as[(n - 1) & h]) == null) { //通过与操作计算出来需要操作的Cell对象的坐标
if (cellsBusy == 0) { //volatile 变量,用来实现spinLock,来在初始化和resize cells数组时使用。
//当cellsBusy为0时,表示当前可以对cells数组进行操作。
Cell r = new Cell(x);//将x值直接赋值给Cell对象
if (cellsBusy == 0 && casCellsBusy()) {//如果这个时候cellsBusy还是0
//就cas将其设置为非0,如果成功了就是获得了spinLock的锁.可以对cells数组进行操作.
//如果失败了,就会再次执行一次循环
boolean created = false;
try {
Cell[] rs; int m, j;
//判断cells是否已经初始化,并且要操作的位置上没有cell对象.
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r; //将之前创建的值为x的cell对象赋值到cells数组的响应位置.
created = true;
}
} finally {
//经典的spinLock编程技巧,先获得锁,然后try finally将锁释放掉
//将cellBusy设置为0就是释放锁.
cellsBusy = 0;
}
if (created)
break; //如果创建成功了,就是使用x创建了新的cell对象,也就是新创建了一个分担热点的value
continue;
}
}
collide = false; //未发生碰撞
}
else if (!wasUncontended)//是否已经发生过一次cas操作失败
wasUncontended = true; //设置成true,以便第二次进入下一个else if 判断
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
//fn是操作类型,如果是空,就是相加,所以让a这个cell对象中的value值和x相加,然后在cas设置,如果成果
//就直接返回
break;
else if (n >= NCPU || cells != as)
//如果cells数组的大小大于系统的可获得处理器数量或在as不再和cells相等.
collide = false;
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
//再次获得cellsBusy这个spinLock,对数组进行resize
try {
if (cells == as) {//要再次检测as是否等于cells以免其他线程已经对cells进行了操作.
Cell[] rs = new Cell[n << 1]; //扩容一倍
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;//赋予cells一个新的数组对象
}
} finally {
cellsBusy = 0;
}
collide = false;
continue;
}
h = advanceProbe(h);//由于使用当前探针变量无法操作成功,所以重新设置一个,再次尝试
关于hash的生成
hash是LongAdder定位当前线程应该将值累加到cells数组哪个位置上的,所以hash的算法是非常重要的,下面就来看看它的实现。java的Thread类里面有一个成员变量
@sun.misc.Contended("tlr")
int threadLocalRandomProbe;
在LongAdder的父类Striped64里通过getProbe方法获取当前线程threadLocalRandomProbe的值
static final int getProbe() {
//PROBE是threadLocalRandomProbe变量在Thread类里面的偏移量,所以下面语句获取的就是threadLocalRandomProbe的值;
return UNSAFE.getInt(Thread.currentThread(), PROBE);
}
threadLocalRandomProbe的初始化 线程对LongAdder的累加操作,在没有进入longAccumulate方法前,threadLocalRandomProbe一直都是0,当发生争用后才会进入longAccumulate方法中,进入该方法第一件事就是判断threadLocalRandomProbe是否为0,如果为0,则将其设置为0x9e3779b9
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current();
h = getProbe();
//设置未竞争标记为true
wasUncontended = true;
}
重点在这行ThreadLocalRandom.current()
public static ThreadLocalRandom current() {
if (UNSAFE.getInt(Thread.currentThread(), PROBE) == 0)
localInit();
return instance;
}
在current方法中判断如果probe的值为0,则执行locaInit()方法,将当前线程的probe设置为非0的值,该方法实现如下:
static final void localInit() {
//private static final AtomicInteger probeGenerator =
new AtomicInteger();
//private static final int PROBE_INCREMENT = 0x9e3779b9;
int p = probeGenerator.addAndGet(PROBE_INCREMENT);
//prob不能为0
int probe = (p == 0) ? 1 : p; // skip 0
long seed = mix64(seeder.getAndAdd(SEEDER_INCREMENT));
//获取当前线程
Thread t = Thread.currentThread();
UNSAFE.putLong(t, SEED, seed);
//将probe的值更新为probeGenerator的值
UNSAFE.putInt(t, PROBE, probe);
}
probeGenerator 是static 类型的AtomicInteger类,每执行一次localInit()方法,都会将probeGenerator 累加一次0x9e3779b9这个值;,0x9e3779b9这个数字的得来是 2^32 除以一个常数,这个常数就是传说中的黄金比例 1.6180339887;然后将当前线程的threadLocalRandomProbe设置为probeGenerator 的值,如果probeGenerator 为0,则取1;
原文始发于微信公众号(阿福聊编程):LongAdder深度解析
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/105798.html