Dubbo集群容错 – 容错机制的实现




Dubbo集群容错 - 容错机制的实现
前言
微服务环境下,为了保证服务的高可用,很少会有部署单点服务,服务通常都是以集群化部署的形式出现的。在之前文章中 [Dubbo远程调用过程]我们已经了解了Dubbo远程调用的实现,但是被调用的远程服务并不是每时每刻都保持良好状态的,当某个服务节点调用抛出异常时,比如网络抖动、服务短暂不可用等情况下需要自动进行容错处理,或者开发者只想本地测试、服务降级,需要Mock(模拟)返回结果,就需要使用本篇提到集群容错机制。

往期推荐

Dubbo远程调用过程
janker

Dubbo集群容错 - 容错机制的实现
Dubbo服务消费的实现原理
janker

Dubbo集群容错 - 容错机制的实现
Dubbo服务暴露的实现原理
janker

Dubbo集群容错 - 容错机制的实现
Dubbo拓展点加载机制
janker

Dubbo集群容错 - 容错机制的实现

Dubbo集群容错 - 容错机制的实现
概述

集群容错机制主要集中在源码dubbo-cluster模块中,我们可以把Cluster看作一个集群容错层,该层中包含ClusterDirectoryRouterLoadBalance 几大核心接口

注意这里要区分Cluster层和Cluster接口,Cluster层是抽象概念,表示的是对外的整个集群容错层,Cluster是集群容错接口,提供FailoverFailfast等容错策略。

由于Cluster层的实现比较多,所以下面介绍的流程是一个基于Abstractclusterinvoker的全量流程,某些实现可能只使用了该流程的一小部分。Cluster的整体工作流程可以分为以下几个步骤:

  1. 生成Invoker对象。不同的Cluster实现会生成不同类型的Clusternvoker对象并返回。然后调用ClusterInvokerInvoker方法,正式开始调用流程。
  2. 获得可调用的服务列表。首先会做前置校验处理,检查远程服务是否已经被销毁。然后通过 Directory#list获取所有可用的服务节点列表。接着使用Router接口处理该服务列表,根据路由规则过滤掉一部分服务节点,最后返回剩下的服务节点列表。
  3. 做负载均衡。从上一步得到的可用的服务列表还需要通过不同的负载均衡策略筛选出一个服务节点,用作最后的调用。首先会根据用户的配置,调用ExtensionLoader获取不同负载均衡策略的扩展点实现,(拓展点加载机制实现原理可参看Dubbo拓展点加载机制)。然后做一些后置操作,如果是异步调用则会设置调用编号。然后调用子类实现的dolnvoke方法, 子类会根据负载均衡策略筛选出一个可以调用的服务。
  4. RPC调用。首先保存每次调用的InvokerRPC上下文,并做RPC调用。然后处理调用结果,对于调用出现异常、成功、失败等情况,每种容错策略会有不同的处理方式。在本文后面会讲到不同容错策略的实现。

总体调用流程如下:

Dubbo集群容错 - 容错机制的实现

图是一个全量的通用流程,其中1〜3步都是在抽象方法Abstractclusterinvoker中 实现的,可以理解为通用的模板流程,主要做了校验、参数准备等工作,最终会调用子类实现的 dolnvoke方法。不同的ClusterInvoker子类都继承了这个抽象类,子类会在上述流程中做个性化的定制。

Dubbo集群容错 - 容错机制的实现
容错机制的实现

Cluster接口一共有9种不同的实现,每种实现分别对应不同的Clusterlnvoker。本篇会介绍继承了 AbstractclusterInvoker 的 7 种Clusterinvoker 实现,MergeMock属于特殊机制会在下一篇给大家介绍。

Dubbo集群容错 - 容错机制的实现
容错机制概述

Dubbo容错机制可以增强应用的鲁棒性,集群容错过程对上层用户是完全透明的,但用户也可以通过不同的配置选项来配置不同的容错策略。每一种容错策略又有自己特有的配置选项。Dubbo 中现有 FailoverFailfastFailsafeFallbackForkingBroadcast 等集群容错机制,容错机制的特性如下表。

