SpringCloud Alibaba系列——13Dubbo的服务治理和监听机制(中)

导读:本篇文章讲解 SpringCloud Alibaba系列——13Dubbo的服务治理和监听机制(中),希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

第2章 动态配置的监听

在dubbo中我们可以对配置文件进行分布式管理,同时我们也可以在运行时去修改dubbo协议中的参数让它在运行时生效,这些都属性配置的动态修改,是服务治理的一种,dubbo有一个同一的服务治理后台就是dubbo-admin。dubbo-admin的下载在第一节课的文档中有,只要从GitHub上下载就可以了,我们去修改接口的属性看看。比如修改UserService接口的retries属性,我们看看zookeeper有什么变化。

SpringCloud Alibaba系列——13Dubbo的服务治理和监听机制(中)

SpringCloud Alibaba系列——13Dubbo的服务治理和监听机制(中)

SpringCloud Alibaba系列——13Dubbo的服务治理和监听机制(中)

通过上面的步骤就可以完成对retries属性的动态修改,dubbo-admin里面修改了属性后,我们来看看zookeeper的变化。

1、在configurators节点多了一个override协议节点

SpringCloud Alibaba系列——13Dubbo的服务治理和监听机制(中)

2、在分布式配置节点多了一个接口配置数据

SpringCloud Alibaba系列——13Dubbo的服务治理和监听机制(中) 

从上面的变化来看看,我们在dubbo-admin中修改一个数据其实对应zookeeper有两个节点新增了,那么这两个节点新增会不会触发客户端的事件呢?答案是肯定的。我们接下来来看看这两个新增节点的事件注册流程。

2.1 监听注册

2.1.1 说明

之前在分析providers节点注册的时候我们就分析了客户端也会对configurators节点进行监听,这块的监听注册逻辑我就不分析了,是跟providers节点的注册监听逻辑是一模一样的,我们重点来分析一下对/dubbo/config节点下的节点进行注册的逻辑。

2.1.2 源码分析

源码还是来看看RegistryProtocol中的refer,最终会走到RegistryDirectory的subscribe方法

@Override
public void subscribe(URL url) {
    setSubscribeUrl(url);
    CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
    //订阅事件 对 config中的 xxx.xx.xx.xx::.configurations
    referenceConfigurationListener = new ReferenceConfigurationListener(this, url);
    //订阅其他事件,configurations routes,providers
    registry.subscribe(url, this);
}

 config节点的注册就在referenceConfigurationListener = new ReferenceConfigurationListener(this, url);这行代码中。

ReferenceConfigurationListener(RegistryDirectory directory, URL url) {
    this.directory = directory;
    this.url = url;
    this.initWith(DynamicConfiguration.getRuleKey(url) + CONFIGURATORS_SUFFIX);
}
protected final void initWith(String key) {
    ruleRepository.addListener(key, this);
    String rawConfig = ruleRepository.getRule(key, DynamicConfiguration.DEFAULT_GROUP);
    if (!StringUtils.isEmpty(rawConfig)) {
        genConfiguratorsFromRawRule(rawConfig);
    }
}

SpringCloud Alibaba系列——13Dubbo的服务治理和监听机制(中)

可以看到对应的路径就是zookeeper中新增的这个路径,是对这个路径进行了监听的。

@Override
protected void doAddListener(String pathKey, ConfigurationListener listener) {
    cacheListener.addListener(pathKey, listener);
    zkClient.addDataListener(pathKey, cacheListener, executor);
}

 其实这里的逻辑跟之前分析的注册监听的逻辑差不多,也是有一个CacheListener,里面建立了路径和监听逻辑的映射关系,然后zookeeper的监听实现类持有了CacheListener逻辑,由zookeeper的监听实现类掉到了CacheListener中去了。

@Override
public void addDataListener(String path, DataListener listener, Executor executor) {
    ConcurrentMap<DataListener, TargetDataListener> dataListenerMap =
        listeners.computeIfAbsent(path, k -> new ConcurrentHashMap<>());
    TargetDataListener targetListener = dataListenerMap.computeIfAbsent(listener, k ->
                                                                        createTargetDataListener(path, k));
    addTargetDataListener(path, targetListener, executor);
}

这里就创建了zookeeper的监听实现类和注册了zookeeper的监听了。

监听实现类

@Override
protected CuratorZookeeperClient.NodeCacheListenerImpl createTargetDataListener(String
                                                                                path, DataListener listener) {
    return new NodeCacheListenerImpl(client, listener, path);
}

注册监听

