一、背景
本文一方面是抱怨一下现下的面试体验,另一方面看一下redisson(3.15.1)的trylock api的实现(我也不喜欢针对面试去看源码,只是想找个出口宣泄一下)。
笔者是因为喜欢写代码才干的java开发这一行,但是现实很残酷,要找一个技术氛围好的团队愉快的写代码并不是一件容易的事情,从作为应聘者的角度只能看到功利的八股文。
最近陆陆续续面试了一些大厂和中厂,在沟通顺利的情况下(双方在同一个频道上),八股文基本上没什么问题(至少我自己认为是这样的),但是面试体验并不是很好。
举个简单的例子,最近面试了一个中厂,对方在谈到分布式锁的时候,问我们分布式锁是怎么用的。
我:比如我们在xxx业务场景下需要用到分布式锁,防止在并发场景下同一个单据在内存中的数据计算逻辑错误。我们用了redisson这个工具,在redis官方的单机锁的基础上(set nx px 加锁,lua脚本解锁),利用watchdog解决续约问题。比如trylock 60秒,他会默认在锁过期前续约30秒。
面试官:trylock如果设置了超时时间,他会开启watchdog吗?
我:默认就会。
面试官:设置了超时时间,他默认不会开启watchdog。
我:怎么会呢?肯定会啊。
面试官:那你们一直都是用带超时时间的api吗?
我:我们都用默认的无参的tryLock,刚才我tryLock60秒只是随便举个例子。
面试官:你去看过源码吗?
我:这个redisson的源码我倒是没有看过,毕竟是一个工具,没遇到问题的时候不会去追溯源码。
我不知道对方的想法是什么,我比较介意的是针对一个工具的api问底层源码,毕竟不是每个人摸鱼的时候都会点进redissonclient的(我承认我点进去过,但是不是为了背诵来面试,只是好奇)。在我看来工具是工具,只要你思路是对的,就没有问题,看源码只是解决问题和培养解决问题思路的一种方式。
此外,redisson在我们团队内部被引入其实是因为某个开发的个人行为,他说我们以前的用法不对(redis官方的单机锁),要用redisson,我也无法反驳,那就用吧。
但是这位开发引入新组件也没有去看官方文档,导致解锁用法有误,这部分代码一直在线上,因为也没什么严重的影响(后续的开发过来直接复制粘贴,错可以一直错下去)。
RLock lock = redissonClient.getLock(key);
try {
if (lock.tryLock()) {
// 业务方法
}
} finally {
if (lock.isLocked() && lock.isHeldByCurrentThread()) { // ?
lock.unlock();
}
}
主要错误的地方在于lock.isLocked和lock.isHeldByCurrentThread会增加两次对redis的远程调用。
在不看redisson的官方文档的情况下,只看JDK中Lock接口的java doc,我们就可以看出tryLock和unlock应该怎么用。
很明显,在tryLock成功的情况下,try/catch包裹所有业务逻辑,而不是在tryLock之外通过try/catch包裹。Redisson既然实现了JDK的Lock接口,就不会改变他的语义。
本身团队内部的规范是直接使用的redis官方的单机分布式锁,没有watchdog,至于为什么没用redisson主要原因有几个:
-
必要性:以前台接口为例,如果用redis官方的单机锁(set nx px 加锁,lua脚本解锁)超时时间设置为60秒,你说你业务超过60秒,tomcat一共就200线程,你这种业务存在,一个接口响应时间在60秒,直接就可能导致tomcat线程池打满夯住,你还没到需要watchdog续约的情况就挂了;
-
系统复杂度:在不理解原理的情况下,我不会引入额外的中间件或者工具,一方面是出问题不好排查,另一方面越多的中间件和工具会导致系统越复杂,非必要不引入;
-
团队:团队成员对开发对中间件和工具的认知不足,引入新的组件会提高开发的门槛,我们不能要求团队里每个人都对所有的中间件了如指掌,也不能对于每一行改动的代码都做代码审查;
二、redisson客户端
我们先从自动配置入手,所有的api调用都是由RedissonClient发起的。
@Bean(
destroyMethod = "shutdown"
)
@ConditionalOnMissingBean({RedissonClient.class})
public RedissonClient redisson() throws IOException {
Config config = null;
// 装载Config配置,比如连接的
return Redisson.create(config);
}
Redisson.create根据Config配置创建RedissonClient的实现Redisson。
public static RedissonClient create(Config config) {
Redisson redisson = new Redisson(config);
if (config.isReferenceEnabled()) {
redisson.enableRedissonReferenceSupport();
}
return redisson;
}
Redisson在构造时new了三个对象:
-
ConnectionManager:负责底层通讯,不看底层了,直接给答案就是Netty
-
EvictionScheduler:不知道是什么,也不想追究
-
WriteBehindService:不知道是什么,也不想追究
public class Redisson implements RedissonClient {
static {
RedissonReference.warmUp();
}
protected final QueueTransferService queueTransferService = new QueueTransferService();
protected final EvictionScheduler evictionScheduler;
protected final WriteBehindService writeBehindService;
protected final ConnectionManager connectionManager;
protected final ConcurrentMap<Class<?>, Class<?>> liveObjectClassCache = new ConcurrentHashMap<>();
protected final Config config;
protected final ConcurrentMap<String, ResponseEntry> responses = new ConcurrentHashMap<>();
protected Redisson(Config config) {
this.config = config;
Config configCopy = new Config(config);
connectionManager = ConfigSupport.createConnectionManager(configCopy);
evictionScheduler = new EvictionScheduler(connectionManager.getCommandExecutor());
writeBehindService = new WriteBehindService(connectionManager.getCommandExecutor());
}
}
三、getLock
Redisson.getLock将ConnectionManager里的CommandExecutor带进RedissonLock,目的肯定是为了RedissonLock可以调用底层通讯方法请求redis。name对应的就是我们的要锁的对象的标识,比如订单号。
@Override
public RLock getLock(String name) {
return new RedissonLock(connectionManager.getCommandExecutor(), name);
}
RedissonLock
-
internalLockLeaseTime:watchDog的超时时间,默认30s;
-
pubsub:暂时不知道有啥用,感觉是用到了redis的pubsub;
public class RedissonLock extends RedissonBaseLock {
protected long internalLockLeaseTime;
protected final LockPubSub pubSub;
final CommandAsyncExecutor commandExecutor;
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}
}
RedissonBaseLock是RedissonLock的基类
-
id:ConnectionManager的id,UUID.randomUUID().toString(),如果一个进程只有一个RedissonClient,那么可以认为这个id在进程内唯一,后续我们就称id是进程id;
-
entryName:进程id拼接业务对象标识name,比如UUID.randomUUID().toString()+”:”+订单号;
-
internalLockLeaseTime:watchDog的超时时间,默认30s;
protected long internalLockLeaseTime;
final String id;
final String entryName;
final CommandAsyncExecutor commandExecutor;
public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.id = commandExecutor.getConnectionManager().getId();
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
this.entryName = id + ":" + name;
}
RedissonObject是RedssionClient构造出来的大多数对象的基类,也是RedissonBaseLock的基类,没什么特别的。
public abstract class RedissonObject implements RObject {
protected final CommandAsyncExecutor commandExecutor;
protected String name;
protected final Codec codec;
public RedissonObject(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
this.codec = codec;
this.name = name;
this.commandExecutor = commandExecutor;
if (name == null) {
throw new NullPointerException("name can't be null");
}
}
public RedissonObject(CommandAsyncExecutor commandExecutor, String name) {
this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name);
}
}
四、tryLock无参方法
无参的tryLock底层调用tryAcquireOnceAsync返回一个Future,再通过get阻塞等待Future完成。
tryAcquireOnceAsync有四个入参:
-
waitTime:如果当前其他人获取了锁,你愿意等多少时间;
-
leaseTime:如果你获取了锁,你想要多久自动释放;
-
unit:针对leaseTime的单位;
-
threadId:当前线程id;
针对无参tryLock,waitTime和leaseTime都是-1。
waitTime=-1,意味着不等待,如果当前锁被其他人占用,快速返回false;
leaseTime=-1,意味深长,如果是-1,代表启用自动续期watchDog,如果不是-1,代表用户自己控制锁过期时间,比如60秒。为什么这么说,继续看源码。
@Override
public boolean tryLock() {
return get(tryLockAsync());
}
@Override
public RFuture<Boolean> tryLockAsync() {
return tryLockAsync(Thread.currentThread().getId());
}
@Override
public RFuture<Boolean> tryLockAsync(long threadId) {
return tryAcquireOnceAsync(-1, -1, null, threadId);
}
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) {
// case1
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
}
// case2
RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
if (ttlRemaining) {
// watchDog
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
无论leaseTime和waitTime的赋值如何,都会先调用tryLockInnerAsync。只是针对leaseTime=-1的情况,还要在tryLockInnerAsync获取锁成功后,执行scheduleExpirationRenewal。
tryLockInnerAsync是个核心方法,就是执行lua脚本,参数如下:
-
KEYS[1]:我们Redisson.getLock传入的name,代表需要锁定的业务对象,比如订单号;
-
ARGV[1]:leaseTime,锁自动释放时间,比如无参tryLock这里是internalLockLeaseTime,就是watchDog的续期时间,如果是用户传入leaseTime,那就是用户指定锁定过期时间,对于redis来说就是key的ttl;
-
ARGV[2]:进程id拼接threadid;
整体lua脚本不算复杂,redisson采用map来存储获取锁成功的客户端id(id+threadId)和重入次数,如果获取成功,返回nil,如果获取失败,返回当前获取锁的剩余ttl。至于为什么是map,还没看到原因
RedisCommands.EVAL_NULL_BOOLEAN会解析,如果返回nil代表true获取锁成功,如果返回非nil代表false获取锁失败。
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
protected String getLockName(long threadId) {
return id + ":" + threadId;
}
ok由于我们调用的是tryLock的无参方法,leaseTime是-1,对于资源key的超时时间默认就是30秒,如果获取锁成功,接下来还要开启watchDog。
RedissonBaseLock.scheduleExpirationRenewal,构造一个ExpirationEntry放到全局map中,key是entryName(进程id+订单号)将当前线程id放入,并执行renewExpiration。
private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP = new ConcurrentHashMap<>();
protected void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
renewExpiration();
}
}
protected String getEntryName() {
return entryName;
}
重点来了,watchDog在renewExpiration被提交,按照internalLockLeaseTime/3,也就是默认30s/3=10s延迟执行renew延长当前线程获取锁的时间,直到当前锁被释放取消这个Timeout。也就是说watchDog默认在获取锁成功的10秒后,就会执行续期。
这个延迟任务Timeout是Netty的实现,底层是Netty实现的HashedWheelTimer时间轮。
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
if (res) {
// reschedule itself
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
renew逻辑也比较简单,判断是否是当前线程持有锁,如果是的话按照internalLockLeaseTime设置新的过期时间。
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
五、tryLock两个参数方法
两个参数的tryLock方法也是实现了JDK的Lock接口,是指定waitTime,就是如果其他人获取锁了,最多允许等待waitTime时间获取锁,否则返回false。底层调用tryLock三个参数方法,相应的leaseTime还是-1,代表要走watchDog逻辑。
@Override
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
return tryLock(waitTime, -1, unit);
}
带waitTime的tryLock主要逻辑就两部分:
-
tryAcquire竞争锁;
-
订阅一个channel,接收key过期消息(那就是unlock的时候,会publish一个消息),底层通过JDK的Semaphore实现同步,这也是RedissonLock的LockPubSub的作用,用于发布锁定key过期和接收锁定key过期;
以上逻辑都建立在不超过waitTime的基础之上。
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
// 1. 竞争锁
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
current = System.currentTimeMillis();
// 2. 利用pubsub订阅一个channel,为了接收锁被unlock的消息
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
unsubscribe(subscribeFuture, threadId);
}
});
}
acquireFailed(waitTime, unit, threadId);
return false;
}
try {
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
while (true) {
// 3. 竞争锁
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// 4. 等待锁过期消息,重新进入3
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
// 5. 取消订阅
unsubscribe(subscribeFuture, threadId);
}
}
tryAcquire的逻辑都和tryAcquireOnceAsync一致,只是这里会返回key的过期时间ttl。
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
发布订阅channel=redisson_lock__channel:{锁定业务对象,如订单号}。
protected final LockPubSub pubSub;
protected RFuture<RedissonLockEntry> subscribe(long threadId) {
return pubSub.subscribe(getEntryName(), getChannelName());
}
String getChannelName() {
return prefixName("redisson_lock__channel", getName());
}
public static String prefixName(String prefix, String name) {
if (name.contains("{")) {
return prefix + ":" + name;
}
return prefix + ":{" + name + "}";
}
六、tryLock三个参数方法
其实tryLock两个参数方法,底层调用的就是tryLock三个参数方法,只不过leaseTime是-1。
而三个参数的tryLock方法,顶层接口不是JDK的Lock,而是Redissson的RLock。
/**
* Tries to acquire the lock with defined <code>leaseTime</code>.
* Waits up to defined <code>waitTime</code> if necessary until the lock became available.
*
* Lock will be released automatically after defined <code>leaseTime</code> interval.
*
* @param waitTime the maximum time to acquire the lock
* @param leaseTime lease time
* @param unit time unit
* @return <code>true</code> if lock is successfully acquired,
* otherwise <code>false</code> if lock is already set.
* @throws InterruptedException - if the thread is interrupted
*/
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
七、unlock
有始有终吧,把unlock也写了。
其实根据前面lock的逻辑,unlock也大致能猜到。
-
lua脚本解锁:要注意判断自己的id是否是当前获取锁成功的客户端id;要注意重入逻辑;
-
无论使用什么方式加锁,解锁都要publish对应key删除的消息到channel=redisson_lock__channel:{锁定业务对象,如订单号};
-
取消watchDog;
@Override
public void unlock() {
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
}
@Override
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise<>();
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
// 3. 取消watchDog(HashTimeWheel里的那个timeout)
cancelExpirationRenewal(threadId);
if (e != null) {
result.tryFailure(e);
return;
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
result.trySuccess(null);
});
return result;
}
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + // 判断是否是自己上的锁
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + // 减少重入次数
"if (counter > 0) then " + // 如果重入次数仍然大于0
"redis.call('pexpire', KEYS[1], ARGV[2]); " + // renew续租
"return 0; " +
"else " + // 如果重入次数为0
"redis.call('del', KEYS[1]); " + // 删除key
"redis.call('publish', KEYS[2], ARGV[1]); " + // 发布解锁消息
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
八、总结
本文分析了Redisson分布式锁tryLock api的实现,我们这里看的只是独占可重入的RedissonLock实现,其实Redisson不仅仅支持这种独占分布式锁,还支持许多其他的高级功能,只不过用到的机会比较少(我没用到过)。
对于tryLock api有两个重要的参数:
-
waitTime:代表最多等待多久获取锁,如果设置为-1,获取锁失败后直接返回false;如果设置非-1,底层会利用redis的pubsub监听解锁消息,在未超时的情况下获取锁;
-
leaseTime:代表锁自动过期的时间,如果设置为-1,代表开启watchDog,只要业务还在进行,就会自动续期;如果设置非-1,由leaseTime决定锁过期时间,不会开启watchDog;
回过头来说,面试官问的trylock如果设置了超时时间,他会开启watchdog吗?该怎么回答?
waitTime是你所谓的超时时间吗,还是leaseTime是你所谓的超时时间?如果是前者,是两个参数的tryLock方法,那么watchDog会开启;如果是后者,面试官是你对了。
so what?这吗?
原文始发于微信公众号(程序猿阿越):Redisson分布式锁实现原理
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/214785.html