Zookeeper入门(三)—使用CuratorFramework操作节点并添加监视器

前言

在上一篇文章ZooKeeper入门(二)中笔者讲解了分布式协调中间件ZooKeeper的常用命令并使用Curator客户端实现了一个简单的配置中心功能。本文的目的就是带领读者朋友们一起学习如何在SpringBoot项目中使用Curator客户端对ZooKeeper节点进行简单的增删改查并对节点设置Watcher监视器等实践,让大家掌握使用Curator客户端对ZooKeeper进行基础的操作。

升级Curator版本

因为与我们使用的3.7.1版本的ZooKeeper对应的Curator客户端已升级到5.3.0版本,而且具备了幂等操作API,因此笔者也对Curator的版本由之前的4.0版本升级到了5.3.0版本

  <dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.3.0</version>
</dependency>

升级后的TreeCache类已过时, 其官方Java API文档中提示我们已使用CuratorCache类代替了TreeCache

因此,我们需要对之前项目中的ZooKeeperConfig类进行修改,鉴于CuratorFramwork类实例作为客户端工具

在对ZooKeeper节点进行操作时需要经常用到,因此我们把他注册到Spring 的IOC容器使其成为一个bean

  • 首先新建一个ZooKeeperClientConfig类,实例化CuratorFramwork bean
package org.sang.config;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ZooKeeperClientConfig {

/**
* 多参数构建ZooKeeper客户端连接
* @return client
*/

@Bean(name="zookeeperClient")
public CuratorFramework createWithOptions(){
// 连接串也可以从配置文件中取
String connectString = "119.29.117.19:2181,119.29.117.19:2182,119.29.117.19:2183";
ExponentialBackoffRetry backoffRetry = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(connectString)
.retryPolicy(backoffRetry)
.sessionTimeoutMs(30*60*1000) // 会话超时30分钟
.connectionTimeoutMs(30*1000) // 连接超时30s
.build();
client.start(); // 初始化后启动
return client;
}

}
  1. ZooKeeperConfig类中注入CuratorFramework bean 并使用CuratorCache类替换过时的TreeCache
@Component
public class ZooKeeperConfig {

private static final Logger logger = LoggerFactory.getLogger(ZooKeeperConfig.class);
@Resource
private CuratorFramework zkClient;

private Properties configProperties = new Properties();


public String getProperty(String key){
return configProperties.getProperty(key);
}

// 初始化
@PostConstruct
public void init() throws Exception {
List<String> configNames = zkClient.getChildren().forPath("/config");
for(String key: configNames){
// 获取每个路径下的值(即配置值)
byte[] value = zkClient.getData().forPath("/config/"+key);
configProperties.put(key, new String(value, "UTF-8"));
}
//保证实时性,利用zk的watch机制
CuratorCache curatorCache = CuratorCache.build(zkClient, "/config");
curatorCache.start();
// 创建监听器
curatorCache.listenable().addListener((type, oldData, data) -> {
// oldData为修改前的数据;data为将要修改的新数据,类型均为ChildData
switch (type){
case NODE_CHANGED:
// 获取变更节点的路径名
String configName = data.getPath().replace("/config/", "");
// 监听到zk的zNode发生了数据变更
logger.info(configName + "的值发生了更新, 更新后的值为:" + new String(data.getData()));
// 获取变更的值
String configValue = new String(data.getData());
configProperties.put(configName, configValue);
break;
default:
break;
}
});
}

}

ZooKeeper节点基础CRUD操作

新建ZooKeeperService服务类,在该类中完成对ZooKeeper节点的操作

package org.sang.service;

import com.alibaba.fastjson.JSON;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.List;

