好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

基于Zookeeper实现分布式锁

1. Zookeeper概述

Zookeeper(后续简称ZK)是一个分布式的,开放源码的分布式应用程序协调服务,通常以集群模式运转,其协调能力可以理解为是基于观察者设计模式来实现的;ZK服务会使用Znode存储使用者的数据,并将这些数据以树形目录的形式来组织管理,支持使用者以观察者的角色指定自己关注哪些节点\数据的变更,当这些变更发生时,ZK会通知其观察者;为满足本篇目标所需,着重介绍以下几个关键特性:

数据组织:数据节点以树形目录(类似文件系统)组织管理,每一个节点中都会保存数据信息和节点信息。

ZooKeeper's Hierarchical Namespace

集群模式:通常是由3、5个基数实例组成集群,当超过半数服务实例正常工作就能对外提供服务,既能避免单点故障,又尽量高可用,每个服务实例都有一个数据备份,以实现数据全局一致

ZooKeeper Service

顺序更新:更新请求都会转由leader执行,来自同一客户端的更新将按照发送的顺序被写入到ZK,处理写请求创建Znode时,Znode名称后会被分配一个全局唯一的递增编号,可以通过顺序号推断请求的顺序,利用这个特性可以实现高级协调服务

监听机制:给某个节点注册监听器,该节点一旦发生变更(例如更新或者删除),监听者就会收到一个Watch Event,可以感知到节点\数据的变更

临时节点:session链接断开临时节点就没了,不能创建子节点(很关键)

ZK的分布式锁正是基于以上特性来实现的,简单来说是:

临时节点:用于支撑异常情况下的锁自动释放能力 顺序节点:用于支撑公平锁获取锁和排队等待的能力 监听机制:用于支撑抢锁能力 集群模式:用于支撑锁服务的高可用 2. 加解锁的流程描述

创建一个永久节点作为锁节点(/lock2) 试图加锁的客户端在指定锁名称节点(/lock2)下,创建临时顺序子节点 获取锁节点(/lock2)下所有子节点 对所获取的子节点按节点自增序号从小到大排序 判断自己是不是第一个子节点,若是,则获取锁 若不是,则监听比该节点小的那个节点的删除事件(这种只监听前一个节点的方式避免了惊群效应) 若是阻塞申请锁,则申请锁的操作可增加阻塞等待 若监听事件生效(说明前节点释放了,可以尝试去获取锁),则回到第3步重新进行判断,直到获取到锁 解锁时,将第一个子节点删除释放 3. ZK分布式锁的能力

可能读者是单篇阅读,这里引入上一篇《分布式锁上-初探》中的一些内容,一个分布式锁应具备这样一些功能特点:

互斥性:在同一时刻,只有一个客户端能持有锁 安全性:避免死锁,如果某个客户端获得锁之后处理时间超过最大约定时间,或者持锁期间发生了故障导致无法主动释放锁,其持有的锁也能够被其他机制正确释放,并保证后续其它客户端也能加锁,整个处理流程继续正常执行 可用性:也被称作容错性,分布式锁需要有高可用能力,避免单点故障,当提供锁的服务节点故障(宕机)时不影响服务运行,这里有两种模式:一种是分布式锁服务自身具备集群模式,遇到故障能自动切换恢复工作;另一种是客户端向多个独立的锁服务发起请求,当某个锁服务故障时仍然可以从其他锁服务读取到锁信息(Redlock) 可重入性:对同一个锁,加锁和解锁必须是同一个线程,即不能把其他线程程持有的锁给释放了 高效灵活:加锁、解锁的速度要快;支持阻塞和非阻塞;支持公平锁和非公平锁

基于上文的内容,这里简单总结一下ZK的能力矩阵(其它分布式锁的情况会在后续文章中补充):

能力

ZK

MySql

Redis原生

Redlock

ETCD

互斥

 

 

 

 

安全

链接异常,session关闭后锁会自动释放

 

 

 

 

可用性

相对还好

 

 

 

 

可重入

线程可重入

 

 

 

 

加解锁速度

居中

 

 

 

 

阻塞非阻塞

都支持

 

 

 

 

公平非公平

仅公平锁

 

 

 

 

关于性能不太高的一种说法

因为每次在创建锁和释放锁的过程中,都要动态创建、销毁临时节点来实现锁功能。ZK中创建和删除节点只能通过Leader服务器来执行,然后Leader服务器还需要将数据同步到所有的Follower机器上,这样频繁的网络通信,性能的短板是非常突出的。在高性能,高并发的场景下,不建议使用ZooKeeper的分布式锁。

由于ZooKeeper的高可用特性,在并发量不是太高的场景,也推荐使用ZK的分布式锁。

4. InterProcessMutex 使用示例

