好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

redisson 实现分布式锁的源码解析

redisson

redisson 实现分布式锁的机制如下:

依赖版本

?

1

implementation 'org.redisson:redisson-spring-boot-starter:3.17.0'

测试代码

下面是模拟一个商品秒杀的场景,示例代码如下:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

public class RedissonTest {

     public static void main(String[] args) {

         //1. 配置部分

         Config config = new Config();

         String address = "redis://127.0.0.1:6379" ;

         SingleServerConfig serverConfig = config.useSingleServer();

         serverConfig.setAddress(address);

         serverConfig.setDatabase( 0 );

         config.setLockWatchdogTimeout( 5000 );

         Redisson redisson = (Redisson) Redisson.create(config);

         RLock rLock = redisson.getLock( "goods:1000:1" );

         //2. 加锁

         rLock.lock();

         try {

             System.out.println( "todo 逻辑处理 1000000." );

         } finally {

             if (rLock.isLocked() && rLock.isHeldByCurrentThread()) {

                 //3. 解锁

                 rLock.unlock();

             }

         }

     }

}

加锁设计

rLock.lock(); 是加锁的核心代码,我们一起来看看调用栈

加锁的核心方法是: org.redisson.RedissonLock#tryLockInnerAsync

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

<T> RFuture<T> tryLockInnerAsync( long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {

         return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,

                 "if (redis.call('exists', KEYS[1]) == 0) then " +

                         "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +

                         "redis.call('pexpire', KEYS[1], ARGV[1]); " +

                         "return nil; " +

                         "end; " +

                         "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +

                         "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +

                         "redis.call('pexpire', KEYS[1], ARGV[1]); " +

                         "return nil; " +

                         "end; " +

                         "return redis.call('pttl', KEYS[1]);" ,

                 Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));

     }

其实它的本质是调用一段 LUA 脚本进行加锁。

锁续期设计

锁的续期是在 org.redisson.RedissonLock#tryAcquireAsync 方法中调用 scheduleExpirationRenewal 实现的。

续期需要注意的是,看门狗是设置在主线程的延迟队列的线程中。

tryAcquireAsync 代码如下:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

private <T> RFuture<Long> tryAcquireAsync( long waitTime, long leaseTime, TimeUnit unit, long threadId) {

     RFuture<Long> ttlRemainingFuture;

     if (leaseTime != - 1 ) {

         ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);

     } else {

         ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,

                 TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);

     }

    

     CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {

         // lock acquired

         if (ttlRemaining == null ) {

             if (leaseTime != - 1 ) {

                 internalLockLeaseTime = unit.toMillis(leaseTime);

             } else {

                 // 锁过期时间续期

                 scheduleExpirationRenewal(threadId);

             }

         }

         return ttlRemaining;

     });

     return new CompletableFutureWrapper<>(f);

}

锁续期 scheduleExpirationRenewal 代码如下:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

protected void scheduleExpirationRenewal( long threadId) {

     ExpirationEntry entry = new ExpirationEntry();

     ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);

     if (oldEntry != null ) {

         oldEntry.addThreadId(threadId);

     } else {

         entry.addThreadId(threadId);

         try {

             renewExpiration();

         } finally {

             if (Thread.currentThread().isInterrupted()) {

                 cancelExpirationRenewal(threadId);

             }

         }

     }

}

然后在调用 renewExpiration(); 执行续期逻辑

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

private void renewExpiration() {

     ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());

     if (ee == null ) {

         return ;

     }

     // 创建延迟任务

     Timeout task = commandExecutor.getConnectionManager().newTimeout( new TimerTask() {

         @Override

         public void run(Timeout timeout) throws Exception {

             ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());

             if (ent == null ) {

                 return ;

             }

             Long threadId = ent.getFirstThreadId();

             if (threadId == null ) {

                 return ;

             }

             // 真正的续期,调用 LUA 脚本续期

             RFuture<Boolean> future = renewExpirationAsync(threadId);

             future.whenComplete((res, e) -> {

                 if (e != null ) {

                     log.error( "Can't update lock " + getRawName() + " expiration" , e);

                     EXPIRATION_RENEWAL_MAP.remove(getEntryName());

                     return ;

                 }

 

                 // 如果续期成功

                 if (res) {

                     // reschedule itself

                     renewExpiration();

                 } else {

                     cancelExpirationRenewal( null );

                 }

             });

         }

     }, internalLockLeaseTime / 3 , TimeUnit.MILLISECONDS);

     ee.setTimeout(task);

}

renewExpirationAsync 方法, 里面还是一段 LUA 脚本,进行重新设置锁的过期时间。

?

1

2

3

4

5

6

7

8

9

10

