好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

浅谈Java(SpringBoot)基于zookeeper的分布式锁实现

通过zookeeper实现分布式锁

1、创建zookeeper的client

首先通过curatorframeworkfactory创建一个连接zookeeper的连接curatorframework client

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

public class curatorfactorybean implements factorybean<curatorframework>, initializingbean, disposablebean {

  private static final logger logger = loggerfactory.getlogger(contractfileinfocontroller. class );

 

  private string connectionstring;

  private int sessiontimeoutms;

  private int connectiontimeoutms;

  private retrypolicy retrypolicy;

  private curatorframework client;

 

  public curatorfactorybean(string connectionstring) {

   this (connectionstring, 500 , 500 );

  }

 

  public curatorfactorybean(string connectionstring, int sessiontimeoutms, int connectiontimeoutms) {

   this .connectionstring = connectionstring;

   this .sessiontimeoutms = sessiontimeoutms;

   this .connectiontimeoutms = connectiontimeoutms;

  }

 

  @override

  public void destroy() throws exception {

   logger.info( "closing curator framework..." );

   this .client.close();

   logger.info( "closed curator framework." );

  }

 

  @override

  public curatorframework getobject() throws exception {

   return this .client;

  }

 

  @override

  public class <?> getobjecttype() {

    return this .client != null ? this .client.getclass() : curatorframework. class ;

  }

 

  @override

  public boolean issingleton() {

   return true ;

  }

 

  @override

  public void afterpropertiesset() throws exception {

   if (stringutils.isempty( this .connectionstring)) {

    throw new illegalstateexception( "connectionstring can not be empty." );

   } else {

    if ( this .retrypolicy == null ) {

     this .retrypolicy = new exponentialbackoffretry( 1000 , 2147483647 , 180000 );

    }

 

    this .client = curatorframeworkfactory.newclient( this .connectionstring, this .sessiontimeoutms, this .connectiontimeoutms, this .retrypolicy);

    this .client.start();

    this .client.blockuntilconnected( 30 , timeunit.milliseconds);

   }

  }

  public void setconnectionstring(string connectionstring) {

   this .connectionstring = connectionstring;

  }

 

  public void setsessiontimeoutms( int sessiontimeoutms) {

   this .sessiontimeoutms = sessiontimeoutms;

  }

 

  public void setconnectiontimeoutms( int connectiontimeoutms) {

   this .connectiontimeoutms = connectiontimeoutms;

  }

 

  public void setretrypolicy(retrypolicy retrypolicy) {

   this .retrypolicy = retrypolicy;

  }

 

  public void setclient(curatorframework client) {

   this .client = client;

  }

}

2、封装分布式锁

根据curatorframework创建interprocessmutex(分布式可重入排它锁)对一行数据进行上锁

?

1

2

3

public interprocessmutex(curatorframework client, string path) {

  this (client, path, new standardlockinternalsdriver());

}

使用 acquire方法
1、acquire() :入参为空,调用该方法后,会一直堵塞,直到抢夺到锁资源,或者zookeeper连接中断后,上抛异常。
2、acquire(long time, timeunit unit):入参传入超时时间、单位,抢夺时,如果出现堵塞,会在超过该时间后,返回false。

?

1

2

3

4

5

6

7

8

9

public void acquire() throws exception {

  if (! this .internallock(-1l, (timeunit) null )) {

   throw new ioexception( "lost connection while trying to acquire lock: " + this .basepath);

  }

}

 

public boolean acquire( long time, timeunit unit) throws exception {

  return this .internallock(time, unit);

}

释放锁 mutex.release();

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

