1. 基本用法
1 2 3 4 5 |
<dependency> <groupid>org.redisson</groupid> <artifactid>redisson</artifactid> <version> 3.8 . 2 </version> </dependency> |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
config config = new config(); config.useclusterservers() .setscaninterval( 2000 ) // cluster state scan interval in milliseconds .addnodeaddress( "redis://127.0.0.1:7000" , "redis://127.0.0.1:7001" ) .addnodeaddress( "redis://127.0.0.1:7002" );
redissonclient redisson = redisson.create(config);
rlock lock = redisson.getlock( "anylock" );
lock.lock();
try { ... } finally { lock.unlock(); } |
针对上面这段代码,重点看一下redisson是如何基于redis实现 分布式锁 的
redisson中提供的加锁的方法有很多,但大致类似,此处只看lock()方法
更多请参见 https://github.com/redisson/redisson/wiki/8.-distributed-locks-and-synchronizers
2. 加锁
可以看到,调用getlock()方法后实际返回一个redissonlock对象,在redissonlock对象的lock()方法主要调用tryacquire()方法
由于leasetime == -1,于是走trylockinnerasync()方法,这个方法才是关键
首先,看一下evalwriteasync方法的定义
复制代码 代码如下:
<t, r> rfuture<r> evalwriteasync(string key, codec codec, rediscommand<t> evalcommandtype, string script, list<object> keys, object ... params);
最后两个参数分别是keys和params
实际调用是这样的:
单独将调用的那一段摘出来看
1 2 3 4 5 6 7 8 9 10 11 12 13 |
commandexecutor.evalwriteasync(getname(), longcodec.instance, command, "if (redis.call('exists', keys[1]) == 0) then " + "redis.call('hset', keys[1], argv[2], 1); " + "redis.call('pexpire', keys[1], argv[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', keys[1], argv[2]) == 1) then " + "redis.call('hincrby', keys[1], argv[2], 1); " + "redis.call('pexpire', keys[1], argv[1]); " + "return nil; " + "end; " + "return redis.call('pttl', keys[1]);" , collections.<object>singletonlist(getname()), internallockleasetime, getlockname(threadid)); |
结合上面的参数声明,我们可以知道,这里keys[1]就是getname(),argv[2]是getlockname(threadid)
假设前面获取锁时传的name是[abc],假设调用的线程id是thread-1,假设成员变量uuid类型的id是6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c
那么keys[1]=abc,argv[2]=6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:thread-1
因此,这段脚本的意思是
1、判断有没有一个叫[abc]的key
2、如果没有,则在其下设置一个字段为[6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:thread-1],值为[1]的键值对 ,并设置它的过期时间
3、如果存在,则进一步判断[6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:thread-1]是否存在,若存在,则其值加1,并重新设置过期时间
4、返回[abc]的生存时间(毫秒)
这里用的数据结构是hash,hash的结构是: key 字段1 值1 字段2 值2 。。。
用在锁这个场景下,key就表示锁的名称,也可以理解为临界资源,字段就表示当前获得锁的线程
所有竞争这把锁的线程都要判断在这个key下有没有自己线程的字段,如果没有则不能获得锁,如果有,则相当于重入,字段值加1(次数)
3. 解锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
protected rfuture< boolean > unlockinnerasync( long threadid) { return commandexecutor.evalwriteasync(getname(), longcodec.instance, rediscommands.eval_boolean, "if (redis.call('exists', keys[1]) == 0) then " + "redis.call('publish', keys[2], argv[1]); " + "return 1; " + "end;" + "if (redis.call('hexists', keys[1], argv[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', keys[1], argv[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', keys[1], argv[2]); " + "return 0; " + "else " + "redis.call('del', keys[1]); " + "redis.call('publish', keys[2], argv[1]); " + "return 1; " + "end; " + "return nil;" , arrays.<object>aslist(getname(), getchannelname()), lockpubsub.unlockmessage, internallockleasetime, getlockname(threadid));
} |
我们还是假设name=abc,假设线程id是thread-1
同理,我们可以知道
keys[1]是getname(),即keys[1]=abc
keys[2]是getchannelname(),即keys[2]=redisson_lock__channel:{abc}
argv[1]是lockpubsub.unlockmessage,即argv[1]=0
argv[2]是生存时间
argv[3]是getlockname(threadid),即argv[3]=6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:thread-1
因此,上面脚本的意思是:
1、判断是否存在一个叫[abc]的key
2、如果不存在,向channel中广播一条消息,广播的内容是0,并返回1
3、如果存在,进一步判断字段6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:thread-1是否存在
4、若字段不存在,返回空,若字段存在,则字段值减1
5、若减完以后,字段值仍大于0,则返回0
6、减完后,若字段值小于或等于0,则广播一条消息,广播内容是0,并返回1;
可以猜测,广播0表示资源可用,即通知那些等待获取锁的线程现在可以获得锁了
4. 等待
以上是正常情况下获取到锁的情况,那么当无法立即获取到锁的时候怎么办呢?
再回到前面获取锁的位置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
@override public void lockinterruptibly( long leasetime, timeunit unit) throws interruptedexception { long threadid = thread.currentthread().getid(); long ttl = tryacquire(leasetime, unit, threadid); // lock acquired if (ttl == null ) { return ; }
// 订阅 rfuture<redissonlockentry> future = subscribe(threadid); commandexecutor.syncsubscription(future);
try { while ( true ) { ttl = tryacquire(leasetime, unit, threadid); // lock acquired if (ttl == null ) { break ; }
// waiting for message if (ttl >= 0 ) { getentry(threadid).getlatch().tryacquire(ttl, timeunit.milliseconds); } else { getentry(threadid).getlatch().acquire(); } } } finally { unsubscribe(future, threadid); } // get(lockasync(leasetime, unit)); }
protected static final lockpubsub pubsub = new lockpubsub();
protected rfuture<redissonlockentry> subscribe( long threadid) { return pubsub.subscribe(getentryname(), getchannelname(), commandexecutor.getconnectionmanager().getsubscribeservice()); }
protected void unsubscribe(rfuture<redissonlockentry> future, long threadid) { pubsub.unsubscribe(future.getnow(), getentryname(), getchannelname(), commandexecutor.getconnectionmanager().getsubscribeservice()); } |
这里会订阅channel,当资源可用时可以及时知道,并抢占,防止无效的轮询而浪费资源
当资源可用用的时候,循环去尝试获取锁,由于多个线程同时去竞争资源,所以这里用了信号量,对于同一个资源只允许一个线程获得锁,其它的线程阻塞
5. 小结
6. 其它相关
《 基于redis的分布式锁的简单实现 》
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
原文链接:http://www.cnblogs.com/cjsblog/p/9831423.html
查看更多关于Java使用Redisson分布式锁实现原理的详细内容...