





集群容错机制主要集中在源码dubbo-cluster
模块中,我们可以把Cluster
看作一个集群容错层,该层中包含Cluster
、Directory
、Router
、LoadBalance
几大核心接口。
注意这里要区分Cluster
层和Cluster
接口,Cluster
层是抽象概念,表示的是对外的整个集群容错层,Cluster
是集群容错接口,提供Failover
、Failfast
等容错策略。
由于Cluster
层的实现比较多,所以下面介绍的流程是一个基于Abstractclusterinvoker
的全量流程,某些实现可能只使用了该流程的一小部分。Cluster
的整体工作流程可以分为以下几个步骤:
-
生成 Invoker
对象。不同的Cluster
实现会生成不同类型的Clusternvoker
对象并返回。然后调用ClusterInvoker
的Invoker
方法,正式开始调用流程。 -
获得可调用的服务列表。首先会做前置校验处理,检查远程服务是否已经被销毁。然后通过 Directory#list
获取所有可用的服务节点列表。接着使用Router
接口处理该服务列表,根据路由规则过滤掉一部分服务节点,最后返回剩下的服务节点列表。 -
做负载均衡。从上一步得到的可用的服务列表还需要通过不同的负载均衡策略筛选出一个服务节点,用作最后的调用。首先会根据用户的配置,调用 ExtensionLoader
获取不同负载均衡策略的扩展点实现,(拓展点加载机制实现原理可参看Dubbo拓展点加载机制)。然后做一些后置操作,如果是异步调用则会设置调用编号。然后调用子类实现的dolnvoke
方法, 子类会根据负载均衡策略筛选出一个可以调用的服务。 -
做 RPC
调用。首先保存每次调用的Invoker
到RPC
上下文,并做RPC调用。然后处理调用结果,对于调用出现异常、成功、失败等情况,每种容错策略会有不同的处理方式。在本文后面会讲到不同容错策略的实现。
总体调用流程如下:
上图是一个全量的通用流程,其中1〜3步都是在抽象方法Abstractclusterinvoker
中 实现的,可以理解为通用的模板流程,主要做了校验、参数准备等工作,最终会调用子类实现的 dolnvoke
方法。不同的ClusterInvoker
子类都继承了这个抽象类,子类会在上述流程中做个性化的定制。

Cluster
接口一共有9种不同的实现,每种实现分别对应不同的Clusterlnvoker
。本篇会介绍继承了 AbstractclusterInvoker
的 7 种Clusterinvoker
实现,Merge
和Mock
属于特殊机制会在下一篇给大家介绍。