protected RFuture<Boolean> renewExpirationAsync( long threadId) {

     return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,

             "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +

                     "redis.call('pexpire', KEYS[1], ARGV[1]); " +

                     "return 1; " +

                     "end; " +

                     "return 0;" ,

             Collections.singletonList(getRawName()),

             internalLockLeaseTime, getLockName(threadId));

}

锁的自旋重试

org.redisson.RedissonLock#lock(long, java.util.concurrent.TimeUnit, boolean) 在执行获取锁失败的时候,会进入重试。其实这里就会执行 18 行以后的 while (true) 逻辑

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

private void lock( long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {

     long threadId = Thread.currentThread().getId();

     Long ttl = tryAcquire(- 1 , leaseTime, unit, threadId);

     // lock acquired

     if (ttl == null ) {

         return ;

     }

 

     CompletableFuture<RedissonLockEntry> future = subscribe(threadId);

     RedissonLockEntry entry;

     if (interruptibly) {

         entry = commandExecutor.getInterrupted(future);

     } else {

         entry = commandExecutor.get(future);

     }

 

     try {

         while ( true ) {

             ttl = tryAcquire(- 1 , leaseTime, unit, threadId);

             // lock acquired

             if (ttl == null ) {

                 break ;

             }

 

             // waiting for message

             if (ttl >= 0 ) {

                 try {

                     // 阻塞锁的超时时间,等锁过期后再尝试加锁

                     entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);

                 } catch (InterruptedException e) {

                     if (interruptibly) {

                         throw e;

                     }

                     entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);

                 }

             } else {

                 if (interruptibly) {

                     entry.getLatch().acquire();

                 } else {

                     entry.getLatch().acquireUninterruptibly();

                 }

             }

         }

     } finally {

         unsubscribe(entry, threadId);

     }

//        get(lockAsync(leaseTime, unit));

}

entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); 其实这里就是一个间歇性自旋。 等到上次锁过期的时间,在唤醒进行抢锁 entry.getLatch().acquire();

还有一个逻辑就是

CompletableFuture future = subscribe(threadId);

这里其实是会订阅一个消息,如果解锁过后,会发布解锁的消息。

解锁设计

rLock.unlock(); 的核心就是释放锁,撤销续期和唤醒在等待加锁的线程(发布解锁成功消息)。

核心方法(解锁): org.redisson.RedissonLock#unlockInnerAsync

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

protected RFuture<Boolean> unlockInnerAsync( long threadId) {

         return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,

                 "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +

                         "return nil;" +

                         "end; " +

                         "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +

                         "if (counter > 0) then " +

                         "redis.call('pexpire', KEYS[1], ARGV[2]); " +

                         "return 0; " +

                         "else " +

                         "redis.call('del', KEYS[1]); " +

                         // 发布解锁成功消息

                         "redis.call('publish', KEYS[2], ARGV[1]); " +

                         "return 1; " +

                         "end; " +

                         "return nil;" ,

                 Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));

     }

还是 LUA 的执行方式。

撤销锁续期

核心方法 org.redisson.RedissonBaseLock#unlockAsync(long)

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

@Override

public RFuture<Void> unlockAsync( long threadId) {

     // 解锁

     RFuture<Boolean> future = unlockInnerAsync(threadId);

     // 撤销续期

     CompletionStage<Void> f = future.handle((opStatus, e) -> {

         cancelExpirationRenewal(threadId);

         if (e != null ) {

             throw new CompletionException(e);

         }

         if (opStatus == null ) {

             IllegalMonitorStateException cause = new IllegalMonitorStateException( "attempt to unlock lock, not locked by current thread by node id: "

                     + id + " thread-id: " + threadId);

             throw new CompletionException(cause);

         }

         return null ;

     });

     return new CompletableFutureWrapper<>(f);

}

解锁成功唤排队线程

在 org.redisson.pubsub.LockPubSub#onMessage 中回去唤醒阻塞的线程,让执行前面的锁自旋逻辑,具体代码如下:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

@Override

protected void onMessage(RedissonLockEntry value, Long message) {

     if (message.equals(UNLOCK_MESSAGE)) {

         Runnable runnableToExecute = value.getListeners().poll();

         if (runnableToExecute != null ) {

             runnableToExecute.run();

         }

         value.getLatch().release();

     } else if (message.equals(READ_UNLOCK_MESSAGE)) {

         while ( true ) {

             Runnable runnableToExecute = value.getListeners().poll();

             if (runnableToExecute == null ) {

                 break ;

             }

             runnableToExecute.run();

         }

         value.getLatch().release(value.getLatch().getQueueLength());

     }

}

到此这篇关于redisson 实现分布式锁的文章就介绍到这了,更多相关redisson 分布式锁内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

原文链接:https://juejin.cn/post/7093149727260147749

查看更多关于redisson 实现分布式锁的源码解析的详细内容...

  阅读:13次