只会原子类AtomicLong不懂Longadder?我怀疑你多线程计数没我快

  • 背景

  • 性能比对

  • 源码分析

    • 核心属性

    • `Celll`类

    • add

  • 参考


背景

在看本篇文章,希望你已经对多线程和原子类有了一定的了解。说说我们在多线程对一些变量做自增同时要保证线程安全,我们最容易使用高性能的原子类来处理,因为他们是采用cas保证线程安全的,一般情况下比重量级加锁性能更佳,所以能用原子类的地方我们尽力不使用加锁。但是有没有比原子类更快线程安全的累加计数器呢?答案是有的,那就jdk1.8新引入juc包中的Longadder

性能比对

我们先来基于LongadderAtomicLong性能作一个比较。这里我们让5个线程分别去去循环自增1000、10000、100000、10000000次看看效率

首先是使用AtomicLong

private static final AtomicLong atomicLong = new AtomicLong(0);
    
    public static void main(String[] args) throws Exception {
        int count = 10;
        int task = 10000000;
        ExecutorService executorService = Executors.newFixedThreadPool(count);
        CountDownLatch countDownLatch = new CountDownLatch(count * task);
        System.out.println("执行任务开始");
        long start = System.currentTimeMillis();
        for (int i = 0; i < 10; i++) {
            executorService.execute(() -> {
                for (int j = 0; j < task; j++) {
                    atomicLong.incrementAndGet();
                    countDownLatch.countDown();
                }
            });

        }
        countDownLatch.await();
        long end = System.currentTimeMillis();
        long time = end - start;
        System.out.println("AtomicLong耗时:" + time + ",最后结果值:" + atomicLong.get());
        executorService.shutdown();
    }

然后我们来看看基于Longadder的累加

private static final LongAdder longAdder = new LongAdder();

    public static void main(String[] args) throws Exception {
        int count = 10;
        int task = 10000000;
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        CountDownLatch countDownLatch = new CountDownLatch(count * task);
        System.out.println("执行任务开始");
        long start = System.currentTimeMillis();
        for (int i = 0; i < count; i++) {
            executorService.execute(() -> {
                for (int j = 0; j < task; j++) {
                    longAdder.add(1);
                    countDownLatch.countDown();
                }
            });

        }
        countDownLatch.await();
        long end = System.currentTimeMillis();
        long time = end - start;
        System.out.println("longAdder耗时:" + time + ",最后结果值:" + longAdder.longValue());
        executorService.shutdown();
    }

这里需要注意的是要让每个线程去执行固定的任务数量,而不是直接把所有任务放在丢入线程池直接执行,不然会发现可能存在的问题是,某些线程执行很快一直抢到任务,没有太大的线程竞争压力,导致效果测试不出来。因为在线程竞争越激烈的条件下,Longadder性能提升越明显

测试机器:只会原子类AtomicLong不懂Longadder?我怀疑你多线程计数没我快

类型 线程数 * 每个线程的任务

10*1000 10*10000
AtomicLong

Longadder

实验结果:

类型




线程数 * 每个线程的任务 10*1000 10*10000 10*100000 10*1000000 10*10000000
耗时(AtomicLong) 15 94 280 2209 30571
耗时(Longadder) 23 97 218 1656 19539

可以看到随着线程任务的增多,Longadder性能越来越好。所以从这里可以看多,在任务量大,并发高的情况下Longadder性能明显是更好的。

源码分析

这里我们来分析Longadder的源码实现,因为在很多高性能队列、并发量,都有参考Longadder的实现,比如ConcurrentHashMap只会原子类AtomicLong不懂Longadder?我怀疑你多线程计数没我快Disruptor等。

所以在研究这些高级并发类前,我们先来研究研究Longadder是如何解决并发冲突问题保证如此高的性能

首先需要知道的是AtomicLong是使用内部变量value保存着实际的long值,然后所有的操作都是对这个long进行cas。所以并发越激烈,CAS的失败概率越高,从而进入恶性CAS空自旋影响性能。

要解决这个问题就要解决多个线程同时对一个long对象进行cas。所以Longadder对此做了改进,L将value值分散到一个数组中,不同线程会命中到数组的不同槽(元素)中,各个线程只对自己槽中的那个值进行CAS操作。只会原子类AtomicLong不懂Longadder?我怀疑你多线程计数没我快