Dubbo
容错机制可以增强应用的鲁棒性,集群容错过程对上层用户是完全透明的,但用户也可以通过不同的配置选项来配置不同的容错策略。每一种容错策略又有自己特有的配置选项。Dubbo
中现有 Failover
、 Failfast
、 Failsafe
、Fallback
、 Forking
、 Broadcast
等集群容错机制,容错机制的特性如下表。
容错机制 |
简介 |
---|---|
Failover | 该策略下当服务调用出现失败时,会重试其他服务节点。可以通过retries 参数设置重试次数。这是Dubbo的默认容错机制,会对请求做负载均衡。通常使用在读操作或幂等的写操作上 |
Failfast | 该策略下服务调用快速失败,当请求失败后,快速返回异常结果,不做任何重试。该容错机制会对请求做负载均衡,通常使用在非幂等接口的调用上 |
Failsafe | 该策略下当出现异常时,直接忽略异常。会对请求做负载均衡。 |
Fallback | 该策略下服务调用请求失败后,自动记录生成一个重试Task并添加到Timer中,适用于一些异步或最终一致性的请求。请求会做负载均衡 |
Forking | 该策略下同时调用多个相同的服务,只要其中一个节点返回,则立即返回结果。 通常使用在对接口实时性要求非常高的服务/接口调用上,同时也会占用更多的系统资源 |
Broadcast | 该策略下广播调用所有可用的服务,任意一个服务节点报错则报错。由于是广播模式,因此请求不需要做负载均衡。 通常用于服务状态更新后的广播 |
Mock | 该策略下提供调用失败时,返回模拟的响应结果。或直接强制返回模拟的结果,并不会发起远程调用。 |
Available | 最简单的容错策略,请求不会做负载均衡,遍历所有服务节点列表,找到第一个可用的节点,直接请求并返回结果。如果没有可用的服务节点,则调用直接抛出异常 |
Mergeable | Mergeable模式下可以自动把多个服务节点请求得到的结果进行合并 |
Cluseter的具体实现:开发者可以在<dubbo :service>
、<dubbo:reference>
、<dubbo:consumer>
、
<dubbo:provider>
标签上通过cluster
属性来设置。对于Failover
容错模式,用户可以通过retries
属性来设置最大重试次数。可以设置在dubbo: reference
标签上,也可以设置在细粒度的方法标签dubbo:method
上。
对于Forking
容错模式,用户可通过forks="最大并行数"
属性来设置最大并行数。假设设置 的forks
数为n
,可用的服务数为m
,当n
< m
时,即可用的服务数大于配置的并行数,则并行请 求 n
个服务;当 n
> m
时,即可用的服务数小于配置的并行数,则请求所有可用的服务m
。
对于Mergeable
容错模式,可以在dubbo:reference
标签中通过merger="true"
开启,合并时可以通过group="*"
属性指定需要合并哪些分组的结果。默认会根据方法的返回值自动匹配合并器,如果同一个类型有两个不同的合并器实现,则需要在参数中指定合并器的名字(merger="合并器名"
)。例如:用户根据某List
类型的返回结果实现了多个合并器,则需要手动指定合并器名称,否则系统不知道要用哪个。如果想调用返回结果的指定方法进行合并(如返 回了一个Set
,想调用Set#addAll
方法),则可以通过merger=".addAll'
配置来实现。
官方 Mergeable
配置示例如下。
<!-- 搜索所有分组,根据返回结果的类型自动查找合并器。该接口中getMenuItems方法不做合并 -->
<dubbo:reference interface="com.xxx.MenuService" group="*">
<dubbo:method name="getMenuItems" merger="true" />
</dubbo:reference><!-- 指定方法合并结果 -->
<dubbo:reference interface="com.xxx.MenuService" group="*">
<dubbo:method name="getMenuItems" merger="mymerge" />
</dubbo:reference><!-- 调用返回结果的指定方法进行合并 -->
<dubbo:reference interface="com.xxx.MenuService" group="*">
<dubbo:method name="getMenuItems" merger=".addAll" />
</dubbo:reference>

在微服务环境中,可能多个节点同时都提供同一个服务。当上层调用Invoker时,无论实际存在多少个Invoker,只需要通过Cluster层,即可完成整个调用的容错逻辑,包括获取服务列表、服务路由、负载均衡等,整个处理过程对上层都是透明的。当然,Cluster接口只是将整个逻辑串联起来, 其中Clusterlnvoker只实现了容错策略部分,其他逻辑则是调用了 Directory、Router、LoadBalance 等接口实现。
容错的接口主要分为两大类,第一类是Cluster类,第二类是ClusterInvoker类。Cluster和 ClusterInvoker之间的关系也比较简单,Cluster接口下面有多种不同的实现,每种实现中都需要实现接口的join方法,在方法中会 new 一个对应的ClusterInvoker实现。我们以FailoverCluster 实现为例来进行说明,代码如下:
Cluster与ClusterInvoker之间的关系
public abstract class AbstractCluster implements Cluster {
//.....
}
public class FailoverCluster extends AbstractCluster {
public final static String NAME = "failover";
@Override
public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {
return new FailoverClusterInvoker<>(directory);
}
}
FailoverCluster
是Cluster
的其中的一种实现,FailoverCluster
中直接创建了一个新的 FailoverClusterlnvoker
并返回。FailoverClusterlnvoker
继承的接口是 Invoker
。只看文字描述理解起来还是不是那么容易。因此,在理解集群容错的详细原理之前,我们先从”上帝视角”看一下整个集群容错的接口之间的关系。Cluster
接口的类图关系如下图。
ClusterInvoker总体类结构关系
Invoker
接口在最上层,它下面分别有 AbstractClusterlnvoker
、 MockClusterlnvoker
和 MergeableClusterlnvoker
3个类。其中,AbstractClusterlnvoker
是一个抽象类,其封装了通用的模板逻辑,如获取服务列表、负载均衡、调用服务提供者等,并留了一个dolnvoke
方法需要子类自行拓展实现。AbstractClusterlnvoker
下面有7个子类,分别实现了不同的集群容错机制。
MockClusterlnvoker
和MergeableClusterlnvoker
由于并不适用于正常的集群容错逻辑,所以没有挂在AbstractClusterlnvoker
抽象类下面,而是直接继承了 Invoker
接口。
以上就是容错的接口,Directory
、Router
和LoadBalance
的接口会在以后的文章中详细探讨。

