在分布式锁的实现当中,都是通过另一个中央服务来存储相应的状态,来达到一个应用分布处理的目的。这里分析了一种通过redis来进行锁实现的细节,以描述在整个实现细节中的处理点。
通常在锁实现当中, 都要实现获取锁,等待锁,释放锁这几种关键的业务场景。然后在这几种场景的基础之上,还需要实现更多的语义,比如过期时间,等待时间等。通过redis的setNx可以达到获取锁的语义,因此大多数的实现均是采用这种手法来进行锁判断和处理(类似的手法还包括concurrentHashMap的putIfAbsent)
本文基于redission版本1.2.0,类RedissionLock.
线程间协作
1. 获取锁
获取锁即通过redis的setNx命令来实现,此命令的意义即仅当相应的值不存在时,才能设置成功。如果设置成功的话,即认为当前能够获取到相应的锁了。相应的主要代码如下所示:
Boolean res = connection.setnx(getName(), currentLock);
其中name即可认为是lock的一个内部表示名字,其中多个线程共享同一个lock,即在操作命令时会使用同一个name值。
2. 等待锁
在第一步的代码中,只会有1个线程获取到相应的锁,其它的线程则会进行等待。这里面,首先是先获取当前的状态为不能获取到锁的,然后再尝试进行等待处理。
在第1步中,我们已经设置失败,即表示已经有其它线程(进程)获取锁了,因此需要返回相应的状态。在redission中,是通过返回相应的ttl值来表示的。(TTL的目的为了实现waitTime语义),如下所示:
Long ttl = connection.pttl(getName()); return ttl;
在接下来的代码中,因为已经有其它线程(进程)获取到锁了,这里要监听相应的锁变化,即监听在redis中的值是否发生了变化。比较常用的手法是查看相应的值是否存在(或者是否被删除),但这种方法需要不断地进行轮询,对于redis的压力很大。另一种方法即是建立相应的监听器,当锁释放进进行通知,这种方法即对应于redis的发布/订阅模型。通过在redis中订阅相应的通道,当释放时获取相应的消息即可。相应的主要代码如下所示:
//这里进行信息订阅,即监听相应的数据是否发生了变化(如锁被释放了) subscribe().awaitUninterruptibly(); try { while (true) { //这里重新进行获取,因为在这期间可能其它锁已被释放了,而这里的监听并没有监听到 ttl = tryLockInner(); if (ttl == null) { break;//成功获取, } // waiting for message RedissonLockEntry entry = ENTRIES.get(getEntryName()); //这里采用了Semaphore信号量来进行进行相应的许可获取 entry.getLatch().acquire(); } } finally { //因为已经获取到锁了,因此这里一定要释放相应的监听(已经不需要监听了) 备注:这里不一定是取消监听,只不过是表示在相应锁的关注上少了一个关注对象。取消监听一定要没有线程关注此锁才可以。因此redission是共用监听器,同一个channel只使用同一个监听器. unsubscribe(); }
3. 释放锁
释放锁即将相应的redis key删除掉,同时需要去通知其它线程(进程)。这里即通过删除+publish来实现。如下所示:
connection.multi(); connection.del(getName()); connection.publish(getChannelName(), unlockMessage); List<Object> res = connection.exec();
这里redission采用了multi+exec,更好地保证redis执行的原子性,主要的保证在于需要同时将name删掉,同时还要发送相应的通知信息。
4. 重新获得锁
因为第3步发送了相应的通知,在之前等待的锁的订阅当中,即会订阅到相应的通知信息。那么,一旦获得了相应的通知,即表示之前的锁已经被释放,那么相应的信号量许可即可release,并且相应的等待线程即可重新进行获取锁的操作。如下代码所示:
public void message(String channel, Object message) { if (message.equals(unlockMessage) && getChannelName().equals(channel)) { value.getLatch().release();//释放许可 } }
在第2步的操作中,采取了while 重试的方式,这是一种典型的处理方式,防虚假唤醒或者再次也获取不到锁的情况(为什么,因为并发及多进程).然后整个步骤再次循环,一直到获取到锁为止。
waitTime实现
redission是实现了javaSE中的Lock接口的,因此有类似tryLock(waitTime)的实现,即如果在多久内获取不到锁,则返回false的情况。
首先看 tryLock()不带参数的实现,实际上就是在上节中提到的tryInnerLock,如果尝试调用setNx失败,也就是直接返回false了。
然后是带参数的time,在上节的整个获取过程中,需要等待的场景主要有2个地方,一是在订阅时处理,二是在等待锁的时候。那么在处理的场景中,首先是在订阅时,等待相应的命令执行时间,必须在指定的时间内完成。如下的代码所示:
if (!subscribe().awaitUninterruptibly(time, unit)) { return false; }
然后,在等待锁的过程中,需要不断地等待指定的间隔,并且在重试之后,需要减掉之前已经等待的时间(总共需要等待3分钟,之前已经等待了2分钟,这次再等1分钟即可)。如下所示:
long current = System.currentTimeMillis(); RedissonLockEntry entry = ENTRIES.get(getEntryName()); if (ttl >= 0 && ttl < time) { //这里即如果之前的锁只需要2分钟即结束了,那么这里只需要等待2分钟即可 entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { entry.getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } //减掉相应的时间,重试处理 long elapsed = System.currentTimeMillis() - current; time -= elapsed;
可重入锁
可重入锁,即在已经获取锁的基础之上,再次获取当前的锁。这在javaSE的语义上是允许的,并且重入锁不需要任何的条件,即检测到当前锁是当前线程在使用,那么重入锁可直接放行。同时,在释放时,之前获取了X次,那么也需要相应的unlock X次,才表示最终锁被成功释放。
1. 获取重入锁
在redission中,在相应的数据记录中使用LockValue对象来表示在锁中嵌入的对象,即在进行setNx时所使用的值。其中存在3个对象,id,threadId,counter。其中id用于表示一个惟一的锁对象(使用UUID,保证多个进程中不一样,此id与当前lock对象中的id一致)。threadId表示同一个进程的不同线程,counter即表示当前已经重入了多少次。
在获取之前的锁时,只需要获取相应的对象,并且与当前的lockValue比较一次即可,主要比较id和threadId(两者都很好获取),如果相同,即表示是一个锁,那么在之前数据的基础之上,累加相应的counter值,再重新set回去即可。如下所示:
LockValue lock = (LockValue) connection.get(getName()); if (lock != null && lock.equals(currentLock)) { lock.incCounter(); connection.multi(); connection.set(getName(), lock); if (connection.exec().size() == 1) { return null; } }
2. 释放重入锁
与获取重入锁机制相同,如果之前所存储的counter值在于1,则进行–即可。否则则直接释放此锁。如下所示:
LockValue currentLock = new LockValue(id, Thread.currentThread().getId()); if (lock.equals(currentLock)) { if (lock.getCounter() > 1) { lock.decCounter(); connection.set(getName(), lock); } else { unlock(connection); }
多线程(进程)并发
1. 多进程
如果是多进程中获取锁,在redission中即采用了重试在记录uid的方式处理。即在lockValue中记录了当前进程内产生的lock对象的uuid值,并且此uuid值每个进程1份。多个进程自然不一样。在进行处理时,均先判断是否是同一份lockValue。如果不同的话,那么自然表示即最开始不是当前进程产生的。
但是在进行订阅监听时,主需要关注具体的name值,并不关心uuid,因此在监听通道时仍然使用name值用于多个进程监听同一个channel.
2. 多线程
为了表示同一个进程中对于同一个name的争用,redission使用了LockEntry表示对同一个锁的使用情况,同时使用Semaphore来表示许可情况。因此同一个uuid和name的关系,因此在一个进程中,同一个name仅存在单个RedissonLockEntry,并且由于特殊的使用方式(采用了copy赋值的方式),保证了同一个锁只有同一个entry。为了保证多线程下线程安全,将相应的latch和promise都final化,即保证不会被修改。
redission采取了类似全局map来记录所有的lockEntry信息。
在单进程中,保证entry的惟一性采用concurrentHashMap最简单的,包括赋值,替换等语义,均提供了很好的实现。如在初次获取锁时,为了记录锁许可情况,需要在map中放置相应的计数信息,通过采用putIfAbsent,并且循环处理的方式,保证相应的counter值是最新的。在释放锁时,需要对计数-1,redission采用了使用新entry替换的方式(实际上也就将counter-1,再设置回去),采用replace也是很好的处理方式。这里的counter为什么不使用atomicInteger,感觉还是可以使用的。
同时,在针对移除的时候,为了保证线程安全性,仅在计数为0时,才移除。这里仍然采用了加锁并且二次判断的方式,即保证在加锁之后,之前的判断仍然是有效的。同时使用了remove(key,value)的方式。
相应的代码如下所示:
//订阅条件时 private Future<Boolean> subscribe() { ...... Promise<Boolean> newPromise = newPromise(); final RedissonLockEntry value = new RedissonLockEntry(newPromise); value.aquire(); RedissonLockEntry oldValue = ENTRIES.putIfAbsent(getEntryName(), value); } //取消订阅时 private void unsubscribe() { while (true) { ...... RedissonLockEntry newEntry = new RedissonLockEntry(entry); newEntry.release(); if (ENTRIES.replace(getEntryName(), entry, newEntry)) { //如果这时相应的entry已经为空,并且移除其为安全的 if (newEntry.isFree() && ENTRIES.remove(getEntryName(), newEntry)) { synchronized (ENTRIES) {//加锁进行移除数据 // maybe added during subscription if (!ENTRIES.containsKey(getEntryName())) { connectionManager.unsubscribe(getChannelName()); }
解锁超时
由于使用了基于redis的实现,其网络的不稳定性并不能保证所有的操作均是安全的。比如在最后的unlock中,并不能保证相应的key值能正确地被删除。如果不能被正确的删除,那么整个分布锁就不能有效的工作,并且产生了饿死锁的情况,即一直不能获取锁。考虑以下的场景,del命令被发送至redis成功接收。但这时主redis down掉,从redis起来,但相应的数据没有被正确同步。to see http://redis.io/topics/distlock
因此,在实现当中,需要考虑锁的过期时间的问题。一个可以实现的方案即将相应的过期时间设置为一个可以接受的值,比如1个小时(取决于具体的业务)。如果超过指定的时间锁还未被释放(认为没被释放),则被自动释放。同时,系统中需要作相应的记录,以检查具体的问题。
在redission中,提供了基于leastTime的选项,以设置相应的锁过期时间。同时提供了forceUnlock选项,以强制性地释放指定的锁。这些方法取决于具体业务的使用(比如采用其它监听器来进行处理等)
总结
从redission的实现来看,其所基于的技术以及考虑的场景都非常的全面。比如从网上随处可copy的代码来说,这个实现是相当的完备,并且所提供的功能也远非简单的demo可比。因此,在具体的业务当中,可尝试使用其实现。
转载请标明出处:i flym
本文地址:https://www.iflym.com/index.php/code/201507050001.html