这样冲突的概率就小很多,然后获取值的时候只需要加所有数组的值累加即可,这样就解决了AtomicLong中恶意自旋的问题。Longadder解决方式就是典型的==空间换时间== 需要注意的是在源码中Longadder的重要属性都是继承于Striped64

其中 Striped64

核心属性

static final int NCPU = Runtime.getRuntime().availableProcessors();

    /**
     * Table of cells. When non-null, size is a power of 2.
     * 存放Cell的hash表,大小为2的幂
     */

    transient volatile Cell[] cells;

    /**
     * Base value, used mainly when there is no contention, but also as
     * a fallback during table initialization races. Updated via CAS.
     * 基础值,在没有竞争时会更新这个值,在有竞争cells没有初始化时,也会cas操作base
     */

    transient volatile long base;

    /**
     * Spinlock (locked via CAS) used when resizing and/or creating Cells.
     * 自旋锁,通过cas加锁,0 表示cells没有处于创建、扩容阶段 
     *                   1 表示正在创建或者扩容cells,不能进行cells新元素设置操作
     */

    transient volatile int cellsBusy;

然后下面看看

Celll

只会原子类AtomicLong不懂Longadder?我怀疑你多线程计数没我快可以看到Striped64中的Cell使用了@jdk.internal.vm.annotation.Contended注解解决伪共享问题(不懂可以看这里)

然后我们在分析一下LongAdder几个核心方法

add

     Cell[] cs; long b, v; int m; Cell c;
        if ((cs = cells) != null || !casBase(b = base, b + x)) {
            boolean uncontended = true;
            //表示 cells没有初始化
            if (cs == null || (m = cs.length - 1) < 0 ||
                (c = cs[getProbe() & m]) == null ||
                !(uncontended = c.cas(v = c.value, v + x)))
                longAccumulate(x, null, uncontended);
        }

我们来一行一行分析 首先是

(cs = cells) != null

cells数组不为null,说明存在竞争;在不存在竞争的时候,cells数组一定为null,一旦对base的cas操作失败,才会初始化cells数组

!casBase(b = base, b + x)

如果cells数组为null,表示之前不存在竞争,并且此次casBase执行成功,表示基于base成员累加成功,add方法直接返回;如果casBase方法执行失败,说明产生了第一次多线程冲突,需要对cells数组初始化,此时即将进入内层if块。


然后我们看看下面的判断条件

cs == null || (m = cs.length - 1) < 0

这里表示cells没有初始化

(c = cs[getProbe() & m]) == null

表示前线程的hash值在cells数组映射位置的Cell对象为空,意思是还没有其他线程在同一个位置做过累加操作

(uncontended = c.cas(v = c.value, v + x))

指当前线程的哈希值在cells数组映射位置的Cell对象不为空,然后在该Cell对象上进行CAS操作,设置其值为v+x(x为该Cell需要累加的值),但是CAS操作失败,表示存在竞争。这里三个判断条件任意一个成立,就调用longAccumulate()方法

longAccumulate()Striped64中最重要的方法,它实现了不同的线程更新各自Cell中的值,实现逻辑类似于分段锁,具体源码如下:

final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended)
 