容错机制
简介
Failover 该策略下当服务调用出现失败时,会重试其他服务节点。可以通过retries参数设置重试次数。这是Dubbo的默认容错机制,会对请求做负载均衡。
通常使用在读操作或幂等的写操作上
Failfast 该策略下服务调用快速失败,当请求失败后,快速返回异常结果,不做任何重试。该容错机制会对请求做负载均衡,通常使用在非幂等接口的调用上
Failsafe 该策略下当出现异常时,直接忽略异常。会对请求做负载均衡。
Fallback 该策略下服务调用请求失败后,自动记录生成一个重试Task并添加到Timer中,适用于一些异步或最终一致性的请求。请求会做负载均衡
Forking 该策略下同时调用多个相同的服务,只要其中一个节点返回,则立即返回结果。
通常使用在对接口实时性要求非常高的服务/接口调用上,同时也会占用更多的系统资源
Broadcast 该策略下广播调用所有可用的服务,任意一个服务节点报错则报错。由于是广播模式,因此请求不需要做负载均衡。
通常用于服务状态更新后的广播
Mock 该策略下提供调用失败时,返回模拟的响应结果。或直接强制返回模拟的结果,并不会发起远程调用。
Available 最简单的容错策略,请求不会做负载均衡,遍历所有服务节点列表,找到第一个可用的节点,直接请求并返回结果。如果没有可用的服务节点,则调用直接抛出异常
Mergeable Mergeable模式下可以自动把多个服务节点请求得到的结果进行合并

Cluseter的具体实现:开发者可以在<dubbo :service><dubbo:reference><dubbo:consumer>

<dubbo:provider>标签上通过cluster属性来设置。对于Failover容错模式,用户可以通过retries属性来设置最大重试次数。可以设置在dubbo: reference标签上,也可以设置在细粒度的方法标签dubbo:method上。

对于Forking容错模式,用户可通过forks="最大并行数"属性来设置最大并行数。假设设置 的forks数为n,可用的服务数为m,当n < m时,即可用的服务数大于配置的并行数,则并行请 求 n 个服务;当 n > m 时,即可用的服务数小于配置的并行数,则请求所有可用的服务m

