Redis 实现分布式锁+执行lua脚本

本篇来看看Redis 实现分布式锁的 步步演进过程 ,包括 setnx -> set -> 过期时间 -> 误删锁 -> uuid控制锁误删-> lua脚本控制删锁的原子性

Redis 实现分布式锁+执行lua脚本

分布式锁,即分布式系统中的锁。在单体应用中我们通过锁解决的是控制共享资源访问的问题,而分布式锁,就是解决了分布式系统中控制共享资源访问的问题。与单体应用不同的是,分布式系统中竞争共享资源的最小粒度从线程升级成了进程。

假设现在在redis中有 5000个 iphone14商品 下面来通过扣减这5000个iphone的案例 来一步步完善分布式的实现,让我们能够更加理解这些改进的原因.

set iphone14 5000

1.无 分布式锁控制

无锁控制的时候 多个线程获取会同时获取到库存 然后进行扣减 会导致并发问题

String stock = stringRedisTemplate.opsForValue().get("iphone14");
if (stock != null) {
// 3.比较并且扣减库存
long stockCount = Long.parseLong(stock);
if (stockCount > 0) {
// 4.设置库存
stringRedisTemplate.opsForValue().set("iphone14", String.valueOf(--stockCount));
}
}

ab测试后发现严重的超卖问题

ab -c 100 -n 5000 http://127.0.0.1:10010/deduct

Redis 实现分布式锁+执行lua脚本

2.使用 setnx 命令

可以通过 redis 的 setnx 命令 来添加锁, 这个命令的意思是 如果key不存在 才设置, 这就模拟了如果别人没抢到锁我就加锁的意思

