并发工具—Atomic原子操作类

梦想不抛弃苦心追求的人,只要不停止追求,你们会沐浴在梦想的光辉之中。再美好的梦想与目标,再完美的计划和方案,如果不能尽快在行动中落实,最终只能是纸上谈兵,空想一番。只要瞄准了大方向,坚持不懈地做下去,才能够扫除挡在梦想前面的障碍,实现美好的人生蓝图。并发工具—Atomic原子操作类,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文

《深入理解CAS》中,介绍了可以通过CAS来实现线程安全的共享变量自增操作,但在平时的开发中,我们并不会直接去使用Unsafe类来进行CAS的操作,在多线程情况下对共享变量的操作,Java的java.util.concurrent.atomic包提供了一系列操作简单、性能高效并且能保证线程安全的类

并发工具—Atomic原子操作类

这些类按照操作的变量类型可以分为四大类:

基本类型:AtomicInteger、AtomicBoolean、AtomicLong

引用类型:AtomicReference、AtomicMarkableReference、AtomicStampedReference

数组类型:AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray

对象属性原子修改器:AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater

除了上面这些之外,在JDK1.8,增加了几个更高性能的原子类型累加器:DoubleAccumulator、DoubleAdder、LongAccumulator、LongAdder、Striped64

下面我们分别任选其一来简单介绍一下这四大类型Atomic类的源码

一、基本类型

以AtomicInteger为例

getAndIncrement()方法调用getAndAddInt()方法,该方法通过对象实例和偏移量拿到内存中的值,然后进行CAS操作,如果操作失败就椅子进行自旋,最后返回的是内存中的原始值

public final int getAndIncrement() {
    return unsafe.getAndAddInt(this, valueOffset, 1);
}

public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}

incrementAndGet()方法返回的就是修改后的值