@Service
public class ZooKeeperService {

private static final Logger logger = LoggerFactory.getLogger(ZooKeeperService.class);

// 注入ZkClient bean
@Resource
private CuratorFramework curatorFramework;

/**
* 创建永久节点
* @param path
* @param data
* @throws Exception
*/

public void createNode(String path, String data) throws Exception{
curatorFramework.create().forPath(path, data.getBytes(StandardCharsets.UTF_8));
}

/**
* 创建临时节点
* @param path
* @param data
* @throws Exception
*/

public void createEphemeralNode(String path, String data) throws Exception {
curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath(path, data.getBytes(StandardCharsets.UTF_8));
}

/**
* 创建临时有序节点
* @param path
* @param data
* @throws Exception
*/

public void crateEphemeralSequentialNode(String path, String data) throws Exception {
curatorFramework.create()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath(path, data.getBytes(StandardCharsets.UTF_8));
}

/**
* 往节点种设置数据
* @param path
* @param data
* @throws Exception
*/

public void setData(String path, String data) throws Exception{
curatorFramework.setData().forPath(path, data.getBytes(StandardCharsets.UTF_8));
}

/**
* 异步修改数据
* @param path
* @param data
* @throws Exception
*/

public void setDataAsync(String path, String data) throws Exception{
// 添加回调监听器, set数据成功后会对节点进行监听
CuratorListener listener = (client, event) -> {
Stat stat = event.getStat();
logger.info("stat=" + JSON.toJSONString(stat));
CuratorEventType eventType = event.getType();
logger.info("eventType="+eventType.name());
};
curatorFramework.getCuratorListenable().addListener(listener);
curatorFramework.setData().inBackground().forPath(path, data.getBytes(StandardCharsets.UTF_8));
}


/**
* 删除节点
* @param path
* @throws Exception
*/

public void deleteData(String path) throws Exception{
curatorFramework.delete().forPath(path);
}

/**
* 安全删除节点
* @param path
* @throws Exception
*/

public void guaranteedDeleteData(String path) throws Exception {
curatorFramework.delete().guaranteed().forPath(path);
}

/**
* 获取子节点下的全部子节点路径集合
* @param path 指定节点路径
* @return List<String> 子节点路径集合
* @throws Exception
*/

public List<String> watchedGetChildren(String path) throws Exception {
List<String> children = curatorFramework.getChildren().watched().forPath(path);
return children;
}


/**
* 获取节点数据
* @param path 节点路径
* @param fullClassName 数据转换对象全类名
* @return Object
* @throws Exception
*/

public Object getDataByPath(String path, String fullClassName) throws Exception {
String jsonStr = new String(curatorFramework.getData().forPath(path), StandardCharsets.UTF_8);
Class clazz = Class.forName(fullClassName);
return JSON.parseObject(jsonStr, clazz);
}

}

  1. 新建ZookeepeController类, 通过接口操作ZookeeperService
package org.sang.controller;

import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils;
import org.sang.config.ZooKeeperConfig;
import org.sang.pojo.RespBean;
import org.sang.service.ZooKeeperService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.*;

import javax.annotation.Resource;
import java.util.List;
import java.util.Map;

