添加依赖 zk 3.8.0版本
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</dependency>
application 添加配置
zookeeper.curator.addresses=127.0.0.1:2181
zookeeper.curator.connectionTimeoutMs=50000
zookeeper.curator.sessionTimeOut=50000
zookeeper.curator.sleepMsBetweenRetry=1000
zookeeper.curator.maxRetries=3
zookeeper.curator.namespace=demo
zookeeper.curator.digest=zs:123456
读取 application 配置
import lombok.Data;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Data
@Component
@ConfigurationProperties(prefix = "zookeeper.curator")
// @ConditionalOnProperty(prefix = "zookeeper.curator", name = "addresses")
public class ZookeeperProperties {
/**
* 集群地址
*/
private String addresses;
/**
* digest 权限用户
*/
private String digest;
/**
* 命名空间(父节点名称)
*/
private String namespace;
/**
* 连接超时时间
*/
private Integer connectionTimeoutMs;
/**
* 会话超时时间
*/
private Integer sessionTimeOut;
/**
* 重试机制时间参数
*/
private Integer sleepMsBetweenRetry;
/**
* 重试机制重试次数
*/
private Integer maxRetries;
}
配置 zookeeper bean
import lombok.Data;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Data
@Configuration
@ConditionalOnProperty(prefix = "zookeeper.curator", name = "addresses")
public class ZookeeperConfig {
@Autowired
private ZookeeperProperties zookeeperProperties;
@Bean
public CuratorFramework curatorClient() throws Exception {
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
.namespace(zookeeperProperties.getNamespace())
// .authorization("digest", zookeeperProperties.getDigest().getBytes())
//连接地址 集群用,隔开
.connectString(zookeeperProperties.getAddresses())
.connectionTimeoutMs(zookeeperProperties.getConnectionTimeoutMs())
//会话超时时间
.sessionTimeoutMs(zookeeperProperties.getSessionTimeOut())
//设置重试机制 每三秒重连一次,总等待时间超过个`10`秒后停止重连
.retryPolicy(new ExponentialBackoffRetry(zookeeperProperties.getSleepMsBetweenRetry(), zookeeperProperties.getMaxRetries()))
.build();
curatorFramework.start();
//注册监听器
ZookeeperImServerWatches watches = new ZookeeperImServerWatches(curatorFramework);
watches.zNodeWatcher();
return curatorFramework;
}
}
配置 zookeeper 的 watcher 事件
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import java.util.Optional;
/**
* 监听 im 服务节点 变更事件
*/
@Slf4j
public class ZookeeperImServerWatches {
// 节点名称
private final static String CHILD_NODE = "/im_server";
private final CuratorFramework curatorClient;
public ZookeeperImServerWatches(CuratorFramework curatorClient) {
this.curatorClient = curatorClient;
}
/**
* 监听节点变化事件
*/
public void zNodeWatcher() throws Exception {
// CuratorCache.Options.SINGLE_NODE_CACHE // 此选项表示,只监听本身节点,不向下监听。此是大坑
CuratorCache curatorCache = CuratorCache.build(curatorClient, CHILD_NODE);
CuratorCacheListener listener = CuratorCacheListener.builder()
// 初始化完成时调用
.forInitialized(() -> log.info("[forInitialized] : Cache initialized"))//
// 添加或更改缓存中的数据时调用
.forCreatesAndChanges(//
(oldNode, node) -> log.info("[forCreatesAndChanges] : Node changed: Old: [{}] New: [{}]\n",//
oldNode, node)//
)//
// 添加缓存中的数据时调用
.forCreates(childData -> log.info("[forCreates] : Node created: [{}]\n", childData))//
// 更改缓存中的数据时调用
.forChanges(//
(oldNode, node) -> log.info("[forChanges] : Node changed: Old: [{}] New: [{}]\n", oldNode, node)//
)//
// 删除缓存中的数据时调用
.forDeletes(childData -> log.info("[forDeletes] : Node deleted: data: [{}]\n", childData))//
// 添加、更改或删除缓存中的数据时调用
.forAll(//
(type, oldData, data) -> log.info("[forAll] : type: [{}] [{}] [{}]\n", type, oldData, data)//
)//
.forNodeCache(//
() -> log.info("[forNodeCache]\n")//
)//
.forPathChildrenCache(//
CHILD_NODE, curatorClient, (client, event) -> {
log.info("[forPathChildrenCache] : type: [{}] data: [{}]\n", event.getType(), event.getData());
ChildData nodeData = event.getData();
//变更的类型
PathChildrenCacheEvent.Type type = Optional.of(event.getType()).orElse(null);
switch (type) {
//添加了子节点
case CHILD_ADDED:
log.info("[CHILD_ADDED] : child node added : path: [{}] data: [{}]", nodeData.getPath(), new String(nodeData.getData()));
processNodeAdded(nodeData);
break;
//修改了子节点数据
case CHILD_UPDATED:
log.info("[CHILD_UPDATED] : child node update : path: [{}] data: [{}]", nodeData.getPath(), new String(nodeData.getData()));
break;
//删除了子节点
case CHILD_REMOVED:
log.info("[CHILD_REMOVED] : child node remove : path: [{}] data: [{}]", nodeData.getPath(), new String(nodeData.getData()));
break;
default:
break;
}
}//
).forTreeCache(//
curatorClient, (client, event) -> log.info("[forTreeCache] [{}]\n", event)//
)//
.build();
curatorCache.listenable().addListener(listener);
curatorCache.start();
}
private void processNodeAdded(ChildData data) {
}
}
配置 zookeeper service
import java.util.List;
public interface ZookeeperService {
String CODE = "com.zookeeper.service.ZookeeperService";
/**
* 判断节点是否存在
*/
boolean isExistNode(final String path);
/**
* 创建节点
*/
void createNode(CreateMode mode, String path);
/**
* 设置节点数据
*/
void setNodeData(String path, String nodeData);
/**
* 创建节点 的同时 设置数据
*/
String createNodeAndData(CreateMode mode, String path, String nodeData);
/**
* 获取节点数据
*/
String getNodeData(String path);
/**
* 获取节点下数据
*/
List<String> getNodeChild(String path);
/**
* 是否递归删除节点
*/
void deleteNode(String path, Boolean recursive);
/**
* 获取节点下的子节点数量
*/
Integer getChildNodeNum(String path);
}
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.GetChildrenBuilder;
import org.apache.zookeeper.CreateMode;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import static com.zookeeper.service.ZookeeperService.CODE;
@Slf4j
@Service(CODE)
@AutoConfigureAfter(value = CuratorFramework.class)
@ConditionalOnBean(value = CuratorFramework.class)
public class ZookeeperServiceImpl implements ZookeeperService {
@Resource(name = "curatorClient")
private CuratorFramework curatorClient;
@Override
public boolean isExistNode(String path) {
curatorClient.sync();
try {
return curatorClient.checkExists().forPath(path) != null;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void createNode(CreateMode mode, String path) {
try {
boolean existNode = isExistNode(path);
if (!existNode) {
curatorClient.create()
// 递归创建父节点
.creatingParentsIfNeeded()
.withMode(mode)
.forPath(path);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void setNodeData(String path, String nodeData) {
try {
// 设置节点数据
curatorClient.setData()
.forPath(path, nodeData.getBytes(StandardCharsets.UTF_8));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public String createNodeAndData(CreateMode mode, String path, String nodeData) {
try {
boolean existNode = isExistNode(path);
if (!existNode) {
// 创建节点,关联数据
return curatorClient.create()
.creatingParentsIfNeeded()
.withMode(mode)
.forPath(path, nodeData.getBytes(StandardCharsets.UTF_8));
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return null;
}
@Override
public String getNodeData(String path) {
try {
// 数据读取和转换
byte[] dataByte = curatorClient.getData().forPath(path);
String data = new String(dataByte, StandardCharsets.UTF_8);
if (!StringUtils.isEmpty(data)) {
return data;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return null;
}
@Override
public List<String> getNodeChild(String path) {
List<String> nodeChildDataList = new ArrayList<>();
try {
// 节点下数据集
nodeChildDataList = curatorClient.getChildren().forPath(path);
} catch (Exception e) {
throw new RuntimeException(e);
}
return nodeChildDataList;
}
@Override
public void deleteNode(String path, Boolean recursive) {
try {
boolean existNode = isExistNode(path);
if (existNode) {
if (recursive) {
// 递归删除节点
curatorClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);
} else {
// 删除单个节点
curatorClient.delete().guaranteed().forPath(path);
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public Integer getChildNodeNum(String path) {
try {
boolean existNode = isExistNode(path);
if (existNode) {
GetChildrenBuilder children = curatorClient.getChildren();
List<String> childNodeNameList = children.forPath(path);
return childNodeNameList.size();
} else {
return 0;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/72495.html