基于redis的分布式锁 RedissonLock实现分析

在分布式锁的实现当中,都是通过另一个中央服务来存储相应的状态,来达到一个应用分布处理的目的。这里分析了一种通过redis来进行锁实现的细节,以描述在整个实现细节中的处理点。

通常在锁实现当中, 都要实现获取锁,等待锁,释放锁这几种关键的业务场景。然后在这几种场景的基础之上,还需要实现更多的语义,比如过期时间,等待时间等。通过redis的setNx可以达到获取锁的语义,因此大多数的实现均是采用这种手法来进行锁判断和处理(类似的手法还包括concurrentHashMap的putIfAbsent)

本文基于redission版本1.2.0,类RedissionLock.

线程间协作

1. 获取锁

获取锁即通过redis的setNx命令来实现,此命令的意义即仅当相应的值不存在时,才能设置成功。如果设置成功的话,即认为当前能够获取到相应的锁了。相应的主要代码如下所示:

Boolean res = connection.setnx(getName(), currentLock);

其中name即可认为是lock的一个内部表示名字,其中多个线程共享同一个lock,即在操作命令时会使用同一个name值。

2. 等待锁

在第一步的代码中,只会有1个线程获取到相应的锁,其它的线程则会进行等待。这里面,首先是先获取当前的状态为不能获取到锁的,然后再尝试进行等待处理。

在第1步中,我们已经设置失败,即表示已经有其它线程(进程)获取锁了,因此需要返回相应的状态。在redission中,是通过返回相应的ttl值来表示的。(TTL的目的为了实现waitTime语义),如下所示:

                    Long ttl = connection.pttl(getName());
                    return ttl;

在接下来的代码中,因为已经有其它线程(进程)获取到锁了,这里要监听相应的锁变化,即监听在redis中的值是否发生了变化。比较常用的手法是查看相应的值是否存在(或者是否被删除),但这种方法需要不断地进行轮询,对于redis的压力很大。另一种方法即是建立相应的监听器,当锁释放进进行通知,这种方法即对应于redis的发布/订阅模型。通过在redis中订阅相应的通道,当释放时获取相应的消息即可。相应的主要代码如下所示:

        //这里进行信息订阅,即监听相应的数据是否发生了变化(如锁被释放了)
        subscribe().awaitUninterruptibly();

        try {
            while (true) {
                    //这里重新进行获取,因为在这期间可能其它锁已被释放了,而这里的监听并没有监听到
                    ttl = tryLockInner();

                if (ttl == null) {
                    break;//成功获取,
                }

                // waiting for message
                RedissonLockEntry entry = ENTRIES.get(getEntryName());
               //这里采用了Semaphore信号量来进行进行相应的许可获取
                entry.getLatch().acquire();
            }
        } finally {
          //因为已经获取到锁了,因此这里一定要释放相应的监听(已经不需要监听了) 备注:这里不一定是取消监听,只不过是表示在相应锁的关注上少了一个关注对象。取消监听一定要没有线程关注此锁才可以。因此redission是共用监听器,同一个channel只使用同一个监听器.
            unsubscribe();
        }

3. 释放锁

释放锁即将相应的redis key删除掉,同时需要去通知其它线程(进程)。这里即通过删除+publish来实现。如下所示:

            connection.multi();
            connection.del(getName());
            connection.publish(getChannelName(), unlockMessage);
            List<Object> res = connection.exec();

这里redission采用了multi+exec,更好地保证redis执行的原子性,主要的保证在于需要同时将name删掉,同时还要发送相应的通知信息。

4. 重新获得锁

因为第3步发送了相应的通知,在之前等待的锁的订阅当中,即会订阅到相应的通知信息。那么,一旦获得了相应的通知,即表示之前的锁已经被释放,那么相应的信号量许可即可release,并且相应的等待线程即可重新进行获取锁的操作。如下代码所示:

            public void message(String channel, Object message) {
                if (message.equals(unlockMessage) && getChannelName().equals(channel)) {
                    value.getLatch().release();//释放许可
                }
            }