下面是使用 setnx 命令实现的分布式 看看会有什么问题?

    public void deduct() {
// 1.获取redis 锁

while (Boolean.FALSE.equals(stringRedisTemplate.opsForValue().setIfAbsent("lock", "lockvalue"))) {
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
// 2.获取库存
String stock = stringRedisTemplate.opsForValue().get("iphone14");
if (stock != null) {
// 3.比较并且扣减库存
long stockCount = Long.parseLong(stock);
if (stockCount > 0) {
// 4.设置库存
stringRedisTemplate.opsForValue().set("iphone14", String.valueOf(--stockCount));
}
}
} finally {
// 5.释放锁
stringRedisTemplate.delete("lock");
}

问题 (没有设置过期时间)

可以看到在finally 里面我们进行了释放锁的操作, 但是如果还没执行finally这里就宕机了 那这个lock 锁会一直存在就导致没有释放 产生了死锁, 所以需要给锁添加一个过期时间

改进一

        while (Boolean.FALSE.equals(stringRedisTemplate.opsForValue().setIfAbsent("lock", "lockvalue"))) {
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//给锁 添加过期时间 ???问题: 刚刚要执行这行代码的时候 就宕机了 就会产生死锁
stringRedisTemplate.expire("lock",30,TimeUnit.SECONDS);

这里虽然在获取锁后 去给锁设置了过期时间, 但是如果 刚刚获取锁就宕机了 也会导致死锁

我们会发现需要 在设置锁的时候同时设置过期时间, 需要这2个操作是原子性的, 那么 setnx 命令就不合适了需要使用 set 命令

3. 使用 set 命令替代setnx (保证原子性)

2.6.12版本开始,redis 为SET命令增加了一系列选项:

EX seconds 设置key的过期时间,单位是秒

PX milliseconds 设置key的过期时间,单位是毫秒

NX 只有键key不存在时,才能设置key的值

XX 只有键key存在时,才能设置key的值

set 命令是支持 NX XX 判断的 , NX 代表不存在才设置, 并且同时支持 设置过期时间

对应到 redistemplate 的方法就是 setifAbsent 是NX setIfPresent 对应XX

       //保证了 设置key 和 过期时间 两条命令的 的原子性 
while (Boolean.FALSE.equals(
stringRedisTemplate.opsForValue().setIfAbsent("lock", "lockvalue", 3, TimeUnit.SECONDS))) {
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
// 2.获取库存
String stock = stringRedisTemplate.opsForValue().get("iphone14");
if (stock != null) {
// 3.比较并且扣减库存
long stockCount = Long.parseLong(stock);
if (stockCount > 0) {
// 4.设置库存
stringRedisTemplate.opsForValue().set("iphone14", String.valueOf(--stockCount));
}
}
} finally {
// 5.释放锁
stringRedisTemplate.delete("lock");
}

问题(误删锁)

从上面代码可以发现 如果某个线程获取了锁 并且在执行业务的因为某些原因执行较慢 导致锁已经到了过期时间 自动释放了, 那么 其他线程会获取到锁, 然后第一个线程执行完成后 又会去删除锁,而此时的锁已经是其他线程的锁 , 导致了误删锁的情况

4. 添加UUID 防止误删锁

这里简单演示通过添加 uuid 来防止误删其他线程的锁

  String uuid = UUID.randomUUID().toString();
while (Boolean.FALSE.equals(
stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 3, TimeUnit.SECONDS))) {
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
.....
finally {
// 5.释放锁 对锁进行了 uuid 的判断 判断是否是自己的锁
String lockValue = stringRedisTemplate.opsForValue().get("lock");
if (lockValue.equals(uuid)) {
stringRedisTemplate.delete("lock");
}
}

问题(误删锁 原子性问题)

可以发现还是有问题 , 虽然我们比较了 锁是不是自己的, 但是 还是有可能在刚刚比较完成 equals 进去后 里面锁失效了 又被其他线程添加了新的锁, 此时还是会有误删的可能

5. Lua 脚本 控制删除锁 的原子性

redis给lua 脚本留了口子通过 eval 命令运行lua脚本来 原子性执行lua脚本里的逻辑

lua 脚本多的就不介绍了, 只需要知道看懂下面的代码即可

 if  10 > 11 
then return '10>11'
elseif 10 > 9
then return '10>9'
else
return 'nil'
end

Redis 实现分布式锁+执行lua脚本

同时 在lua里也可以调用 redis的命令 通过 redis.call(‘set’, ‘name’, ‘johnny’)

Redis 实现分布式锁+执行lua脚本

好了不介绍了 下面来看看 如何通过lua脚本来控制 删除锁的原子性

5.1 lua 脚本删除锁逻辑

if redis.call('get' , 'lock') == uuid 
then
redis.call('del' , 'lock')
return 1
else
return 0
end

假如 此时 redis中 lock 锁的 uuid = 91dbc829-d44e-4f03-96d8-95a06f3ff975

转化成 可以执行的

if redis.call('get' , KEYS[1]) == ARGV[1] then  redis.call('del' , KEYS[1])  return 1 else  return 0 end 1 lock 91dbc829-d44e-4f03-96d8-95a06f3ff975**

Redis 实现分布式锁+执行lua脚本

5.2 redistemplate 执行lua脚本实现

 private final String deleteLockLua =
"if redis.call('get' , KEYS[1]) == ARGV[1] then redis.call('del' , KEYS[1]) return 1 else return 0 end";

public void deduct() {
// 1.获取redis 锁
String uuid = UUID.randomUUID().toString();
while (Boolean.FALSE.equals(
stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 3, TimeUnit.SECONDS))) {
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
// 2.获取库存
String stock = stringRedisTemplate.opsForValue().get("iphone14");
if (stock != null) {
// 3.比较并且扣减库存
long stockCount = Long.parseLong(stock);
if (stockCount > 0) {
// 4.设置库存
stringRedisTemplate.opsForValue().set("iphone14", String.valueOf(--stockCount));
}
}
} finally {
//通过 DefaultRedisScript 来执行 lua脚本
DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>();
//Boolean 对应 lua脚本返回的 0 1
redisScript.setResultType(Boolean.class);
//指定需要执行的 lua脚本
redisScript.setScriptText(deleteLockLua);
// 5.释放锁
//注意 需要提供 List<K> keys, Object... args 代表 keys 和 ARGV
stringRedisTemplate.execute(redisScript, Collections.singletonList("lock"), uuid);
}
}

压测一下发现正常控制了库存

总结

本篇主要一步步演进手写redis分布式锁的实现, 包括 setnx -> set -> 过期时间 -> 误删锁 -> uuid控制锁误删-> lua脚本控制删锁的原子性等等.. 其实目前还有问题, 包括锁续期问题 以及redis 可重入锁的问题 有机会在完善吧

需要注意 redis 中如何使用 lua脚本的, 因为一些原子性操作就是需要lua脚本来控制 包括 redission 框架也是通过lua脚本实现的.


原文始发于微信公众号(Johnny屋):Redis 实现分布式锁+执行lua脚本

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/105513.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!