本文基于ZooKeeper Curator进行分布式锁的实践
搭建ZooKeeper环境
这里基于Docker搭建ZooKeeper环境
# 拉取 ZooKeeper 镜像
docker pull zookeeper:3.4
# 创建 ZooKeeper 容器
docker run -p 2181:2181 -d
--name ZooKeeper-Service-2
zookeeper:3.4
POM依赖
Curator,作为Netflix开源的ZooKeeper客户端框架,大大简化了我们操作、使用ZooKeeper的难度,并且提供了非常丰富的基于链式调用的API。故这里首先在POM中引入Curator依赖,其中我们需要在Curator依赖中排除ZooKeeper依赖,然后单独引入与服务端版本一致的ZooKeeper依赖
<dependencies>
<!-- ZooKeeper Client: Curator -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
<exclusions>
<exclusion>
<!--排除自带ZooKeeper依赖-->
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Zookeeper依赖版本需与服务端保持一致 -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.0</version>
</dependency>
</dependencies>
Curator基本实践
创建节点
众所周知,在ZooKeeper中节点支持两种类型:持久/临时、有序/无序。即两两组合则共计四种节点。其中临时节点会在客户端连接断开后自动被删除,而持久节点则不会;有序节点在创建过程中则会被分配一个唯一的单调递增的序号,并将序号追加在节点名称中,而无序节点则不会。下面即是一个基于Curator创建节点的示例
/**
* ZooKeeper Curator 基本实践
* @author Aaron Zhu
* @date 2022-03-28
*/
public class Basic {
public static void main(String[] args) throws Exception {
// 创建客户端
CuratorFramework zkClient = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181") // ZK Server地址信息
.connectionTimeoutMs(15 * 1000) // 连接超时时间: 15s
.sessionTimeoutMs( 60 * 1000 ) // 会话超时时间: 60s
// 重试策略: 重试3次, 每次间隔1s
.retryPolicy(new RetryNTimes(3, 1000))
.build();
// 启动客户端
zkClient.start();
System.out.println("---------------------- 系统上线 ----------------------");
/******************************* 创建节点 *******************************/
zkClient.create()
.creatingParentsIfNeeded() //递归创建, 如果没有父节点则自动创建(持久类型的)父节点
.withMode( PERSISTENT ) // 节点类型: 持久节点
.forPath("/AaronTest/nodeA", "Hello, Node A".getBytes());
zkClient.create()
.creatingParentsIfNeeded() //递归创建, 如果没有父节点则自动创建(持久类型的)父节点
.withMode( PERSISTENT_SEQUENTIAL ) // 节点类型: 持久有序节点
.forPath("/AaronTest/nodeB", "Hello, Node B".getBytes());
zkClient.create()
.creatingParentsIfNeeded() //递归创建, 如果没有父节点则自动创建(持久类型的)父节点
.withMode( EPHEMERAL ) // 节点类型: 临时节点
.forPath("/AaronTest/nodeC", "Hello, Node C".getBytes());
zkClient.create()
.creatingParentsIfNeeded() //递归创建, 如果没有父节点则自动创建(持久类型的)父节点
.withMode( EPHEMERAL_SEQUENTIAL ) // 节点类型: 临时有序节点
.forPath("/AaronTest/nodeD", "Hello, Node D".getBytes());
List<String> childrenNodeNameList = zkClient.getChildren()
.forPath("/AaronTest");
for(String childrenNodeName : childrenNodeNameList) {
// 获取节点的状态信息、数据信息
Stat stat = new Stat();
byte[] bytes = zkClient.getData()
.storingStatIn(stat)
.forPath( "/AaronTest/" + childrenNodeName );
String data = new String( bytes );
System.out.println("--------------------------------");
System.out.println("childrenNodeName: " + childrenNodeName);
System.out.println("data: " + data);
System.out.println("stat: " + stat);
}
// 关闭客户端
zkClient.close();
System.out.println("---------------------- 系统下线 ----------------------");
}
}
测试结果如下所示,符合预期
Watcher机制
ZooKeeper通过引入Watched机制实现发布/订阅功能,但原生的Watcher机制一旦触发一次后就会失效。如果期望一直监听,则必须每次重复注册Watcher,使用起来较为繁琐。为此Curator对其进行了优化,实现了自动注册,以便进行重复监听。具体地,Curator中提供了三种监听器:NodeCache、PathChildrenCache、TreeCache。其中,NodeCache只可监听指定路径所在节点的创建、修改、删除;PathChildrenCache只可监听指定路径下的第一级子节点的创建、修改、删除,无法监听指定路径所在节点的事件,无法监听指定路径的子节点的子节点的事件;TreeCache可监听指定路径所在节点的创建、修改、删除,可监听指定路径下的所有各级子节点的创建、修改、删除
NodeCache实践
下面即是一个基于Curator实践NodeCache的示例
/**
* NodeCache Demo
* @author Aaron Zhu
* @date 2022-03-29
*/
public class NodeCacheDemo {
public static void main(String[] args) throws Exception {
// 创建客户端
CuratorFramework zkClient = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181") // ZK Server地址信息
.connectionTimeoutMs(15 * 1000) // 连接超时时间: 15s
.sessionTimeoutMs( 60 * 1000 ) // 会话超时时间: 60s
// 重试策略: 重试3次, 每次间隔1s
.retryPolicy(new RetryNTimes(3, 1000))
.build();
// 启动客户端
zkClient.start();
System.out.println("---------------------- 系统上线 ----------------------");
// 只可监听指定路径所在节点的创建、修改、删除
String node = "/Aaron/Bob";
NodeCache nodeCache = new NodeCache(zkClient, node);
nodeCache.start();
nodeCache.getListenable()
.addListener( () -> {
System.out.print("监听当前节点的事件");
ChildData currentNode = nodeCache.getCurrentData();
if( currentNode==null ) {
System.out.println(": 当前节点已被删除");
} else {
getNodeInfo( currentNode );
}
});
System.out.println("Test 1: 创建当前节点");
zkClient.create()
.creatingParentsIfNeeded() //递归创建, 如果没有父节点则自动创建(持久类型的)父节点
.withMode( PERSISTENT ) // 节点类型: 持久节点
.forPath( node, "Good Morning".getBytes() );
Thread.sleep(2000);
System.out.println("Test 2: 修改当前节点的数据");
zkClient.setData()
.forPath( node, "Good Night".getBytes() );
Thread.sleep(2000);
System.out.println("Test 3: 删除当前节点");
zkClient.delete()
.forPath(node);
Thread.sleep(2000);
// 主线程等待执行完毕
try{ Thread.sleep( 120*1000 ); } catch (Exception e) {}
// 关闭客户端
zkClient.close();
System.out.println("---------------------- 系统下线 ----------------------");
}
private static void getNodeInfo(ChildData currentNode) {
String info = ", Current Data Info: n"
+ "path: " + currentNode.getPath()
+ ", data: " + new String(currentNode.getData())
+ ", stat="+currentNode.getStat();
System.out.println(info);
}
}
测试结果如下所示,符合预期
PathChildrenCache实践
下面即是一个基于Curator实践PathChildrenCache的示例
/**
* PathChildrenCache Demo
* @author Aaron Zhu
* @date 2022-03-29
*/
public class PathChildrenCacheDemo {
public static void main(String[] args) throws Exception {
// 创建客户端
CuratorFramework zkClient = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181") // ZK Server地址信息
.connectionTimeoutMs(15 * 1000) // 连接超时时间: 15s
.sessionTimeoutMs( 60 * 1000 ) // 会话超时时间: 60s
// 重试策略: 重试3次, 每次间隔1s
.retryPolicy(new RetryNTimes(3, 1000))
.build();
// 启动客户端
zkClient.start();
System.out.println("---------------------- 系统上线 ----------------------");
// 只可监听指定路径下的第一级子节点的创建、修改、删除
// 无法监听指定路径所在节点的事件
// 无法监听指定路径的子节点的子节点的事件
String parentPath = "/Aaron/Tony";
PathChildrenCache pathChildrenCache = new PathChildrenCache(zkClient, parentPath, true);
pathChildrenCache.start();
pathChildrenCache.getListenable()
.addListener( (client, event) -> {
PathChildrenCacheEvent.Type eventType = event.getType();
ChildData currentNode = event.getData();
if( PathChildrenCacheEvent.Type.CHILD_ADDED.equals( eventType ) ) {
System.out.println("添加 子节点, Current Data Info:");
getNodeInfo(currentNode);
} else if( PathChildrenCacheEvent.Type.CHILD_UPDATED.equals( eventType ) ) {
System.out.println("修改 子节点, Current Data Info:");
getNodeInfo(currentNode);
} else if( PathChildrenCacheEvent.Type.CHILD_REMOVED.equals( eventType ) ) {
System.out.println("删除 子节点, Current Data Info:");
getNodeInfo(currentNode);
}
});
System.out.println("Test 1: 创建子节点");
String childNode1 = parentPath + "/Lucy";
zkClient.create()
.creatingParentsIfNeeded() //递归创建, 如果没有父节点则自动创建(持久类型的)父节点
.withMode( PERSISTENT_SEQUENTIAL ) // 节点类型: 持久有序节点
.forPath( childNode1, "I'm a Dog".getBytes() );
Thread.sleep(2000);
System.out.println("Test 2: 创建子节点");
String childNode2 = parentPath + "/Tony";
zkClient.create()
.creatingParentsIfNeeded() //递归创建, 如果没有父节点则自动创建(持久类型的)父节点
.withMode( PERSISTENT ) // 节点类型: 持久节点
.forPath( childNode2, "Good Morning".getBytes() );
Thread.sleep(2000);
System.out.println("Test 3: 修改子节点的数据");
zkClient.setData()
.forPath( childNode2, "Good Night".getBytes() );
Thread.sleep(2000);
System.out.println("Test 4: 删除子节点");
zkClient.delete()
.forPath(childNode2);
Thread.sleep(2000);
// 主线程等待执行完毕
try{ Thread.sleep( 120*1000 ); } catch (Exception e) {}
// 关闭客户端
zkClient.close();
System.out.println("---------------------- 系统下线 ----------------------");
}
private static void getNodeInfo(ChildData currentNode) {
String info = "path: " + currentNode.getPath()
+ ", data: " + new String(currentNode.getData())
+ ", stat="+currentNode.getStat();
System.out.println(info);
}
}
测试结果如下所示,符合预期
TreeCache实践
下面即是一个基于Curator实践TreeCache的示例
/**
* TreeCache Demo
* @author Aaron Zhu
* @date 2022-03-29
*/
public class TreeCacheDemo {
public static void main(String[] args) throws Exception {
// 创建客户端
CuratorFramework zkClient = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181") // ZK Server地址信息
.connectionTimeoutMs(15 * 1000) // 连接超时时间: 15s
.sessionTimeoutMs( 60 * 1000 ) // 会话超时时间: 60s
// 重试策略: 重试3次, 每次间隔1s
.retryPolicy(new RetryNTimes(3, 1000))
.build();
// 启动客户端
zkClient.start();
System.out.println("---------------------- 系统上线 ----------------------");
// 可监听指定路径所在节点的创建、修改、删除
// 可监听指定路径下的所有各级子节点的创建、修改、删除
String parentPath = "/Aaron/Luca";
TreeCache treeCache = new TreeCache(zkClient, parentPath);
treeCache.start();
treeCache.getListenable()
.addListener( (client, event) -> {
TreeCacheEvent.Type eventType = event.getType();
ChildData currentNode = event.getData();
if( TreeCacheEvent.Type.NODE_ADDED.equals( eventType ) ) {
System.out.println("添加 节点, Current Data Info:");
getNodeInfo(currentNode);
} else if( TreeCacheEvent.Type.NODE_UPDATED.equals( eventType ) ) {
System.out.println("修改 节点, Current Data Info:");
getNodeInfo(currentNode);
} else if( TreeCacheEvent.Type.NODE_REMOVED.equals( eventType ) ) {
System.out.println("删除 节点, Current Data Info:");
getNodeInfo(currentNode);
}
});
System.out.println("Test 1: 创建指定路径所在节点");
zkClient.create()
.creatingParentsIfNeeded() //递归创建, 如果没有父节点则自动创建(持久类型的)父节点
.withMode( PERSISTENT ) // 节点类型: 持久节点
.forPath( parentPath, "I'm a Dog".getBytes() );
Thread.sleep(2000);
System.out.println("Test 2: 修改指定路径所在节点");
zkClient.setData()
.forPath( parentPath, "Good Night".getBytes() );
Thread.sleep(2000);
System.out.println("Test 3: 创建一级子节点");
String childNode1 = parentPath + "/Cat";
zkClient.create()
.withMode( PERSISTENT ) // 节点类型: 持久节点
.forPath( childNode1, "I'm a Cat".getBytes() );
Thread.sleep(2000);
System.out.println("Test 4: 创建二级子节点");
String childNode2 = childNode1 + "/David";
zkClient.create()
.withMode( PERSISTENT ) // 节点类型: 持久节点
.forPath( childNode2, "My Name is David".getBytes() );
Thread.sleep(2000);
System.out.println("Test 5: 修改二级子节点");
zkClient.setData()
.forPath( childNode2, "I'm Sorry".getBytes() );
Thread.sleep(2000);
System.out.println("Test 6: 删除二级子节点");
zkClient.delete()
.forPath(childNode2);
Thread.sleep(2000);
System.out.println("Test 7: 删除一级子节点");
zkClient.delete()
.forPath(childNode1);
Thread.sleep(2000);
System.out.println("Test 8: 删除指定路径所在节点");
zkClient.delete()
.forPath(parentPath);
Thread.sleep(2000);
// 主线程等待执行完毕
try{ Thread.sleep( 60*1000 ); } catch (Exception e) {}
// 关闭客户端
zkClient.close();
System.out.println("---------------------- 系统下线 ----------------------");
}
private static void getNodeInfo(ChildData currentNode) {
String info = "path: " + currentNode.getPath()
+ ", data: " + new String(currentNode.getData())
+ ", stat="+currentNode.getStat();
System.out.println(info);
}
}
测试结果如下所示,符合预期
分布式锁
Curator还进一步地提供了非常丰富的分布式锁特性,具体包括:
-
InterProcessMutex 分布式可重入互斥锁 -
InterProcessReadWriteLock 分布式可重入读写锁 -
InterProcessSemaphoreMutex 分布式不可重入互斥锁 -
InterProcessSemaphoreV2 分布式信号量
InterProcessMutex分布式可重入互斥锁
InterProcessMutex是一个分布式的可重入的互斥锁,示例代码如下所示
/**
* InterProcessMutex Demo: 分布式可重入互斥锁
* @author Aaron Zhu
* @date 2022-03-31
*/
public class InterProcessMutexDemo {
private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
private static String zkLockPath = "/Aaron/Lock1";
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(10);
// 创建客户端
CuratorFramework zkClient = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181") // ZK Server地址信息
.connectionTimeoutMs(15 * 1000) // 连接超时时间: 15s
.sessionTimeoutMs( 60 * 1000 ) // 会话超时时间: 60s
// 重试策略: 重试3次, 每次间隔1s
.retryPolicy(new RetryNTimes(3, 1000))
.build();
// 启动客户端
zkClient.start();
System.out.println("---------------------- 系统上线 ----------------------");
for(int i=1; i<=3; i++) {
String taskName = "任务#"+i;
Task task = new Task(taskName, zkClient, zkLockPath);
threadPool.execute( task );
}
// 主线程等待所有任务执行完毕
try{ Thread.sleep( 120*1000 ); } catch (Exception e) {}
// 关闭客户端
zkClient.close();
System.out.println("---------------------- 系统下线 ----------------------");
}
/**
* 打印信息
* @param msg
*/
private static void info(String msg) {
String time = formatter.format(LocalTime.now());
String thread = Thread.currentThread().getName();
String log = "["+time+"] "+ " <"+ thread +"> " + msg;
System.out.println(log);
}
private static class Task implements Runnable {
private String taskName;
private InterProcessMutex lock;
public Task(String taskName, CuratorFramework zkClient, String zkLockPath) {
this.taskName = taskName;
this.lock = new InterProcessMutex(zkClient, zkLockPath);
}
@Override
public void run() {
try{
lock.acquire();
info(taskName + ": 成功获取锁 #1");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
methodA();
} catch (Exception e) {
System.out.println( taskName + ": Happen Exception: " + e.getMessage());
} finally {
info(taskName + ": 释放锁 #1n");
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void methodA() {
try{
lock.acquire();
info(taskName + ": 成功获取锁 #2");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
} catch (Exception e) {
System.out.println(taskName + ": Happen Exception: " + e.getMessage());
} finally {
info(taskName + ": 释放锁 #2");
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
测试结果如下所示,符合预期
InterProcessReadWriteLock分布式可重入读写锁
读写互斥
InterProcessReadWriteLock是一个分布式可重入读写锁,其中读锁为共享锁、写锁为互斥锁。示例代码如下所示
/**
* InterProcessReadWriteLock Demo: 分布式读写锁, 读锁为共享锁、写锁为互斥锁
* @author Aaron Zhu
* @date 2022-03-31
*/
public class InterProcessReadWriteLockDemo1 {
private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
private static ExecutorService threadPool = Executors.newFixedThreadPool(10);
private static String zkLockPath = "/Aaron/Lock2";
private static CuratorFramework zkClient;
@BeforeClass
public static void init() {
// 创建客户端
zkClient = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181") // ZK Server地址信息
.connectionTimeoutMs(15 * 1000) // 连接超时时间: 15s
.sessionTimeoutMs( 60 * 1000 ) // 会话超时时间: 60s
// 重试策略: 重试3次, 每次间隔1s
.retryPolicy(new RetryNTimes(3, 1000))
.build();
// 启动客户端
zkClient.start();
System.out.println("---------------------- 系统上线 ----------------------");
}
/**
* 测试: 读锁为共享锁
*/
@Test
public void test1Read() {
System.out.println("n---------------------- Test 1 : Read ----------------------");
for(int i=1; i<=3; i++) {
String taskName = "读任务#"+i;
Runnable task = new ReadTask(taskName, zkClient, zkLockPath);
threadPool.execute( task );
}
// 主线程等待所有任务执行完毕
try{ Thread.sleep( 10*1000 ); } catch (Exception e) {}
}
/**
* 测试: 写锁为互斥锁
*/
@Test
public void test2Write() {
System.out.println("n---------------------- Test 2 : Write ----------------------");
for(int i=1; i<=3; i++) {
String taskName = "写任务#"+i;
Runnable task = new WriteTask(taskName, zkClient, zkLockPath);
threadPool.execute( task );
}
// 主线程等待所有任务执行完毕
try{ Thread.sleep( 30*1000 ); } catch (Exception e) {}
}
/**
* 测试: 读写互斥
*/
@Test
public void test2ReadWrite() {
System.out.println("n---------------------- Test 3 : Read Write ----------------------");
for(int i=1; i<=8; i++) {
Runnable task = null;
Boolean isReadTask = RandomUtils.nextBoolean();
if( isReadTask ) {
task = new ReadTask( "读任务#"+i, zkClient, zkLockPath );
} else {
task = new WriteTask( "写任务#"+i, zkClient, zkLockPath );
}
threadPool.execute( task );
}
// 主线程等待所有任务执行完毕
try{ Thread.sleep( 40*1000 ); } catch (Exception e) {}
}
@AfterClass
public static void close() {
// 关闭客户端
zkClient.close();
System.out.println("---------------------- 系统下线 ----------------------");
}
/**
* 打印信息
* @param msg
*/
private static void info(String msg) {
String time = formatter.format(LocalTime.now());
String thread = Thread.currentThread().getName();
String log = "["+time+"] "+ " <"+ thread +"> " + msg;
System.out.println(log);
}
/**
* 读任务
*/
private static class ReadTask implements Runnable {
private String taskName;
private InterProcessMutex readLock;
public ReadTask(String taskName, CuratorFramework zkClient, String zkLockPath) {
this.taskName = taskName;
InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(zkClient, zkLockPath);
this.readLock = interProcessReadWriteLock.readLock();
}
@Override
public void run() {
try{
readLock.acquire();
info(taskName + ": 成功获取读锁 #1");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
} catch (Exception e) {
System.out.println( taskName + ": Happen Exception: " + e.getMessage());
} finally {
info(taskName + ": 释放读锁 #1");
try {
readLock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
/**
* 写任务
*/
private static class WriteTask implements Runnable {
private String taskName;
private InterProcessMutex writeLock;
public WriteTask(String taskName, CuratorFramework zkClient, String zkLockPath) {
this.taskName = taskName;
InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(zkClient, zkLockPath);
this.writeLock = interProcessReadWriteLock.writeLock();
}
@Override
public void run() {
try{
writeLock.acquire();
info(taskName + ": 成功获取写锁 #1");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
} catch (Exception e) {
System.out.println( taskName + ": Happen Exception: " + e.getMessage());
} finally {
info(taskName + ": 释放写锁 #1n");
try {
writeLock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
测试结果如下所示,符合预期
可重入性
由于读锁、写锁分别是基于InterProcessMutex实现的,故这二者自然也是支持可重入的。示例代码如下所示
/**
* InterProcessReadWriteLock Demo: 分布式读写锁, 可重入性测试
* @author Aaron Zhu
* @date 2022-03-31
*/
public class InterProcessReadWriteLockDemo2 {
private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
private static ExecutorService threadPool = Executors.newFixedThreadPool(10);
private static String zkLockPath = "/Aaron/Lock3";
private static CuratorFramework zkClient;
@BeforeClass
public static void init() {
System.out.println("---------------------- 系统上线 ----------------------");
// 创建客户端
zkClient = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181") // ZK Server地址信息
.connectionTimeoutMs(15 * 1000) // 连接超时时间: 15s
.sessionTimeoutMs( 60 * 1000 ) // 会话超时时间: 60s
// 重试策略: 重试3次, 每次间隔1s
.retryPolicy(new RetryNTimes(3, 1000))
.build();
// 启动客户端
zkClient.start();
}
/**
* 测试: 读锁具有可重入性
*/
@Test
public void test1Read() {
System.out.println("n---------------------- Test 1 : Read ----------------------");
InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(zkClient, zkLockPath);
InterProcessMutex readLock = interProcessReadWriteLock.readLock();
Runnable task = new Task("读任务", readLock, readLock);
threadPool.execute( task );
// 主线程等待所有任务执行完毕
try{ Thread.sleep( 5*1000 ); } catch (Exception e) {}
}
/**
* 测试: 写锁具有可重入性
*/
@Test
public void test2Write() {
System.out.println("n---------------------- Test 2 : Write ----------------------");
InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(zkClient, zkLockPath);
InterProcessMutex writeLock = interProcessReadWriteLock.writeLock();
Runnable task = new Task("写任务", writeLock, writeLock);
threadPool.execute( task );
// 主线程等待所有任务执行完毕
try{ Thread.sleep( 5*1000 ); } catch (Exception e) {}
}
@AfterClass
public static void close() {
// 关闭客户端
zkClient.close();
System.out.println("---------------------- 系统下线 ----------------------");
}
/**
* 打印信息
* @param msg
*/
private static void info(String msg) {
String time = formatter.format(LocalTime.now());
String thread = Thread.currentThread().getName();
String log = "["+time+"] "+ " <"+ thread +"> " + msg;
System.out.println(log);
}
/**
* 任务
*/
private static class Task implements Runnable {
private String taskName;
private InterProcessMutex firstLock;
private InterProcessMutex secondLock;
public Task(String taskName, InterProcessMutex firstLock, InterProcessMutex secondLock) {
this.taskName = taskName;
this.firstLock = firstLock;
this.secondLock = secondLock;
}
@Override
public void run() {
try{
firstLock.acquire();
info(taskName + ": 成功获取锁 firstLock");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
methodB();
} catch (Exception e) {
System.out.println( taskName + ": Happen Exception: " + e.getMessage());
} finally {
info(taskName + ": 释放锁 firstLockn");
try {
firstLock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public void methodB() {
try{
secondLock.acquire();
info(taskName + ": 成功获取锁 secondLock");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
} catch (Exception e) {
System.out.println( taskName + ": Happen Exception: " + e.getMessage());
} finally {
info(taskName + ": 释放锁 secondLock");
try {
secondLock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
测试结果如下所示,符合预期
锁升级、锁降级
所谓锁升级指的是读锁升级为写锁。当一个线程先获取到读锁再去申请写锁,显然其是不支持的。理由也很简单,读锁是可以多个服务实例同时持有的。若其中一个服务实例此锁线程能够进行锁升级,成功获得写锁。显然与我们之前的所说的读写互斥相违背。因为其在获得写锁的同时,其他服务实例依然持有读锁;反之,其是支持锁降级的,即写锁降级为读锁。当一个服务实例的线程在获得写锁后,该线程依然可以获得读锁。这个时候当其释放写锁,则将只持有读锁,即完成了锁降级过程。锁降级的使用价值也很大,其一方面保证了安全,读锁在写锁释放前获取;另一方面保证了高效,因为读锁是共享的。
锁升级示例代码如下所示
/**
* InterProcessReadWriteLock Demo: 分布式读写锁, 锁升级、锁降级测试
* @author Aaron Zhu
* @date 2022-03-31
*/
public class InterProcessReadWriteLockDemo3 {
private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
private static String zkLockPath = "/Aaron/Lock4";
private static CuratorFramework zkClient;
/**
* 测试: 锁升级
*/
@Test
public void test1Read2Write() {
System.out.println("---------------------- 系统上线 ----------------------");
// 创建客户端
zkClient = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181") // ZK Server地址信息
.connectionTimeoutMs(15 * 1000) // 连接超时时间: 15s
.sessionTimeoutMs( 60 * 1000 ) // 会话超时时间: 60s
// 重试策略: 重试3次, 每次间隔1s
.retryPolicy(new RetryNTimes(3, 1000))
.build();
// 启动客户端
zkClient.start();
System.out.println("---------------------- Test 1 : Read -> Write ----------------------n");
InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(zkClient, zkLockPath);
InterProcessMutex readLock = interProcessReadWriteLock.readLock();
InterProcessMutex writeLock = interProcessReadWriteLock.writeLock();
try {
readLock.acquire();
info("成功获取读锁");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
writeLock.acquire();
info("成功获取写锁");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
readLock.release();
info("成功释放读锁");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
writeLock.release();
info("成功释放写锁");
} catch (Exception e) {
System.out.println("Happen Exception: " + e.getMessage());
}
zkClient.close();
System.out.println("---------------------- 系统下线 ----------------------");
}
/**
* 打印信息
* @param msg
*/
private static void info(String msg) {
String time = formatter.format(LocalTime.now());
String thread = Thread.currentThread().getName();
String log = "["+time+"] "+ " <"+ thread +"> " + msg;
System.out.println(log);
}
}
测试结果如下所示,在持有读锁的情况下,继续尝试获取写锁会被一直阻塞
锁降级示例代码如下所示
/**
* InterProcessReadWriteLock Demo: 分布式读写锁, 锁升级、锁降级测试
* @author Aaron Zhu
* @date 2022-03-31
*/
public class InterProcessReadWriteLockDemo3 {
private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
private static String zkLockPath = "/Aaron/Lock4";
private static CuratorFramework zkClient;
/**
* 测试: 锁降级
*/
@Test
public void test2Write2Read() {
System.out.println("---------------------- 系统上线 ----------------------");
// 创建客户端
zkClient = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181") // ZK Server地址信息
.connectionTimeoutMs(15 * 1000) // 连接超时时间: 15s
.sessionTimeoutMs( 60 * 1000 ) // 会话超时时间: 60s
// 重试策略: 重试3次, 每次间隔1s
.retryPolicy(new RetryNTimes(3, 1000))
.build();
// 启动客户端
zkClient.start();
System.out.println("---------------------- Test 2 : Write -> Read ----------------------n");
InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(zkClient, zkLockPath);
InterProcessMutex readLock = interProcessReadWriteLock.readLock();
InterProcessMutex writeLock = interProcessReadWriteLock.writeLock();
try {
writeLock.acquire();
info("成功获取写锁");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
readLock.acquire();
info("成功获取读锁");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
writeLock.release();
info("成功释放写锁");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
readLock.release();
info("成功释放读锁");
} catch (Exception e) {
System.out.println("Happen Exception: " + e.getMessage());
}
zkClient.close();
System.out.println("---------------------- 系统下线 ----------------------");
}
/**
* 打印信息
* @param msg
*/
private static void info(String msg) {
String time = formatter.format(LocalTime.now());
String thread = Thread.currentThread().getName();
String log = "["+time+"] "+ " <"+ thread +"> " + msg;
System.out.println(log);
}
}
测试结果如下所示,符合预期
InterProcessSemaphoreMutex分布式不可重入互斥锁
InterProcessSemaphoreMutex则是一个分布式不可重入互斥锁,示例代码如下所示
/**
* InterProcessSemaphoreMutex Demo : 分布式不可重入互斥锁
* @author Aaron Zhu
* @date 2022-04-03
*/
public class InterProcessSemaphoreMutexDemo {
private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
private static ExecutorService threadPool = Executors.newFixedThreadPool(10);
private static String zkLockPath = "/Aaron/Lock5";
private static CuratorFramework zkClient;
@BeforeClass
public static void init() {
System.out.println("---------------------- 系统上线 ----------------------");
// 创建客户端
zkClient = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181") // ZK Server地址信息
.connectionTimeoutMs(15 * 1000) // 连接超时时间: 15s
.sessionTimeoutMs( 60 * 1000 ) // 会话超时时间: 60s
// 重试策略: 重试3次, 每次间隔1s
.retryPolicy(new RetryNTimes(3, 1000))
.build();
// 启动客户端
zkClient.start();
}
/**
* InterProcessSemaphoreMutex 是互斥锁
*/
@Test
public void test1() {
System.out.println("n---------------------- Test 1 ----------------------");
Runnable task = () -> {
InterProcessSemaphoreMutex lock = new InterProcessSemaphoreMutex(zkClient, zkLockPath);
try{
lock.acquire();
info("成功获取锁 #1");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
} catch (Exception e) {
System.out.println("Happen Exception: " + e.getMessage());
} finally {
info("释放锁 #1n");
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
};
for(int i=1; i<=3; i++) {
threadPool.execute( task );
}
// 主线程等待所有任务执行完毕
try{ Thread.sleep( 10*1000 ); } catch (Exception e) {}
}
/**
* InterProcessSemaphoreMutex 是不可重入锁
*/
@Test
public void test2() {
System.out.println("n---------------------- Test 2 ----------------------");
InterProcessSemaphoreMutex lock = new InterProcessSemaphoreMutex(zkClient, zkLockPath);
try{
lock.acquire();
info("成功获取锁 #1");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
lock.acquire();
info("成功获取锁 #2");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextLong(100, 500));
lock.release();
info("释放锁 #1n");
lock.release();
info("释放锁 #1n");
} catch (Exception e) {
System.out.println("Happen Exception: " + e.getMessage());
}
}
/**
* 打印信息
* @param msg
*/
private static void info(String msg) {
String time = formatter.format(LocalTime.now());
String thread = Thread.currentThread().getName();
String log = "["+time+"] "+ " <"+ thread +"> " + msg;
System.out.println(log);
}
}
测试结果如下所示,符合预期。Test 1结果证明其是一个互斥锁,而Test 2则在第二次获取锁时被阻塞,证明其不可重入
InterProcessSemaphoreV2分布式信号量
InterProcessSemaphoreV2是一个的分布式信号量,示例代码如下所示
/**
* InterProcessSemaphoreV2 Demo : 分布式信号量
* @author Aaron Zhu
* @date 2022-04-03
*/
public class InterProcessSemaphoreV2Demo {
private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
private static ExecutorService threadPool = Executors.newFixedThreadPool(10);
private static String zkLockPath = "/Aaron/Lock6";
private static CuratorFramework zkClient;
@Test
public void test1() {
System.out.println("---------------------- 系统上线 ----------------------");
// 创建客户端
zkClient = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181") // ZK Server地址信息
.connectionTimeoutMs(15 * 1000) // 连接超时时间: 15s
.sessionTimeoutMs( 60 * 1000 ) // 会话超时时间: 60s
// 重试策略: 重试3次, 每次间隔1s
.retryPolicy(new RetryNTimes(3, 1000))
.build();
// 启动客户端
zkClient.start();
// 系统最大并发处理量
int maxLimit = 2;
IntStream.rangeClosed(1,5)
.mapToObj( num -> new UserReq("用户#"+num, zkClient, zkLockPath, maxLimit) )
.forEach( threadPool::execute );
// 主线程等待所有任务执行完毕
try{ Thread.sleep( 20*1000 ); } catch (Exception e) {}
// 关闭客户端
zkClient.close();
System.out.println("---------------------- 系统下线 ----------------------");
}
/**
* 打印信息
* @param msg
*/
private static void info(String msg) {
String time = formatter.format(LocalTime.now());
String thread = Thread.currentThread().getName();
String log = "["+time+"] "+ " <"+ thread +"> " + msg;
System.out.println(log);
}
private static class UserReq implements Runnable {
private String name;
private InterProcessSemaphoreV2 interProcessSemaphoreV2;
private Integer maxLimit;
public UserReq(String name, CuratorFramework zkClient, String zkLockPath, Integer maxLimit) {
this.name = name;
this.maxLimit = maxLimit;
this.interProcessSemaphoreV2 = new InterProcessSemaphoreV2(zkClient, zkLockPath, maxLimit);
}
@Override
public void run() {
try {
// 模拟用户不定时发起请求
Thread.sleep(RandomUtils.nextLong(500, 2000));
String msg = name + ": 发起请求";
info(msg);
// 阻塞等待,直到获取许可
Lease lease = interProcessSemaphoreV2.acquire();
info(name + ": 系统开始处理请求");
// 模拟业务耗时
Thread.sleep(RandomUtils.nextInt(5, 20)*1000);
// 用户请求处理完毕,释放许可
interProcessSemaphoreV2.returnLease( lease );
info(name + ": 系统处理完毕");
}catch (Exception e) {
System.out.println("Happen Exception: " + e.getMessage());
}
}
}
}
测试结果如下所示,符合预期。其每次同时处理的用户请求数最大只有2个
原文始发于微信公众号(青灯抽丝):分布式锁(二):基于ZooKeeper Curator的分布式锁实践
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/156673.html