@Override
protected void addTargetDataListener(String path,
                                     CuratorZookeeperClient.NodeCacheListenerImpl nodeCacheListener, Executor executor) {
    try {
        NodeCache nodeCache = new NodeCache(client, path);
        if (nodeCacheMap.putIfAbsent(path, nodeCache) != null) {
            return;
        }
        if (executor == null) {
            nodeCache.getListenable().addListener(nodeCacheListener);
        } else {
            nodeCache.getListenable().addListener(nodeCacheListener, executor);
        }
        nodeCache.start();
    } catch (Exception e) {
        throw new IllegalStateException("Add nodeCache listener for path:" + path, e);
    }
}

从上面的分析我们可以看出,已经注册了对分布式配置中心的某个节点的监听了。

2.2 监听触发

2.2.1 说明

前面我们分析了监听的注册,下面我们来看看监听的触发逻辑,看看监听触发后到底是干了些什么。

前面我们通过dubbo-admin修改了一个属性,已经写了数据倒相应的zookeeper节点中了,那么事件一定会触发的。我们来看看触发的逻辑。下面是监听实现类

2.2.2 源码分析

static class NodeCacheListenerImpl implements NodeCacheListener {
    private CuratorFramework client;
    private volatile DataListener dataListener;
    private String path;
    protected NodeCacheListenerImpl() {
    }
    public NodeCacheListenerImpl(CuratorFramework client, DataListener dataListener, String
                                 path) {
        this.client = client;
        this.dataListener = dataListener;
        this.path = path;
    }
    @Override
    public void nodeChanged() throws Exception {
        ChildData childData = nodeCacheMap.get(path).getCurrentData();
        String content = null;
        EventType eventType;
        if (childData == null) {
            eventType = EventType.NodeDeleted;
        } else {
            content = new String(childData.getData(), CHARSET);
            eventType = EventType.NodeDataChanged;
        }
        dataListener.dataChanged(path, content, eventType);
    }
}

当事件触发后就会掉到nodeChanged()方法。

SpringCloud Alibaba系列——13Dubbo的服务治理和监听机制(中)

SpringCloud Alibaba系列——13Dubbo的服务治理和监听机制(中)

可以看到根据path,path就是新增的config节点的路径,根据path从CacheListener中后去到了一个客户端的监听类。那么就会掉到该监听类的process方法中。  

@Override
public void dataChanged(String path, Object value, EventType eventType) {
    ConfigChangeType changeType;
    if (value == null) {
        changeType = ConfigChangeType.DELETED;
    } else {
        changeType = ConfigChangeType.MODIFIED;
    }
    String key = pathToKey(path);
    ConfigChangedEvent configChangeEvent = new ConfigChangedEvent(key, getGroup(path),
                                                                  (String) value, changeType);
    Set<ConfigurationListener> listeners = keyListeners.get(path);
    if (CollectionUtils.isNotEmpty(listeners)) {
        listeners.forEach(listener -> listener.process(configChangeEvent));
    }
}

其实最终也是掉到了RegistryDirectory的refreshOverrideAndInvoker逻辑,这里我们上面分析过就是为了刷新服务列表的,那么我们看看

这里是从zookeeper的分布式配置中心中获取配置,然后根据override协议生成配置类

private void overrideDirectoryUrl() {
    // merge override parameters
    this.overrideDirectoryUrl = directoryUrl;
    List<Configurator> localConfigurators = this.configurators; // local reference
    //根据override://协议对原始协议进行修改
    doOverrideUrl(localConfigurators);
    List<Configurator> localAppDynamicConfigurators =
        CONSUMER_CONFIGURATION_LISTENER.getConfigurators(); // local reference
    doOverrideUrl(localAppDynamicConfigurators);
    if (referenceConfigurationListener != null) {
        //获取配置中心的属性覆盖协议
        List<Configurator> localDynamicConfigurators =
            referenceConfigurationListener.getConfigurators(); // local reference
        doOverrideUrl(localDynamicConfigurators);
    }
}

SpringCloud Alibaba系列——13Dubbo的服务治理和监听机制(中)

然后根据配置类的configure方法把原始的dubbo协议然后根据override协议来修改原始的dubbo协议,比如把override协议中的retries=8替换掉之前dubbo协议中的retries=3,这就是配置类的作用,一个override协议会获取一个Configurator的实例,根据这个实例修改原始的dubbo协议。其实就是属性替换或者属性新增。

private void doOverrideUrl(List<Configurator> configurators) {
    if (CollectionUtils.isNotEmpty(configurators)) {
        for (Configurator configurator : configurators) {
            this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
        }
    }
}

 现在原始的dubbo协议修改了,那么接下来就是服务列表刷新了,其实刷新逻辑跟之前是一样的,也是会在toInvokers方法中去根据protocol.refer方法根据新的dubbo协议生成一个invoker对象,然后刷新invokers服务列表,这里就不贴代码了,跟之前的一样。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/76695.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!