一、前言
一年前,写了一篇有瑕疵的博文 Redis分布式锁原理及实现 。这篇博文最后虽然给出了redis实现分布式锁的方式,但是在并发相当高的情况下,比如
Requests per second: 1453.85 [#/sec] (mean)
情况下,如果出现了一次锁超时,那么,之后的请求会有极大的概率一直持续处在被锁的状态,即出现死锁。
经过不断查资料以及实践检测,最终,得出了Redis使用单个实例下锁的正确实现
。
二、原理
有效方式使用分布式锁所需的最低保证:
- 安全属性:相互排斥。在任何给定时刻,只有一个客户端可以持有锁。
- 活力属性A:无死锁。最终,即使锁定资源的客户端崩溃或被分区,也始终可以获取锁定。
- 活力属性B:容错。只要大多数Redis节点启动,客户端就能够获取和释放锁。
由于介绍的是单实例下的Redis锁,所以 活力属性B 暂不考虑。
获得锁定
要获得锁定,可以采用以下方法:
SET resource_name my_random_value NX PX 30000
该命令仅在密钥尚不存在时才设置密钥(NX选项),到期时间为30000毫秒(PX选项)。密钥设置为“我的随机值”值。此值必须在所有客户端和所有锁定请求中都是唯一的。
以安全的方式释放锁
使用一个告诉Redis的脚本:仅当密钥存在且存储在密钥上的值恰好是我期望的值时才删除密钥。这是通过以下Lua脚本完成的:
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
这一点很重要,以避免删除由另一个客户端创建的锁。仅使用DEL是不安全的,因为客户端可能会删除另一个客户端的锁定。
三、java 实现
老话说得好,Talk is cheap ,show me the code!
接下来看看其java实现。
导入依赖并配置基础信息
导入依赖:
<!-- 本项目是 springboot 2.0.4.RELEASE 下得实现 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
配置基础信息:
spring:
redis:
# Redis数据库索引(默认为0)
database: 0
# Redis服务器地址
host: 127.0.0.1
# Redis服务器连接端口
port: 6379
# Redis 密码
password: require_pass
jedis:
pool:
# 连接池中的最小空闲连接
min-idle: 100
# 连接池中的最大空闲连接
max-idle: 500
# 连接池最大连接数(使用负值表示没有限制)
max-active: 2000
# 连接池最大阻塞等待时间(使用负值表示没有限制)
max-wait: 10000
# 连接超时时间(毫秒)
timeout: 0
配置JedisPool
新建类RedisConfig如下:
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
@Configuration
@EnableCaching
public class RedisConfig extends CachingConfigurerSupport {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private int port;
@Value("${spring.redis.password}")
private String auth;
@Value("${spring.redis.timeout}")
private int timeout;
@Value("${spring.redis.jedis.pool.max-idle}")
private int maxIdle;
@Value("${spring.redis.jedis.pool.max-wait}")
private long maxWaitMillis;
@Bean
public JedisPool redisPoolFactory() {
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxIdle(maxIdle);
jedisPoolConfig.setMaxWaitMillis(maxWaitMillis);
if(auth ==null || auth.equals("")){
return new JedisPool(jedisPoolConfig, host, port, timeout);
}else{
return new JedisPool(jedisPoolConfig, host, port, timeout,auth);
}
}
}
核心加锁和解锁方法
创建以下抽象RedisService :
import cc.mrbird.common.domain.RedisInfo;
import java.util.List;
import java.util.Map;
import java.util.Set;
public interface RedisService {
/**
* setnx expire
* @param key
* @param value
* @param nxxx setnx
* @param expx expire
* @param time
* @return
*/
String set(String key, String value, String nxxx, String expx, int time);
/**
* 执行redis脚本
* @param script 脚本
* @param keys 键值
* @param args 参数
* @return
*/
Object eval(String script, List<String> keys, List<String> args);
}
具体实现如下:
@Service("redisService")
public class RedisServiceImpl implements RedisService {
@Autowired
JedisPool jedisPool;
/**
* 处理jedis请求
*
* @param f 处理逻辑,通过lambda行为参数化
* @return 处理结果
*/
private Object excuteByJedis(Function<Jedis, Object> f) {
try (Jedis jedis = jedisPool.getResource()) {
return f.apply(jedis);
} catch (Exception e) {
log.error(e.getMessage());
return null;
}
}
@Override
public String set(String key, String value, String nxxx, String expx, int time) {
return (String) this.excuteByJedis(j -> j.set(key, value, nxxx, expx, time));
}
@Override
public Object eval(String script, List<String> keys, List<String> args) {
return this.excuteByJedis(j -> j.eval(script, keys, args));
}
}
有了以上准备,下面就可以创建一个工具类,专门用来处理Redis锁,如下:
@Component
@Slf4j
public class RedisLockUtils {
/**
* 用来表示 setnx 的参数
*/
private static final String SET_IF_NOT_EXIST = "NX";
/**
* EX = seconds(秒); PX = milliseconds(毫秒)
*/
private static final String SET_WITH_EXPIRE_TIME = "EX";
/**
* 释放锁成功返回值
*/
private static final Long RELEASE_SUCCESS = 1L;
/**
* 加锁成功返回值
*/
private static final String LOCK_SUCCESS = "OK";
/**
* 超时时间 10s,单位是由 {@code SET_WITH_EXPIRE_TIME }
*/
public static final int TIMEOUT = 10;
/**
* 常量前缀
*/
private static final String REDIS_LOCK_KEY_PREFIX = "redis_lock_key_prefix";
/**
* 常量连接符
*/
private static final String REDIS_LOCK_PLUS = "@";
/**
* 可用 key前缀
*/
public static final String REDIS_LOCK_KEY = REDIS_LOCK_KEY_PREFIX + REDIS_LOCK_PLUS;
@Autowired
RedisService redisService;
/**
* 生成分布式锁密钥
*
* @param key
* @return
*/
public String generateLockKey(String key) {
if (StringUtils.isBlank(key)) {
return "";
}
return REDIS_LOCK_KEY + MD5Utils.encrypt(key);
}
/**
* 尝试获取分布式锁
*
* @param lockKey 锁
* @param requestId 请求标识
* @param expireTime 超期时间
* @return 是否获取成功 true:成功获取锁;false:未获取锁资格
*/
public boolean tryGetDistributedLock(String lockKey, String requestId, int expireTime) {
String result = redisService.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
if (LOCK_SUCCESS.equals(result)) {
return true;
}
return false;
}
/**
* 释放分布式锁
*
* @param lockKey 锁
* @param requestId 请求标识
* @return 是否释放成功 true:手动解锁成功;false:手动解锁失败
*/
public boolean releaseDistributedLock(String lockKey, String requestId) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = redisService.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
if (RELEASE_SUCCESS.equals(result)) {
return true;
}
log.info("解锁失败,解锁用户:{}, 锁值为:{}", requestId, lockKey);
return false;
}
}
到这,就可以愉快得撸代码了。
四、拓展
以上方法实现的redis锁,每次只有一个用户占用锁资源,这样会照成当一个用户A获取锁执行业务逻辑时,其他的用户都不能执行这块的业务逻辑,只有当A执行完成后,其他用户再次请求过来时,才有机会和获取到锁资格去执行相应的业务逻辑。
下面我有一个业务场景,比如发红包,如果用上面这样的锁机制,先点击“抢红包”按钮的用户,也不一定能比后点击“抢红包”按钮的用户先抢到红包。因为先点击的用户可能此时锁正被占用,而后点击的后点击的用户 可能这时候锁正好释放被他给碰上了。所以这样情况,以上的锁机制是不符合这个抢红包的逻辑的。
那么应该怎么办呢???其实也很简单。利用redis的单线程特性和其api中的 DECR
或是INCR
可以实现。
下面来看一看基础 命令:
INCR key
简述:
-
为键 key 储存的数字值加上一。
-
如果键 key 不存在, 那么它的值会先被初始化为 0 , 然后再执行 INCR 命令。
-
如果键 key 储存的值不能被解释为数字, 那么 INCR 命令将返回一个错误。
-
本操作的值限制在 64 位(bit)有符号数字表示之内。
-
INCR 命令是一个针对字符串的操作。 因为 Redis 并没有专用的整数类型, 所以键 key 储存的值在执行 INCR 命令时会被解释为十进制 64 位有符号整数。
返回值: INCR 命令会返回键 key 在执行加一操作之后的值。
代码示例:
redis> SET page_view 20
OK
redis> INCR page_view
(integer) 21
redis> GET page_view # 数字值在 Redis 中以字符串的形式保存
"21"
DECR key
简述:
-
为键 key 储存的数字值减去一。
-
如果键 key 不存在, 那么键 key 的值会先被初始化为 0 , 然后再执行 DECR 操作。
-
如果键 key 储存的值不能被解释为数字, 那么 DECR 命令将返回一个错误。
-
本操作的值限制在 64 位(bit)有符号数字表示之内。
返回值: DECR 命令会返回键 key 在执行减一操作之后的值。
代码示例:
对储存数字值的键 key 执行 DECR 命令:
redis> SET failure_times 10
OK
redis> DECR failure_times
(integer) 9
对不存在的键执行 DECR 命令:
redis> EXISTS count
(integer) 0
redis> DECR count
(integer) -1
测试
在上面的RedisServiceImpl 类中添加一个方法:
public Long incr(String key) {
return (Long)this.excuteByJedis(j -> j.incr(key));
}
测试方法逻辑如下:
public String testIncr() {
long tmp = 0;
if ((tmp = redisService.incr("numkey")) > 100) { //100是红包数量,假定限制只能 发100个
return "已经发完了!!!";
}
System.out.println("第" + tmp + “抢到红包的幸运儿!”) ;
return "成功发出一个红包!";
}
这样处理后,那么可以保证先点击的前一百个用户可以抢到红包,后面点击的用户则不会抢到红包了。
ps:你也可以使用 DECR 执行减法操作来计数,同时还可以利用set方法设置一个初值,并给定一个红包过期时间,留给读者发挥,这里就不赘述了。
参考文档:
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/15995.html