对于Mergeable容错模式,可以dubbo:reference标签中通过merger="true"开启,合并时可以通过group="*"属性指定需要合并哪些分组的结果。默认会根据方法的返回值自动匹配合并器,如果同一个类型有两个不同的合并器实现,则需要在参数中指定合并器的名字(merger="合并器名")。例如:用户根据某List类型的返回结果实现了多个合并器,则需要手动指定合并器名称,否则系统不知道要用哪个。如果想调用返回结果的指定方法进行合并(如返 回了一个Set,想调用Set#addAll方法),则可以通过merger=".addAll'配置来实现。

官方 Mergeable配置示例如下。

<!-- 搜索所有分组,根据返回结果的类型自动查找合并器。该接口中getMenuItems方法不做合并 --><dubbo:reference interface="com.xxx.MenuService" group="*">
    <dubbo:method name="getMenuItems" merger="true" />
</dubbo:reference>
<!-- 指定方法合并结果 -->
<dubbo:reference interface="com.xxx.MenuService" group="*">
    <dubbo:method name="getMenuItems" merger="mymerge" />
</dubbo:reference>

<!-- 调用返回结果的指定方法进行合并 -->
<dubbo:reference interface="com.xxx.MenuService" group="*">
    <dubbo:method name="getMenuItems" merger=".addAll" />
</dubbo:reference>

Dubbo集群容错 - 容错机制的实现





Cluster 类接口关系

在微服务环境中,可能多个节点同时都提供同一个服务。当上层调用Invoker时,无论实际存在多少个Invoker,只需要通过Cluster层,即可完成整个调用的容错逻辑,包括获取服务列表、服务路由、负载均衡等,整个处理过程对上层都是透明的。当然,Cluster接口只是将整个逻辑串联起来, 其中Clusterlnvoker只实现了容错策略部分,其他逻辑则是调用了 DirectoryRouterLoadBalance 等接口实现。

容错的接口主要分为两大类,第一类是Cluster类,第二类是ClusterInvoker类。ClusterClusterInvoker之间的关系也比较简单,Cluster接口下面有多种不同的实现,每种实现中都需要实现接口的join方法,在方法中会 new 一个对应的ClusterInvoker实现。我们以FailoverCluster 实现为例来进行说明,代码如下:

ClusterClusterInvoker之间的关系

public abstract class AbstractCluster implements Cluster {
 //.....
}
public class FailoverCluster extends AbstractCluster {

    public final static String NAME = "failover";

    @Override
    public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {
        return new FailoverClusterInvoker<>(directory);
    }

}

FailoverClusterCluster的其中的一种实现,FailoverCluster中直接创建了一个新的 FailoverClusterlnvoker 并返回。FailoverClusterlnvoker 继承的接口是 Invoker。只看文字描述理解起来还是不是那么容易。因此,在理解集群容错的详细原理之前,我们先从”上帝视角”看一下整个集群容错的接口之间的关系。Cluster接口的类图关系如下图。


Dubbo集群容错 - 容错机制的实现

ClusterInvoker总体类结构关系

Dubbo集群容错 - 容错机制的实现

Invoker 接口在最上层,它下面分别有 AbstractClusterlnvokerMockClusterlnvokerMergeableClusterlnvoker 3个类。其中,AbstractClusterlnvoker 是一个抽象类,其封装了通用的模板逻辑,如获取服务列表、负载均衡、调用服务提供者等,并留了一个dolnvoke方法需要子类自行拓展实现。AbstractClusterlnvoker 下面有7个子类,分别实现了不同的集群容错机制。

MockClusterlnvokerMergeableClusterlnvoker由于并不适用于正常的集群容错逻辑,所以没有挂在AbstractClusterlnvoker抽象类下面,而是直接继承了 Invoker接口。


以上就是容错的接口,DirectoryRouterLoadBalance的接口会在以后的文章中详细探讨。

Dubbo集群容错 - 容错机制的实现
Failover 策略

Cluster接口上有SPI注解@SPI(FailoverCluster.NAME),即默认实现是Failover。该策略的 代码逻辑如下:

  1. 对传入的参数进行校验。校验从AbstractClusterlnvoker传入的Invoker列表是否为空。
  2. 获取重试配置参数。从调用URL中获取对应的retries重试次数。
  3. 初始化一些集合和对象。用于保存调用过程中出现的异常、记录调用了哪些节点(这个会在负载均衡中使用,在某些配置下,尽量不要一直调用同一个服务)。
  4. 使用for循环实现重试,for循环的次数就是重试的次数。成功则返回,否则继续循环。如果for循环完,还没有一个成功的返回,则抛出异常,把(3)中记录的信息抛出去。

前3步都是做一些参数校验、相关数据的准备工作。第4步才是真正的调用逻辑。以下步骤是 for 循环中的逻辑:

  • 对传入的参数进行校验。如果for循环次数大于1,即有过一次失败,则会再次校验节点是否被销毁、传 入的Invoker列表是否为空。
  • 做负载均衡。调用select方法做负载均衡,得到要调用的节点,并记录这个节点到步骤3 的集合里,再把已经调用的节点信息放进RPC上下文中。
  • 发起远程调用。调用invoker#invoke方法做远程调用,成功则返回,异常则记录异常信息, 再做下次循环。

Failover流程如下图

Dubbo集群容错 - 容错机制的实现
Failover总体流程

Dubbo集群容错 - 容错机制的实现
Failfast 策略

Failfast会在调用失败后直接抛出异常并返回,逻辑实现非常简单,具体执行逻辑如下:

  • 对传入参数进行参数校验。校验从AbstractClusterlnvoker传入的Invoker列表是否为空。
  • 负载均衡。调用select方法做负载均衡,得到要调用的节点。
  • 发起远程调用。在try代码块中调用invoker#invoke方法做远程调用。如果捕获到Exception,则直接封装成RpcException 然后抛出。


 

Dubbo集群容错 - 容错机制的实现
Failsafe策略


Failsafe模式下调用时如果出现异常,则会直接忽略异常:捕获异常,返回空的结果集。实现逻辑也比较简单,步骤如下:

  • 对传入的参数进行校验。校验从AbstractClusterlnvoker传入的Invoker列表是否为空。
  • 负载均衡。调用select方法做负载均衡,获取要调用的服务节点。
  • 发起远程调用。在try代码块中调用invoker#invoke方法做远程调用,”catch“到任何异常都直接”吞掉”,返回一个空的结果集。

Failsafe调用代码

    public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        try {
           //1 验传入的参数
            checkInvokers(invokers, invocation);
           //2 做负载均衡
            Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
           //3 进行远程调用,调用成功则直接返回
            return invokeWithContext(invoker, invocation);
        } catch (Throwable e) {
            logger.error("Failsafe ignore exception: " + e.getMessage(), e);
           //捕获到异常,直接返回一个空的结果集
            return AsyncRpcResult.newDefaultAsyncResult(nullnull, invocation); // ignore
        }
    }



