LongAdder那些不为人知的细节

大家好,我是阿轩。

今天我们来剖析一下LongAdder的源码。

说到LongAdder,可能有些小伙伴会比较陌生,毕竟日常工作中用的真的很少。

但是,不能说用的很少就不用去学习研究他了,我们看源码,一方面是为了更加深刻的了解他,避免实际工作中使用到他时踩坑,更重要的是学习他的设计思想和理念

前言

前面我们学习了AQSCLH队列机制。

学习了ConcurentHashMap分治思想

学习了StampedLock通过巧妙的设计,如何让一个long型变量即表示出了读写锁的状态,又表示出了读写锁的数量

没看过或者忘记的小伙伴可以去回顾一下,ConcurrentHashMap那些不为人知的细节AQS那些不为人知的细节StampedLock那些不为人知的细节

今天我们将要学习的LongAdder,主要是用于在高并发时,对某个数字型变量进行统计的场景。

比如,博客平台的某篇文章,他的点赞数,在不使用任何第三方中间件只使用JDK的条件下,你会怎么实现呢?

通常的做法是定义一个int或者long型变量,每次在修改他的时候加一个,不管是synchronized锁或者AQS锁都可以。

或者是使用一个原子类,比如AtomicInteger

但是这2种方案,虽然计算的结果是准确的,但是在高并发的场景下,显然性能是不能满足要求的。

这个时候LongAdder就闪亮登场了。

其实LongAdder的设计思想之前在讲ConcurrentHashMap的时候也提到过,2个字,分治

下面我们就一起来看看,他的庐山真面目。

LongAdder类概览

首先我们看下LongAdder类的整体结构

LongAdder那些不为人知的细节

比较简单,一个构造函数。

add方法,也是最核心的方法,待会细讲。

increment方法和decrement方法都是调用的add方法

LongAdder那些不为人知的细节

sum方法

public long sum() {
    Cell[] as = cells; Cell a;
    long sum = base;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

就是简单的对数组进行求和

reset方法,将数组初始化

public void reset() {
    Cell[] as = cells; Cell a;
    base = 0L;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                a.value = 0L;
        }
    }
}

sumThenReset方法,先求和再初始化。

剩下的longValue之类的4个方法都是调用的sum方法

LongAdder那些不为人知的细节

看到这里,小伙伴们可能会有疑问,他的具体实现写哪了,没看到呀?

LongAdder那些不为人知的细节

LongAdder继承了一个叫Striped64的类,具体实现都在Striped64类里,待会我们会看到。

下面我们就看下最核心的方法,add方法

add

public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[getProbe() & m]) == null ||
            !(uncontended = a.cas(v = a.value, v + x)))
            longAccumulate(x, null, uncontended);
    }
}

add方法里引用到了2个变量,cells和base,这2个变量是Striped64类里的。

/**
 * Table of cells. When non-null, size is a power of 2.
 */
transient volatile Cell[] cells;

/**
 * Base value, used mainly when there is no contention, but also as
 * a fallback during table initialization races. Updated via CAS.
 */
transient volatile long base;

一个是Cell类型的数组,一个是long型变量。