{
        // 存储线程的probe值
        int h;
        // 如果getProbe()方法返回0,说明随机数未初始化
        if ((h = getProbe()) == 0) {
            ThreadLocalRandom.current(); // force initialization
            h = getProbe();
            wasUncontended = true;
        }
        // 是否可以扩容
        boolean collide = false;                // True if last slot nonempty
        // 自旋,一直到操作成功
        done: for (;;) {
         // cs  cells的引用
            Cell[] cs;
            // 当前线程中命中的 cell
            Cell c;
            // cells 数组长度
            int n;
            //期望值 
            long v;
            // cells已经初始化过
            if ((cs = cells) != null && (n = cs.length) > 0) {
             //当前线程所在的Cell未初始化
                if ((c = cs[(n - 1) & h]) == null) {
                    // 当前无其它线程在创建或扩容cells,也没有线程在创建Cell
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        // 新建一个Cell,值为当前需要增加的值
                        Cell r = new Cell(x);   // Optimistically create
                     // 再次检测cellsBusy,并尝试更新它为1(cas保证线程安全) 相当于当前线程加锁
                        if (cellsBusy == 0 && casCellsBusy()) {
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                // 重新获取cells,并找到当前线程hash到cells数组中的位置
                            // 这里一定要重新获取cells,因为as并不在锁定范围内
                            // 有可能已经扩容了,这里要重新获取
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    // 把上面新建的Cell放在cells的j位置处
                                    rs[j] = r;
                                    break done;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                //当前线程所在的Cell不为空,且更新失败了
             // 设置 wasUncontended 为true 表示已经进行过了自旋
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                // 再次尝试CAS更新当前线程所在Cell的值,如果成功了就返回
                else if (c.cas(v = c.value,
                               (fn == null) ? v + x : fn.applyAsLong(v, x)))
                    break;
                // 如果cells数组的长度达到了CPU核心数,或者cells发生了扩容
                else if (n >= NCPU || cells != cs)
                    collide = false;            // At max size or stale
                //设置扩容意向,但不是真的扩容
                else if (!collide)
                    collide = true;
                // 正真的扩容逻辑
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                     //双重检查,是否已有其他线程扩容
                        if (cells == cs)        // Expand table unless stale
                         // 新数组扩容为原数组的两倍,并把旧数组拷贝到新数组
                            cells = Arrays.copyOf(cs, n << 1);
                    } finally {
                     // 释放锁
                        cellsBusy = 0;
                    }
                    // 标记当前未出现冲突
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                h = advanceProbe(h);
            }
            // cells未初始化,加锁
            else if (cellsBusy == 0 && cells == cs && casCellsBusy()) {
                try {                           // Initialize table
                 // 是否被其他线程初始化了,类似双重检查的单利
                    if (cells == cs) {
                     //新建一个大小为2的Cell数组
                        Cell[] rs = new Cell[2];
                        //找到当前线程hash到数组中的位置并创建其对应的Cell
                        rs[h & 1] = new Cell(x);
                        //赋值给cells数组
                        cells = rs;
                        // 退出循环
                        break done;
                    }
                } finally {
                 // 类似最终释放锁
                    cellsBusy = 0;
                }
            }
            // Fall back on using base
            // 如果有其它线程在初始化cells数组中,就尝试更新base
         // 如果成功了就返回
            else if (casBase(v = base,
                             (fn == null) ? v + x : fn.applyAsLong(v, x)))
                break done;
        }
    }

这段代码的逻辑有点长,需要耐心慢慢看完。大量运用了类似双重简单的方式,下面对上面流程作了一个总结

  1. 如果cells数组未初始化,当前线程会尝试占有cellsBusy锁并创建cells数组
  2. 如果当前线程尝试创建cells数组时,发现有其它线程已经在创建了,就尝试更新base,如果成功就返回
  3. 如果cells数组已经初始化了,就通过线程的probe值找到当前线程所在cells数组的位置
  4. 如果当前线程所在的Cell未初始化,就占有占有cellsBusy锁并在相应的位置创建一个Cell
  5. 尝试CAS更新当前线程所在的Cell,如果成功就返回,如果失败说明出现冲突,修改wasUncontended值自旋
  6. 当前线程更新Cell失败后并不是立即扩容,而是尝试更新probe值后再重试一次
  7. 如果在重试的时候还是更新失败,就扩容
  8. 扩容时当前线程占有cellsBusy锁,并把数组容量扩大到两倍,再迁移原cells数组中元素到新数组中

整体流程还是比较复杂的,推荐大家多看几遍源码,自己整理一份流程图供自己理解,我这里就先不整理了

至此Longadder的核心源码就分析完了。Longadder向我们演示了什么是空间换时间,后续的一些优秀的高性能队列比如Disruptor也是参照了Longadder的实现。

如有刊误,欢迎指出留言,期待一起进步

参考

博客book:《java高并发编程》


原文始发于微信公众号(小奏技术):只会原子类AtomicLong不懂Longadder?我怀疑你多线程计数没我快

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/30410.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!