第3章 动态路由的监听
3.1 监听注册
3.1.1 说明
动态路由这块我们只分析条件动态路由,也是要根据dubbo-admin服务治理的方式去修改路由配置,然后在运行时作用到调用流程中去。我们去看看dubbo-admin中如何去修改一个动态路由。
我们以读写分离路由为例去配置,关于动态路由配置规则,请参照dubbo官网的路由配置规则:路由规则 | Apache Dubbo
method = find*,list*,get*,is* => host = 172.22.3.94,172.22.3.95,172.22.3.96
method != find*,list*,get*,is* => host = 192.168.67.1,172.22.3.98
保存后我们看看zookeeper的变化
1、在routers节点多了一个route协议节点
2、分布式配置config节点下新增了节点
3.1.2 源码分析
routers节点下的监听之前已经分析过,我这里就不分析了,我们主要来分析一下config节点下的监听注册逻辑。
还是在RegistryProtocol的refer作为入口,最终来到
protected <T> ClusterInvoker<T> doCreateInvoker(DynamicDirectory<T> directory, Cluster
cluster, Registry registry, Class<T> type) {
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>
(directory.getConsumerUrl().getParameters());
URL urlToRegistry = new ServiceConfigURL(
parameters.get(PROTOCOL_KEY) == null ? DUBBO : parameters.get(PROTOCOL_KEY),
parameters.remove(REGISTER_IP_KEY), 0, getPath(parameters, type), parameters);
if (directory.isShouldRegister()) {
directory.setRegisteredConsumerUrl(urlToRegistry);
//把协议注册到 /dubbo/cn.enjoy.userService/consumers节点下面
registry.register(directory.getRegisteredConsumerUrl());
}
//创建路由链
directory.buildRouterChain(urlToRegistry);
//订阅事件,对 configurations,providers,routes节点建立监听
directory.subscribe(toSubscribeUrl(urlToRegistry));
//返回默认的 FailoverClusterInvoker对象
return (ClusterInvoker<T>) cluster.join(directory);
}
directory.buildRouterChain(urlToRegistry);在这行代码这里完成了路由的监听
private RouterChain(URL url) {
loopPool = executorRepository.nextExecutorExecutor();
List<RouterFactory> extensionFactories =
ExtensionLoader.getExtensionLoader(RouterFactory.class)
.getActivateExtension(url, ROUTER_KEY);
List<Router> routers = extensionFactories.stream()
.map(factory -> factory.getRouter(url))
.collect(Collectors.toList());
initWithRouters(routers);
List<StateRouterFactory> extensionStateRouterFactories =
ExtensionLoader.getExtensionLoader(
StateRouterFactory.class)
.getActivateExtension(url, STATE_ROUTER_KEY);
List<StateRouter> stateRouters = extensionStateRouterFactories.stream()
.map(factory -> factory.getRouter(url, this))
.sorted(StateRouter::compareTo)
.collect(Collectors.toList());
// init state routers
initWithStateRouters(stateRouters);
}
最终代码会走到
this.getRuleRepository().addListener(routerKey, this);
在这行代码这里完成了注册流程,注册跟之前是一样的就不细看了
3.2 监听触发
3.2.1 说明
当我们写节点时就会触发事件
3.2.2 源码分析
当触发了事件时,一样的会走到CacheListener类中,然后根据path后去到path对应的监听回调类,该类就是ServiceRouter
我们看看该类的process逻辑
@Override
public synchronized void process(ConfigChangedEvent event) {
if (logger.isDebugEnabled()) {
logger.debug("Notification of condition rule, change type is: " +
event.getChangeType() +
", raw rule is:\n " + event.getContent());
}
if (event.getChangeType().equals(ConfigChangeType.DELETED)) {
routerRule = null;
conditionRouters = Collections.emptyList();
} else {
try {
routerRule = ConditionRuleParser.parse(event.getContent());
generateConditions(routerRule);
} catch (Exception e) {
logger.error("Failed to parse the raw condition rule and it will not take effect, please check " +
"if the condition rule matches with the template, the raw rule is:\n "
+ event.getContent(), e);
}
}
}
这里是解析配置规则,然后根据配置规则生成路由规则类,这里就生成了两个条件路由规则类
那么这个规则类最终是如何进行路由的呢?
其实说白了,就是在调用过程中,当我们获取到服务列表的时候,对这个服务列表进行条件路由规则的过滤,过滤剩下的invoker对象才是我们需要调用的invoker对象。
那我们来看看调用流程
在doList方法中就会根据路由规则来路由了,我们看看doList逻辑
@Override
public List<Invoker<T>> doList(Invocation invocation) {
if (forbidden) {
// 1. No service provider 2. Service providers are disabled
throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "No provider available from registry " +
getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey()
+ " on consumer " +
NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() +
", please check status of providers(disabled, not registered or in blacklist).");
}
if (multiGroup) {
return this.invokers == null ? Collections.emptyList() : this.invokers;
}
List<Invoker<T>> invokers = null;
try {
// Get invokers from cache, only runtime routers will be executed.
invokers = routerChain.route(getConsumerUrl(), invocation);
} catch (Throwable t) {
logger.error("Failed to execute router: " + getUrl() + ", cause: " +
t.getMessage(), t);
}
return invokers == null ? Collections.emptyList() : invokers;
}
invokers = routerChain.route(getConsumerUrl(), invocation);这行代码就是根据路由规则路由的。
最终会走到ServiceRouter中,根据事件触发后生成的两个条件路由规则来进行路由匹配。
@Override
public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
if (!enabled) {
return invokers;
}
if (CollectionUtils.isEmpty(invokers)) {
return invokers;
}
try {
if (!matchWhen(url, invocation)) {
return invokers;
}
List<Invoker<T>> result = new ArrayList<Invoker<T>>();
if (thenCondition == null) {
logger.warn("The current consumer in the service blacklist. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey());
return result;
}
for (Invoker<T> invoker : invokers) {
if (matchThen(invoker.getUrl(), url)) {
result.add(invoker);
}
}
if (!result.isEmpty()) {
return result;
} else if (this.isForce()) {
logger.warn("The route result is empty and force execute. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey() + ", router: " + url.getParameterAndDecoded(RULE_KEY));
return result;
}
} catch (Throwable t) {
logger.error("Failed to execute condition router rule: " + getUrl() + ", invokers:" + invokers + ", cause: " + t.getMessage(), t);
}
return invokers;
}
路由匹配规则就是根据你的配置信息来进行一一匹配,这块逻辑同学们可以自己去看,这里就不分析了,路由还算是比较容易看懂的。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/76694.html