通过zookeeper实现分布式锁
1、创建zookeeper的client
首先通过curatorframeworkfactory创建一个连接zookeeper的连接curatorframework client
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
public class curatorfactorybean implements factorybean<curatorframework>, initializingbean, disposablebean { private static final logger logger = loggerfactory.getlogger(contractfileinfocontroller. class );
private string connectionstring; private int sessiontimeoutms; private int connectiontimeoutms; private retrypolicy retrypolicy; private curatorframework client;
public curatorfactorybean(string connectionstring) { this (connectionstring, 500 , 500 ); }
public curatorfactorybean(string connectionstring, int sessiontimeoutms, int connectiontimeoutms) { this .connectionstring = connectionstring; this .sessiontimeoutms = sessiontimeoutms; this .connectiontimeoutms = connectiontimeoutms; }
@override public void destroy() throws exception { logger.info( "closing curator framework..." ); this .client.close(); logger.info( "closed curator framework." ); }
@override public curatorframework getobject() throws exception { return this .client; }
@override public class <?> getobjecttype() { return this .client != null ? this .client.getclass() : curatorframework. class ; }
@override public boolean issingleton() { return true ; }
@override public void afterpropertiesset() throws exception { if (stringutils.isempty( this .connectionstring)) { throw new illegalstateexception( "connectionstring can not be empty." ); } else { if ( this .retrypolicy == null ) { this .retrypolicy = new exponentialbackoffretry( 1000 , 2147483647 , 180000 ); }
this .client = curatorframeworkfactory.newclient( this .connectionstring, this .sessiontimeoutms, this .connectiontimeoutms, this .retrypolicy); this .client.start(); this .client.blockuntilconnected( 30 , timeunit.milliseconds); } } public void setconnectionstring(string connectionstring) { this .connectionstring = connectionstring; }
public void setsessiontimeoutms( int sessiontimeoutms) { this .sessiontimeoutms = sessiontimeoutms; }
public void setconnectiontimeoutms( int connectiontimeoutms) { this .connectiontimeoutms = connectiontimeoutms; }
public void setretrypolicy(retrypolicy retrypolicy) { this .retrypolicy = retrypolicy; }
public void setclient(curatorframework client) { this .client = client; } } |
2、封装分布式锁
根据curatorframework创建interprocessmutex(分布式可重入排它锁)对一行数据进行上锁
1 2 3 |
public interprocessmutex(curatorframework client, string path) { this (client, path, new standardlockinternalsdriver()); } |
使用 acquire方法
1、acquire() :入参为空,调用该方法后,会一直堵塞,直到抢夺到锁资源,或者zookeeper连接中断后,上抛异常。
2、acquire(long time, timeunit unit):入参传入超时时间、单位,抢夺时,如果出现堵塞,会在超过该时间后,返回false。
1 2 3 4 5 6 7 8 9 |
public void acquire() throws exception { if (! this .internallock(-1l, (timeunit) null )) { throw new ioexception( "lost connection while trying to acquire lock: " + this .basepath); } }
public boolean acquire( long time, timeunit unit) throws exception { return this .internallock(time, unit); } |
释放锁 mutex.release();
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
public void release() throws exception { thread currentthread = thread.currentthread(); interprocessmutex.lockdata lockdata = (interprocessmutex.lockdata) this .threaddata.get(currentthread); if (lockdata == null ) { throw new illegalmonitorstateexception( "you do not own the lock: " + this .basepath); } else { int newlockcount = lockdata.lockcount.decrementandget(); if (newlockcount <= 0 ) { if (newlockcount < 0 ) { throw new illegalmonitorstateexception( "lock count has gone negative for lock: " + this .basepath); } else { try { this .internals.releaselock(lockdata.lockpath); } finally { this .threaddata.remove(currentthread); }
} } } } |
封装后的dlock代码
1、调用interprocessmutex processmutex = dlock.mutex(path);
2、手动释放锁processmutex.release();
3、需要手动删除路径dlock.del(path);
推荐 使用:
都是 函数式编程
在业务代码执行完毕后 会释放锁和删除path
1、这个有返回结果
public t mutex(string path, zklockcallback zklockcallback, long time, timeunit timeunit)
2、这个无返回结果
public void mutex(string path, zkvoidcallback zklockcallback, long time, timeunit timeunit)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
public class dlock { private final logger logger; private static final long timeout_d = 100l; private static final string root_path_d = "/dlock" ; private string lockrootpath; private curatorframework client;
public dlock(curatorframework client) { this ( "/dlock" , client); }
public dlock(string lockrootpath, curatorframework client) { this .logger = loggerfactory.getlogger(dlock. class ); this .lockrootpath = lockrootpath; this .client = client; } public interprocessmutex mutex(string path) { if (!stringutils.startswith(path, "/" )) { path = constant.keybuilder( new object[]{ "/" , path}); }
return new interprocessmutex( this .client, constant.keybuilder( new object[]{ this .lockrootpath, "" , path})); }
public <t> t mutex(string path, zklockcallback<t> zklockcallback) throws zklockexception { return this .mutex(path, zklockcallback, 100l, timeunit.milliseconds); }
public <t> t mutex(string path, zklockcallback<t> zklockcallback, long time, timeunit timeunit) throws zklockexception { string finalpath = this .getlockpath(path); interprocessmutex mutex = new interprocessmutex( this .client, finalpath);
try { if (!mutex.acquire(time, timeunit)) { throw new zklockexception( "acquire zk lock return false" ); } } catch (exception var13) { throw new zklockexception( "acquire zk lock failed." , var13); }
t var8; try { var8 = zklockcallback.doinlock(); } finally { this .releaselock(finalpath, mutex); }
return var8; }
private void releaselock(string finalpath, interprocessmutex mutex) { try { mutex.release(); this .logger.info( "delete zk node path:{}" , finalpath); this .deleteinternal(finalpath); } catch (exception var4) { this .logger.error( "dlock" , "release lock failed, path:{}" , finalpath, var4); // logutil.error(this.logger, "dlock", "release lock failed, path:{}", new object[]{finalpath, var4}); }
}
public void mutex(string path, zkvoidcallback zklockcallback, long time, timeunit timeunit) throws zklockexception { string finalpath = this .getlockpath(path); interprocessmutex mutex = new interprocessmutex( this .client, finalpath);
try { if (!mutex.acquire(time, timeunit)) { throw new zklockexception( "acquire zk lock return false" ); } } catch (exception var13) { throw new zklockexception( "acquire zk lock failed." , var13); }
try { zklockcallback.response(); } finally { this .releaselock(finalpath, mutex); }
}
public string getlockpath(string custompath) { if (!stringutils.startswith(custompath, "/" )) { custompath = constant.keybuilder( new object[]{ "/" , custompath}); }
string finalpath = constant.keybuilder( new object[]{ this .lockrootpath, "" , custompath}); return finalpath; }
private void deleteinternal(string finalpath) { try { ((errorlistenerpathable) this .client.delete().inbackground()).forpath(finalpath); } catch (exception var3) { this .logger.info( "delete zk node path:{} failed" , finalpath); }
}
public void del(string custompath) { string lockpath = "" ;
try { lockpath = this .getlockpath(custompath); ((errorlistenerpathable) this .client.delete().inbackground()).forpath(lockpath); } catch (exception var4) { this .logger.info( "delete zk node path:{} failed" , lockpath); }
} } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
@functionalinterface public interface zklockcallback<t> { t doinlock(); }
@functionalinterface public interface zkvoidcallback { void response(); }
public class zklockexception extends exception { public zklockexception() { }
public zklockexception(string message) { super (message); }
public zklockexception(string message, throwable cause) { super (message, cause); } } |
配置curatorconfig
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
@configuration public class curatorconfig { @value ( "${zk.connectionstring}" ) private string connectionstring;
@value ( "${zk.sessiontimeoutms:500}" ) private int sessiontimeoutms;
@value ( "${zk.connectiontimeoutms:500}" ) private int connectiontimeoutms;
@value ( "${zk.dlockroot:/dlock}" ) private string dlockroot;
@bean public curatorfactorybean curatorfactorybean() { return new curatorfactorybean(connectionstring, sessiontimeoutms, connectiontimeoutms); }
@bean @autowired public dlock dlock(curatorframework client) { return new dlock(dlockroot, client); } } |
测试代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
@restcontroller @requestmapping ( "/dlock" ) public class lockcontroller {
@autowired private dlock dlock;
@requestmapping ( "/lock" ) public map testdlock(string no){ final string path = constant.keybuilder( "/test/no/" , no); long mutex=0l; try { system.out.println( "在拿锁:" +path+system.currenttimemillis()); mutex = dlock.mutex(path, () -> { try { system.out.println( "拿到锁了" + system.currenttimemillis()); thread.sleep( 10000 ); system.out.println( "操作完成了" + system.currenttimemillis()); } finally { return system.currenttimemillis(); } }, 1000 , timeunit.milliseconds); } catch (zklockexception e) { system.out.println( "拿不到锁呀" +system.currenttimemillis()); } return collections.singletonmap( "ret" ,mutex); }
@requestmapping ( "/dlock" ) public map testdlock1(string no){ final string path = constant.keybuilder( "/test/no/" , no); long mutex=0l; try { system.out.println( "在拿锁:" +path+system.currenttimemillis()); interprocessmutex processmutex = dlock.mutex(path); processmutex.acquire(); system.out.println( "拿到锁了" + system.currenttimemillis()); thread.sleep( 10000 ); processmutex.release(); system.out.println( "操作完成了" + system.currenttimemillis()); } catch (zklockexception e) { system.out.println( "拿不到锁呀" +system.currenttimemillis()); e.printstacktrace(); } catch (exception e){ e.printstacktrace(); } return collections.singletonmap( "ret" ,mutex); } @requestmapping ( "/del" ) public map deldlock(string no){ final string path = constant.keybuilder( "/test/no/" , no); dlock.del(path); return collections.singletonmap( "ret" , 1 ); } } |
以上所述是小编给大家介绍的java(springboot)基于zookeeper的分布式锁实现详解整合,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对网站的支持!
原文链接:https://blog.csdn.net/LJY_SUPER/article/details/87807091
查看更多关于浅谈Java(SpringBoot)基于zookeeper的分布式锁实现的详细内容...