什么是分布式锁?Zookeeper和Redis是如何实现的?

导读:本篇文章讲解 什么是分布式锁?Zookeeper和Redis是如何实现的?,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

什么是分布式锁?

在分布式服务中,服务都是多实例部署。在高并发访问的情况下,容易出现数据的不一致。以用户下单的场景为例,用户下单时,如果连续的点击下单的操作,可能在某一时刻多次下单请求被转发到不同的订单服务实例。这时,该用户可能会生成多个订单,流程如下:

什么是分布式锁?Zookeeper和Redis是如何实现的?

这时,需要使用分布式锁在某个运行点对服务进行同步控制,在该时刻只有一个服务才能下单成功。流程如下:

什么是分布式锁?Zookeeper和Redis是如何实现的?

分布式锁应该具备哪些条件?

  1. 在分布式系统环境下,一个方法在同一时间只能被一个服务的一个线程执行。
  2. 具备可重入特性,同一个线程可以多次竞争到这个锁。
  3. 具备锁失效机制,即拿到分布式锁的服务即使服务宕机,服务不可用,该分布式锁会自动失效,防止其他未得到服务不能拿到该分布式锁而出现死锁。流程如图:

什么是分布式锁?Zookeeper和Redis是如何实现的?

    4.具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败。

常见的分布式锁实现方案

curator 基于zookeeper实现的分布式锁

curator是Netfix公司开源的一套zookeeper客户端框架,提供了分布式锁服务接口InterProcessLock,接口定义如下:

public interface InterProcessLock
{
    /**
     * 阻塞获取锁
     */
    public void acquire() throws Exception;

    /**
     * 在指定的时间内尝试获取锁
     */
    public boolean acquire(long time, TimeUnit unit) throws Exception;

    /**
     * 释放锁
     */
    public void release() throws Exception;

    /**
     * 是否被占用
     */
    boolean isAcquiredInThisProcess();
}

其子类包括,InterProcessMutex,可重入的互斥锁;InterProcessMultiLock,类似容器的锁;InterProcessReadWriteLock,读写锁;InterProcessSemaphoreMutex,不可重入的互斥锁。InterProcessMutex(可重入的互斥锁)可以实现分布式锁的场景,curator基于zookeeper临时顺序节点与watcher事件特性实现。

在InterProcessMutex中,根据分布式锁特性1和特性2,在一个服务中,一个线程可重复的获取互斥锁,获取的逻辑如下:

public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex>
{
 /**
     * 获取锁
     */
public void acquire() throws Exception
    {
        if ( !internalLock(-1, null) )
        {
            throw new IOException("Lost connection while trying to acquire lock: " + basePath);
        }
    }

 private boolean internalLock(long time, TimeUnit unit) throws Exception
    {
         // 一个服务中的当前线程
        Thread currentThread = Thread.currentThread();
        // 根据当前线程获取锁
        LockData lockData = threadData.get(currentThread);
        if ( lockData != null )
        {
            // 重入获取锁,增加获取锁的次数
            lockData.lockCount.incrementAndGet();
            return true;
        }
        // 在规定时间内创建临时顺序节点
        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        if ( lockPath != null )
        {
            LockData newLockData = new LockData(currentThread, lockPath);
            threadData.put(currentThread, newLockData);
            return true;
        }
        // 没有创建序号最小的临时节点,则获取锁失败
        return false;
    }
}

curator使用zookeeper的临时顺序节点机制(如果此时zk客户端服务宕机,服务不可用,zk服务器端检测到zk客户端如果与其断开链接,就会删除该临时节点)创建指定路径节点。源码如下:

