学习目标
-
Dubbo的服务上下线流程以及动态感知
-
Dubbo的动态配置监听
-
Dubbo的路由监听
第1章 服务上线下线监听
1.1 监听注册
1.1.1 说明
以cn.enjoy.service.UserService接口的上线下线为例。
服务的上线会在/dubbo/cn.enjoy.service.UserService/providers节点下写入相应的dubbo协议的节点数据,下线就会删除该节点的数据,那么是如何注册对该节点的监听的呢?我们来分析一下源码
1.1.2 源码分析
学过zookeeper的同学应该都知道,如果要监听一个节点的数据变更,就只要客户端注册一个对该节点的监听就可以了,我们来看看dubbo是如何注册监听的。首先我们应该知道,当providers节点发生了数据变更应该通知给谁,肯定是要通知给消费方的,所以这里的客户端指的就是消费方,那么我们只要看看消费方的protocol.refer方法,在服务引用的时候注册了监听。代码最终来到了RegistryProtocol中的doCreateInvoker方法。
protected <T> ClusterInvoker<T> doCreateInvoker(DynamicDirectory<T> directory, Cluster
cluster, Registry registry, Class<T> type) {
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>
(directory.getConsumerUrl().getParameters());
URL urlToRegistry = new ServiceConfigURL(
parameters.get(PROTOCOL_KEY) == null ? DUBBO : parameters.get(PROTOCOL_KEY),
parameters.remove(REGISTER_IP_KEY), 0, getPath(parameters, type), parameters);
if (directory.isShouldRegister()) {
directory.setRegisteredConsumerUrl(urlToRegistry);
//把协议注册到 /dubbo/cn.enjoy.userService/consumers节点下面
registry.register(directory.getRegisteredConsumerUrl());
}
//创建路由链
directory.buildRouterChain(urlToRegistry);
//订阅事件,对 configurations,providers,routes节点建立监听
directory.subscribe(toSubscribeUrl(urlToRegistry));
//返回默认的 FailoverClusterInvoker对象
return (ClusterInvoker<T>) cluster.join(directory);
}
directory.subscribe(toSubscribeUrl(urlToRegistry));就是对providers节点注册了监听。
@Override
public void subscribe(URL url) {
setSubscribeUrl(url);
CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
//订阅事件 对 config中的 xxx.xx.xx.xx::.configurations
referenceConfigurationListener = new ReferenceConfigurationListener(this, url);
//订阅其他事件,configurations routes,providers
registry.subscribe(url, this);
}
registry.subscribe(url, this);这行代码就是注册监听的代码,其实这里注册的监听目录有三个,configurations routes,providers。代码最终来到了ZookeeperRegistry中的doSubscribe方法。
@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
try {
if (ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
ConcurrentMap<NotifyListener, ChildListener> listeners =
zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> {
for (String child : currentChilds) {
child = URL.decode(child);
if (!anyServices.contains(child)) {
anyServices.add(child);
subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), k);
}
}
});
zkClient.create(root, false);
List<String> services = zkClient.addChildListener(root, zkListener);
if (CollectionUtils.isNotEmpty(services)) {
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
} else {
CountDownLatch latch = new CountDownLatch(1);
List<URL> urls = new ArrayList<>();
//这里对应 configurators,providers,routes目录
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners =
zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
//RegistryChildListenerImpl 事件回调类,zookeeper事件回调到它
ChildListener zkListener = listeners.computeIfAbsent(listener, k -> new RegistryChildListenerImpl(url, path, k, latch));
if (zkListener instanceof RegistryChildListenerImpl) {
((RegistryChildListenerImpl) zkListener).setLatch(latch);
}
zkClient.create(path, false);
//这里会注册zookeeper事件,并且把zookeeper事件和RegistryChildListenerImpl做映射
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
//弄一个empty协议,做初始化工作,比如清空集合容器
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
//启动后做初始化式的触发监听
notify(url, listener, urls);
// tells the listener to run only after the sync notification of main thread finishes.
latch.countDown();
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() +
", cause: " + e.getMessage(), e);
}
}
toCategoriesPath(url)该方法根据dubbo协议中的category属性的值来得到需要监听的目录,目录就有三个configurators,providers,routes。
RegistryChildListenerImpl是zookeeper的实例监听类回调的逻辑类
List<String> children = zkClient.addChildListener(path, zkListener);
在这行代码这里注册了zookeeper的监听,并且把RegistryChildListenerImpl实例传递过去了。
@Override
public List<String> addChildListener(String path, final ChildListener listener) {
ConcurrentMap<ChildListener, TargetChildListener> listeners =
childListeners.computeIfAbsent(path, k -> new ConcurrentHashMap<>());
//ChildListener和zookeeper事件做了映射
TargetChildListener targetListener = listeners.computeIfAbsent(listener, k ->
createTargetChildListener(path, k));
//添加zookeeper事件监听
return addTargetChildListener(path, targetListener);
}
监听实例
@Override
public CuratorZookeeperClient.CuratorWatcherImpl createTargetChildListener(String path,
ChildListener listener) {
return new CuratorZookeeperClient.CuratorWatcherImpl(client, listener, path);
}
//注册zookeeper的监听,
@Override
public List<String> addTargetChildListener(String path, CuratorWatcherImpl listener) {
try {
return client.getChildren().usingWatcher(listener).forPath(path);
} catch (NoNodeException e) {
return null;
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
上面的分析我们知道了,其实我们是对某个接口的三个节点注册了监听,configurators,providers,routes这个三个节点都注册上了监听了。
1.2 监听触发
1.2.1 说明
学过zookeeper的同学应该都知道,监听的触发只有我们有对监听的节点发生数据的变更就会触发监听的客户端,比如服务的上线下线就会对providers节点进行数据新增和删除操作就会触发事件。
1.2.2 源码分析
前面我们分析了监听的注册,下面我们来分析一下监听的触发源码,比如我们往providers节点写入一个数据,那么监听应该要触发,往providers写数据,我们采用dubbo api的方式去写一个dubbo协议的数据写到providers节点下,如下:
@Test
public void providerReg() {
String url = "dubbo%3A%2F%2F192.168.67.3%3A20990%2Fcn.enjoy.service.UserService%3Fanyhost%3Dtrue%26application%3Ddubbo_provider%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dcn.enjoy.service.UserService%26metadata-type%3Dremote%26methods%3DdoKill%2CqueryUser%26pid%3D14092%26release%3D3.0.2.1%26retries%3D7%26revision%3D1.0-SNAPSHOT%26service-name-mapping%3Dtrue%26side%3Dprovider%26threadpool%3Dfixed%26threads%3D100%26timeout%3D5000%26timestamp%3D1635058443480";
RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getAdaptiveExtension();
Registry registry = registryFactory.getRegistry(URL.valueOf("zookeeper://127.0.0.1:2181"));
registry.register(URL.valueOf(URL.decode(url)));
}
往/dubbo/cn.enjoy.service.UserService/providers/节点写入dubbo协议
当写入时,会触发客户端的监听代码,我们来看看监听类,zookeeper会回调到process方法来触发事件。childListener就是前面我们分析的RegistryChildListenerImpl类。
static class CuratorWatcherImpl implements CuratorWatcher {
private CuratorFramework client;
private volatile ChildListener childListener;
private String path;
public CuratorWatcherImpl(CuratorFramework client, ChildListener listener, String path)
{
this.client = client;
this.childListener = listener;
this.path = path;
}
protected CuratorWatcherImpl() {
}
public void unwatch() {
this.childListener = null;
}
@Override
public void process(WatchedEvent event) throws Exception {
// if client connect or disconnect to server, zookeeper will queue
// watched event(Watcher.Event.EventType.None, .., path = null).
if (event.getType() == Watcher.Event.EventType.None) {
return;
}
if (childListener != null) {
childListener.childChanged(path,client.getChildren().usingWatcher(this).forPath(path));
}
}
}
接下来我们看一下事件触发后都干了些什么。
private class RegistryChildListenerImpl implements ChildListener {
private RegistryNotifier notifier;
private long lastExecuteTime;
private volatile CountDownLatch latch;
public RegistryChildListenerImpl(URL consumerUrl, String path, NotifyListener listener,
CountDownLatch latch) {
this.latch = latch;
notifier = new RegistryNotifier(ZookeeperRegistry.this.getDelay()) {
@Override
public void notify(Object rawAddresses) {
long delayTime = getDelayTime();
if (delayTime <= 0) {
this.doNotify(rawAddresses);
} else {
long interval = delayTime - (System.currentTimeMillis() -
lastExecuteTime);
if (interval > 0) {
try {
Thread.sleep(interval);
} catch (InterruptedException e) {
// ignore
}
}
lastExecuteTime = System.currentTimeMillis();
this.doNotify(rawAddresses);
}
}
@Override
protected void doNotify(Object rawAddresses) {
ZookeeperRegistry.this.notify(consumerUrl, listener, ZookeeperRegistry.this.toUrlsWithEmpty(consumerUrl, path, (List<String>) rawAddresses));
}
};
}
public void setLatch(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void childChanged(String path, List<String> children) {
try {
latch.await();
} catch (InterruptedException e) {
logger.warn("Zookeeper children listener thread was interrupted unexpectedly,may cause race condition with the main thread.");
}
notifier.notify(children);
}
}
在这里触发了逻辑
protected void doNotify(Object rawAddresses) {
ZookeeperRegistry.this.notify(consumerUrl, listener,
ZookeeperRegistry.this.toUrlsWithEmpty(consumerUrl, path, (List<String>) rawAddresses));
}
listener.notify(categoryList);触发了RegistryDirectory的notify逻辑,我们重点看看这里。
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
listener.notify(categoryList);
// We will update our cache file after each notification.
// When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
if (localCacheEnabled) {
saveProperties(url);
}
}
RegistryDirectory的notify逻辑
//事件监听回调
@Override
public synchronized void notify(List<URL> urls) {
if (isDestroyed()) {
return;
}
//对回调的协议分组
// routes://
// override://
//dubbo://
Map<String, List<URL>> categoryUrls = urls.stream()
.filter(Objects::nonNull)
.filter(this::isValidCategory)
.filter(this::isNotCompatibleFor26x)
.collect(Collectors.groupingBy(this::judgeCategory));
List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY,
Collections.emptyList());
this.configurators =
Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY,
Collections.emptyList());
//生成路由规则,加入到规则链中
toRouters(routerURLs).ifPresent(this::addRouters);
// providers
List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY,
Collections.emptyList());
/**
* 3.x added for extend URL address
*/
ExtensionLoader<AddressListener> addressListenerExtensionLoader =
ExtensionLoader.getExtensionLoader(AddressListener.class);
List<AddressListener> supportedListeners =
addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);
if (supportedListeners != null && !supportedListeners.isEmpty()) {
for (AddressListener addressListener : supportedListeners) {
providerURLs = addressListener.notify(providerURLs, getConsumerUrl(),this);
}
}
//刷新本地服务列表
refreshOverrideAndInvoker(providerURLs);
}
如果触发的是一个providers协议,那么configuratorURLs和routerURLs都是空的。那么直接走到了refreshOverrideAndInvoker(providerURLs);
private synchronized void refreshOverrideAndInvoker(List<URL> urls) {
// mock zookeeper://xxx?mock=return null
overrideDirectoryUrl();
//刷新本地列表
refreshInvoker(urls);
}
直接走到了refreshInvoker(urls);这个方法的大概意思就是刷新本地列表,就是刷新invokers变量的值。
//刷新本地服务列表
private void refreshInvoker(List<URL> invokerUrls) {
Assert.notNull(invokerUrls, "invokerUrls should not be null");
if (invokerUrls.size() == 1
&& invokerUrls.get(0) != null
&& EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
this.forbidden = true; // Forbid to access
this.invokers = Collections.emptyList();
routerChain.setInvokers(this.invokers);
destroyAllInvokers(); // Close all invokers
} else {
this.forbidden = false; // Allow to access
Map<URL, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (invokerUrls == Collections.<URL>emptyList()) {
invokerUrls = new ArrayList<>();
}
if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
invokerUrls.addAll(this.cachedInvokerUrls);
} else {
this.cachedInvokerUrls = new HashSet<>();
this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
}
if (invokerUrls.isEmpty()) {
return;
}
//创建url和invoker对象的映射关系
Map<URL, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
/**
* If the calculation is wrong, it is not processed.
*
* 1. The protocol configured by the client is inconsistent with the protocol of the server.
* eg: consumer protocol = dubbo, provider only has other protocol services(rest).
* 2. The registration center is not robust and pushes illegal specification data.
*
*/
if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls
.toString()));
return;
}
//所有的invoker对象
List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>
(newUrlInvokerMap.values()));
// pre-route and build cache, notice that route cache should build on original Invoker list.
// toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed.
routerChain.setInvokers(newInvokers);
this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
this.urlInvokerMap = newUrlInvokerMap;
try {
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
// notify invokers refreshed
this.invokersChanged();
}
}
会在//创建url和invoker对象的映射关系 Map<URL, Invoker> newUrlInvokerMap = toInvokers(invokerUrls);
这行代码中根据传递进来的dubbo协议去生成invoker对象,其实就是调用了protocol.refer方法生成了invoker对象了。
private Map<URL, Invoker<T>> toInvokers(List<URL> urls) {
Map<URL, Invoker<T>> newUrlInvokerMap = new ConcurrentHashMap<>();
if (urls == null || urls.isEmpty()) {
return newUrlInvokerMap;
}
String queryProtocols = this.queryMap.get(PROTOCOL_KEY);
for (URL providerUrl : urls) {
// If protocol is configured at the reference side, only the matching protocol is selected
if (queryProtocols != null && queryProtocols.length() > 0) {
boolean accept = false;
String[] acceptProtocols = queryProtocols.split(",");
for (String acceptProtocol : acceptProtocols) {
if (providerUrl.getProtocol().equals(acceptProtocol)) {
accept = true;
break;
}
}
if (!accept) {
continue;
}
}
if (EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
continue;
}
if
(!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol()
)) {
logger.error(new IllegalStateException("Unsupported protocol " +
providerUrl.getProtocol() +
" in notified url: " + providerUrl + " from registry " +
getUrl().getAddress() +
" to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +
ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
continue;
}
URL url = mergeUrl(providerUrl);
// Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
Map<URL, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
Invoker<T> invoker = localUrlInvokerMap == null ? null :
localUrlInvokerMap.remove(url);
if (invoker == null) { // Not in the cache, refer again
try {
boolean enabled = true;
if (url.hasParameter(DISABLED_KEY)) {
enabled = !url.getParameter(DISABLED_KEY, false);
} else {
enabled = url.getParameter(ENABLED_KEY, true);
}
if (enabled) {
//生成invoker对象
invoker = protocol.refer(serviceType, url);
}
} catch (Throwable t) {
logger.error("Failed to refer invoker for interface:" + serviceType +
",url:(" + url + ")" + t.getMessage(), t);
}
if (invoker != null) { // Put new invoker in cache
newUrlInvokerMap.put(url, invoker);
}
} else {
newUrlInvokerMap.put(url, invoker);
}
}
return newUrlInvokerMap;
}
然后在这行代码
this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
刷新了服务列表。
我们从上面的分析中可以看到,其实触发的事件最终的目的就是为了刷新客户端本地的服务列表,把新注册的dubbo协议通过protocol.refer生成了一个新的invoker对象,然后加入到了本地服务列表中了。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/76696.html