zookeeper curator 整合 SpringBoot

导读:本篇文章讲解 zookeeper curator 整合 SpringBoot,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

添加依赖 zk 3.8.0版本

<dependency>
	<groupId>org.apache.curator</groupId>
	<artifactId>curator-framework</artifactId>
</dependency>

<dependency>
	<groupId>org.apache.curator</groupId>
	<artifactId>curator-recipes</artifactId>
</dependency>

application 添加配置

zookeeper.curator.addresses=127.0.0.1:2181
zookeeper.curator.connectionTimeoutMs=50000
zookeeper.curator.sessionTimeOut=50000
zookeeper.curator.sleepMsBetweenRetry=1000
zookeeper.curator.maxRetries=3
zookeeper.curator.namespace=demo
zookeeper.curator.digest=zs:123456

读取 application 配置

import lombok.Data;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Data
@Component
@ConfigurationProperties(prefix = "zookeeper.curator")
// @ConditionalOnProperty(prefix = "zookeeper.curator", name = "addresses")
public class ZookeeperProperties {

	/**
	 * 集群地址
	 */
	private String addresses;

	/**
	 * digest 权限用户
	 */
	private String digest;

	/**
	 * 命名空间(父节点名称)
	 */
	private String namespace;

	/**
	 * 连接超时时间
	 */
	private Integer connectionTimeoutMs;

	/**
	 * 会话超时时间
	 */
	private Integer sessionTimeOut;

	/**
	 * 重试机制时间参数
	 */
	private Integer sleepMsBetweenRetry;

	/**
	 * 重试机制重试次数
	 */
	private Integer maxRetries;

}

配置 zookeeper bean

import lombok.Data;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Data
@Configuration
@ConditionalOnProperty(prefix = "zookeeper.curator", name = "addresses")
public class ZookeeperConfig {

	@Autowired
	private ZookeeperProperties zookeeperProperties;

	@Bean
	public CuratorFramework curatorClient() throws Exception {
		CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
				.namespace(zookeeperProperties.getNamespace())
				// .authorization("digest", zookeeperProperties.getDigest().getBytes())
				//连接地址  集群用,隔开
				.connectString(zookeeperProperties.getAddresses())
				.connectionTimeoutMs(zookeeperProperties.getConnectionTimeoutMs())
				//会话超时时间
				.sessionTimeoutMs(zookeeperProperties.getSessionTimeOut())
				//设置重试机制 每三秒重连一次,总等待时间超过个`10`秒后停止重连
				.retryPolicy(new ExponentialBackoffRetry(zookeeperProperties.getSleepMsBetweenRetry(), zookeeperProperties.getMaxRetries()))
				.build();
		curatorFramework.start();

		//注册监听器
		ZookeeperImServerWatches watches = new ZookeeperImServerWatches(curatorFramework);
		watches.zNodeWatcher();
		return curatorFramework;
	}

}

配置 zookeeper 的 watcher 事件

import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;

import java.util.Optional;

/**
 * 监听 im 服务节点 变更事件
 */
@Slf4j
public class ZookeeperImServerWatches {

	// 节点名称
	private final static String CHILD_NODE = "/im_server";

	private final CuratorFramework curatorClient;

	public ZookeeperImServerWatches(CuratorFramework curatorClient) {
		this.curatorClient = curatorClient;
	}

	/**
	 * 监听节点变化事件
	 */
	public void zNodeWatcher() throws Exception {
		// CuratorCache.Options.SINGLE_NODE_CACHE	// 此选项表示,只监听本身节点,不向下监听。此是大坑
		CuratorCache curatorCache = CuratorCache.build(curatorClient, CHILD_NODE);

		CuratorCacheListener listener = CuratorCacheListener.builder()
				// 初始化完成时调用
				.forInitialized(() -> log.info("[forInitialized] : Cache initialized"))//
				// 添加或更改缓存中的数据时调用
				.forCreatesAndChanges(//
						(oldNode, node) -> log.info("[forCreatesAndChanges] : Node changed: Old: [{}] New: [{}]\n",//
								oldNode, node)//
				)//
				// 添加缓存中的数据时调用
				.forCreates(childData -> log.info("[forCreates] : Node created: [{}]\n", childData))//
				// 更改缓存中的数据时调用
				.forChanges(//
						(oldNode, node) -> log.info("[forChanges] : Node changed: Old: [{}] New: [{}]\n", oldNode, node)//
				)//
				// 删除缓存中的数据时调用
				.forDeletes(childData -> log.info("[forDeletes] : Node deleted: data: [{}]\n", childData))//
				// 添加、更改或删除缓存中的数据时调用
				.forAll(//
						(type, oldData, data) -> log.info("[forAll] : type: [{}] [{}] [{}]\n", type, oldData, data)//
				)//
				.forNodeCache(//
						() -> log.info("[forNodeCache]\n")//
				)//
				.forPathChildrenCache(//
						CHILD_NODE, curatorClient, (client, event) -> {
							log.info("[forPathChildrenCache] : type: [{}] data: [{}]\n", event.getType(), event.getData());
							ChildData nodeData = event.getData();
							//变更的类型
							PathChildrenCacheEvent.Type type = Optional.of(event.getType()).orElse(null);
							switch (type) {
								//添加了子节点
								case CHILD_ADDED:
									log.info("[CHILD_ADDED] : child node added : path: [{}] data: [{}]", nodeData.getPath(), new String(nodeData.getData()));
									processNodeAdded(nodeData);
									break;
								//修改了子节点数据
								case CHILD_UPDATED:
									log.info("[CHILD_UPDATED] : child node update : path: [{}] data: [{}]", nodeData.getPath(), new String(nodeData.getData()));
									break;
								//删除了子节点
								case CHILD_REMOVED:
									log.info("[CHILD_REMOVED] : child node remove : path: [{}] data: [{}]", nodeData.getPath(), new String(nodeData.getData()));
									break;
								default:
									break;
							}
						}//
				).forTreeCache(//
						curatorClient, (client, event) -> log.info("[forTreeCache] [{}]\n", event)//
				)//
				.build();

		curatorCache.listenable().addListener(listener);
		curatorCache.start();
	}