Cluster
接口上有SPI
注解@SPI(FailoverCluster.NAME)
,即默认实现是Failover
。该策略的 代码逻辑如下:
-
对传入的参数进行校验。校验从 AbstractClusterlnvoker
传入的Invoker
列表是否为空。 -
获取重试配置参数。从调用 URL
中获取对应的retries
重试次数。 -
初始化一些集合和对象。用于保存调用过程中出现的异常、记录调用了哪些节点(这个会在负载均衡中使用,在某些配置下,尽量不要一直调用同一个服务)。 -
使用for循环实现重试,for循环的次数就是重试的次数。成功则返回,否则继续循环。如果for循环完,还没有一个成功的返回,则抛出异常,把(3)中记录的信息抛出去。
前3步都是做一些参数校验、相关数据的准备工作。第4步才是真正的调用逻辑。以下步骤是 for 循环中的逻辑:
-
对传入的参数进行校验。如果for循环次数大于1,即有过一次失败,则会再次校验节点是否被销毁、传 入的 Invoker
列表是否为空。 -
做负载均衡。调用 select
方法做负载均衡,得到要调用的节点,并记录这个节点到步骤3 的集合里,再把已经调用的节点信息放进RPC
上下文中。 -
发起远程调用。调用 invoker#invoke
方法做远程调用,成功则返回,异常则记录异常信息, 再做下次循环。
Failover流程如下图


Failfast
会在调用失败后直接抛出异常并返回,逻辑实现非常简单,具体执行逻辑如下:
-
对传入参数进行参数校验。校验从 AbstractClusterlnvoker
传入的Invoker列表是否为空。 -
负载均衡。调用 select
方法做负载均衡,得到要调用的节点。 -
发起远程调用。在 try
代码块中调用invoker#invoke
方法做远程调用。如果捕获到Exception
,则直接封装成RpcException
然后抛出。

Failsafe
模式下调用时如果出现异常,则会直接忽略异常:捕获异常,返回空的结果集。实现逻辑也比较简单,步骤如下:
-
对传入的参数进行校验。校验从 AbstractClusterlnvoker
传入的Invoker
列表是否为空。 -
负载均衡。调用 select
方法做负载均衡,获取要调用的服务节点。 -
发起远程调用。在 try
代码块中调用invoker#invoke
方法做远程调用,”catch
“到任何异常都直接”吞掉”,返回一个空的结果集。
Failsafe调用代码
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
try {
//1 验传入的参数
checkInvokers(invokers, invocation);
//2 做负载均衡
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
//3 进行远程调用,调用成功则直接返回
return invokeWithContext(invoker, invocation);
} catch (Throwable e) {
logger.error("Failsafe ignore exception: " + e.getMessage(), e);
//捕获到异常,直接返回一个空的结果集
return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // ignore
}
}

Fallback
模式下如果调用失败,则会定期重试。FailbackClusterlnvoker
里面定义了一个 HashedWheelTimer
,里面保存了定时失败需要重试的Task,重试失败重新放回HashedWheelTimer
中,如果调用重试成功,则会把serviceContext
清除。dolnvoke
的调用逻辑如下:
-
对传入的参数进行校验。校验从 AbstractClusterlnvoker
传入的Invoker
列表是否为空。 -
负载均衡。调用select方法做负载均衡,得到要调用的节点。 -
发起远程调用。在 try
代码块中调用invoker#invoke
方法做远程调用,”catch
” 到异常后 直接把invocation
、loadbalance
、invokers
、consumerUrl
等信息构建了一个RetryTimerTask
重试任务。 -
定时 HashedWheelTimer
会定时执行RetryTimerTask
,如果请求仍然失败,则会重新添加到HashedWheelTimer
中。
Fallback重试代码
protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
Invoker<T> invoker = null;
URL consumerUrl = RpcContext.getServiceContext().getConsumerUrl();
try {
//校验传入的参数
checkInvokers(invokers, invocation);
//负载均衡
invoker = select(loadbalance, invocation, invokers, null);
//远程调用
return invokeWithContextAsync(invoker, invocation, consumerUrl);
} catch (Throwable e) {
logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
+ e.getMessage() + ", ", e);
if (retries > 0) {
////添加失败的请求到重试failTimer中
addFailed(loadbalance, invocation, invokers, invoker, consumerUrl);
}
return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // ignore
}
}
//添加失败的请求到重试failTimer中
private void addFailed(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker, URL consumerUrl) {
if (failTimer == null) {
synchronized (this) {
if (failTimer == null) {
failTimer = new HashedWheelTimer(
new NamedThreadFactory("failback-cluster-timer", true),
1,
TimeUnit.SECONDS, 32, failbackTasks);
}
}
}
//构建RetryTimerTask
RetryTimerTask retryTimerTask = new RetryTimerTask(loadbalance, invocation, invokers, lastInvoker, retries, RETRY_FAILED_PERIOD, consumerUrl);
try {
failTimer.newTimeout(retryTimerTask, RETRY_FAILED_PERIOD, TimeUnit.SECONDS);
} catch (Throwable e) {
logger.error("Failback background works error, invocation->" + invocation + ", exception: " + e.getMessage());
}
}

