LongAdder深度解析

概念

在讲LongAdder之前我们先了解一下AtomicLong。我们知道AtomicLong底层是利用的CAS原理实现并发性的,利用CAS方法去设置内部的value,它的实现逻辑是采用采用自旋的方式不断更新目标值,直到更新成功即乐观锁的实现思路。

那么在并发量比较低的情况下,线程冲突的概率比较小,自旋的次数不会很多,可以实现对应的业务场景。但是,高并发情况下,N个线程同时进行自旋操作,N-1个线程失败,导致CPU打满场景,此时AtomicLong的自旋会成为瓶颈。那么如何解决呢? 

这时引入LongAdder来解决高并发环境下AtomicLong的自旋瓶颈问题。LongAdder的原理就是降低对value更新的并发数,也就是将对单一value的变更压力分散到多个value值上,降低单个value的热度。

成员变量

我们知道LongAdder的大致原理之后,再来详细的了解一下它的具体实现,搞起。首先看一下LongAdder类的结构图LongAdder深度解析从类的结构图中我们能看到LongAdder是Striped64的子类,Striped64的设计核心思路就是通过内部的分散计算来避免竞争,其内部包含一个base和一个Cell[] cells数组,又叫hash表。没有竞争的情况下,要累加的数通过CAS累加到base上;如果有竞争的话,会将要累加的数累加到Cells数组中的某个cell元素里面。所以整个Striped64的值为sum=base+∑[0~n]cells。在Striped64中存在几个比较重要的变量,下面我们来说明一下:

// CPU核数
static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
 * 存放Cell的hash表,大小为2的幂
 */
 
transient volatile Cell[] cells;
/** 
 * 基础值,
 * 1. 在没有竞争时会更新这个值;
 * 2. 在cells初始化的过程中,cells处于不可用的状态,这时候也会尝试将通过cas操作值累加到base。 
 */

transient volatile long base;
/*
* 创建或者扩容Cells数组时使用的自旋锁变量
*/
  
transient volatile int cellsBusy;

成员变量cells

cells数组是LongAdder高性能实现的非常重要的成员变量,AtomicInteger只有一个value,所有线程累加都要通过CAS竞争value这一个变量,高并发的情况下导致线程争抢严重。 

而LongAdder则有两个值用于累加,一个是base,它的作用类似于AtomicInteger里面的value,在没有竞争的情况不会用到cells数组,它为null,这时使用base做累加,有了竞争后cells数组就会就会进行初始化,第一次初始化长度为2,以后每次扩容都是变为原来的两倍,直到cells数组的长度大于等于当前服务器CPU的核数为止就不在扩容,每个线程会通过线程对cells[threadLocalRandomProbe%cells.length]位置的Cell对象中的value做累加,这样相当于将线程绑定到了cells中的某个cell对象上。

cells是LongAdder的父类Striped64中的Cell数组类型的成员变量。每个Cell对象中都包含一个value值,并提供对这个value值的CAS操作。

@sun.misc.Contended static final class Cell {
        //用来保存要累加的值 
        volatile long value;
        Cell(long x) { value = x; }
        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }
   .......
}

成员变量cellsBusy

cellsBusy,它有两个值0 或1,它的作用是当要修改cells数组时加锁,防止多线程同时修改cells数组,0为无锁,1为加锁 

加锁的状况有三种:分别是cells数组初始化的时候| cells数组扩容的时候|如果cells数组中某个元素为null,给这个位置创建新的Cell对象的时候

成员变量base

它的作用是在开始没有竞争的情况下,将累加值累加到base ,在cells初始化的过程中,cells不可用,这时会尝试将值累加到base上。

Add操作

我们首先来看一下LongAdder的add函数,其会多次尝试CAS操作将值进行累加,如果成功了就直接返回,失败则继续执行。