Zookeeper 客户端框架 Curator 提供的 InterProcessMutex 是分布式锁的一种实现,acquire 方法阻塞|非阻塞获取锁,release 方法释放锁,另外还提供了可撤销、可重入功能。

4.1 接口介绍

 //  获取互斥锁
public void acquire (  )  throws Exception ;   //  在给定的时间内获取互斥锁
public  boolean  acquire (  long   time  ,  TimeUnit unit )  throws Exception ;   //  释放锁处理
public void release (  )  throws Exception ;   //  如果当前线程获取了互斥锁,则返回true  boolean  isAcquiredInThisProcess (  )  ; 

4.2 pom依赖

 

 < dependency >   < groupId > org .apache  .logging  .log4j   groupId >   < artifactId > log4j - core  artifactId >   < version >  2.8  .2   version >    dependency >   < dependency >   < groupId > org .apache  .zookeeper   groupId >   < artifactId > zookeeper  artifactId >   < version >  3.5  .7   version >    dependency >   < dependency >   < groupId > org .apache  .curator   groupId >   < artifactId > curator - framework  artifactId >   < version >  4.3  .0   version >    dependency >   < dependency >   < groupId > org .apache  .curator   groupId >   < artifactId > curator - recipes  artifactId >   < version >  4.3  .0   version >    dependency >   < dependency >   < groupId > org .apache  .curator   groupId >   < artifactId > curator - client  artifactId >   < version >  4.3  .0   version >    dependency >                     

 

4.3 示例

 

package com .atguigu  .case3  ;  import org .apache  .curator  .framework  .CuratorFramework  ;  import org .apache  .curator  .framework  .CuratorFrameworkFactory  ;  import org .apache  .curator  .framework  .recipes  .locks  .InterProcessMutex  ;  import org .apache  .curator  .retry  .ExponentialBackoffRetry  ;  public class CuratorLockTest  {  public static void main ( String [  ]  args )   {   //  创建分布式锁1
        InterProcessMutex lock1  =  new InterProcessMutex ( getCuratorFramework (  )  ,   "/locks"  )  ;   //  创建分布式锁2
        InterProcessMutex lock2  =  new InterProcessMutex ( getCuratorFramework (  )  ,   "/locks"  )  ;  new Thread ( new Runnable (  )   {  @Override
            public void run (  )   {  try  {  lock1 .acquire  (  )  ;  System .out  .println  (  "线程1 获取到锁"  )  ;  lock1 .acquire  (  )  ;  System .out  .println  (  "线程1 再次获取到锁"  )  ;  Thread .sleep  (  5   *   1000  )  ;  lock1 .release  (  )  ;  System .out  .println  (  "线程1 释放锁"  )  ;  lock1 .release  (  )  ;  System .out  .println  (  "线程1  再次释放锁"  )  ;   }  catch  ( Exception e )   {  e .printStackTrace  (  )  ;   }   }   }  )  .start  (  )  ;  new Thread ( new Runnable (  )   {  @Override
            public void run (  )   {  try  {  lock2 .acquire  (  )  ;  System .out  .println  (  "线程2 获取到锁"  )  ;  lock2 .acquire  (  )  ;  System .out  .println  (  "线程2 再次获取到锁"  )  ;  Thread .sleep  (  5   *   1000  )  ;  lock2 .release  (  )  ;  System .out  .println  (  "线程2 释放锁"  )  ;  lock2 .release  (  )  ;  System .out  .println  (  "线程2  再次释放锁"  )  ;   }  catch  ( Exception e )   {  e .printStackTrace  (  )  ;   }   }   }  )  .start  (  )  ;   }  private static CuratorFramework getCuratorFramework (  )   {  ExponentialBackoffRetry policy  =  new ExponentialBackoffRetry (  3000  ,   3  )  ;  CuratorFramework client  =  CuratorFrameworkFactory .builder  (  )  .connectString  (  "xxx:2181,xxx:2181,xxx:2181"  )   .connectionTimeoutMs  (  2000  )   .sessionTimeoutMs  (  2000  )   .retryPolicy  ( policy )  .build  (  )  ;   //  启动客户端
        client .start  (  )  ;  System .out  .println  (  "zookeeper 启动成功"  )  ;  return client ;   }   } 

 

5. DIY一个阉割版的分布式锁

通过这个实例对照第2节内容来理解加解锁的流程,以及如何避免惊群效应。

 