Dubbo集群容错 - 容错机制的实现
Fallback策略

Fallback模式下如果调用失败,则会定期重试。FailbackClusterlnvoker里面定义了一个 HashedWheelTimer,里面保存了定时失败需要重试的Task,重试失败重新放回HashedWheelTimer中,如果调用重试成功,则会把serviceContext清除。dolnvoke的调用逻辑如下:

  1. 对传入的参数进行校验校验从AbstractClusterlnvoker传入的Invoker列表是否为空。
  2. 负载均衡。调用select方法做负载均衡,得到要调用的节点。
  3. 发起远程调用。try代码块中调用invoker#invoke方法做远程调用,” catch” 到异常后 直接把invocationloadbalanceinvokersconsumerUrl等信息构建了一个RetryTimerTask重试任务。
  4. 定时HashedWheelTimer会定时执行RetryTimerTask,如果请求仍然失败,则会重新添加到HashedWheelTimer中。

Fallback重试代码

    protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        Invoker<T> invoker = null;
        URL consumerUrl = RpcContext.getServiceContext().getConsumerUrl();
        try {
           //校验传入的参数
            checkInvokers(invokers, invocation);
           //负载均衡
            invoker = select(loadbalance, invocation, invokers, null);
            //远程调用
            return invokeWithContextAsync(invoker, invocation, consumerUrl);
        } catch (Throwable e) {
            logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
                + e.getMessage() + ", ", e);
            if (retries > 0) {
               ////添加失败的请求到重试failTimer中
                addFailed(loadbalance, invocation, invokers, invoker, consumerUrl);
            }
            return AsyncRpcResult.newDefaultAsyncResult(nullnull, invocation); // ignore
        }
    }
 
//添加失败的请求到重试failTimer中
private void addFailed(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker, URL consumerUrl) {
        if (failTimer == null) {
            synchronized (this) {
                if (failTimer == null) {
                    failTimer = new HashedWheelTimer(
                        new NamedThreadFactory("failback-cluster-timer"true),
                        1,
                        TimeUnit.SECONDS, 32, failbackTasks);
                }
            }
        }
         //构建RetryTimerTask
        RetryTimerTask retryTimerTask = new RetryTimerTask(loadbalance, invocation, invokers, lastInvoker, retries, RETRY_FAILED_PERIOD, consumerUrl);
        try {
            failTimer.newTimeout(retryTimerTask, RETRY_FAILED_PERIOD, TimeUnit.SECONDS);
        } catch (Throwable e) {
            logger.error("Failback background works error, invocation->" + invocation + ", exception: " + e.getMessage());
        }
}



Dubbo集群容错 - 容错机制的实现
Broadcast 策略

Broadcast模式下会调用时广播给所有可用的节点,如果任何一个节点有报错,则返回异常。步骤如下:

  1. 做前置处理。校验从AbstractClusterlnvoker传入的Invoker列表是否为空;在RPC上下 文中设置Invoker列表;初始化了一些对象,用于保存调用过程中产生的异常和结果信息等。(校验参数、初始化用于保存异常和结果的对象)
  2. 循环遍历所有Invoker,直接做RPC调用。任何一个节点调用出错,并不会中断整个广播过程,会先记录异常,在最后一个广播完成后再抛出。如果多个节点异常,则只有最后一个节点的异常时会被抛出,前面的异常将会被覆盖。