@RestController
@RequestMapping("/zookeeper")
public class ZooKeeperController {

@Resource
private ZooKeeperService zooKeeperService;
@Resource
private ZooKeeperConfig zooKeeperConfig;

private final static Logger logger = LoggerFactory.getLogger(ZooKeeperController.class);

/**
* 获取配置变量接口
*/

@GetMapping("/getConfigValueByKey")
public RespBean<String> getConfigValueByKey(@RequestParam("configKey") String configKey){
logger.info("configKey={}", configKey);
String configValue = zooKeeperConfig.getProperty(configKey);
RespBean<String> respBean = RespBean.success(configValue);
return respBean;
}

/**
* 创建持久节点接口
*/

@PostMapping(value = "/create/persistent")
public RespBean<String> createPersistentNode(@RequestBody Map<String, Object> postData) {
RespBean respBean;
try {
checkPostData(postData);
String path = (String) postData.get("path");
Object data = postData.get("data");
zooKeeperService.createNode(path, JSON.toJSONString(data));
respBean = RespBean.success("create node " + path + " success");
} catch (Exception e) {
respBean = RespBean.error("create node failed");
logger.error("create node error", e);
}
return respBean;
}

/**
* 创建临时节点接口
*/

@RequestMapping("/create/ephemeral")
public RespBean<String> createTempNode(@RequestBody Map<String, Object> postData) {
RespBean<String> respBean;
try {
checkPostData(postData);
String path = (String) postData.get("path");
Object data = postData.get("data");
zooKeeperService.createEphemeralNode(path, JSON.toJSONString(data));
respBean = RespBean.success("create ephemeral node " + path + " success");
} catch (Exception e) {
respBean = RespBean.error("create ephemeral node failed");
logger.error("create ephemeral node error", e);
}
return respBean;
}

/**
* 创建临时有序节点接口
*/

@PostMapping("/ephemeral/sequence")
public RespBean<String> createEphemeralSequenceNode(@RequestBody Map<String, Object> postData){
RespBean respBean;
try{
checkPostData(postData);
String path = (String) postData.get("path");
Object data = postData.get("data");
zooKeeperService.crateEphemeralSequentialNode(path, JSON.toJSONString(data));
respBean = RespBean.success("create ephemeral sequence node " + postData.get("path") + " success");
} catch (Exception e) {
respBean = RespBean.error("create ephemeral node failed");
logger.error("create ephemeral sequence node error", e);
}
return respBean;
}

/**
* 根据节点路径获取节点中的数据接口
*/

@PostMapping("getDataByPath")
public RespBean<Object> getDataByPath(@RequestBody Map<String, String> paramMap){
RespBean respBean;
try {
String path = paramMap.get("path");
String fullClassName = paramMap.get("fullClassName");
Object data = zooKeeperService.getDataByPath(path, fullClassName);
respBean = RespBean.success(data);
} catch (Exception e) {
respBean = RespBean.error("get data failed caused by " + e.getMessage());
logger.error("get data error", e);
}
return respBean;
}

/**
* 同步修改节点数据接口
*/

@PostMapping("/setData/sync")
public RespBean<String> setData(@RequestBody Map<String, Object> paramMap){
checkPostData(paramMap);
RespBean<String> respBean;
try {
zooKeeperService.setData((String) paramMap.get("path"), JSON.toJSONString(paramMap.get("data")));
respBean = RespBean.success("set data success");
} catch (Exception e) {
logger.error("set data failed", e);
respBean = RespBean.error("set data failed, caused by " + e.getMessage());
}
return respBean;
}

@PostMapping("/setData/async")
public RespBean<String> asyncSetData(@RequestBody Map<String, Object> paramMap){
checkPostData(paramMap);
RespBean<String> respBean;
try {
zooKeeperService.setDataAsync((String) paramMap.get("path"), JSON.toJSONString(paramMap.get("data")));
respBean = RespBean.success("async set data success");
} catch (Exception e) {
logger.error("async set data failed", e);
respBean = RespBean.error("async set data failed, caused by " + e.getMessage());
}
return respBean;
}

/**
* 获取被监听的子节点路径集合接口
*/

@GetMapping("/getWatchedChildren")
public RespBean<List<String>> getWatchedChildren(@RequestParam("path") String path){
RespBean<List<String>> respBean;
try {
List<String> watchedChildren = zooKeeperService.watchedGetChildren(path);
respBean = RespBean.success(watchedChildren);
} catch (Exception e) {
logger.error("getWatchedChildren error", e);
respBean = RespBean.error("getWatchedChildren failed, caused by " + e.getMessage());
}
return respBean;
}

/**
* 删除节点接口
*/

@DeleteMapping("/deleteByPath")
public RespBean<String> deleteDataByPath(@RequestParam("path") String path){
logger.info("delete ZNode " + path);
RespBean<String> respBean;
try {
zooKeeperService.deleteData(path);
respBean = RespBean.success("delete ZNode success");
} catch (Exception e) {
logger.error("delete ZNode of " + path + "failed", e);
respBean = RespBean.error("delete ZNode failed, caused by " + e.getMessage());
}
return respBean;
}

/**
* 安全删除节点接口
*/

@DeleteMapping("/guaranteedDeleteData")
public RespBean<String> guaranteedDeleteData(@RequestParam("path") String path){
logger.info("guaranteed delete ZNode " + path);
RespBean<String> respBean;
try {
zooKeeperService.guaranteedDeleteData(path);
respBean = RespBean.success("guaranteed delete data success");
} catch (Exception e) {
logger.error("guaranteed delete data failed", e);
respBean = RespBean.error("guaranteed delete data failed, caused by " + e.getMessage());
}
return respBean;
}

/**
* 校验POST请求入参数据
*/

private void checkPostData(Map<String, Object> postData){
String path = (String) postData.get("path");
if(StringUtils.isEmpty(path)){
throw new IllegalArgumentException("path cannot be null");
}
Object data = postData.get("data");
if(data==null || "".equals(data)){
throw new IllegalArgumentException("data cannot be null");
}
}

}

测试CRUD基础操作

首先我们在项目的SpringSecurity配置文件WebSecurityConfig.java中对操作zookeeper的接口放开认证要求

SpringSecurity#configure(HttpSecurity http)方法

 http.authorizeRequests()
.antMatchers("/user/reg").anonymous()
.antMatchers("/zookeeper/**").anonymous()

参考笔者之前发布的文章Zookeeepr入门(一)启动ZooKeeper集群服务,然后启动本地的Redis和MySql服务后

再在IDEA中启动blogserver服务,服务启动成功之后就可以通过在postman中调用接口进行验证了

创建持久节点

在postman中调用创建持久节点接口

POST http://localhost:8081/blog/zookeeper/create/persistent
{
"path": "/test",
"data": {
"serviceName": "zooKeeperService",
"serverAddress": "192.110.119.201:2181,192.110.119.202:2181,192.110.119.203:2181",
"contextPath": "/zooKeeper",
"apiList": []
}
}