Broadcast
模式下会调用时广播给所有可用的节点,如果任何一个节点有报错,则返回异常。步骤如下:
-
做前置处理。校验从 AbstractClusterlnvoker
传入的Invoker
列表是否为空;在RPC
上下 文中设置Invoker
列表;初始化了一些对象,用于保存调用过程中产生的异常和结果信息等。(校验参数、初始化用于保存异常和结果的对象) -
循环遍历所有 Invoker
,直接做RPC
调用。任何一个节点调用出错,并不会中断整个广播过程,会先记录异常,在最后一个广播完成后再抛出。如果多个节点异常,则只有最后一个节点的异常时会被抛出,前面的异常将会被覆盖。

Available
模式下调用时,当找到第一个可用的服务节点直接调用,并返回结果。步骤如下:
-
遍历 AbstractClusterlnvoker
传入的Invoker
列表,如果Invoker
是可用的,就直接调用并返回。 -
如果遍历整个列表仍然没找到可用的 Invoker
,则会抛出异常。

Forking
模式下调用可以同时并发请求多个服务,有任何一个服务节点返回,则直接返回。相对于其他调用策略,Forking
的实现是最复杂的。其步骤如下:
-
准备工作。校验传入的
Invoker
列表是否可用;初始化一个Invoker
集合,用于保存真正要调用的Invoker
列表,从URL
中得到最大并行数、超时时间。 -
获取最终要调用的
Invoker
列表。假设用户设置最大的并行数为n
, 实际可以调用的 最大服务数为m
。如果n
< 0或n
<m
,则说明可用的服务数小于用户的设置,因此最终要调用的Invoker
只能有m
个;如果n
<m
,则会循环调用负载均衡方法,不断得到可调用的Invoker
,加入 步骤1中的Invoker
集合里。这里有一点需要注意:在
Invoker
加入集合时,会做去重操作。因此,如果用户设置的负载 均衡策略每次返回的都是同一个Invoker
,那么集合中最后只会存在一个Invoker
,也就是只会 调用一个节点。 -
调用前的准备工作。设置要调用的
Invoker
列表到RPC
上下文,初始化一个异常计数器,初始化一个阻塞队列,用于记录并行调用的结果。 -
执行调用。循环使用线程池并行调用,调用成功,则把结果加入阻塞队列,调用失败, 则失败计数+1。如果所有线程的调用都失败了,即失败计数大于等于所有可调用的
Invoker
时,则把异常信息加入阻塞队列中。注意:并行调用是如何保证个别调用失败不返回异常信息,只有全部失败才返回异常信息的呢?因为有判断条件,当失败计数N所有可调用的
Invoker
时,才会把异常信息放入阻塞队列,所以只有当最后一个Invoker
也调用失败时才会把异常信息保存到阻塞队列, 从而达到全部失败才返回异常的效果。 -
同步等待结果。由于步骤4中的步骤是在线程池中执行的,因此主线程还会继续往下执行,主线程中会使用阻塞队列的
poll
(“超时时间”)方法,同步等待阻塞队列中的第一个结果, 如果是正常结果则返回,如果是异常则抛出异常。
从上面的处理逻辑我们可以知道,Forking
模式下的超时机制是通过在阻塞队列的poll
方法中传入超时时间参数来实现的,线程池中的并发调用会获得第一个正常返回的处理结果。只有所有请求都失败了,Forking
才会返回失败。
Forking
调用流程如图









原文始发于微信公众号(爪哇干货分享):Dubbo集群容错 – 容错机制的实现
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/172081.html