如果你觉得内容对你有帮助的话,不如给个赞,鼓励一下更新😂。
加油啊!新的一天,新的开始😘
CAS
说到CAS,我们就得提起通常所说的并发包也就是 java.util.concurrent(JUC)及其子包,它集中了 Java 并发的各种基础工具类,具体主要包括几个方面:
- 提供了比 synchronized 更加高级的各种同步结构 CountDownLatch、Semaphore 等,可以实现更加丰富的多线程操作,比如 Semaphore 作为资源控制器,可以限制同时进行工作的线程数量。
- 各种线程安全的容器,比如面试常问到的 ConcurrentHashMap或者通过类似快照机制,实现线程安全的动态数组 CopyOnWriteArrayList 等。
- 各种并发队列实现,如各种 BlockedQueue 实现,比较典型的 ArrayBlockingQueue、 SynchorousQueue 或针对特定场景的 优先队列 PriorityBlockingQueue 等。
- 强大的 Executor 框架,可以创建各种不同类型的线程池,调度任务运行等,绝大部分情况下,不再需要自己从头实现线程池和任务调度器。
如果想要读懂 Java 中的并发包,其核心就是要先读懂 CAS 机制,因为 CAS 在并发包中可以说是随处可见,这篇文章我们就来简单学习一下CAS
神秘的CAS?
CAS全称Compare and swap,字面意思就是比较并交换,一个 CAS 涉及到以下操作:
我们假设内存中的原数据为V,旧的预期值为A,需要修改的新值为B。
1、比较 A 与 V 是否相等。
2、如果比较相等,将 B 写入 V。
3、否则,可能出现不同的选择,要么进行重试(自旋),要么就返回一个成功或者失败的结果。
当多个线程同时对某个资源进行CAS操作,只能有一个线程操作成功,但是并不会阻塞其他线程,其他线程只会收到操作失败的信号。所以 CAS 其实是一个乐观锁。
补充:什么是乐观锁?
乐观锁就是在你操作数据的时候认为不加锁也没事,然后我们可以先不加锁,当出现了冲突的时候,我们在想办法解决。
CAS 的实现
文章开头我有提到提高JUC这个包,其实在 Java实习生面试复习(八):volatile的学习 文章中我也有提及这个特殊的类,就是以Atomic开头的支持原子性操作的类,比如AtomicInteger,它是对 int 类型的一个封装,提供原子性的访问和更新操作,其原子性操作的实现是基于 CAS(compare-and-swap)技术。
那么它又是怎么实现的CAS呢?跟随AtomInteger的代码我们一路往下 => 从 AtomicInteger 的内部属性可以看出,它依赖于 Unsafe 提供的一些底层能力,进行底层操作;以 volatile 的 value 字段,记录数值,以保证可见性。
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;
具体的原子操作细节,可以参考AtomInteger中的 getAndIncrement。其底层就是Unsafe 会利用 value 字段的内存地址偏移,直接完成操作。
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
也就是说,这个 CAS 的方法是使用了本地的方法。所以这几个方法的具体实现就需要我们自己去 jdk 的源码中搜索了。
再往下,,我也不会了,哈哈😭,菜哭,以后有机会再深入看看。
CAS 的应用
了解完 CAS 的原理我们继续看看 CAS 的相关应用
并发修改的情况下,使用CAS版本号机制保证数据的一致性
模拟的场景如下:
- 先从用户表中读取到用户的余额
- 在内存中对余额进行+1操作
- 把+1后的余额写到DB
建表SQL
CREATE TABLE `user` (
`id` int(11) NOT NULL COMMENT 'id',
`balance` int(255) DEFAULT NULL COMMENT '余额',
`version` int(255) DEFAULT NULL COMMENT '版本号',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_croatian_ci;
INSERT INTO `demo`.`user`(`id`, `balance`, `version`) VALUES (1, 0, 0);
Main 启动 100个线程对user表中,id为1的balance字段进行: 读取 , +1 ,写入的操作
import java.sql.DriverManager;
import java.sql.SQLException;
public class Main {
private static final String MYSQL_URL = "jdbc:mysql://localhost:3306/demo?serverTimezone=GMT%2b8";
private static final String USER = "root";
private static final String PASS = "root";
public static void main(String[] args) {
Service service = new Service();
for (int x = 0; x < 100; x++) {
new Thread(() -> {
try {
service.increment(DriverManager.getConnection(MYSQL_URL, USER, PASS));
} catch (SQLException e) {
e.printStackTrace();
}
}).start();
}
}
static {
try {
Class.forName("com.mysql.cj.jdbc.Driver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
}
Service
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
public class Service {
private static final int USER_ID = 1;
// cas 乐观锁
public void incrementCas(Connection connection) throws SQLException {
connection.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);
while (true) {
connection.setAutoCommit(false);
// 从DB检索到余额和版本号
PreparedStatement preparedStatement = connection.prepareStatement("SELECT `balance` ,`version` FROM `user` WHERE `id` = ?;");
preparedStatement.setInt(1, USER_ID);
ResultSet resultSet = preparedStatement.executeQuery();
int balance = 0;
int version = 0;
if (resultSet.next()) {
balance = resultSet.getInt("balance");
version = resultSet.getInt("version");
}
// +1后写入到DB
preparedStatement = connection.prepareStatement("UPDATE `user` SET `balance` = ? ,`version` = `version` + 1 WHERE `id` = ? AND `version` = ?;");
preparedStatement.setInt(1, balance + 1);
preparedStatement.setInt(2, USER_ID);
preparedStatement.setInt(3, version); // 版本号
int result = preparedStatement.executeUpdate();
connection.commit();
if(result != 0) {
break;
}
// 更新失败,再次进入循环
}
}
}
简单自旋锁实现
自旋锁,在lock()的时候,一直while()循环,直到 cas 操作成功为止。
扩展:这里还可以考虑那种能够根据线程最近获得锁的状态来调整循环次数的自旋锁,我们称之为自适应自旋锁,因为普通的自旋锁会由于自旋的次数,出现过度消耗 CPU的问题。
public class SpinLock {
private AtomicReference<Thread> sign =new AtomicReference<>();
public void lock(){
Thread current = Thread.currentThread();
while(!sign .compareAndSet(null, current)){
}
}
public void unlock (){
Thread current = Thread.currentThread();
sign .compareAndSet(current, null);
}
}
CAS 缺点
谁偷偷更改了我的值(ABA问题)
著名的ABA问题。前面提到 CAS 是在更新时比较前值,如果对方只是恰好相同,例如期间发生了 A -> B -> A 的更新,仅仅判断数值是 A,可能导致不合理的修改操作。针对这种情况,Java 提供了 AtomicStampedReference 工具类,通过为引用建立类似版本号(stamp)的方式,来保证 CAS 的正确性。
对于基本类型的值来说,这种把数字改变了在改回原来的值是没有太大影响的,但如果是对于引用类型的话,就会产生很大的影响了。
那么我们怎么解决?
为了解决这个 ABA 的问题,我们可以引入版本控制,例如,每次有线程修改了引用的值,就会进行版本的更新,虽然两个线程持有相同的引用,但他们的版本不同,这样,我们就可以预防 ABA 问题了。
Java 中提供了 AtomicStampedReference 这个类,就可以进行版本控制了,我们可以看个简单的案例。
/**
* @author 南街
* @program JavaAdvanced
* @classname AtomicStampedReferenceDemo
* @description 版本号原子引用,可用于解决ABA问题
**/
public class AtomicStampedReferenceDemo {
private static AtomicReference<Integer> atomicReference = new AtomicReference<>(100);
private static AtomicStampedReference<Integer> stampedReference = new AtomicStampedReference<Integer>(100,1);;
public static void main(String[] args) {
System.out.println("==========以下是ABA问题的产生==============");
new Thread(()->{
atomicReference.compareAndSet(100,101);
atomicReference.compareAndSet(101,100);
},"t1").start();
Thread t2 = new Thread(() -> {
// 暂停一秒钟t2线程,保证上面的t1线程完成一次ABA操作
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(atomicReference.compareAndSet(100, 200) + "\t" + atomicReference.get());
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t2");
t2.start();
while (t2.isAlive()) {
}
System.out.println("==========以下是ABA问题的解决==============");
new Thread(() -> {
System.out.println("第一次版本号stamp = " + stampedReference.getStamp());
// 暂停3秒钟t3线程
try {
TimeUnit.SECONDS.sleep(3);
stampedReference.compareAndSet(100, 101, stampedReference.getStamp(),
stampedReference.getStamp() + 1);
System.out.println("第二次版本号stamp = " + stampedReference.getStamp());
stampedReference.compareAndSet(101, 100, stampedReference.getStamp(),
stampedReference.getStamp() + 1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("第三次版本号stamp = " + stampedReference.getStamp());
},"t3").start();
new Thread(() -> {
int stamp = stampedReference.getStamp();
System.out.println("t4第一次版本号stamp = " + stamp);
// 暂停5秒钟t4线程
try {
TimeUnit.SECONDS.sleep(5);
System.out.println("t4修改:" + stampedReference.compareAndSet(100, 2019, stamp,
stamp + 1));
System.out.println("stampedReference.getReference() = " + stampedReference.getReference());
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t4").start();
}
}
嘿嘿,运行结果的话,自己不妨复制下来跑跑看,程序还是要多实践滴🐱👓!
总结
- CAS 是 Java 并发中所谓 lock-free(无锁) 机制的基础。
- CAS 的使用能够避免线程的阻塞。
- 多数情况下我们使用的是 while true 直到成功为止,也叫自旋锁,在自旋锁上我们可以优化出自适应自旋锁。
- 使用 AtomicStampedReference 解决ABA问题。
这篇文章到这就结束啦,我们下期📝再见(我也不知道啥时候🤣),喜欢的话就给个赞 + 收藏 + 关注吧!
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/15206.html