package com .rock  .case2  ;  import org .apache  .zookeeper . *  ;  import org .apache  .zookeeper  .data  .Stat  ;  import java .io  .IOException  ;  import java .util  .List  ;  import java .util  .concurrent  .CountDownLatch  ;   /**    * zk 分布式锁 v1版本:    * 完成功能 :    *      1. 避免了惊群效应    * 缺失功能:    *      1. 超时控制    *      2. 读写锁    *      3. 重入控制    */  public class DistributedLock  {  private String connectString ;  private  int  sessionTimeout ;  private ZooKeeper zk ;  private CountDownLatch connectLatch  =  new CountDownLatch (  1  )  ;  private CountDownLatch waitLatch  =  new CountDownLatch (  1  )  ;  private String waitPath ;  private String currentNode ;  private String LOCK_ROOT_PATH ;  private static String NODE_PREFIX  =   "w"  ;  public DistributedLock ( String connectString ,   int  sessionTimeout ,  String lockName )  {   // TODO : 数据校验
        this .connectString   =  connectString ;  this .sessionTimeout   =  sessionTimeout ;  this .LOCK_ROOT_PATH   =  lockName ;   }  public void init (  )  throws IOException ,  KeeperException ,  InterruptedException  {   //  建联
        zk  =  new ZooKeeper ( connectString ,  sessionTimeout ,  watchedEvent  ->   {   //  connectLatch  连接上zk后  释放
            if  ( watchedEvent .getState  (  )   ==  Watcher .Event  .KeeperState  .SyncConnected  )   {  connectLatch .countDown  (  )  ;   }   }  )  ;  connectLatch .await  (  )  ;  //  等待zk正常连接后  //  判断锁名称节点是否存在
        Stat stat  =  zk .exists  ( LOCK_ROOT_PATH ,   false  )  ;  if  ( stat  ==   null  )   {   //  创建一下锁名称节点
            try  {  zk .create  ( LOCK_ROOT_PATH ,  LOCK_ROOT_PATH .getBytes  (  )  ,  ZooDefs .Ids  .OPEN_ACL_UNSAFE  ,  CreateMode .PERSISTENT  )  ;   }  catch  ( KeeperException e )   {   // 并发创建冲突忽略。
                if  (  ! e .code  (  )  .name  (  )  .equals  (  "NODEEXISTS"  )  )   {  throw e ;   }   }   }   }   /**    * 待补充功能:    * 1. 超时设置    * 2. 读写区分    * 3. 重入控制    */  public void zklock (  )  throws KeeperException ,  InterruptedException  {  if  (  ! tryLock (  )  )   {  waitLock (  )  ;  zklock (  )  ;   }   }   /**    *    */  private void waitLock (  )  throws KeeperException ,  InterruptedException  {  try  {  zk .getData  ( waitPath ,  new Watcher (  )   {  @Override
                public void process ( WatchedEvent watchedEvent )  {   //  waitLatch  需要释放
                    if  ( watchedEvent .getType  (  )   ==  Watcher .Event  .EventType  .NodeDeleted   &&  watchedEvent .getPath  (  )  .equals  ( waitPath )  )   {  waitLatch .countDown  (  )  ;   }   }   }  ,  new Stat (  )  )  ;   //  等待监听
            waitLatch .await  (  )  ;   }  catch  ( KeeperException .NoNodeException  e )   {   // 如果等待的节点已经被清除了 , 不等了 , 再尝试去抢锁
            return ;   }   }  private  boolean  tryLock (  )  throws KeeperException ,  InterruptedException  {  currentNode  =  zk .create  ( LOCK_ROOT_PATH  +   "/"   +  NODE_PREFIX ,   null  ,  ZooDefs .Ids  .OPEN_ACL_UNSAFE  ,  CreateMode .EPHEMERAL_SEQUENTIAL  )  ;   //  判断创建的节点是否是最小的序号节点,如果是获取到锁;如果不是,监听他序号前一个节点
        List < String >  children  =  zk .getChildren  ( LOCK_ROOT_PATH ,   false  )  ;   //  如果children 只有一个值,那就直接获取锁; 如果有多个节点,需要判断,谁最小
        if  ( children .size  (  )   ==   1  )   {  return  true  ;   }  else  {  String thisNode  =  currentNode .substring  ( LOCK_ROOT_PATH .length  (  )   +   1  )  ;   //  通过w00000000获取该节点在children集合的位置  int  index  =  children .indexOf  ( thisNode )  ;  if  ( index  ==   0  )   {   // 自己就是第一个节点
                return  true  ;   }   //  需要监听  他前一个节点变化
            waitPath  =  LOCK_ROOT_PATH  +   "/"   +  children .get  ( index  -   1  )  ;   }  return  false  ;   }   //  解锁
    public void unZkLock (  )  {   //  删除节点
        try  {  zk .delete  ( this .currentNode  ,   -  1  )  ;   }  catch  ( InterruptedException e )   {  e .printStackTrace  (  )  ;   }  catch  ( KeeperException e )   {  e .printStackTrace  (  )  ;   }   }   } 

原文地址:https://mp.weixin.qq.com/s/W9rrECILCoxHhCegABmvGw

查看更多关于基于Zookeeper实现分布式锁的详细内容...

  阅读:14次