@sun.misc.Contended static final class Cell {
    volatile long value;
    Cell(long x) { value = x; }
    final boolean cas(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long valueOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> ak = Cell.class;
            valueOffset = UNSAFE.objectFieldOffset
                (ak.getDeclaredField("value"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

Cell是一个内部类,里面只有一个long型变量和一个cas方法,其实,说白了,就是对一个long型变量做了层包装

这2个变量就是实现LongAdder机制的基石。他将我们传统的通过对一个变量进行累加来统计数据拆分成了对多个变量进行累加,最后将所有变量的值求和获得最终的统计数据。

base就是基础变量,当没有并发或者并发不高时,只通过对base累加就可以了,当高并发时,对base变量的累加就会出现竞争,导致累加效率低下,这个时候,就会把并发的流量分散cell数组的不同元素上,对cell数组的不同元素进行累加,大大提高效率,最后将base变量和数组所有元素值进行求和就是我们要的最终统计数据。

下面我们详细的看下add方法。

首先判断cell数组是否为空,为空就表示还没有初始化,说明并发压力不大,这个时候尝试casbase变量进行累加,如果成功就直接结束add方法了,失败说明开始有并发压力了,需要初始化cell数组了。

接着是4个或运算相连的条件判断,前2个条件判断cell数组是否初始化,第三个条件判断该请求线程定位到的数组下标位是否为空,第四个条件尝试cas对线程定位到的数组下标位cell元素进行累加。

这里看起来是4个简单的条件判断,实则已经发生了2次逻辑短路

首先判断cell数组是否为空,不为空才会执行第三个条件,判断该线程定位到的数组下标位元素是否为空。如果也不为空,会尝试cas累加cell元素值。

表面上看起来是一个if判断,实则是3个if判断。

巧妙的使用或运算可以大大缩减代码量,学到了吗

LongAdder那些不为人知的细节

这里注意一下,cas失败才会进入方法体,同时将uncontended变量设置为false,后面会用到。

接着进入真正的实现方法longAccumulate

longAccumulate

这个方法有点长,我们分2部分看

int h;
if ((h = getProbe()) == 0) {
    ThreadLocalRandom.current(); // force initialization
    h = getProbe();
    wasUncontended = true;
}

第一部分,这里是获取线程的一个类似于hash的值,看过HashMap源码的小伙伴应该知道,HashMap的底层数据结构数组加链表,那怎么定位到key落在哪个索引位上呢,就是通过key的hash值来定位,这里,同理,怎样定位到线程去累加哪个cell元素值呢?

在线程Thread类里有这么一个属性threadLocalRandomProbe

/** Probe hash value; nonzero if threadLocalRandomSeed initialized */
@sun.misc.Contended("tlr")
int threadLocalRandomProbe;

看注释也能看出来,这个值相当于线程的一个hash值,通过getProbe方法获取

static final int getProbe() {
    return UNSAFE.getInt(Thread.currentThread(), PROBE);
}
private static final long PROBE;
static {
    try {
        Class<?> tk = Thread.class;
        PROBE = UNSAFE.objectFieldOffset
            (tk.getDeclaredField("threadLocalRandomProbe"));
    } catch (Exception e) {
        throw new Error(e);
    }
}

threadLocalRandomProbe变量初始的时候是0,需要调用ThreadLocalRandom.current方法初始化

public static ThreadLocalRandom current() {
    if (UNSAFE.getInt(Thread.currentThread(), PROBE) == 0)
        localInit();
    return instance;
}

最终调用localInit方法

static final void localInit() {
    int p = probeGenerator.addAndGet(PROBE_INCREMENT);
    int probe = (p == 0) ? 1 : p; // skip 0
    long seed = mix64(seeder.getAndAdd(SEEDER_INCREMENT));
    Thread t = Thread.currentThread();
    UNSAFE.putLong(t, SEED, seed);
    UNSAFE.putInt(t, PROBE, probe);
}

可以看到probe的值是由probeGenerator这个变量产生的

private static final AtomicInteger probeGenerator = new AtomicInteger();

这是一个静态原子整数类

private static final int PROBE_INCREMENT = 0x9e3779b9;

PROBE_INCREMENT是一个常数,所以,每次线程调用初始化方法时,都会在之前的基础上再加一下PROBE_INCREMENT,从而使线程的probe值一直在变化。

接下来就会进入一个for死循环,里面分3大块

  • if ((as = cells) != null && (n = as.length) > 0)
  • else if (cellsBusy == 0 && cells == as && casCellsBusy())
  • else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))

第一块,cell数组不为空,第二块,为空,尝试初始化数组,这里会存在并发竞争,所以通过cellsBusy变量来进行加锁

transient volatile int cellsBusy;

加锁的方法是casCellsBusy

final boolean casCellsBusy() {
    return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
}

哪个线程cas设置成功了,哪个线程就相当于抢到了锁,然后开始初始化cell数组。

最后一块,cell数组为空,线程又没有抢到锁进行初始化,那该干什么呢,总不能干等着吧,所以,会再尝试casbase变量进行累加,万一成功了呢,是不是!

如果还是失败,那只能进入下一次循环了。

第二块和第三块很简单,我们重点看第一块。

LongAdder那些不为人知的细节

①如果线程定位到的数组元素为空,那么就要进行初始化,而初始化是存在并发竞争的,所以需要加锁,也就是通过对cellsBusy变量进行cas设值,成功了才能开始初始化

看到这里,有的小伙伴可能会想,如果某一时刻,2个线程定位到的cell数组索引位不同,但是都是空,那么此时这2个线程进行初始化其实是互不干扰的,但是却要竞争同一个锁,只能有一个线程初始化成功。

可不可以给cell数组的每个索引位都设置一个锁呢,哪个索引位要初始化了就抢对应的锁

可以当然也是可以的,但是一同带来的问题就是cell数组有多大,就要创建多少个cellsBusy变量,而且数组会一直的动态扩容cellsBusy变量的创建也是个问题,而且,初始化只会进行一次,这点性能的损失是微乎其微的。

记住,没有十全十美的方案,任何一种方案都是一种权衡

这里定义了一个collide变量,直译过来就是碰撞的意思,也就是有没有发生hash碰撞,如果定位到的数组元素为null,就说明没有发生hash碰撞,需要将collide变量重置为false

②,wasUncontended这个变量是从方法参数传进来的,还记得在外层add方法里,如果cas对数组元素设置值失败的话会把wasUncontended设置为false,这里,如果外层已经cas失败过一次了,那么就不会再进入到 ③ 尝试cas了。

③,这里就是对数组元素进行cas设值,成功就退出循环结束整个方法了。

④,NCPU表示当前运行应用的机器的cpu核数

static final int NCPU = Runtime.getRuntime().availableProcessors();

理论上,数组大小达到cpu核数的时候效率最高,因为最多只能同时运行cpu个数的线程,所以,就算数组大小超过cpu核数,效率也不会得到提高。当数组大小第一次大于等于NCPU时,后面就不会在进行扩容了。

如果没超过NCPU,就判断第二个条件,数组是否发生了变化,也就是有没有发生扩容,如果发生了,将碰撞变量collide重置为false

⑤,走到这里,说明肯定已经发生了碰撞,将collide改为true。

⑥,这里就是扩容了,没什么好说的,扩容一倍,将老数组的元素复制到新数组上。

最后要注意一下,不管进入的是哪个if方法体,执行完后都会执行advanceProbe方法

static final int advanceProbe(int probe) {
    probe ^= probe << 13;   // xorshift
    probe ^= probe >>> 17;
    probe ^= probe << 5;
    UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
    return probe;
}

这个方法其实就是将线程的hash值变换一下,让他定位到数组的其他索引位上。

到此,LongAdder的主要核心源码就看完了,画个图总结一下

LongAdder那些不为人知的细节

总结

看完了LongAdder源码的设计思想,下次我们再遇到类似的高并发场景下需要对某个变量进行操作的时候,就可以借鉴一下这里的分治思想

其实,有时候仔细想想,真的是代码来源于生活而又高于生活。

就比如早上地铁高峰期的时候,地铁就相当于我们实际运行的应用,如果让所有的客流都一下子涌入,那地铁就爆了,也就是我们经常看见的服务器宕机了。

所以,现实生活中,我们会在地铁站的入口用栅栏围成一条通道,让乘客逐个的进入而不是一下子全部涌入,这不就是我们程序中用到的队列吗,削峰解耦是不是。

好了,我是阿轩,欢迎小伙伴关注阿轩的公众号,里面有阿轩的个人微信号,欢迎添加,一起交流,一起进步。


原文始发于微信公众号(程序员阿轩):LongAdder那些不为人知的细节

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/36990.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!