1. Zookeeper概述
Zookeeper(后续简称ZK)是一个分布式的,开放源码的分布式应用程序协调服务,通常以集群模式运转,其协调能力可以理解为是基于观察者设计模式来实现的;ZK服务会使用Znode存储使用者的数据,并将这些数据以树形目录的形式来组织管理,支持使用者以观察者的角色指定自己关注哪些节点\数据的变更,当这些变更发生时,ZK会通知其观察者;为满足本篇目标所需,着重介绍以下几个关键特性:
数据组织:数据节点以树形目录(类似文件系统)组织管理,每一个节点中都会保存数据信息和节点信息。
ZooKeeper's Hierarchical Namespace
集群模式:通常是由3、5个基数实例组成集群,当超过半数服务实例正常工作就能对外提供服务,既能避免单点故障,又尽量高可用,每个服务实例都有一个数据备份,以实现数据全局一致
ZooKeeper Service
顺序更新:更新请求都会转由leader执行,来自同一客户端的更新将按照发送的顺序被写入到ZK,处理写请求创建Znode时,Znode名称后会被分配一个全局唯一的递增编号,可以通过顺序号推断请求的顺序,利用这个特性可以实现高级协调服务
监听机制:给某个节点注册监听器,该节点一旦发生变更(例如更新或者删除),监听者就会收到一个Watch Event,可以感知到节点\数据的变更
临时节点:session链接断开临时节点就没了,不能创建子节点(很关键)
ZK的分布式锁正是基于以上特性来实现的,简单来说是:
临时节点:用于支撑异常情况下的锁自动释放能力 顺序节点:用于支撑公平锁获取锁和排队等待的能力 监听机制:用于支撑抢锁能力 集群模式:用于支撑锁服务的高可用 2. 加解锁的流程描述创建一个永久节点作为锁节点(/lock2) 试图加锁的客户端在指定锁名称节点(/lock2)下,创建临时顺序子节点 获取锁节点(/lock2)下所有子节点 对所获取的子节点按节点自增序号从小到大排序 判断自己是不是第一个子节点,若是,则获取锁 若不是,则监听比该节点小的那个节点的删除事件(这种只监听前一个节点的方式避免了惊群效应) 若是阻塞申请锁,则申请锁的操作可增加阻塞等待 若监听事件生效(说明前节点释放了,可以尝试去获取锁),则回到第3步重新进行判断,直到获取到锁 解锁时,将第一个子节点删除释放 3. ZK分布式锁的能力
可能读者是单篇阅读,这里引入上一篇《分布式锁上-初探》中的一些内容,一个分布式锁应具备这样一些功能特点:
互斥性:在同一时刻,只有一个客户端能持有锁 安全性:避免死锁,如果某个客户端获得锁之后处理时间超过最大约定时间,或者持锁期间发生了故障导致无法主动释放锁,其持有的锁也能够被其他机制正确释放,并保证后续其它客户端也能加锁,整个处理流程继续正常执行 可用性:也被称作容错性,分布式锁需要有高可用能力,避免单点故障,当提供锁的服务节点故障(宕机)时不影响服务运行,这里有两种模式:一种是分布式锁服务自身具备集群模式,遇到故障能自动切换恢复工作;另一种是客户端向多个独立的锁服务发起请求,当某个锁服务故障时仍然可以从其他锁服务读取到锁信息(Redlock) 可重入性:对同一个锁,加锁和解锁必须是同一个线程,即不能把其他线程程持有的锁给释放了 高效灵活:加锁、解锁的速度要快;支持阻塞和非阻塞;支持公平锁和非公平锁基于上文的内容,这里简单总结一下ZK的能力矩阵(其它分布式锁的情况会在后续文章中补充):
能力 |
ZK |
MySql |
Redis原生 |
Redlock |
ETCD |
互斥 |
是 |
|
|
|
|
安全 |
链接异常,session关闭后锁会自动释放 |
|
|
|
|
可用性 |
相对还好 |
|
|
|
|
可重入 |
线程可重入 |
|
|
|
|
加解锁速度 |
居中 |
|
|
|
|
阻塞非阻塞 |
都支持 |
|
|
|
|
公平非公平 |
仅公平锁 |
|
|
|
|
关于性能不太高的一种说法
因为每次在创建锁和释放锁的过程中,都要动态创建、销毁临时节点来实现锁功能。ZK中创建和删除节点只能通过Leader服务器来执行,然后Leader服务器还需要将数据同步到所有的Follower机器上,这样频繁的网络通信,性能的短板是非常突出的。在高性能,高并发的场景下,不建议使用ZooKeeper的分布式锁。
由于ZooKeeper的高可用特性,在并发量不是太高的场景,也推荐使用ZK的分布式锁。
4. InterProcessMutex 使用示例Zookeeper 客户端框架 Curator 提供的 InterProcessMutex 是分布式锁的一种实现,acquire 方法阻塞|非阻塞获取锁,release 方法释放锁,另外还提供了可撤销、可重入功能。
4.1 接口介绍
// 获取互斥锁 public void acquire ( ) throws Exception ; // 在给定的时间内获取互斥锁 public boolean acquire ( long time , TimeUnit unit ) throws Exception ; // 释放锁处理 public void release ( ) throws Exception ; // 如果当前线程获取了互斥锁,则返回true boolean isAcquiredInThisProcess ( ) ;
4.2 pom依赖
< dependency > < groupId > org .apache .logging .log4j groupId > < artifactId > log4j - core artifactId > < version > 2.8 .2 version > dependency > < dependency > < groupId > org .apache .zookeeper groupId > < artifactId > zookeeper artifactId > < version > 3.5 .7 version > dependency > < dependency > < groupId > org .apache .curator groupId > < artifactId > curator - framework artifactId > < version > 4.3 .0 version > dependency > < dependency > < groupId > org .apache .curator groupId > < artifactId > curator - recipes artifactId > < version > 4.3 .0 version > dependency > < dependency > < groupId > org .apache .curator groupId > < artifactId > curator - client artifactId > < version > 4.3 .0 version > dependency >
4.3 示例
package com .atguigu .case3 ; import org .apache .curator .framework .CuratorFramework ; import org .apache .curator .framework .CuratorFrameworkFactory ; import org .apache .curator .framework .recipes .locks .InterProcessMutex ; import org .apache .curator .retry .ExponentialBackoffRetry ; public class CuratorLockTest { public static void main ( String [ ] args ) { // 创建分布式锁1 InterProcessMutex lock1 = new InterProcessMutex ( getCuratorFramework ( ) , "/locks" ) ; // 创建分布式锁2 InterProcessMutex lock2 = new InterProcessMutex ( getCuratorFramework ( ) , "/locks" ) ; new Thread ( new Runnable ( ) { @Override public void run ( ) { try { lock1 .acquire ( ) ; System .out .println ( "线程1 获取到锁" ) ; lock1 .acquire ( ) ; System .out .println ( "线程1 再次获取到锁" ) ; Thread .sleep ( 5 * 1000 ) ; lock1 .release ( ) ; System .out .println ( "线程1 释放锁" ) ; lock1 .release ( ) ; System .out .println ( "线程1 再次释放锁" ) ; } catch ( Exception e ) { e .printStackTrace ( ) ; } } } ) .start ( ) ; new Thread ( new Runnable ( ) { @Override public void run ( ) { try { lock2 .acquire ( ) ; System .out .println ( "线程2 获取到锁" ) ; lock2 .acquire ( ) ; System .out .println ( "线程2 再次获取到锁" ) ; Thread .sleep ( 5 * 1000 ) ; lock2 .release ( ) ; System .out .println ( "线程2 释放锁" ) ; lock2 .release ( ) ; System .out .println ( "线程2 再次释放锁" ) ; } catch ( Exception e ) { e .printStackTrace ( ) ; } } } ) .start ( ) ; } private static CuratorFramework getCuratorFramework ( ) { ExponentialBackoffRetry policy = new ExponentialBackoffRetry ( 3000 , 3 ) ; CuratorFramework client = CuratorFrameworkFactory .builder ( ) .connectString ( "xxx:2181,xxx:2181,xxx:2181" ) .connectionTimeoutMs ( 2000 ) .sessionTimeoutMs ( 2000 ) .retryPolicy ( policy ) .build ( ) ; // 启动客户端 client .start ( ) ; System .out .println ( "zookeeper 启动成功" ) ; return client ; } }
5. DIY一个阉割版的分布式锁
通过这个实例对照第2节内容来理解加解锁的流程,以及如何避免惊群效应。
package com .rock .case2 ; import org .apache .zookeeper . * ; import org .apache .zookeeper .data .Stat ; import java .io .IOException ; import java .util .List ; import java .util .concurrent .CountDownLatch ; /** * zk 分布式锁 v1版本: * 完成功能 : * 1. 避免了惊群效应 * 缺失功能: * 1. 超时控制 * 2. 读写锁 * 3. 重入控制 */ public class DistributedLock { private String connectString ; private int sessionTimeout ; private ZooKeeper zk ; private CountDownLatch connectLatch = new CountDownLatch ( 1 ) ; private CountDownLatch waitLatch = new CountDownLatch ( 1 ) ; private String waitPath ; private String currentNode ; private String LOCK_ROOT_PATH ; private static String NODE_PREFIX = "w" ; public DistributedLock ( String connectString , int sessionTimeout , String lockName ) { // TODO : 数据校验 this .connectString = connectString ; this .sessionTimeout = sessionTimeout ; this .LOCK_ROOT_PATH = lockName ; } public void init ( ) throws IOException , KeeperException , InterruptedException { // 建联 zk = new ZooKeeper ( connectString , sessionTimeout , watchedEvent -> { // connectLatch 连接上zk后 释放 if ( watchedEvent .getState ( ) == Watcher .Event .KeeperState .SyncConnected ) { connectLatch .countDown ( ) ; } } ) ; connectLatch .await ( ) ; // 等待zk正常连接后 // 判断锁名称节点是否存在 Stat stat = zk .exists ( LOCK_ROOT_PATH , false ) ; if ( stat == null ) { // 创建一下锁名称节点 try { zk .create ( LOCK_ROOT_PATH , LOCK_ROOT_PATH .getBytes ( ) , ZooDefs .Ids .OPEN_ACL_UNSAFE , CreateMode .PERSISTENT ) ; } catch ( KeeperException e ) { // 并发创建冲突忽略。 if ( ! e .code ( ) .name ( ) .equals ( "NODEEXISTS" ) ) { throw e ; } } } } /** * 待补充功能: * 1. 超时设置 * 2. 读写区分 * 3. 重入控制 */ public void zklock ( ) throws KeeperException , InterruptedException { if ( ! tryLock ( ) ) { waitLock ( ) ; zklock ( ) ; } } /** * */ private void waitLock ( ) throws KeeperException , InterruptedException { try { zk .getData ( waitPath , new Watcher ( ) { @Override public void process ( WatchedEvent watchedEvent ) { // waitLatch 需要释放 if ( watchedEvent .getType ( ) == Watcher .Event .EventType .NodeDeleted && watchedEvent .getPath ( ) .equals ( waitPath ) ) { waitLatch .countDown ( ) ; } } } , new Stat ( ) ) ; // 等待监听 waitLatch .await ( ) ; } catch ( KeeperException .NoNodeException e ) { // 如果等待的节点已经被清除了 , 不等了 , 再尝试去抢锁 return ; } } private boolean tryLock ( ) throws KeeperException , InterruptedException { currentNode = zk .create ( LOCK_ROOT_PATH + "/" + NODE_PREFIX , null , ZooDefs .Ids .OPEN_ACL_UNSAFE , CreateMode .EPHEMERAL_SEQUENTIAL ) ; // 判断创建的节点是否是最小的序号节点,如果是获取到锁;如果不是,监听他序号前一个节点 List < String > children = zk .getChildren ( LOCK_ROOT_PATH , false ) ; // 如果children 只有一个值,那就直接获取锁; 如果有多个节点,需要判断,谁最小 if ( children .size ( ) == 1 ) { return true ; } else { String thisNode = currentNode .substring ( LOCK_ROOT_PATH .length ( ) + 1 ) ; // 通过w00000000获取该节点在children集合的位置 int index = children .indexOf ( thisNode ) ; if ( index == 0 ) { // 自己就是第一个节点 return true ; } // 需要监听 他前一个节点变化 waitPath = LOCK_ROOT_PATH + "/" + children .get ( index - 1 ) ; } return false ; } // 解锁 public void unZkLock ( ) { // 删除节点 try { zk .delete ( this .currentNode , - 1 ) ; } catch ( InterruptedException e ) { e .printStackTrace ( ) ; } catch ( KeeperException e ) { e .printStackTrace ( ) ; } } }
原文地址:https://mp.weixin.qq.com/s/W9rrECILCoxHhCegABmvGw
查看更多关于基于Zookeeper实现分布式锁的详细内容...