前言
继上次小编所讲RPC协议暴露服务并且远程调用之后,小编这次给大家带来注册中心协议整体流程原理以及源码精讲,Dubbo协议服务暴露与引用以及源码分析文章中,远程服务暴露可以只通过RPC协议即可,那为什么要注册中心呢,不知道大家有没有考虑到当多服务多调用的时候,我们的服务端url不可能写死,那么我们的注册中心就上场了。那怎样才可以与RPC协议进行交互并且注册服务到注册中心,这就是本篇小编给大家带来的内容。
注册协议
如上图所示,RegistryProtocol与DubboProtocol都是实现了Protocol。
在服务暴露的时候,调用的是RegistryProtocol,这个RegistryProtocol内部包含了一个协议DubboProtocol,然后RegistryProtocol把url注册到ZookeeperRegistry。上图可以看到registry协议地址中export属性就是dubbo协议(上面registry协议地址只包含了服务暴露)。
这里服务暴露是这样,引用也是这样,通过RegistryProtocol去注册消费者,在服务引用的时候自动找到服务提供者之后建立连接然后发起调用。
注册协议服务暴露
服务暴露流程:
使用RegistryProtocol调用,进行服务的暴露然后进行url的注册。
小编从RegistryProtocol开始通过代码示例讲一下流程。(记得开启zookeeper)
public class RegistryProtocolTest {
private static final String REGISTRY_URL_TEXT = "zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=boot-server&dubbo=2.0.2&export=dubbo://127.0.0.1:20880/com.learn.code.IUserService";
private static final String URL_TEXT = "dubbo://127.0.0.1:20880/com.learn.code.IUserService";
@Before
public void init() {
ApplicationModel.getConfigManager().setApplication(new ApplicationConfig("test"));
}
@Test
public void exportServer() throws IOException {
//准备环境
UserService userService = new UserService();
ServiceRepository serviceRepository = ApplicationModel.getServiceRepository();
ServiceDescriptor serviceDescriptor = serviceRepository.registerService(IUserService.class);
serviceRepository.registerProvider("com.learn.code.IUserService",
userService, serviceDescriptor,
new ServiceConfig<>(), null);
//注册协议
RegistryProtocol registryProtocol = new RegistryProtocol();
//依赖实际协议
registryProtocol.setProtocol(new DubboProtocol());
//依赖注册工厂
ZookeeperRegistryFactory zookeeperRegistryFactory = new ZookeeperRegistryFactory();
//工厂的话依赖zookeeper的客户端
zookeeperRegistryFactory.setZookeeperTransporter(new CuratorZookeeperTransporter());
registryProtocol.setRegistryFactory(zookeeperRegistryFactory);
//ProxyFactory jdkProxyFactory = new JdkProxyFactory();
ProxyFactory javassistProxyFactory = new JavassistProxyFactory();
//通过javassistProxyFactory(dubbo默认)获取到invoker后进行服务暴露
registryProtocol.export(javassistProxyFactory.getInvoker(userService,IUserService.class, URL.valueOf(REGISTRY_URL_TEXT)));
System.in.read();
}
}
//直接使用dubbo协议进行测试
@Test
public void referServer() throws IOException {
DubboProtocol dubboProtocol = new DubboProtocol();
Invoker<IUserService> invoker = dubboProtocol.refer(IUserService.class, URL.valueOf(URL_TEXT));
ProxyFactory proxyFactory = new JdkProxyFactory();
IUserService proxy = proxyFactory.getProxy(invoker);
System.out.println(proxy.getUser(1L));
}
测试结果
UserDto(id=1, age=18, name=Bob, desc=当前服务:null)
JavassistProxyFactory是dubbo默认的(基于动态生成代理类),与JdkProxyFactory通过反射不同,生成的代理类然后实例化后可以直接调用方法,而不是通过反射。大家有兴趣可自己看一下源码,小编不做详解。(有机会给大家看下生成代理类后的代码以及如何获取)
源码阅读
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// 注册地址
URL registryUrl = getRegistryUrl(originInvoker);
// url to export locally 服务提供者地址 即dubbo
URL providerUrl = getProviderUrl(originInvoker);
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
// the same service. Because the subscribed is cached key with the name of the service, it causes the
// subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
//export invoker 暴露远程服务
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// url to registry
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
// decide if we need to delay publish
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
if (register) {
// 将url进行注册
register(registryUrl, registeredProviderUrl);
}
// register stated url on provider model
// 在提供者模型上注册指定的url
registerStatedUrl(registryUrl, registeredProviderUrl, register);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
// Deprecated! Subscribe to override rules in 2.6.x or before.
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
notifyExport(exporter);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<>(exporter);
}
上面最重要两步骤就是
//export invoker 暴露远程服务 上篇文章所讲的dubbo远程服务的暴露并且放入一个map中,以便后续操作(销毁)
final ExporterChangeableWrapper exporter = doLocalExport(originInvoker, providerUrl);
// 将url进行注册 连接zookeeper然后写入url
register(registryUrl, registeredProviderUrl);
注册协议服务引用
与服务暴露流程不同的是,这边先写入消费者url然后创建invoker。创建invoker过程中包括了服务订阅以及服务引用和构建集群的Invoker。这是大致流程。下面小编详细讲一下每个步骤的过程。
- 注册消费者URL:这个没上面可讲的,根据消费端的配置封装成所需的url写入注册中心
- 服务订阅:消费端监听服务的提供者配置和路由,这个主要是在每次服务端有发生变更时触发通知,刚刚启动触发第一次通知,刷新invokers,这边如果提供端有多个urls,然后多次调用DubboProtocl进行进行引用。
- 构建集群Invoker:基于多个invokers构建一个集群Invoker即ClusterInvoker。
- 最后调用的时候,通过集群Invoker通过负载均衡机制选择一个invoker发起远程调用。
小编下面通过代码示例进行证明:
private static final String REF_URL_TEXT = "zookeeper://127.0.0.1:2181//org.apache.dubbo.registry.RegistryService?application=boot-client&dubbo=2.0.2&interface=com.learn.code.IUserService";
@Test
public void refServer(){
//注册协议
RegistryProtocol registryProtocol = new RegistryProtocol();
//依赖实际协议
registryProtocol.setProtocol(new DubboProtocol());
//依赖注册工厂
ZookeeperRegistryFactory zookeeperRegistryFactory = new ZookeeperRegistryFactory();
//工厂的话依赖zookeeper的客户端
zookeeperRegistryFactory.setZookeeperTransporter(new CuratorZookeeperTransporter());
registryProtocol.setRegistryFactory(zookeeperRegistryFactory);
//服务监听调用引用创建集群Invoker
Invoker<IUserService> refer = registryProtocol.refer(IUserService.class, URL.valueOf(REF_URL_TEXT));
//使用代理对象
ProxyFactory javassistProxyFactory = new JavassistProxyFactory();
IUserServiceproxy = javassistProxyFactory.getProxy(refer);
System.out.println(proxy.getUser(1L));
}
测试结果和上面一样。
小编先带大家看下集群Invoker的结构是怎样的。
Dubbo在消费端封装的Invoker非常多。不管怎么封装其实原理不变的,下面小编给大家捋捋Invoker
Cluster接口中可以根据Directory我们构建一个ClusterInvoker,本身Cluster实现类中如FailbackCluster同样帮助创建一个FailbackClusterInvoker。这个大家不用特别关心。重点是AbstractClusterInvoker有一个了Directory,而服务注册表就保存在他里面,同时含有多个invoker,invoker底层为DubboInvoker,其包含了多个client,这个client则为NettyClient,这样就和上述debug图中的invoker对应起来了。当然集群模式下通过负载均衡机制挑选一个invoker发起调用。
解释完集群Invoker的结构,继续说一下服务引用的时序图
服务引用时序图
小编根据源码给大家解释一下(摘重要源码):主要看注释
org.apache.dubbo.registry.integration.RegistryProtocol#refer
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
//取出消费端提供的url使用registerFactory构建register
url = getRegistryUrl(url);
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
//取出配置所需要采用的集群,没有配置默认失败重试,开始服务引用
Cluster cluster = Cluster.getCluster(qs.get(CLUSTER_KEY));
return doRefer(cluster, registry, type, url);
}
org.apache.dubbo.registry.integration.RegistryProtocol#doRefer
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
//创建最重要的对象RegistryDirectory,与注册中心与建立连接需要两个对象registry和远程协议
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (directory.isShouldRegister()) {
directory.setRegisteredConsumerUrl(subscribeUrl);
//注册消费者url,创建消费者临时节点
registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(subscribeUrl);
//服务订阅,这里代码比较复杂,请看下一块调用最终的地方
directory.subscribe(toSubscribeUrl(subscribeUrl));
//封装,directory保存了所有的invoker,即clusterInvoker
Invoker<T> invoker = cluster.join(directory);
List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
if (CollectionUtils.isEmpty(listeners)) {
return invoker;
}
RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker);
for (RegistryProtocolListener listener : listeners) {
listener.onRefer(this, registryInvokerWrapper);
}
return registryInvokerWrapper;
}
org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doSubscribe
//这边走的是else代码
List<URL> urls = new ArrayList<>();
//监听provider,config,router
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
//添加子节点并且notify
ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds)));
zkClient.create(path, false);
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
//添加所有的提供者信息
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
//触发第一次刷新invoker,里面方法进行了try catch,进行失败重试,最终调用listener.notify
notify(url, listener, urls);
org.apache.dubbo.registry.integration.RegistryDirectory#notify
@Override
public synchronized void notify(List<URL> urls) {
//过滤一些信息进行分类
Map<String, List<URL>> categoryUrls = urls.stream()
.filter(Objects::nonNull)
.filter(this::isValidCategory)
.filter(this::isNotCompatibleFor26x)
.collect(Collectors.groupingBy(this::judgeCategory));
// providers 重点看提供者信息
List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
//刷新invoker,这里面有比较新旧invokers来判断是否要销毁没有用的url
refreshOverrideAndInvoker(providerURLs);
}
org.apache.dubbo.registry.integration.RegistryDirectory#toInvokers
//建立连接封装DubboInvoker
invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
总结
本篇文章与上一篇RPC协议远程服务暴露才是完整的Dubbo服务暴露与引用的全过程。缺一不可,这次小编并没有提到服务的销毁流程,后面有时间补上,大家也可以自己去了解和熟悉一下。当然这里还不是面向使用者,更像是面对dubbo开发人员,服务的暴露和引用实际使用的时候为ServiceConfig和ReferenceConfig,小编拆散揉碎了来从底层一步一步往上层扩展。先是dubbo协议可以直接远程调用,然后使用registry协议将dubbo协议封装注册到注册中心上去,最后我们从使用者角度使用ServiceConfig和ReferenceConfig来调用registry协议来完成dubbo的整个调用过程。当然这边小编在dubbo的第一章节中Dubbo整体架构与核⼼组件认知以及简单示例就有其调用示例。
到这里小编的Dubbo源码之旅快接近尾声了,后面还有Dubbo远程通讯时编解码源码分析,Dubbo整个调用链路流程以及Dubbo服务销毁。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/13570.html