// LockInternals.class
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes)
        throws Exception {
        final long startMillis = System.currentTimeMillis();
        final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
        final byte[] localLockNodeBytes = (revocable.get() != null)
            ? new byte[0] : lockNodeBytes;
        int retryCount = 0;

        String ourPath = null;
        boolean hasTheLock = false;
        boolean isDone = false;

        while (!isDone) {
            isDone = true;

            try {
                // 生成临时顺序节点
                ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
                // 获取序号最小的节点
                hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
            } catch (KeeperException.NoNodeException e) {

                // 根据重试机制创建节点失败
                if (client.getZookeeperClient().getRetryPolicy()
                              .allowRetry(retryCount++,
                            System.currentTimeMillis() - startMillis,
                            RetryLoop.getDefaultRetrySleeper())) {
                    isDone = false;
                } else {
                    throw e;
                }
            }
        }
        // 如果是序号最小的临时节点则获取锁成功
        if (hasTheLock) {
            return ourPath;
        }

        return null;
    }


    // StandardLockInternalsDriver.class
    public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
    {
        String ourPath;
        // 判断节点数据是否为空,生成节点
        // CreateMode.EPHEMERAL_SEQUENTIAL即为临时顺序节点
        if ( lockNodeBytes != null )
        {
            
            ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
        }
        else
        {
            ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
        }
        return ourPath;
    }