在第2步的操作中,采取了while 重试的方式,这是一种典型的处理方式,防虚假唤醒或者再次也获取不到锁的情况(为什么,因为并发及多进程).然后整个步骤再次循环,一直到获取到锁为止。

waitTime实现

redission是实现了javaSE中的Lock接口的,因此有类似tryLock(waitTime)的实现,即如果在多久内获取不到锁,则返回false的情况。

首先看 tryLock()不带参数的实现,实际上就是在上节中提到的tryInnerLock,如果尝试调用setNx失败,也就是直接返回false了。

然后是带参数的time,在上节的整个获取过程中,需要等待的场景主要有2个地方,一是在订阅时处理,二是在等待锁的时候。那么在处理的场景中,首先是在订阅时,等待相应的命令执行时间,必须在指定的时间内完成。如下的代码所示:

        if (!subscribe().awaitUninterruptibly(time, unit)) {
            return false;
        }

然后,在等待锁的过程中,需要不断地等待指定的间隔,并且在重试之后,需要减掉之前已经等待的时间(总共需要等待3分钟,之前已经等待了2分钟,这次再等1分钟即可)。如下所示:

                long current = System.currentTimeMillis();
                RedissonLockEntry entry = ENTRIES.get(getEntryName());

                if (ttl >= 0 && ttl < time) {
                    //这里即如果之前的锁只需要2分钟即结束了,那么这里只需要等待2分钟即可
                    entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    entry.getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }

               //减掉相应的时间,重试处理
                long elapsed = System.currentTimeMillis() - current;
                time -= elapsed;

可重入锁

可重入锁,即在已经获取锁的基础之上,再次获取当前的锁。这在javaSE的语义上是允许的,并且重入锁不需要任何的条件,即检测到当前锁是当前线程在使用,那么重入锁可直接放行。同时,在释放时,之前获取了X次,那么也需要相应的unlock X次,才表示最终锁被成功释放。

1. 获取重入锁

在redission中,在相应的数据记录中使用LockValue对象来表示在锁中嵌入的对象,即在进行setNx时所使用的值。其中存在3个对象,id,threadId,counter。其中id用于表示一个惟一的锁对象(使用UUID,保证多个进程中不一样,此id与当前lock对象中的id一致)。threadId表示同一个进程的不同线程,counter即表示当前已经重入了多少次。

在获取之前的锁时,只需要获取相应的对象,并且与当前的lockValue比较一次即可,主要比较id和threadId(两者都很好获取),如果相同,即表示是一个锁,那么在之前数据的基础之上,累加相应的counter值,再重新set回去即可。如下所示:

                    LockValue lock = (LockValue) connection.get(getName());
                    if (lock != null && lock.equals(currentLock)) {
                        lock.incCounter();
                        connection.multi();
                        connection.set(getName(), lock);
                        if (connection.exec().size() == 1) {
                            return null;
                        }
                    }

2. 释放重入锁

与获取重入锁机制相同,如果之前所存储的counter值在于1,则进行–即可。否则则直接释放此锁。如下所示:

LockValue currentLock = new LockValue(id, Thread.currentThread().getId());
                    if (lock.equals(currentLock)) {
                        if (lock.getCounter() > 1) {
                            lock.decCounter();
                            connection.set(getName(), lock);
                        } else {
                            unlock(connection);
                        }

多线程(进程)并发

1. 多进程

如果是多进程中获取锁,在redission中即采用了重试在记录uid的方式处理。即在lockValue中记录了当前进程内产生的lock对象的uuid值,并且此uuid值每个进程1份。多个进程自然不一样。在进行处理时,均先判断是否是同一份lockValue。如果不同的话,那么自然表示即最开始不是当前进程产生的。

但是在进行订阅监听时,主需要关注具体的name值,并不关心uuid,因此在监听通道时仍然使用name值用于多个进程监听同一个channel.

2. 多线程

为了表示同一个进程中对于同一个name的争用,redission使用了LockEntry表示对同一个锁的使用情况,同时使用Semaphore来表示许可情况。因此同一个uuid和name的关系,因此在一个进程中,同一个name仅存在单个RedissonLockEntry,并且由于特殊的使用方式(采用了copy赋值的方式),保证了同一个锁只有同一个entry。为了保证多线程下线程安全,将相应的latch和promise都final化,即保证不会被修改。

redission采取了类似全局map来记录所有的lockEntry信息。

在单进程中,保证entry的惟一性采用concurrentHashMap最简单的,包括赋值,替换等语义,均提供了很好的实现。如在初次获取锁时,为了记录锁许可情况,需要在map中放置相应的计数信息,通过采用putIfAbsent,并且循环处理的方式,保证相应的counter值是最新的。在释放锁时,需要对计数-1,redission采用了使用新entry替换的方式(实际上也就将counter-1,再设置回去),采用replace也是很好的处理方式。这里的counter为什么不使用atomicInteger,感觉还是可以使用的。

同时,在针对移除的时候,为了保证线程安全性,仅在计数为0时,才移除。这里仍然采用了加锁并且二次判断的方式,即保证在加锁之后,之前的判断仍然是有效的。同时使用了remove(key,value)的方式。

相应的代码如下所示:

//订阅条件时
    private Future<Boolean> subscribe() {
......
        Promise<Boolean> newPromise = newPromise();
        final RedissonLockEntry value = new RedissonLockEntry(newPromise);
        value.aquire();
        RedissonLockEntry oldValue = ENTRIES.putIfAbsent(getEntryName(), value);
}
//取消订阅时
    private void unsubscribe() {
        while (true) {
......
            RedissonLockEntry newEntry = new RedissonLockEntry(entry);
            newEntry.release();
            if (ENTRIES.replace(getEntryName(), entry, newEntry)) {
//如果这时相应的entry已经为空,并且移除其为安全的
if (newEntry.isFree()
                        && ENTRIES.remove(getEntryName(), newEntry)) {
                    synchronized (ENTRIES) {//加锁进行移除数据
                        // maybe added during subscription
                        if (!ENTRIES.containsKey(getEntryName())) {
                            connectionManager.unsubscribe(getChannelName());
    }

解锁超时

由于使用了基于redis的实现,其网络的不稳定性并不能保证所有的操作均是安全的。比如在最后的unlock中,并不能保证相应的key值能正确地被删除。如果不能被正确的删除,那么整个分布锁就不能有效的工作,并且产生了饿死锁的情况,即一直不能获取锁。考虑以下的场景,del命令被发送至redis成功接收。但这时主redis down掉,从redis起来,但相应的数据没有被正确同步。to see http://redis.io/topics/distlock

因此,在实现当中,需要考虑锁的过期时间的问题。一个可以实现的方案即将相应的过期时间设置为一个可以接受的值,比如1个小时(取决于具体的业务)。如果超过指定的时间锁还未被释放(认为没被释放),则被自动释放。同时,系统中需要作相应的记录,以检查具体的问题。

在redission中,提供了基于leastTime的选项,以设置相应的锁过期时间。同时提供了forceUnlock选项,以强制性地释放指定的锁。这些方法取决于具体业务的使用(比如采用其它监听器来进行处理等)

总结

从redission的实现来看,其所基于的技术以及考虑的场景都非常的全面。比如从网上随处可copy的代码来说,这个实现是相当的完备,并且所提供的功能也远非简单的demo可比。因此,在具体的业务当中,可尝试使用其实现。

转载请标明出处:i flym
本文地址:https://www.iflym.com/index.php/code/201507050001.html

相关文章:

作者: flym

I am flym,the master of the site:)

发表评论

邮箱地址不会被公开。 必填项已用*标注