public void release() throws exception {

   thread currentthread = thread.currentthread();

   interprocessmutex.lockdata lockdata = (interprocessmutex.lockdata) this .threaddata.get(currentthread);

   if (lockdata == null ) {

    throw new illegalmonitorstateexception( "you do not own the lock: " + this .basepath);

   } else {

    int newlockcount = lockdata.lockcount.decrementandget();

    if (newlockcount <= 0 ) {

     if (newlockcount < 0 ) {

      throw new illegalmonitorstateexception( "lock count has gone negative for lock: " + this .basepath);

     } else {

      try {

       this .internals.releaselock(lockdata.lockpath);

      } finally {

       this .threaddata.remove(currentthread);

      }

 

     }

    }

   }

  }

封装后的dlock代码
1、调用interprocessmutex processmutex = dlock.mutex(path);

2、手动释放锁processmutex.release();

3、需要手动删除路径dlock.del(path);

推荐 使用:
都是 函数式编程
在业务代码执行完毕后 会释放锁和删除path
1、这个有返回结果
public t mutex(string path, zklockcallback zklockcallback, long time, timeunit timeunit)
2、这个无返回结果
public void mutex(string path, zkvoidcallback zklockcallback, long time, timeunit timeunit)

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

public class dlock {

  private final logger logger;

  private static final long timeout_d = 100l;

  private static final string root_path_d = "/dlock" ;

  private string lockrootpath;

  private curatorframework client;

 

  public dlock(curatorframework client) {

   this ( "/dlock" , client);

  }

 

  public dlock(string lockrootpath, curatorframework client) {

   this .logger = loggerfactory.getlogger(dlock. class );

   this .lockrootpath = lockrootpath;

   this .client = client;

  }

  public interprocessmutex mutex(string path) {

   if (!stringutils.startswith(path, "/" )) {

    path = constant.keybuilder( new object[]{ "/" , path});

   }

 

   return new interprocessmutex( this .client, constant.keybuilder( new object[]{ this .lockrootpath, "" , path}));

  }

 

  public <t> t mutex(string path, zklockcallback<t> zklockcallback) throws zklockexception {

   return this .mutex(path, zklockcallback, 100l, timeunit.milliseconds);

  }

 

  public <t> t mutex(string path, zklockcallback<t> zklockcallback, long time, timeunit timeunit) throws zklockexception {

   string finalpath = this .getlockpath(path);

   interprocessmutex mutex = new interprocessmutex( this .client, finalpath);

 

   try {

    if (!mutex.acquire(time, timeunit)) {

     throw new zklockexception( "acquire zk lock return false" );

    }

   } catch (exception var13) {

    throw new zklockexception( "acquire zk lock failed." , var13);

   }

 

   t var8;

   try {

    var8 = zklockcallback.doinlock();

   } finally {

    this .releaselock(finalpath, mutex);

   }

 

   return var8;

  }

 

  private void releaselock(string finalpath, interprocessmutex mutex) {

   try {

    mutex.release();

    this .logger.info( "delete zk node path:{}" , finalpath);

    this .deleteinternal(finalpath);

   } catch (exception var4) {

    this .logger.error( "dlock" , "release lock failed, path:{}" , finalpath, var4);

//   logutil.error(this.logger, "dlock", "release lock failed, path:{}", new object[]{finalpath, var4});

   }

 

  }

 

  public void mutex(string path, zkvoidcallback zklockcallback, long time, timeunit timeunit) throws zklockexception {

   string finalpath = this .getlockpath(path);

   interprocessmutex mutex = new interprocessmutex( this .client, finalpath);

 

   try {

    if (!mutex.acquire(time, timeunit)) {

     throw new zklockexception( "acquire zk lock return false" );

    }

   } catch (exception var13) {

    throw new zklockexception( "acquire zk lock failed." , var13);

   }

 

   try {

    zklockcallback.response();

   } finally {

    this .releaselock(finalpath, mutex);

   }

 

  }

 

  public string getlockpath(string custompath) {

   if (!stringutils.startswith(custompath, "/" )) {

    custompath = constant.keybuilder( new object[]{ "/" , custompath});

   }

 

   string finalpath = constant.keybuilder( new object[]{ this .lockrootpath, "" , custompath});

   return finalpath;

  }

 