public final int incrementAndGet() {
    return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

以原子方式将输入的值与实例中的原值相加,并返回最后相加后的结果

public final int addAndGet(int delta) {
    return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
}

将实例中的值更改为新值,然后返回旧值

public final int getAndSet(int newValue) {
    return unsafe.getAndSetInt(this, valueOffset, newValue);
}

上面这些方法只要调用了Unsafe的getAndAddInt()方法,在CAS失败时,就会一直自旋,这也就导致了如果在非常大的高并发环境下,这种方式会有很严重的性能问题,而JDK1.8新增的这些原子类型的累加器就是为了解决这个问题的。

二、引用类型

以AtomicReference为例,AtomicReference作用是对普通对象的封装,它可以保证你在修改对象引用时的线程安全性。其他两个应用类型主要用来结局CAS的ABA问题

compareAndSet()方法比较原始对象,设置新的对象

public final boolean compareAndSet(V expect, V update) {
    return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
}
public class AtomicReferenceTest {

    public static void main( String[] args ) {
        User user1 = new User("张三", 23);
        User user2 = new User("李四", 25);
        User user3 = new User("王五", 20);

        //初始化为 user1
        AtomicReference<User> atomicReference = new AtomicReference<>();
        atomicReference.set(user1);

        //把 user2 赋给 atomicReference
        atomicReference.compareAndSet(user1, user2);
        System.out.println(atomicReference.get());

        //把 user3 赋给 atomicReference
        atomicReference.compareAndSet(user1, user3);
        System.out.println(atomicReference.get());

    }
}

@Data
@AllArgsConstructor
class User {
    private String name;
    private Integer age;
}

getAndSet()方法设置新的对象,然后返回原始对象

public final V getAndSet(V newValue) {
    return (V)unsafe.getAndSetObject(this, valueOffset, newValue);
}

public final Object getAndSetObject(Object var1, long var2, Object var4) {
    Object var5;
    do {
        var5 = this.getObjectVolatile(var1, var2);
    } while(!this.compareAndSwapObject(var1, var2, var5, var4));

    return var5;
}

三、数组类型

以AtomicIntegerArray为例

getAndIncrement()方法将数组索引下标为i的元素进行+1,然后返回原值

public final int getAndIncrement(int i) {
    return getAndAdd(i, 1);
}

public final int getAndAdd(int i, int delta) {
    return unsafe.getAndAddInt(array, checkedByteOffset(i), delta);
}

addAndGet()方法将数组索引下标为i的元素加上指定的值delta,然后返回最终的值

public final int addAndGet(int i, int delta) {
    return getAndAdd(i, delta) + delta;
}

compareAndSet()方法根据提供的期望值,对数组下标为i的元素值进行更新

public final boolean compareAndSet(int i, int expect, int update) {
    return compareAndSetRaw(checkedByteOffset(i), expect, update);
}

private boolean compareAndSetRaw(long offset, int expect, int update) {
    return unsafe.compareAndSwapInt(array, offset, expect, update);
}

四、对象属性原子修改器

以AtomicIntegerFieldUpdater为例,AtomicIntegerFieldUpdater可以线程安全地更新对象中的整型变量。

构造方法需要指定对象的类和属性名称

public static <U> AtomicIntegerFieldUpdater<U> newUpdater(Class<U> tclass,
                                                          String fieldName) {
    return new AtomicIntegerFieldUpdaterImpl<U>
        (tclass, fieldName, Reflection.getCallerClass());
}

incrementAndGet()方法首先根据属性的偏移量拿到对象内存中的属性值,然后进行+1操作,最后通过CAS自旋来更新对象的属性值,最后返回更新后的值

public int incrementAndGet(T obj) {
    int prev, next;
    do {
        prev = get(obj);
        next = prev + 1;
    } while (!compareAndSet(obj, prev, next));
    return next;
}

public final int get(T obj) {
    accessCheck(obj);
    return U.getIntVolatile(obj, offset);
}

下面的代码展示了通过三种方式进行递增,最后的值是一样的

public class AtomicIntegerFieldUpdaterTest {

    public static class Candidate {

        volatile int score = 0;

        AtomicInteger score2 = new AtomicInteger();
    }

    public static final AtomicIntegerFieldUpdater<Candidate> scoreUpdater =
        AtomicIntegerFieldUpdater.newUpdater(Candidate.class, "score");

    public static AtomicInteger realScore = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException {

        final Candidate candidate = new Candidate();

        Thread[] t = new Thread[10000];
        for (int i = 0; i < 10000; i++) {
            t[i] = new Thread(new Runnable() {
                @Override
                public void run() {
                    if (Math.random() > 0.4) {
                        candidate.score2.incrementAndGet();
                        scoreUpdater.incrementAndGet(candidate);
                        realScore.incrementAndGet();
                    }
                }
            });
            t[i].start();
        }
        for (int i = 0; i < 10000; i++) {
            t[i].join();
        }
        System.out.println("AtomicIntegerFieldUpdater Score=" + candidate.score);
        System.out.println("AtomicInteger Score=" + candidate.score2.get());
        System.out.println("realScore=" + realScore.get());

    }
}

对于AtomicIntegerFieldUpdater 的使用有一些限制和约束:

  • 对象的属性字段必须是volatile类型的,在线程之间共享变量时立即可见volatile int score = 0
  • 字段的描述类型(修饰符public/protected/default/private)与调用者与操作对象字段的关系一致。也就是说调用者能够直接操作对象字段,那么就可以反射进行原子操作。但是对于父类的字段,子类是不能直接操作的,尽管子类可以访问父类的字段。
  • 只能是实例变量,不能是类变量,也就是说不能加static关键字
  • 只能是可修改变量,即不能使用final变量,因为final的语义是不可修改的。实际上final和volatile是有冲突的,这两个关键字不能同时存在
  • 对于对于AtomicIntegerFieldUpdater和AtomicLongFieldUpdater只能修改int/long类型的字段,不能修改其包装类型(Integer/Long)。如果要修改包装类型就需要使用AtomicReferenceFieldUpdater。

五、DoubleAdder/LongAdder详解

在上面AtomicIntege中介绍过了,它的自增方法使用CAS+自旋的方式来实现,如果在高并发的环境下,N个线程同时执行CAS操作,只有一个会成功,就会出现大量线程失败并不断自旋的情况,此时AtomicIntege/AtomicLong的自旋就会成为瓶颈

这就是DoubleAdder/LongAdder引入的初衷——解决高并发环境下AtomicIntege和AtomicLong的自旋瓶颈问题。

可以通过下面的代码自行测试一下,这两者的区别:

public class LongAdderTest {

    public static void main(String[] args) {
        testAtomicLongVSLongAdder(10, 10000);
        System.out.println("==================");
        testAtomicLongVSLongAdder(10, 200000);
        System.out.println("==================");
        testAtomicLongVSLongAdder(100, 200000);
    }

    static void testAtomicLongVSLongAdder(final int threadCount, final int times) {
        try {
            long start = System.currentTimeMillis();
            testLongAdder(threadCount, times);
            long end = System.currentTimeMillis() - start;
            System.out.println("条件>>>>>>线程数:" + threadCount + ", 单线程操作计数" + times);
            System.out.println("结果>>>>>>LongAdder方式增加计数" + (threadCount * times) + "次,共计耗时:" + end);

            long start2 = System.currentTimeMillis();
            testAtomicLong(threadCount, times);
            long end2 = System.currentTimeMillis() - start2;
            System.out.println("条件>>>>>>线程数:" + threadCount + ", 单线程操作计数" + times);
            System.out.println("结果>>>>>>AtomicLong方式增加计数" + (threadCount * times) + "次,共计耗时:" + end2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    static void testAtomicLong(final int threadCount, final int times) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(threadCount);
        AtomicLong atomicLong = new AtomicLong();
        for (int i = 0; i < threadCount; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < times; j++) {
                        atomicLong.incrementAndGet();
                    }
                    countDownLatch.countDown();
                }
            }, "my-thread" + i).start();
        }
        countDownLatch.await();
    }

    static void testLongAdder(final int threadCount, final int times) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(threadCount);
        LongAdder longAdder = new LongAdder();
        for (int i = 0; i < threadCount; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < times; j++) {
                        longAdder.add(1);
                    }
                    countDownLatch.countDown();
                }
            }, "my-thread" + i).start();
        }

        countDownLatch.await();
    }
}