Dubbo集群容错 - 容错机制的实现
Available 策略

Available 模式下调用时,当找到第一个可用的服务节点直接调用,并返回结果。步骤如下:

  1. 遍历AbstractClusterlnvoker传入的Invoker列表,如果Invoker是可用的,就直接调用并返回。
  2. 如果遍历整个列表仍然没找到可用的Invoker,则会抛出异常。




Dubbo集群容错 - 容错机制的实现
Forking 策略

Forking模式下调用可以同时并发请求多个服务,有任何一个服务节点返回,则直接返回。相对于其他调用策略,Forking的实现是最复杂的。其步骤如下:

  1. 准备工作。校验传入的Invoker列表是否可用;初始化一个Invoker集合,用于保存真正要调用的Invoker列表,从URL中得到最大并行数、超时时间。

  2. 获取最终要调用的Invoker列表。假设用户设置最大的并行数为n, 实际可以调用的 最大服务数为m。如果n< 0或 n < m,则说明可用的服务数小于用户的设置,因此最终要调用的 Invoker 只能有m个;如果 n < m ,则会循环调用负载均衡方法,不断得到可调用的Invoker,加入 步骤1中的Invoker集合里。

    这里有一点需要注意:在Invoker加入集合时,会做去重操作。因此,如果用户设置的负载 均衡策略每次返回的都是同一个Invoker,那么集合中最后只会存在一个Invoker,也就是只会 调用一个节点。

  3. 调用前的准备工作。设置要调用的Invoker列表到RPC上下文,初始化一个异常计数器,初始化一个阻塞队列,用于记录并行调用的结果。

  4. 执行调用。循环使用线程池并行调用,调用成功,则把结果加入阻塞队列,调用失败, 则失败计数+1。如果所有线程的调用都失败了,即失败计数大于等于所有可调用的Invoker时,则把异常信息加入阻塞队列中。

    注意:并行调用是如何保证个别调用失败不返回异常信息,只有全部失败才返回异常信息的呢?因为有判断条件,当失败计数N所有可调用的Invoker时,才会把异常信息放入阻塞队列,所以只有当最后一个Invoker也调用失败时才会把异常信息保存到阻塞队列, 从而达到全部失败才返回异常的效果。

  5. 同步等待结果。由于步骤4中的步骤是在线程池中执行的,因此主线程还会继续往下执行,主线程中会使用阻塞队列的poll(“超时时间”)方法,同步等待阻塞队列中的第一个结果, 如果是正常结果则返回,如果是异常则抛出异常。

从上面的处理逻辑我们可以知道,Forking模式下的超时机制是通过在阻塞队列的poll方法中传入超时时间参数来实现的,线程池中的并发调用会获得第一个正常返回的处理结果。只有所有请求都失败了,Forking才会返回失败。

Forking调用流程如图


Dubbo集群容错 - 容错机制的实现

Dubbo集群容错 - 容错机制的实现
总结
以上就是所有普通容错策略的实现原理,在Cluster模块中它是一个非常重要的组件之一,类似的在这个模块非常重要的组件还包括,DirectoryRouterLoadBalance其实现原理,在下一篇中会深入探索,敬请期待。



喜欢此内容的人还喜欢

RocketMQ核心概念及架构
janker

Dubbo集群容错 - 容错机制的实现
Dubbo远程调用过程
janker

Dubbo集群容错 - 容错机制的实现
Mybatis源码分析
janker

Dubbo集群容错 - 容错机制的实现
Dubbo服务暴露的实现原理
janker

Dubbo集群容错 - 容错机制的实现
Dubbo集群容错 - 容错机制的实现
点分享
Dubbo集群容错 - 容错机制的实现
点收藏
Dubbo集群容错 - 容错机制的实现
点点赞
Dubbo集群容错 - 容错机制的实现
点在看








原文始发于微信公众号(爪哇干货分享):Dubbo集群容错 – 容错机制的实现

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/172081.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!