在《深入理解CAS》中,介绍了可以通过CAS来实现线程安全的共享变量自增操作,但在平时的开发中,我们并不会直接去使用Unsafe类来进行CAS的操作,在多线程情况下对共享变量的操作,Java的java.util.concurrent.atomic
包提供了一系列操作简单、性能高效并且能保证线程安全的类
这些类按照操作的变量类型可以分为四大类:
基本类型:AtomicInteger、AtomicBoolean、AtomicLong
引用类型:AtomicReference、AtomicMarkableReference、AtomicStampedReference
数组类型:AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray
对象属性原子修改器:AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater
除了上面这些之外,在JDK1.8,增加了几个更高性能的原子类型累加器:DoubleAccumulator、DoubleAdder、LongAccumulator、LongAdder、Striped64
下面我们分别任选其一来简单介绍一下这四大类型Atomic类的源码
一、基本类型
以AtomicInteger为例
getAndIncrement()方法调用getAndAddInt()方法,该方法通过对象实例和偏移量拿到内存中的值,然后进行CAS操作,如果操作失败就椅子进行自旋,最后返回的是内存中的原始值
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
incrementAndGet()方法返回的就是修改后的值
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
以原子方式将输入的值与实例中的原值相加,并返回最后相加后的结果
public final int addAndGet(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
}
将实例中的值更改为新值,然后返回旧值
public final int getAndSet(int newValue) {
return unsafe.getAndSetInt(this, valueOffset, newValue);
}
上面这些方法只要调用了Unsafe的getAndAddInt()方法,在CAS失败时,就会一直自旋,这也就导致了如果在非常大的高并发环境下,这种方式会有很严重的性能问题,而JDK1.8新增的这些原子类型的累加器就是为了解决这个问题的。
二、引用类型
以AtomicReference为例,AtomicReference作用是对普通对象的封装,它可以保证你在修改对象引用时的线程安全性。其他两个应用类型主要用来结局CAS的ABA问题
compareAndSet()方法比较原始对象,设置新的对象
public final boolean compareAndSet(V expect, V update) {
return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
}
public class AtomicReferenceTest {
public static void main( String[] args ) {
User user1 = new User("张三", 23);
User user2 = new User("李四", 25);
User user3 = new User("王五", 20);
//初始化为 user1
AtomicReference<User> atomicReference = new AtomicReference<>();
atomicReference.set(user1);
//把 user2 赋给 atomicReference
atomicReference.compareAndSet(user1, user2);
System.out.println(atomicReference.get());
//把 user3 赋给 atomicReference
atomicReference.compareAndSet(user1, user3);
System.out.println(atomicReference.get());
}
}
@Data
@AllArgsConstructor
class User {
private String name;
private Integer age;
}
getAndSet()方法设置新的对象,然后返回原始对象
public final V getAndSet(V newValue) {
return (V)unsafe.getAndSetObject(this, valueOffset, newValue);
}
public final Object getAndSetObject(Object var1, long var2, Object var4) {
Object var5;
do {
var5 = this.getObjectVolatile(var1, var2);
} while(!this.compareAndSwapObject(var1, var2, var5, var4));
return var5;
}
三、数组类型
以AtomicIntegerArray为例
getAndIncrement()方法将数组索引下标为i
的元素进行+1,然后返回原值
public final int getAndIncrement(int i) {
return getAndAdd(i, 1);
}
public final int getAndAdd(int i, int delta) {
return unsafe.getAndAddInt(array, checkedByteOffset(i), delta);
}
addAndGet()方法将数组索引下标为i
的元素加上指定的值delta
,然后返回最终的值
public final int addAndGet(int i, int delta) {
return getAndAdd(i, delta) + delta;
}
compareAndSet()方法根据提供的期望值,对数组下标为i
的元素值进行更新
public final boolean compareAndSet(int i, int expect, int update) {
return compareAndSetRaw(checkedByteOffset(i), expect, update);
}
private boolean compareAndSetRaw(long offset, int expect, int update) {
return unsafe.compareAndSwapInt(array, offset, expect, update);
}
四、对象属性原子修改器
以AtomicIntegerFieldUpdater为例,AtomicIntegerFieldUpdater可以线程安全地更新对象中的整型变量。
构造方法需要指定对象的类和属性名称
public static <U> AtomicIntegerFieldUpdater<U> newUpdater(Class<U> tclass,
String fieldName) {
return new AtomicIntegerFieldUpdaterImpl<U>
(tclass, fieldName, Reflection.getCallerClass());
}
incrementAndGet()方法首先根据属性的偏移量拿到对象内存中的属性值,然后进行+1操作,最后通过CAS自旋来更新对象的属性值,最后返回更新后的值
public int incrementAndGet(T obj) {
int prev, next;
do {
prev = get(obj);
next = prev + 1;
} while (!compareAndSet(obj, prev, next));
return next;
}
public final int get(T obj) {
accessCheck(obj);
return U.getIntVolatile(obj, offset);
}
下面的代码展示了通过三种方式进行递增,最后的值是一样的
public class AtomicIntegerFieldUpdaterTest {
public static class Candidate {
volatile int score = 0;
AtomicInteger score2 = new AtomicInteger();
}
public static final AtomicIntegerFieldUpdater<Candidate> scoreUpdater =
AtomicIntegerFieldUpdater.newUpdater(Candidate.class, "score");
public static AtomicInteger realScore = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
final Candidate candidate = new Candidate();
Thread[] t = new Thread[10000];
for (int i = 0; i < 10000; i++) {
t[i] = new Thread(new Runnable() {
@Override
public void run() {
if (Math.random() > 0.4) {
candidate.score2.incrementAndGet();
scoreUpdater.incrementAndGet(candidate);
realScore.incrementAndGet();
}
}
});
t[i].start();
}
for (int i = 0; i < 10000; i++) {
t[i].join();
}
System.out.println("AtomicIntegerFieldUpdater Score=" + candidate.score);
System.out.println("AtomicInteger Score=" + candidate.score2.get());
System.out.println("realScore=" + realScore.get());
}
}
对于AtomicIntegerFieldUpdater 的使用有一些限制和约束:
- 对象的属性字段必须是volatile类型的,在线程之间共享变量时立即可见
volatile int score = 0
- 字段的描述类型(修饰符public/protected/default/private)与调用者与操作对象字段的关系一致。也就是说调用者能够直接操作对象字段,那么就可以反射进行原子操作。但是对于父类的字段,子类是不能直接操作的,尽管子类可以访问父类的字段。
- 只能是实例变量,不能是类变量,也就是说不能加static关键字
- 只能是可修改变量,即不能使用final变量,因为final的语义是不可修改的。实际上final和volatile是有冲突的,这两个关键字不能同时存在
- 对于对于AtomicIntegerFieldUpdater和AtomicLongFieldUpdater只能修改int/long类型的字段,不能修改其包装类型(Integer/Long)。如果要修改包装类型就需要使用AtomicReferenceFieldUpdater。
五、DoubleAdder/LongAdder详解
在上面AtomicIntege中介绍过了,它的自增方法使用CAS+自旋的方式来实现,如果在高并发的环境下,N个线程同时执行CAS操作,只有一个会成功,就会出现大量线程失败并不断自旋的情况,此时AtomicIntege/AtomicLong的自旋就会成为瓶颈
这就是DoubleAdder/LongAdder引入的初衷——解决高并发环境下AtomicIntege和AtomicLong的自旋瓶颈问题。
可以通过下面的代码自行测试一下,这两者的区别:
public class LongAdderTest {
public static void main(String[] args) {
testAtomicLongVSLongAdder(10, 10000);
System.out.println("==================");
testAtomicLongVSLongAdder(10, 200000);
System.out.println("==================");
testAtomicLongVSLongAdder(100, 200000);
}
static void testAtomicLongVSLongAdder(final int threadCount, final int times) {
try {
long start = System.currentTimeMillis();
testLongAdder(threadCount, times);
long end = System.currentTimeMillis() - start;
System.out.println("条件>>>>>>线程数:" + threadCount + ", 单线程操作计数" + times);
System.out.println("结果>>>>>>LongAdder方式增加计数" + (threadCount * times) + "次,共计耗时:" + end);
long start2 = System.currentTimeMillis();
testAtomicLong(threadCount, times);
long end2 = System.currentTimeMillis() - start2;
System.out.println("条件>>>>>>线程数:" + threadCount + ", 单线程操作计数" + times);
System.out.println("结果>>>>>>AtomicLong方式增加计数" + (threadCount * times) + "次,共计耗时:" + end2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
static void testAtomicLong(final int threadCount, final int times) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(threadCount);
AtomicLong atomicLong = new AtomicLong();
for (int i = 0; i < threadCount; i++) {
new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < times; j++) {
atomicLong.incrementAndGet();
}
countDownLatch.countDown();
}
}, "my-thread" + i).start();
}
countDownLatch.await();
}
static void testLongAdder(final int threadCount, final int times) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(threadCount);
LongAdder longAdder = new LongAdder();
for (int i = 0; i < threadCount; i++) {
new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < times; j++) {
longAdder.add(1);
}
countDownLatch.countDown();
}
}, "my-thread" + i).start();
}
countDownLatch.await();
}
}
5.1 LongAdder原理
基本设计思路:
AtomicLong中有个内部变量value保存着实际的long值,所有的操作都是针对该变量进行。也就是说,高并发环境下,value变量其实是一个热点,也就是N个线程竞争一个热点。LongAdder的基本思路就是分散热点,将value值分散到一个数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。
如上图所示,LongAdder内部有一个base变量和Cell[]数组:
base变量:非竞态条件下,直接累加到该变量上
Cell[]数组:竞态条件下,直接累加到各个线程自己的槽Cell[i]中
上面这两句话的意思通俗一点儿理解就是,先对base变量进行CAS操作,如果失败了,并不会像AtomicLong自行自旋,而是通过每个线程的hash值找到自己对应的在Cell[]数组中的位置,然后对Cell中的value
属性进行CAS操作
// CPU核数,用来决定数组槽的大小
static final int NCPU = Runtime.getRuntime().availableProcessors();
// 数组槽,大小为2的次幂
transient volatile Cell[] cells;
/**
* 基数,在两种情况下会使用:
* 1. 没有遇到并发竞争时,直接使用base累加数值
* 2. 初始化cells数组时,必须要保证cells数组只能被初始化一次(即只有一个线程能对cells初始化),
* 其他竞争失败的线程会讲数值累加到base上
*/
transient volatile long base;
// Spinlock (locked via CAS) used when resizing and/or creating Cells.
transient volatile int cellsBusy;
下面的源码展示了Cell的结构,可以通过它的cas()方法进行CAS操作,替换原始的value
值
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
5.2 LongAdder的add()方法
在add()方法中,先去判断cells数组是否为空,为空说明处于非静态条件,就去调用casBase()方法对base属性值进行CAS操作
如果cells不为空,或者casBase()方法失败,就是判断cells是否经过了初始化,getProbe()方法用于获取当前线程的threadLocalRandomProbe
值,然后对当前cells的长度-1进行按位与操作,得到线程对应的槽位(Cell对象),然后对Cell对象的value值进行CAS操作
如果没有经过初始化或者Cell的CAS操作失败,就去调用longAccumulate()方法,LongAdder继承自Striped64类,longAccumulate()方法是在Striped64中实现的
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
final boolean casBase(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}
add()方法对应的逻辑图如下:
只有从未出现过并发冲突的时候,base基数才会使用到,一旦出现了并发冲突,之后所有的操作都只针对Cell[]数组中的单元Cell。
如果Cell[]数组未初始化,会调用父类的longAccumelate去初始化Cell[],如果Cell[]已经初始化但是冲突发生在Cell单元内,则也调用父类的longAccumelate,此时可能就需要对Cell[]扩容了。
这也是LongAdder设计的精妙之处:尽量减少热点冲突,不到最后万不得已,尽量将CAS操作延迟。
5.3 LongAdder的sum()方法
返回累加的和,也就是当前时刻
的计数值 ,在高并发时,除非全局加锁,否则得不到程序运行中某个时刻绝对准确的值 。 此返回值可能不是绝对准确的,因为调用这个方法时还有其他线程可能正在进行计数累加,,方法的返回时刻和调用时刻不是同一个点,在有并发的情况下,这个值只是近似准确的计数值
由于计算总和时没有对Cell数组进行加锁,所以在累加过程中可能有其他线程对Cell中的值进 行了修改,也有可能对数组进行了扩容,所以sum返回的值并不是非常精确的,其返回值并不是 一个调用sum方法时的原子快照值。
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/153630.html