public void add(long x) {

        Cell[] as; long b, v; int m; Cell a;
        /**
         * 如果一下两种条件则继续执行if内的语句
         * 1. cells数组不为null(不存在争用的时候,cells数组一定为null,一旦对base的cas操作失败,才会初始化cells数组)
         * 2. 如果cells数组为null,如果casBase执行成功,则直接返回,如果casBase方法执行失败(casBase失败,说明第一次争用冲突产生,需要对cells数组初始化)进入if内;
         * casBase方法很简单,就是通过UNSAFE类的cas设置成员变量base的值为base+要累加的值
         * casBase执行成功的前提是无竞争,这时候cells数组还没有用到为null,可见在无竞争的情况下是类似于AtomticInteger处理方式,使用cas做累加。
         */

        if ((as = cells) != null || !casBase(b = base, b + x)) {
            //uncontended判断cells数组中,当前线程要做cas累加操作的某个元素是否#不#存在争用,如果cas失败则存在争用;uncontended=false代表存在争用,uncontended=true代表不存在争用。

            boolean uncontended = true;
            /**
            *1. as == null :cells数组未被初始化,成立则直接进入if执行cell初始化
            *2. (m = as.length - 1) < 0:cells数组的长度为0
            *条件1与2都代表cells数组没有被初始化成功,初始化成功的cells数组长度为2;
            *3. (a = as[getProbe() & m]) == null :如果cells被初始化,且它的长度不为0,则通过getProbe方法获取当前线程Thread的threadLocalRandomProbe变量的值,初始为0,然后执行threadLocalRandomProbe&(cells.length-1 ),相当于m%cells.length;如果cells[threadLocalRandomProbe%cells.length]的位置为null,这说明这个位置从来没有线程做过累加,需要进入if继续执行,在这个位置创建一个新的Cell对象;
            *4. !(uncontended = a.cas(v = a.value, v + x)):尝试对cells[threadLocalRandomProbe%cells.length]位置的Cell对象中的value值做累加操作,并返回操作结果,如果失败了则进入if,重新计算一个threadLocalRandomProbe;

            如果进入if语句执行longAccumulate方法,有三种情况
            1. 前两个条件代表cells没有初始化,
            2. 第三个条件指当前线程hash到的cells数组中的位置还没有其它线程做过累加操作,
            3. 第四个条件代表产生了冲突,uncontended=false
            **/

            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
                longAccumulate(x, null, uncontended);
        }
    }

当并发量较少时,cell数组尚未初始化,所以只调用casBase函数,对base变量进行CAS累加。我们来看一下casBase函数相关的源码吧。我们可以认为变量base就是第一个value值,也是基础value变量。先调用casBase函数来cas一下base变量,如果成功了,就不需要在进行下面比较复杂的算法。

final boolean casBase(long cmp, long val) {
    return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}

当并发量逐渐提高时,casBase函数会失败。如果cells数组为null或为空,就直接调用longAccumulate方法。因为cells为null或在为空,说明cells未初始化,所以调用longAccumulate进行初始化。否则继续判断。如果cells中已经初始化,就继续进行后续判断。我们先来理解一下getProbe()&m 的这个操作吧,可以把这个操作当作一次计算hash值,然后将cells中这个位置的Cell对象赋值给变量a。如果变量a不为null,那么就调用该对象的cas方法去设置其value值。如果a为null,或在cas赋值发生冲突,那么调用longAccumulate方法。

LongAccumulate方法

我们都知道只有当对base的cas操作失败之后,LongAdder才引入Cell数组.所以在longAccumulate中就是对Cell数组进行操作,分别涉及了数组的初始化,扩容和设置某个位置的Cell对象等操作。在这段代码中,关于cellBusy的cas操作构成了一个SpinLock,这就是经典的SpinLock的编程技巧,大家可以学习一下。我们先来看一下longAccumulate的主体代码,首先是一个无限for循环,然后根据cells数组的状态来判断是要进行cells数组的初始化,还是进行对象添加或者扩容。三个参数第一个为要累加的值,第二个为null,第三个为wasUncontended表示调用方法之前的add方法是否未发生竞争;