接口返回信息:

{
"status": 200,
"msg": "success",
"data": "create node /test success"
}

然后更换入参调用同一个接口创建子节点/test/oderService

POST http://localhost:8081/blog/zookeeper/create/persistent
{
"path": "/test/oderService",
"data": {
"serviceName": "orderService",
"serverAddress": "192.110.119.201:8080,192.110.119.202:8080,192.110.119.203:8080",
"contextPath": "/orderService",
"apiList": [
{
"apiId": 1,
"apiPath": "/order/create",
"apiName": "创建订单接口API",
"requestType": "POST",
"argumentTypes": "java.lang.String, java.lang.Double",
"returnType": "org.sang.pojo.RespBean"
},
{
"apiId": 2,
"apiPath": "/order/get",
"apiName": "查询订单接口API",
"requestType": "GET",
"argumentTypes": "java.lang.Long",
"returnType": "org.sang.pojo.RespBean"
}
]
}
}

注意:为了创建路径为/test/orderService的持久节点成功,必须先创建/test节点,否则在没有父节点的情况下直接创建/test`的子节点zookeeper客户端会会报错

此时我们在Linux客户端连接ZooKeeper服务, 通过ls /test命令可查看到/test路径下的所有子节点

[zk: localhost:2181(CONNECTED) 19] ls /test
[orderService]

然后再通过get /test/orderService命令可以直接查看该节点下的数据

[zk: localhost:2181(CONNECTED) 21] get /test/orderService
{"serviceName":"orderService","serverAddress":"192.110.119.201:8080,192.110.119.202:8080,192.110.119.203:8080","contextPath":"/orderService","apiList":[{"apiId":1,"apiPath":"/order/create","apiName":"创建订单接口API","requestType":"POST","argumentTypes":"java.lang.String, java.lang.Double","returnType":"org.sang.pojo.RespBean"},{"apiId":2,"apiPath":"/order/get","apiName":"查询订单接口API","requestType":"GET","argumentTypes":"java.lang.Long","returnType":"org.sang.pojo.RespBean"}]}
  1. 查看节点数据

为了将/test节点及其子节点中存放的数据在取数据时能反序列化为一个对象,我们新建了一两个实体类

ServiceInfo.javaApiInfo.java

public class ServiceInfo implements Serializable {
// 服务名称
private String serviceName;
// 服务地址,IP+端口号,多个使用逗号分隔
private String serverAddress;
// 上下文
private String contextPath;
// 服务中的api列表
private List<ApiInfo> apiList;
// 省略setter和getter方法
}
public class ApiInfo implements Serializable {
// APIID
private Long apiId;
// API 路径
private String apiPath;
// API名称
private String apiName;
// 请求类型:GET|POST|PUT|DELETE
private String requestType;
// 参数类型,参数全类名, 多个以逗号分隔
private String argumentTypes;
// 返回值类型
private String returnType;
// 省略setter和getter方法
}

在postman中调用查询节点数据接口

  POST http://localhost:8081/blog/zookeeper/getDataByPath
{
"path": "/test",
"fullClassName": "org.sang.pojo.ServiceInfo"
}
// 第一参数为节点路径,第二个参数为实体类全类名

接口返回信息:

{
"status": 200,
"msg": "success",
"data": {
"serviceName": "zooKeeperService",
"serverAddress": "192.110.119.201:2181,192.110.119.202:2181,192.110.119.203:2181",
"contextPath": "/zooKeeper",
"apiList": []
}
}

再次调用相同接口获取/test/orderService节点中的数据

POST http://localhost:8081/blog/zookeeper/getDataByPath
{
"path": "/test/orderService",
"fullClassName": "org.sang.pojo.ServiceInfo"
}

接口返回结果:

{
"status": 200,
"msg": "success",
"data": {
"serviceName": "orderService",
"serverAddress": "192.110.119.201:8080,192.110.119.202:8080,192.110.119.203:8080",
"contextPath": "/orderService",
"apiList": [
{
"apiId": 1,
"apiPath": "/order/create",
"apiName": "创建订单接口API",
"requestType": "POST",
"argumentTypes": "java.lang.String, java.lang.Double",
"returnType": "org.sang.pojo.RespBean"
},
{
"apiId": 2,
"apiPath": "/order/get",
"apiName": "查询订单接口API",
"requestType": "GET",
"argumentTypes": "java.lang.Long",
"returnType": "org.sang.pojo.RespBean"
}
]
}
}

创建临时节点

调用创建临时节点接口

POST http://localhost:8081/blog/zookeeper/create/ephemeral
{
"path": "/test/ephemralOne",
"data": "ephemeral node1"
}

接口返回信息:

{
"status": 200,
"msg": "success",
"data": "create ephemeral node /test/ephemralOne success"
}

然后在连接ZooKeeper服务的Linux客户端中执行命令ls /test 查看临时节点

[zk: localhost:2181(CONNECTED) 22] ls /test
[ephemralOne, orderService]

我们发现/test节点下多了一个子节点ephemralOne

然后重启blogserver服务后我们再次执行ls /test命令会发现ephemralOne子节点消失不见了,证明了它是    一个临时节点,在会话关闭后就会消失。

创建临时有序节点

调用创建临时有序节点接口

POST http://localhost:8081/blog/zookeeper/ephemeral/sequence
{
"path": "/test/ephemeralSequence",
"data": "ephemeralSequence"
}

连续调用以上接口三次

每次调用都会返回以下响应信息:

{
"status": 200,
"msg": "success",
"data": "create ephemeral sequence node /test/ephemeralSequence success"
}

再次执行ls /test命令查看/test节点下的子节点

[zk: localhost:2181(CONNECTED) 23] ls /test
[ephemeralSequence0000000004, ephemeralSequence0000000005, ephemeralSequence0000000006, orderService]

可以看到创建的临时有序节点在指定的路径名ephemeralSequence后面都带有一个十位长度的数字字符串

在获取节点路径时需要将后面的数字字符也带上

  [zk: localhost:2181(CONNECTED) 24] get /test/ephemeralSequence0000000004
"ephemeralSequence"

重置节点数据

调用同步设置数据接口

POST 
{
"path": "/test",
"data": {
"serverAddress": "localhost:2181,localhost:2182,localhost:2183",
"contextPath": "/cloudService",
"serviceName": "registerService",
"apiList":[]
}
}

返回结果:

  {
"status": 200,
"msg": "success",
"data": "set data success"
}

然后通过客户端zkCli命令可以查看到test节点数据发送了变化

[zk: localhost:2181(CONNECTED) 0] get /test
{"serverAddress":"localhost:2181,localhost:2182,localhost:2183","contextPath":"/cloudService","serviceName":"registerService","apiList":[]}

删除节点

调用删除节点接口

DELETE http://localhost:8081/blog/zookeeper/deleteByPath?path=/test/ephemralOne

返回结果

{
"status": 200,
"msg": "success",
"data": "delete ZNode success"
}

注意DeleteBuilder#forPath方法只能删除没有子节点的节点,不能用来删除有子节点的节点

另一种删除方式curatorFramework.delete().guaranteed().forPath(path)   也只能删除子节点,这种方式是显示地表示可删除节点位子节点

ZooKeeper中的回调、监听器和Watcher

ZooKeeper中间件之所以能作为一个分布式协调器的一个重要原因就在于它的Watch机制, 当节点创建、修改、删除以及重连和连接失效时都能通过watch机制得到通知

通过回调监听

这种方式是通过设置BackgroundCallback回调函数监听节点

BackgroundCallback是一个接口

public interface BackgroundCallback {
void processResult(CuratorFramework zkClient, CuratorEvent event) throws Exception;
}

构造一个BackgroundCallback实例需要实现processResult抽象方法

第一个参数是CuratorFramework类型的zkClient, 通过这个客户端可以在回调种继续操作ZNode节点, 添加监视器等

第二个参数是CuratorEvent类型的事件对象, CuratorEvent也是一个接口,它的实现类是CuratorEventImpl

public interface CuratorEvent {
CuratorEventType getType();

int getResultCode();

String getPath();

Object getContext();

Stat getStat();

byte[] getData();

String getName();

List<String> getChildren();

List<ACL> getACLList();

List<CuratorTransactionResult> getOpResults();

WatchedEvent getWatchedEvent();
}

通过CuratorEvent#getType方法可以获得事件类型CuratorEventType,是一个枚举类型可以看到有下面这些事件类型

public enum CuratorEventType {
CREATE, // 创建节点
DELETE, // 删除节点
EXISTS, // 存在节点
GET_DATA, // 获取节点数据
SET_DATA, // 重置节点数据
CHILDREN, // 子节点
SYNC, // 同步
GET_ACL, // 获取权限
SET_ACL, // 设置权限
TRANSACTION, // 事务
GET_CONFIG, // 获取zookeeper配置信息
RECONFIG, // 重新配置zookeeper
WATCHED, // 监听
REMOVE_WATCHES, // 移除监听
CLOSING, 会话关闭
ADD_WATCH; // 添加watch
private CuratorEventType() {
}
}

通过添加监听器监听

监听器为CuratorListener, 它是一个接口,有一个抽象方法

 public interface CuratorListener {
void eventReceived(CuratorFramework zkClient, CuratorEvent event) throws Exception;
}

可以看到CuratorListenerBackgroundCallback两个接口具有相同类型的入参,可以说二者实现监视节点的效果和底层原理都是一样的

给节点添加Watcher

watcher是一个接口

@Public
public interface Watcher {
void process(WatchedEvent watchedEvent);
}

要构造一个Watcher实例需要实现process抽象方法,只有一个WatchedEvent类型的构造参数

@Public
public class WatchedEvent {
private final KeeperState keeperState;
private final EventType eventType;
private String path;
// 三个参数的构造函数
public WatchedEvent(EventType eventType, KeeperState keeperState, String path) {
this.keeperState = keeperState;
this.eventType = eventType;
this.path = path;
}
// 一个参数的构造方法
public WatchedEvent(WatcherEvent eventMessage) {
this.keeperState = KeeperState.fromInt(eventMessage.getState());
this.eventType = EventType.fromInt(eventMessage.getType());
this.path = eventMessage.getPath();
}

public KeeperState getState() {
return this.keeperState;
}

public EventType getType() {
return this.eventType;
}

public String getPath() {
return this.path;
}

public String toString() {
return "WatchedEvent state:" + this.keeperState + " type:" + this.eventType + " path:" + this.path;
}

public WatcherEvent getWrapper() {
return new WatcherEvent(this.eventType.getIntValue(), this.keeperState.getIntValue(), this.path);
}
}

通过WatchedEvent#type方法可以获得事件类型参数EventType对象

EventType也是一个枚举类, 囊括了以下几种ZNode事件

None(-1),  // 无事件
NodeCreated(1), // 创建节点
NodeDeleted(2), // 节点被删除
NodeDataChanged(3), // 节点发送改变
NodeChildrenChanged(4), // 子节点发生改变
DataWatchRemoved(5), // 数据监视器被删除
ChildWatchRemoved(6), // 子节点监视器被删除
PersistentWatchRemoved(7); // 持久化监视器被删除

Watcher内部具有EventTypeKeeperState两个枚举类

BackgroundCallback与CuratorListener的用法

ZooKeeeprService类中定义一个全局变量callback, 这个回调可以在异步创建节点、异步修改节点数据以及异步删除节点时对节点进行监听处理

 // 定义回调函数,对节点进行监听
private BackgroundCallback callback = ((zkClient, curatorEvent) -> {
logger.info("event data="+ ((curatorEvent.getData()==null)?"null data": new String(curatorEvent.getData())));
switch (curatorEvent.getType()){
case SET_DATA:
// 只会触发SET_DATA事件
logger.info("node data changed");
// 回调做其他事情
break;
case CREATE:
logger.info("node created");
break;
case CHILDREN:
logger.info("children");
break;
case DELETE:
logger.info("node deleted");
break;
default:
logger.info("eventType="+curatorEvent.getName());
break;
}
});
// 定义监听器,对节点进行监听
private CuratorListener listener = (client, event) -> {
Stat stat = event.getStat();
logger.info("stat=" + JSON.toJSONString(stat));
CuratorEventType eventType = event.getType();
logger.info("eventType="+eventType.name());
};

这里我们只在回调里打印日志

然后在异步创建节点、异步修改节点数据及异步删除节点方法中使用

 /**
* 异步创建节点
* @param path
* @param data
* @throws Exception
*/

public void asyncCreateNode(String path, String data) throws Exception {
curatorFramework.create().inBackground(callback).forPath(path, data.getBytes(StandardCharsets.UTF_8));
}

/**
* 异步修改节点数据
* @param path
* @param data
* @throws Exception
*/

public void setDataAsyncWithCallback(String path, String data) throws Exception {
curatorFramework.setData().inBackground(callback).forPath(path, data.getBytes(StandardCharsets.UTF_8));
}

/**
* 使用监听器异步修改数据
* @param path
* @param data
* @throws Exception
*/

public void setDataAsyncWithListener(String path, String data) throws Exception {
// 通过监听器修节点改数据
curatorFramework.getCuratorListenable().addListener(listener);
curatorFramework.setData().inBackground().forPath(path, data.getBytes(StandardCharsets.UTF_8));
}
/**
* 异步删除数据
* @param path
* @throws Exception
*/

public void asyncDeleteData(String path) throws Exception {
curatorFramework.delete().inBackground(callback).forPath(path);
}

然后在ZooKeeperController类中加上对应的控制器方法

@PostMapping("/create/async")
public RespBean<String> asyncCreateNode(@RequestBody Map<String, Object> postData){
RespBean respBean;
try {
checkPostData(postData);
String path = (String) postData.get("path");
Object data = postData.get("data");
zooKeeperService.asyncCreateNode(path, JSON.toJSONString(data));
respBean = RespBean.success(" async create node " + path + " success");
} catch (Exception e){
respBean = RespBean.error("async create node failed");
logger.error("async create node error", e);
}
return respBean;
}

@PostMapping("/setData/async/callback")
public RespBean<String> setDataAsyncWithCallback(@RequestBody Map<String, Object> paramMap){
checkPostData(paramMap);
RespBean<String> respBean;
try {
zooKeeperService.setDataAsyncWithCallback((String) paramMap.get("path"), JSON.toJSONString(paramMap.get("data")));
respBean = RespBean.success("async set data with callback success");
} catch (Exception e) {
logger.error("set data with callback failed", e);
respBean = RespBean.error("async set data with callback failed, caused by " + e.getMessage());
}
return respBean;
}

@DeleteMapping("/deleteByPath/async")
public RespBean<String> asyncDeleteByPath(@RequestParam("path") String path){
logger.info(" async delete ZNode " + path);
RespBean<String> respBean;
try {
zooKeeperService.asyncDeleteData(path);
respBean = RespBean.success("delete ZNode success");
} catch (Exception e) {
logger.error("async delete ZNode of " + path + "failed", e);
respBean = RespBean.error("async delete ZNode failed, caused by " + e.getMessage());
}
return respBean;
}

然后重启服务开始测试效果

测试异步添加节点

POST http://localhost:8081/blog/zookeeper/create/async
{
"path": "/test/person",
"data": "personInfo collection"
}

返回信息:

{
"status": 200,
"msg": "success",
"data": " async create node /test/person success"
}

可以看到控制台中打印出了节点创建的信息,这是我们在回调函数中根据事件类型判断打印的日志

2022-08-21 13:44:02.629  INFO 20120 --- [ain-EventThread] org.sang.service.ZooKeeperService        : event data=null data
2022-08-21 13:44:02.630 INFO 20120 --- [ain-EventThread] org.sang.service.ZooKeeperService : node created

测试使用回调异步修改节点数据接口

POST http://localhost:8081/blog/zookeeper/setData/async/callback
{
"path": "/test/person",
"data": "person infomation collection"
}

返回结果:

{
"status": 200,
"msg": "success",
"data": "async set data with callback success"
}

控制台打印如下日志:

2022-08-21 22:55:02.011  INFO 13724 --- [ain-EventThread] org.sang.service.ZooKeeperService        : event data=null data
2022-08-21 22:55:02.012 INFO 13724 --- [ain-EventThread] org.sang.service.ZooKeeperService : node data changed

测试使用监听器异步修改节点数据

POST http://localhost:8081/blog/zookeeper/setData/async/listener
{
"path": "/test/person/2",
"data": {
"name": "李四",
"age": 24,
"height": 173.2,
"salary": 15800.85
}
}

返回信息:

{
"status": 200,
"msg": "success",
"data": "async set data with callback success"
}

控制台打印日志:

2022-08-21 23:03:54.778  INFO 13724 --- [ain-EventThread] org.sang.service.ZooKeeperService        : stat=null
2022-08-21 23:03:54.779 INFO 13724 --- [ain-EventThread] org.sang.service.ZooKeeperService : eventType=SET_DATA

测试异步删除节点

DELETE http://localhost:8081/blog/zookeeper/deleteByPath/async?path=/test/person/2

返回结果:

{
"status": 200,
"msg": "success",
"data": "delete ZNode success"
}

控制台打印出日志:

2022-08-21 23:09:06.306  INFO 13724 --- [ain-EventThread] org.sang.service.ZooKeeperService        : event data=null data2022-08-21 23:09:06.306  INFO 13724 --- [ain-EventThread] org.sang.service.ZooKeeperService        : node deleted

Watcher的用法

ZooKeeperService类里定义个全局的CuratorWatcher, 并定义一个为节点添加这个watcher的方法

private CuratorWatcher watcher = watchedEvent -> {
String eventName = watchedEvent.getType().name();
// 监听的节点路径
String watchedPath = watchedEvent.getPath();
logger.info("watchedPath={}", watchedPath);
switch (eventName){
case "NodeCreated":
logger.info("node created, add Lock success");
break;
case "NodeDeleted":
logger.info("node deleted, release lock success");
break;
case "NodeDataChanged":
logger.info("node data changed");
break;
case "NodeChildrenChanged":
logger.info("node children changed");
break;
case "DataWatchRemoved":
logger.info("data watcher removed");
break;
case "ChildWatchRemoved":
logger.info("child watcher removed");
break;
case "PersistentWatchRemoved":
logger.info("persistent watcher removed");


/**
* 给节点添加watcher
* @param path
* @throws Exception
*/

public void addWatchByPath(String path) throws Exception {
curatorFramework.getData().usingWatcher(watcher).forPath(path);
}break;
default:
logger.info("none event");
break;
}
};

/**
* 给节点添加watcher
* @param path
* @throws Exception
*/

public void addWatchByPath(String path) throws Exception {
curatorFramework.getData().usingWatcher(watcher).forPath(path);
}

然后再在ZooKeeperController类中添加对应的控制器方法

@PostMapping("/addWatcherByPath")
public RespBean<String> addWatcherByPath(@RequestParam("path") String path){
RespBean<String> respBean;
try {
zooKeeperService.addWatchByPath(path);
respBean = RespBean.success("add watcher success");
} catch (Exception e) {
logger.info("add watcher failed", e);
respBean = RespBean.error("add watcher failed, caused by " + e.getMessage());
}
return respBean;
}

重启应用后对/test/person/1这个节点进行监听

POST http://localhost:8081/blog/zookeeper/addWatcherByPath?path=/test/person/1

返回信息:

{
"status": 200,
"msg": "success",
"data": "add watcher success"
}

然后调用同步setData接口对这个节点数据进行修改

POST http://localhost:8081/blog/zookeeper/setData/sync
{
"path": "/test/person/1",
"data": {
"name": "王五",
"age": 32,
"height": 173.6,
"salary": 16800.58
}
}

返回信息:

{
"status": 200,
"msg": "success",
"data": "set data success"
}

控制台打印出日志:

2022-08-21 23:29:19.349  INFO 3732 --- [ain-EventThread] org.sang.service.ZooKeeperService        : watchedPath=/test/person/1
2022-08-21 23:29:19.349 INFO 3732 --- [ain-EventThread] org.sang.service.ZooKeeperService : node data changed

注意: Watcher还会监听一次,后面继续对节点进行操作就不会进入CuratorWatcher#process方法,如果需要继续监视节点的变化,则需要重新对节点添加Watcher

我们来测试一下效果,继续修改/test/person/1节点

POST http://localhost:8081/blog/zookeeper/setData/sync
{
"path": "/test/person/1",
"data": {
"name": "赵六",
"age": 33,
"height": 174.6,
"salary": 18800.58
}
}

返回信息:

{
"status": 200,
"msg": "success",
"data": "set data success"
}

但是控制台却看不到我们定义的watcherprocess方法中打印的日志,说明被调用了一次的watcher已经失效

不过要给节点添加持久类型的Watcher可通过下面这种链式调用方式实现

CuratorFramework.watchers()
.add()
.withMode(AddWatchMode.PERSISTENT)
.usingWatcher(watcher)
.forPath(path)

小结

本文主要详细讲解了使用CuratorFramework客户端在SpringBoot项目中对ZooKeeper节点实现增删改查以及对ZooKeeper节点添加BackgroundCallback回调、CuratorListener监听器和Watcher监视器等操作,既能实现对节点的异步操作,也能监听节点的变化。从而让我们并根据ZooKeeper节点事件类型作出响应的业务逻辑处理.

关于使用CuratorFramework客户端以非事务的方式操作ZooKeeper节点就介绍到这里,想要更深入的学习CuratorFramework的用法可通过阅读该类及其方法相关类的源码进一步掌握。下一篇文章,笔者将继续介绍

使用CuratorFramework客户端在一个事务中完成多个操作,并介绍使用ZooKeeper实现分布式事务锁。

原创不易, 欢迎看到这里的读者朋友帮忙点赞转发,记得点亮下面的【在看】,鼓励笔者持续创作!

往期原创文章

【1】Zookeeper入门(一)

【2】ZooKeeper入门(二):ZooKeeper常用命令介绍及使用Curator客户端实现分布式配置中心

【3】Kafka快速上手基础实践教程(一)

【4】看官网自学Kafka太慢,我选择在掘金小册订阅《图解Kafka之实战指南》

【5】SpringBoot项目整合Vue做一个完整的用户注册功能


原文始发于微信公众号(阿福谈Web编程):Zookeeper入门(三)—使用CuratorFramework操作节点并添加监视器

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/42339.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!