5.1 LongAdder原理

基本设计思路

AtomicLong中有个内部变量value保存着实际的long值,所有的操作都是针对该变量进行。也就是说,高并发环境下,value变量其实是一个热点,也就是N个线程竞争一个热点。LongAdder的基本思路就是分散热点,将value值分散到一个数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。

并发工具—Atomic原子操作类

如上图所示,LongAdder内部有一个base变量和Cell[]数组:

base变量:非竞态条件下,直接累加到该变量上

Cell[]数组:竞态条件下,直接累加到各个线程自己的槽Cell[i]中

上面这两句话的意思通俗一点儿理解就是,先对base变量进行CAS操作,如果失败了,并不会像AtomicLong自行自旋,而是通过每个线程的hash值找到自己对应的在Cell[]数组中的位置,然后对Cell中的value属性进行CAS操作

// CPU核数,用来决定数组槽的大小
static final int NCPU = Runtime.getRuntime().availableProcessors();
// 数组槽,大小为2的次幂
transient volatile Cell[] cells;

/**
 *  基数,在两种情况下会使用:
 *  1. 没有遇到并发竞争时,直接使用base累加数值
 *  2. 初始化cells数组时,必须要保证cells数组只能被初始化一次(即只有一个线程能对cells初始化),
 *  其他竞争失败的线程会讲数值累加到base上
 */
transient volatile long base;

// Spinlock (locked via CAS) used when resizing and/or creating Cells.
transient volatile int cellsBusy;

下面的源码展示了Cell的结构,可以通过它的cas()方法进行CAS操作,替换原始的value

@sun.misc.Contended static final class Cell {
    volatile long value;
    Cell(long x) { value = x; }
    final boolean cas(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long valueOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> ak = Cell.class;
            valueOffset = UNSAFE.objectFieldOffset
                (ak.getDeclaredField("value"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

5.2 LongAdder的add()方法

在add()方法中,先去判断cells数组是否为空,为空说明处于非静态条件,就去调用casBase()方法对base属性值进行CAS操作

如果cells不为空,或者casBase()方法失败,就是判断cells是否经过了初始化,getProbe()方法用于获取当前线程的threadLocalRandomProbe值,然后对当前cells的长度-1进行按位与操作,得到线程对应的槽位(Cell对象),然后对Cell对象的value值进行CAS操作

如果没有经过初始化或者Cell的CAS操作失败,就去调用longAccumulate()方法,LongAdder继承自Striped64类,longAccumulate()方法是在Striped64中实现的

public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[getProbe() & m]) == null ||
            !(uncontended = a.cas(v = a.value, v + x)))
            longAccumulate(x, null, uncontended);
    }
}

final boolean casBase(long cmp, long val) {
    return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}

add()方法对应的逻辑图如下:

并发工具—Atomic原子操作类

只有从未出现过并发冲突的时候,base基数才会使用到,一旦出现了并发冲突,之后所有的操作都只针对Cell[]数组中的单元Cell。

如果Cell[]数组未初始化,会调用父类的longAccumelate去初始化Cell[],如果Cell[]已经初始化但是冲突发生在Cell单元内,则也调用父类的longAccumelate,此时可能就需要对Cell[]扩容了。

这也是LongAdder设计的精妙之处:尽量减少热点冲突,不到最后万不得已,尽量将CAS操作延迟。

5.3 LongAdder的sum()方法

返回累加的和,也就是当前时刻的计数值 ,在高并发时,除非全局加锁,否则得不到程序运行中某个时刻绝对准确的值 。 此返回值可能不是绝对准确的,因为调用这个方法时还有其他线程可能正在进行计数累加,,方法的返回时刻和调用时刻不是同一个点,在有并发的情况下,这个值只是近似准确的计数值

由于计算总和时没有对Cell数组进行加锁,所以在累加过程中可能有其他线程对Cell中的值进 行了修改,也有可能对数组进行了扩容,所以sum返回的值并不是非常精确的,其返回值并不是 一个调用sum方法时的原子快照值。

public long sum() {
    Cell[] as = cells; Cell a;
    long sum = base;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/153630.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!