final void longAccumulate(long x, LongBinaryOperator fn,
                             boolean wasUncontended)
 
{
       int h;
       if ((h = getProbe()) == 0) { 
           //获取PROBE变量,探针变量,与当前运行的线程相关,不同线程不同
           ThreadLocalRandom.current(); 
       //初始化PROBE变量,和getProbe都使用Unsafe类提供的原子性操作。
           h = getProbe();
           wasUncontended = true;
       }
       boolean collide = false;
       for (;;) { //cas经典无限循环,不断尝试
           Cell[] as; Cell a; int n; long v;
           if ((as = cells) != null && (n = as.length) > 0) { 
           // cells不为null,并且数组size大于0,表示cells已经初始化了
           // 初始化Cell对象并设置到数组中或者进行数组扩容
           }
           else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
           //cells数组未初始化,获得cellsBusy lock,进行cells数组的初始化
           // cells数组初始化操作
           }
          //如果初始化数组失败了,那就再次尝试一下直接cas base变量,
          // 如果成功了就直接返回,这是最后一个进行CAS操作的地方。
           else if (casBase(v = base, ((fn == null) ? v + x :
                                       fn.applyAsLong(v, x))))
               break;
       }
   }

未进行初始化Cell数组代码如下所示,它首先调用casCellsBusy函数获取了cellsBusy锁,然后进行数组的初始化操作,最后将cellBusy’锁’释放掉。

else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try {                           // Initialize table
    //初始化cells数组,初始容量为2,并将x值通过hash&1,放到0个或第1个位置上
    if (cells == as) {
        Cell[] rs = new Cell[2];
        rs[h & 1] = new Cell(x);
        cells = rs;
        init = true;
    }
finally {
    //解锁
    cellsBusy = 0;
}
//如果init为true说明初始化成功,跳出循环
if (init)
    break;
}

如果Cell数组已经初始化过了,那么就进行Cell数组的设置或者扩容。这部分代码有一系列的if else的判断,如果前一个条件不成立,才会进入下一条判断。首先,当Cell数组中对应位置的cell对象为null时,表明该位置的Cell对象需要进行初始化,所以使用casCellsBusy函数获取’锁’,然后初始化Cell对象,并且设置进cells数组,最后释放掉’锁’。当Cell数组中对应位置的cell对象不为null,则直接调用其cas操作进行累加。当上述操作都失败后,认为多个线程在对同一个位置的Cell对象进行操作,这个Cell对象是一个“热点”,所以Cell数组需要进行扩容,将热点分散。

if ((a = as[(n - 1) & h]) == null) { //通过与操作计算出来需要操作的Cell对象的坐标
    if (cellsBusy == 0) { //volatile 变量,用来实现spinLock,来在初始化和resize cells数组时使用。
    //当cellsBusy为0时,表示当前可以对cells数组进行操作。 
        Cell r = new Cell(x);//将x值直接赋值给Cell对象
        if (cellsBusy == 0 && casCellsBusy()) {//如果这个时候cellsBusy还是0
        //就cas将其设置为非0,如果成功了就是获得了spinLock的锁.可以对cells数组进行操作.
        //如果失败了,就会再次执行一次循环
            boolean created = false;
            try {
                Cell[] rs; int m, j;
                //判断cells是否已经初始化,并且要操作的位置上没有cell对象.
                if ((rs = cells) != null &&
                    (m = rs.length) > 0 &&
                    rs[j = (m - 1) & h] == null) {
                    rs[j] = r; //将之前创建的值为x的cell对象赋值到cells数组的响应位置.
                    created = true;
                }
            } finally {
                //经典的spinLock编程技巧,先获得锁,然后try finally将锁释放掉
                //将cellBusy设置为0就是释放锁.
                cellsBusy = 0;
            }
            if (created)
                break//如果创建成功了,就是使用x创建了新的cell对象,也就是新创建了一个分担热点的value
            continue
        }
    }
    collide = false//未发生碰撞
}
else if (!wasUncontended)//是否已经发生过一次cas操作失败
    wasUncontended = true//设置成true,以便第二次进入下一个else if 判断