// LockInternals.class 循环获取锁
 private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
    {
        boolean     haveTheLock = false;
        boolean     doDelete = false;
        try
        {
            if ( revocable.get() != null )
            {
                client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
            }

            while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
            {
                // 获取所有顺序节点
                List<String>        children = getSortedChildren();
                String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
                // 根据最小序号获取锁
                PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                if ( predicateResults.getsTheLock() )
                {
                    haveTheLock = true;
                }
                else
                {
                    // 获取锁实现,对前一个节点进行监听,如果前一个节点数据变化则触发watcher事件,唤起阻塞线程
                    String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

                    synchronized(this)
                    {
                        try
                        {
// 监听previousSequencePath 节点                 client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                            if ( millisToWait != null )
                            {
                                millisToWait -= (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                if ( millisToWait <= 0 )
                                {
                                    doDelete = true;    // timed out - delete our node
                                    break;
                                }

                                wait(millisToWait);
                            }
                            else
                            {
                                wait();
                            }
                        }
                        catch ( KeeperException.NoNodeException e )
                        {
                            // it has been deleted (i.e. lock released). Try to acquire again
                        }
                    }
                }
            }
        }
        catch ( Exception e )
        {
            ThreadUtils.checkInterrupted(e);
            doDelete = true;
            throw e;
        }
        finally
        {
            if ( doDelete )
            {
                deleteOurPath(ourPath);
            }
        }
        return haveTheLock;
    }


    // LockInternals.class 唤醒线程的Watcher
    private final Watcher watcher = new Watcher()
    {
        @Override
        public void process(WatchedEvent event)
        {
            client.postSafeNotify(LockInternals.this);
        }
    };

    // CuratorFramework.class 具体的唤醒逻辑
  default CompletableFuture<Void> postSafeNotify(Object monitorHolder)
    {
        return runSafe(() -> {
            synchronized(monitorHolder) {
                monitorHolder.notifyAll();
            }
        });
    }

curator释放分布式锁,或根据当前获取锁的线程判断,是否是当前线程在执行释放锁操作。并且,检查重入锁是否已经完全释放,最后才可以真正的释放分布式锁。源码如下:

public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex>{
   
 @Override
    public void release() throws Exception {
        // 释放锁的当前线程
        Thread currentThread = Thread.currentThread();
        LockData lockData = threadData.get(currentThread);
        // 如果释放锁的当前线程没有锁则抛出异常
        if (lockData == null) {
            throw new IllegalMonitorStateException("You do not own the lock: " +
                basePath);
        }

        int newLockCount = lockData.lockCount.decrementAndGet();
        // 检查重入锁次数
        if (newLockCount > 0) {
            return;
        }

        if (newLockCount < 0) {
            throw new IllegalMonitorStateException(
                "Lock count has gone negative for lock: " + basePath);
        }
        // 重入锁次数为0时,执行释放锁逻辑
        try {
            internals.releaseLock(lockData.lockPath);
        } finally {
            threadData.remove(currentThread);
        }
    }
}

使用redis基于StringRedisTemplate实现分布式锁

基于redis实现分布式锁的业务代码需要自己实现。在我实现中,定义在规定时间内获取锁和释放锁的方法,接口定义如下:

    /**
     * 超时获取锁
     *
     * @param condition 获取锁条件
     */
    boolean tryLock(TryLockPrams condition);

    /**
     * 释放锁
     *
     * @param key   key
     * @param owner 锁的owner
     */
    boolean unlock(String key, String owner);

在竞争获取锁时,使用redis的set方法(原子性的操作key,value和失效时间,以及操作类型),使redis获取锁的操作具有原子性,并且具备锁失效机制,防止死锁。此种方式具备排他性,但是不具备可重入性。set方法具体代码如下:

Boolean set(byte[] key, byte[] value, Expiration expiration, SetOption option);

竞争获取锁时,根据用户设置的时间设置,在规定的时间内尝试获取锁。详细代码如下:

@Override
    public boolean tryLock(TryLockPrams prams) {

        String key;// 设置锁的key
        String owner;// 拥有锁的标识
        long expireTime;// 锁失效的时间
        TimeUnit expireTimeUnit;// 锁失效的时间单位
        if (StringUtils.isBlank(key = prams.getKey())
                || StringUtils.isBlank(owner = prams.getOwner())
                || (expireTime = prams.getExpireTime()) <= 0
                || Objects.isNull(expireTimeUnit = prams.getExpireTimeUnit())) {

            log.warn("prams [{}] is invalid!", JSON.toJSONString(prams));
            return false;
        }
        long startTime = System.currentTimeMillis();
        long tryLockMaxTime = prams.getTryLockMaxTime();
        long executeTime;
        do {
            try {
                RedisSerializer<String> keySerializer = (RedisSerializer<String>) redisTemplate.getKeySerializer();
                RedisSerializer<String> valueSerializer = (RedisSerializer<String>) redisTemplate.getValueSerializer();
                boolean success = redisTemplate.execute((RedisCallback<Boolean>) 
// 调用set方法进行原子性,并且具有时效性的设置
connection -> connection.set(
                        keySerializer.serialize(key),
                        valueSerializer.serialize(owner),
                        Expiration.from(expireTime, expireTimeUnit),
                        RedisStringCommands.SetOption.ifAbsent()));
                // 设置成功
                if (success) {
                    // 守护线程监控该key的失效时间,如果在即将失效时,锁还未释放,则增加锁的失效时间
                    prams.setDeadLine(System.currentTimeMillis() + expireTimeUnit.toMillis(expireTime));
                    extendExpiredTimeMap.putIfAbsent(key, prams);
                    return success;
                }
            } catch (Exception e) {

                log.warn("set if absent error!key {},value {},expireTime {}", key, owner, expireTime);
            }
        }
        while ((executeTime = System.currentTimeMillis() - startTime) < expireTimeUnit.toMillis(tryLockMaxTime));

        log.info("try lock error!prams {},executeTime {},expireTimeUnit {}",
                JSON.toJSONString(prams), executeTime, expireTimeUnit.name());
        return false;
    }

释放锁时,使用lua脚本实现,保证操作的原子性。实现逻辑是,根据key获取的值与传入的对于的value值进行比较,如果相同则删除,不删除则操作失败。一般业务场景中,key为业务数据对应的标识(例如,限制单人的重复操作,则key为用户id),value为存储当前操作线程的标识。详细代码如下:

   /**
     * 解锁lua脚本
     */
    private static final String LUA_UNLOCK = "if redis.pcall('get', KEYS[1]) == ARGV[1] then return redis.pcall('del', KEYS[1]) else return 0 end";

@Override
    public boolean unlock(String key, String owner) {

        Long result = redisTemplate.execute((RedisCallback<Long>) connection ->
                connection.eval(LUA_UNLOCK.getBytes(), ReturnType.INTEGER, 1, key.getBytes(), owner.getBytes()));
        boolean success = Objects.nonNull(result) && result > 0;
        if (success) {
            // 移除延长失效时间数据
            extendExpiredTimeMap.remove(key);
        } else {
            log.warn("unlock error!key {},owner {}", key, owner);
        }
        return success;
    }

由于redis是设置的指定失效时间来保证锁的失效性,即获取锁的服务宕机服务不可用时,在指定的失效到期后锁会释放。但是,如果在指定的时间内,服务的逻辑还未执行完毕,锁就自动释放了,这种情况如何避免呢?可以使用ScheduledExecutorService定时执行重新设置失效时间的任务,避免获取锁的服务还未执行完毕就释放锁。详细代码如下:

  public RedisLock() {
        scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
        extendExpiredTimeMap = new ConcurrentHashMap<>();
        init();
    }

    private void init() {
        scheduledExecutorService.scheduleAtFixedRate(this, 0, executingTimePeriod, TimeUnit.SECONDS);
    }


/**
     * 定时任务,对即将过期的key再次设置失效时间
     */
    @Override
    public void run() {

        if (log.isDebugEnabled()) {
            log.debug("execute extend expired time task!extend expired time map size {}", extendExpiredTimeMap.size());
        }
        if (extendExpiredTimeMap.size() <= 0) {
            return;
        }
        List<TryLockPrams> tryLockPrams = new LinkedList<>();
        for (ConcurrentHashMap.Entry<String, TryLockPrams> entry : extendExpiredTimeMap.entrySet()) {
            // 校验参数
            if (!isValid(entry) || !isExpiringTime(entry.getValue())) {
                continue;
            }
            tryLockPrams.add(entry.getValue());
            if (tryLockPrams.size() > 0 && 0 == tryLockPrams.size() % PIPELINE_LIMIT) {

                setIfPresentPipeline(tryLockPrams);
                tryLockPrams.clear();
            }
        }
        if (tryLockPrams.size() > 0) {

            setIfPresentPipeline(tryLockPrams);
            tryLockPrams.clear();
        }
    }

/**
     * 在缓存中即将过期数据才会重新设置失效时间,降低操作redis的频率
     *
     * @param tryLockPrams
     * @return
     */
    private boolean isExpiringTime(TryLockPrams tryLockPrams) {

        long deadLine = tryLockPrams.getDeadLine();
        long current = System.currentTimeMillis();
        long validTime = deadLine - current;
        if (BigDecimal.valueOf(validTime).divide(BigDecimal.valueOf(TimeUnit.SECONDS.toMillis(executingTimePeriod))).intValue() <= 2) {

            if (log.isDebugEnabled()) {
                log.debug("tryLockPrams [{}] is expiring!current is {}", JSON.toJSONString(tryLockPrams), current);
            }
            tryLockPrams.setDeadLine(current + tryLockPrams.getExpireTimeUnit().toMillis(tryLockPrams.getExpireTime()));
            return true;
        } else {
            return false;
        }
    }

    private boolean isValid(ConcurrentHashMap.Entry<String, TryLockPrams> entry) {

        TryLockPrams prams;
        if (Objects.isNull(prams = entry.getValue())
                || StringUtils.isBlank(prams.getKey())
                || StringUtils.isBlank(prams.getOwner())
                || (prams.getExpireTime()) <= 0
                || Objects.isNull(prams.getExpireTimeUnit())) {

            log.warn("prams [{}] is invalid!", JSON.toJSONString(prams));
            return false;
        }
        return true;
    }

    private void setIfPresentPipeline(List<TryLockPrams> tryLockPrams) {
        redisTemplate.execute((RedisCallback<Object>) connection -> {

            // 开启Pipeline
            connection.openPipeline();
            for (TryLockPrams tryLockPram : tryLockPrams) {
                redisTemplate.opsForValue().setIfPresent(tryLockPram.getKey(), tryLockPram.getOwner(),
                        tryLockPram.getExpireTime(), tryLockPram.getExpireTimeUnit());
            }
            // 关闭Pipeline
            connection.closePipeline();
            return null;
        });
    }

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/13644.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!