1. 概述
使用原生的zookeeper Java API开发存在一下问题:
- 会话连接时异步的,需要自己处理
- Watch需要重复注册,否则不能生效
- 开发的复杂性较高
- 不支持多节点删除和创建,需要自己递归
Curator是一种用于分布式调度任务的Zookeeper Java客户端框架,专门用于解决分布式锁,其解决了原生Java API开发分布式锁遇到的问题,提供分布式锁、集群Leader选举、共享计数器、缓存机制、分布式队列、节点管理和监听等。更多详情可参考Curator官网
2. 引入核心依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>5.2.0</version>
</dependency>
3. 参数配置
zookeeper:
connect:
address: xx.xx.xx.xx:2181 #zookeeper连接地址
connection-time-out: 10000 #连接超时时间 毫秒
session-time-out: 10000 #session超时时间 毫秒
sleep-time-out: 3000 #重试初试时间 毫秒
max-retries: 3 #重试次数
wait-time: 20 #连接等待时间 毫秒
name-space: curator #命名空间
4. 配置文件
@Configuration
public class CuratorConfig {
@Value("${zookeeper.connect.address}")
private String connectStr;
@Value("${zookeeper.connect.connection-time-out}")
private int connectionTimeout;
@Value("${zookeeper.connect.session-time-out}")
private int sessionTimeout;
@Value("${zookeeper.connect.sleep-time-out}")
private int sleepTimeOut;
@Value("${zookeeper.connect.max-retries}")
private int maxRetries;
@Value("${zookeeper.connect.name-space}")
private String namespace;
/**
* 初始化curator客户端
* @return
*/
@Bean
public CuratorFramework getCuratorClient() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(sleepTimeOut, maxRetries);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(connectStr)
.connectionTimeoutMs(connectionTimeout)
.sessionTimeoutMs(sessionTimeout)
.namespace(namespace)
.retryPolicy(retryPolicy).build();
client.start();
return client;
/*RetryPolicy retryPolicy = new ExponentialBackoffRetry(sleepTimeOut, maxRetries);
CuratorFramework client = CuratorFrameworkFactory.newClient(connectStr, retryPolicy);
client.start();
return client;*/
}
}
CuratorFrameworkFactory提供两个方法初始化curator连接,一个是工厂方法newClient,一个是构建方法build,使用工厂方法可以创建一个默认的curator实例,build构建方法可以定制创建实例。
5. 接口封装
public interface CuratorService {
/**
* 创建节点
* @param path
* @param data
* @return
* @throws Exception
*/
String createNode(String path, String data) throws Exception;
/**
* 创建指定类型的无序节点(持久或临时)
* @param nodeType
* @param path
* @param data
* @return
* @throws Exception
*/
String createTypeNode(CreateMode nodeType, String path, String data) throws Exception;
/**
* 创建指定类型的有序节点
* @param nodeType
* @param path
* @param data
* @return
*/
String createTypeSeqNode(CreateMode nodeType, String path, String data) throws Exception;
/**
* 设置值
* @param path
* @param data
* @return
*/
Stat setData(String path, String data) throws Exception;
/**
* 异步设置值
* @param path
* @param data
* @return
* @throws Exception
*/
Stat setDataAsync(String path, String data) throws Exception;
/**
* 删除节点
* @param path
* @throws Exception
*/
void deleteNode(String path) throws Exception;
/**
* 查看子节点
* @param path
* @return
* @throws Exception
*/
List<String> watchedGetChildren(String path) throws Exception;
/**
* 查看子节点
* @param path
* @param watcher
* @return
* @throws Exception
*/
List<String> watchedGetChildren(String path, Watcher watcher) throws Exception;
/**
* 创建分布式锁
* @return
*/
void getLock(String path);
/**
* 获取分布式ID
* @param path
* @param data
* @return
* @throws Exception
*/
String getDistributedId(String path, String data) throws Exception;
}
6. 接口实现
@Service
public class CuratorServiceImpl implements CuratorService {
@Value("${zookeeper.connect.wait-time}")
private int waitTime;
@Autowired
private CuratorFramework curatorClient;
@Override
public String createNode(String path, String data) throws Exception {
String nodePath = curatorClient.create().creatingParentsIfNeeded().forPath(path, data.getBytes());
return nodePath;
}
@Override
public String createTypeNode(CreateMode nodeType, String path, String data) throws Exception {
String nodePath = curatorClient.create().creatingParentsIfNeeded().withMode(nodeType).forPath(path, data.getBytes());
return nodePath;
}
@Override
public String createTypeSeqNode(CreateMode nodeType, String path, String data) throws Exception {
String nodePath = curatorClient.create().creatingParentsIfNeeded().withProtection().withMode(nodeType).forPath(path, data.getBytes());
return nodePath;
}
@Override
public Stat setData(String path, String data) throws Exception {
Stat stat = curatorClient.setData().forPath(path, data.getBytes());
return stat;
}
@Override
public Stat setDataAsync(String path, String data) throws Exception {
CuratorListener listener = new CuratorListener() {
@Override
public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
//examine event for details
}
};
curatorClient.getCuratorListenable().addListener(listener);
Stat stat = curatorClient.setData().inBackground().forPath(path, data.getBytes());
return stat;
}
@Override
public void deleteNode(String path) throws Exception {
curatorClient.delete().deletingChildrenIfNeeded().forPath(path);
}
@Override
public List<String> watchedGetChildren(String path) throws Exception {
List<String> childrenList = curatorClient.getChildren().watched().forPath(path);
return childrenList;
}
@Override
public List<String> watchedGetChildren(String path, Watcher watcher) throws Exception {
List<String> childrenList = curatorClient.getChildren().usingWatcher(watcher).forPath(path);
return childrenList;
}
@Override
public void getLock(String path) {
InterProcessLock lock = new InterProcessMutex(curatorClient, path);
try {
if (lock.acquire(waitTime, TimeUnit.MILLISECONDS)) {
System.out.println("to do something");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
@Override
public String getDistributedId(String path, String data) throws Exception {
String seqNode = this.createTypeSeqNode(CreateMode.EPHEMERAL_SEQUENTIAL, "/" + path, data);
System.out.println(seqNode);
int index = seqNode.lastIndexOf(path);
if (index >= 0) {
index += path.length();
return index <= seqNode.length() ? seqNode.substring(index) : "";
}
return seqNode;
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/76807.html