else if (a.cas(v = a.value, ((fn == null) ? v + x :
                            fn.applyAsLong(v, x))))
     //fn是操作类型,如果是空,就是相加,所以让a这个cell对象中的value值和x相加,然后在cas设置,如果成果
    //就直接返回
    break;
else if (n >= NCPU || cells != as)
  //如果cells数组的大小大于系统的可获得处理器数量或在as不再和cells相等.
    collide = false;
else if (!collide)
    collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
  //再次获得cellsBusy这个spinLock,对数组进行resize
    try {
        if (cells == as) {//要再次检测as是否等于cells以免其他线程已经对cells进行了操作.
            Cell[] rs = new Cell[n << 1]; //扩容一倍
            for (int i = 0; i < n; ++i)
                rs[i] = as[i];
            cells = rs;//赋予cells一个新的数组对象
        }
    } finally {
        cellsBusy = 0;
    }
    collide = false;
    continue;
}
h = advanceProbe(h);//由于使用当前探针变量无法操作成功,所以重新设置一个,再次尝试

关于hash的生成

hash是LongAdder定位当前线程应该将值累加到cells数组哪个位置上的,所以hash的算法是非常重要的,下面就来看看它的实现。java的Thread类里面有一个成员变量

@sun.misc.Contended("tlr")
    int threadLocalRandomProbe;

在LongAdder的父类Striped64里通过getProbe方法获取当前线程threadLocalRandomProbe的值

  static final int getProbe() {
        //PROBE是threadLocalRandomProbe变量在Thread类里面的偏移量,所以下面语句获取的就是threadLocalRandomProbe的值;
        return UNSAFE.getInt(Thread.currentThread(), PROBE);
    }

threadLocalRandomProbe的初始化 线程对LongAdder的累加操作,在没有进入longAccumulate方法前,threadLocalRandomProbe一直都是0,当发生争用后才会进入longAccumulate方法中,进入该方法第一件事就是判断threadLocalRandomProbe是否为0,如果为0,则将其设置为0x9e3779b9

int h;
if ((h = getProbe()) == 0) {
    ThreadLocalRandom.current(); 
    h = getProbe();
    //设置未竞争标记为true
    wasUncontended = true;
}

重点在这行ThreadLocalRandom.current()

  public static ThreadLocalRandom current() {
        if (UNSAFE.getInt(Thread.currentThread(), PROBE) == 0)
            localInit();
        return instance;
    }

在current方法中判断如果probe的值为0,则执行locaInit()方法,将当前线程的probe设置为非0的值,该方法实现如下:

 static final void localInit() {
        //private static final AtomicInteger probeGenerator =
        new AtomicInteger();
        //private static final int PROBE_INCREMENT = 0x9e3779b9;
        int p = probeGenerator.addAndGet(PROBE_INCREMENT);
        //prob不能为0
        int probe = (p == 0) ? 1 : p; // skip 0
        long seed = mix64(seeder.getAndAdd(SEEDER_INCREMENT));
        //获取当前线程
        Thread t = Thread.currentThread();
        UNSAFE.putLong(t, SEED, seed);
        //将probe的值更新为probeGenerator的值
        UNSAFE.putInt(t, PROBE, probe);
    }

probeGenerator 是static 类型的AtomicInteger类,每执行一次localInit()方法,都会将probeGenerator 累加一次0x9e3779b9这个值;,0x9e3779b9这个数字的得来是 2^32 除以一个常数,这个常数就是传说中的黄金比例 1.6180339887;然后将当前线程的threadLocalRandomProbe设置为probeGenerator 的值,如果probeGenerator 为0,则取1;

👉 如果本文对你有帮助的话,欢迎点赞|在看,非常感谢 

原文始发于微信公众号(阿福聊编程):LongAdder深度解析

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/105798.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!