	private void processNodeAdded(ChildData data) {
		
	}
}

配置 zookeeper service

import java.util.List;

public interface ZookeeperService {

	String CODE = "com.zookeeper.service.ZookeeperService";

	/**
	 * 判断节点是否存在
	 */
	boolean isExistNode(final String path);

	/**
	 * 创建节点
	 */
	void createNode(CreateMode mode, String path);

	/**
	 * 设置节点数据
	 */
	void setNodeData(String path, String nodeData);

	/**
	 * 创建节点 的同时 设置数据
	 */
	String createNodeAndData(CreateMode mode, String path, String nodeData);

	/**
	 * 获取节点数据
	 */
	String getNodeData(String path);

	/**
	 * 获取节点下数据
	 */
	List<String> getNodeChild(String path);

	/**
	 * 是否递归删除节点
	 */
	void deleteNode(String path, Boolean recursive);

	/**
	 * 获取节点下的子节点数量
	 */
	Integer getChildNodeNum(String path);

}
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.GetChildrenBuilder;
import org.apache.zookeeper.CreateMode;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

import static com.zookeeper.service.ZookeeperService.CODE;

@Slf4j
@Service(CODE)
@AutoConfigureAfter(value = CuratorFramework.class)
@ConditionalOnBean(value = CuratorFramework.class)
public class ZookeeperServiceImpl implements ZookeeperService {

	@Resource(name = "curatorClient")
	private CuratorFramework curatorClient;

	@Override
	public boolean isExistNode(String path) {
		curatorClient.sync();
		try {
			return curatorClient.checkExists().forPath(path) != null;
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}

	@Override
	public void createNode(CreateMode mode, String path) {
		try {
			boolean existNode = isExistNode(path);
			if (!existNode) {
				curatorClient.create()
						// 递归创建父节点
						.creatingParentsIfNeeded()
						.withMode(mode)
						.forPath(path);
			}
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}

	@Override
	public void setNodeData(String path, String nodeData) {
		try {
			// 设置节点数据
			curatorClient.setData()
					.forPath(path, nodeData.getBytes(StandardCharsets.UTF_8));
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}

	@Override
	public String createNodeAndData(CreateMode mode, String path, String nodeData) {
		try {
			boolean existNode = isExistNode(path);
			if (!existNode) {
				// 创建节点,关联数据
				return curatorClient.create()
						.creatingParentsIfNeeded()
						.withMode(mode)
						.forPath(path, nodeData.getBytes(StandardCharsets.UTF_8));
			}
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
		return null;
	}

	@Override
	public String getNodeData(String path) {
		try {
			// 数据读取和转换
			byte[] dataByte = curatorClient.getData().forPath(path);
			String data = new String(dataByte, StandardCharsets.UTF_8);
			if (!StringUtils.isEmpty(data)) {
				return data;
			}
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
		return null;
	}

	@Override
	public List<String> getNodeChild(String path) {
		List<String> nodeChildDataList = new ArrayList<>();
		try {
			// 节点下数据集
			nodeChildDataList = curatorClient.getChildren().forPath(path);
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
		return nodeChildDataList;
	}

	@Override
	public void deleteNode(String path, Boolean recursive) {
		try {
			boolean existNode = isExistNode(path);
			if (existNode) {
				if (recursive) {
					// 递归删除节点
					curatorClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);
				} else {
					// 删除单个节点
					curatorClient.delete().guaranteed().forPath(path);
				}
			}
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}

	@Override
	public Integer getChildNodeNum(String path) {
		try {
			boolean existNode = isExistNode(path);
			if (existNode) {
				GetChildrenBuilder children = curatorClient.getChildren();
				List<String> childNodeNameList = children.forPath(path);
				return childNodeNameList.size();
			} else {
				return 0;
			}
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}

}

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/72495.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!