  private void deleteinternal(string finalpath) {

   try {

    ((errorlistenerpathable) this .client.delete().inbackground()).forpath(finalpath);

   } catch (exception var3) {

    this .logger.info( "delete zk node path:{} failed" , finalpath);

   }

 

  }

 

  public void del(string custompath) {

   string lockpath = "" ;

 

   try {

    lockpath = this .getlockpath(custompath);

    ((errorlistenerpathable) this .client.delete().inbackground()).forpath(lockpath);

   } catch (exception var4) {

    this .logger.info( "delete zk node path:{} failed" , lockpath);

   }

 

  }

}

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

@functionalinterface

public interface zklockcallback<t> {

  t doinlock();

}

 

@functionalinterface

public interface zkvoidcallback {

  void response();

}

 

public class zklockexception extends exception {

  public zklockexception() {

  }

 

  public zklockexception(string message) {

   super (message);

  }

 

  public zklockexception(string message, throwable cause) {

   super (message, cause);

  }

}

配置curatorconfig

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

@configuration

public class curatorconfig {

  @value ( "${zk.connectionstring}" )

  private string connectionstring;

 

  @value ( "${zk.sessiontimeoutms:500}" )

  private int sessiontimeoutms;

 

  @value ( "${zk.connectiontimeoutms:500}" )

  private int connectiontimeoutms;

 

  @value ( "${zk.dlockroot:/dlock}" )

  private string dlockroot;

 

  @bean

  public curatorfactorybean curatorfactorybean() {

   return new curatorfactorybean(connectionstring, sessiontimeoutms, connectiontimeoutms);

  }

 

  @bean

  @autowired

  public dlock dlock(curatorframework client) {

   return new dlock(dlockroot, client);

  }

}

测试代码

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

@restcontroller

@requestmapping ( "/dlock" )

public class lockcontroller {

 

  @autowired

  private dlock dlock;

 

  @requestmapping ( "/lock" )

  public map testdlock(string no){

   final string path = constant.keybuilder( "/test/no/" , no);

   long mutex=0l;

   try {

    system.out.println( "在拿锁:" +path+system.currenttimemillis());

     mutex = dlock.mutex(path, () -> {

     try {

      system.out.println( "拿到锁了" + system.currenttimemillis());

      thread.sleep( 10000 );

      system.out.println( "操作完成了" + system.currenttimemillis());

     } finally {

      return system.currenttimemillis();

     }

    }, 1000 , timeunit.milliseconds);

   } catch (zklockexception e) {

    system.out.println( "拿不到锁呀" +system.currenttimemillis());

   }

   return collections.singletonmap( "ret" ,mutex);

  }

 

  @requestmapping ( "/dlock" )

  public map testdlock1(string no){

   final string path = constant.keybuilder( "/test/no/" , no);

   long mutex=0l;

   try {

    system.out.println( "在拿锁:" +path+system.currenttimemillis());

    interprocessmutex processmutex = dlock.mutex(path);

    processmutex.acquire();

    system.out.println( "拿到锁了" + system.currenttimemillis());

    thread.sleep( 10000 );

    processmutex.release();

    system.out.println( "操作完成了" + system.currenttimemillis());

   } catch (zklockexception e) {

    system.out.println( "拿不到锁呀" +system.currenttimemillis());

    e.printstacktrace();

   } catch (exception e){

    e.printstacktrace();

   }

   return collections.singletonmap( "ret" ,mutex);

  }

  @requestmapping ( "/del" )

  public map deldlock(string no){

   final string path = constant.keybuilder( "/test/no/" , no);

   dlock.del(path);

   return collections.singletonmap( "ret" , 1 );

  }

}

以上所述是小编给大家介绍的java(springboot)基于zookeeper的分布式锁实现详解整合,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对网站的支持!

原文链接:https://blog.csdn.net/LJY_SUPER/article/details/87807091

查看更多关于浅谈Java(SpringBoot)基于zookeeper的分布式锁实现的详细内容...

  阅读:15次