Dubbo 框架设计与源码解读(配置解析优先级、线程分配、负载均衡、容错方案)

导读:本篇文章讲解 Dubbo 框架设计与源码解读(配置解析优先级、线程分配、负载均衡、容错方案),希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com




  • 图中左边淡蓝背景的为服务消费⽅使⽤的接⼝,右边淡绿⾊背景的为服务提供⽅使⽤的接⼝,位于中轴线上的为双⽅都⽤到的接⼝。
  • 图中从下⾄上分为⼗层,各层均为单向依赖,右边的⿊⾊箭头代表层之间的依赖关系,每⼀层都可以剥离上层被复⽤,其中, Service 和 Config 层为 API,其它各层均为 SPI。
  • 图中绿⾊⼩块的为扩展接⼝,蓝⾊⼩块为实现类,图中只显示⽤于关联各层的实现类。
  • 图中蓝⾊虚线为初始化过程,即启动时组装链,红⾊实线为⽅法调⽤过程,即运⾏时调时链,紫⾊三⻆箭头为继承,可以把⼦类看作⽗类的同⼀个节点,线上的⽂字为调⽤的⽅法。


  1. 首先将请求体编码,包含请求唯一标识符、包含对象体(方法名、参数)
  2. 将请求对象打包扔给调度中心,如果既是Provider又是Consumer,则直接Invoker本地调用
  3. Proxy持有Invoker对象通过路由和负载情况,路由根据权重或hash等,根据路由规则找到服务提供者的ip,对ip发起请求,到达Provider
  4. Provider从请求中解析出协议头、序列化请求参数、解析对象、版本号、方法名称等
  5. 通过过滤器、上下文解析、限流、计数,然后反序列化成为一个对象
  6. 根据对象找到具体方法,找到具体线程池执行,然后原路返回


  • Service 接⼝服务层:该层与业务逻辑相关,根据 provider 和 consumer 的业务设计对应的接
  • config 配置层:对外配置接口,以ServiceConfig、ReferenceConfig为中心,可以直接初始化配置类,也可以通过spring解析配置生成配置类
  • proxy 服务代理层:服务接口透明代理,生成服务的客户端Stub和服务端Skeleton,以ServiceProxy为中心,扩展接口为ProxyFactory
  • registry 注册中心层:封装服务地址的注册与发现,以服务URL为中心,扩展接口为RegistryFactory、Registry、RegistryService
  • cluster 路由层:封装多个提供者的路由及负载均衡,并桥接注册中心,以Invoker为中心,扩展接口为Cluster、Directory、Router、LoadBalance
  • monitor 监控层:RPC调用次数和调用时间监控,以Statistics为中心,扩展接口为MonitorFactory、Monitor、MonitorService
  • protocol 远程调用层:封装RPC调用,以Invocation、Result为中心,扩展接口为Protocol、Invoker、Exporter
  • exchange 信息交换层:封装请求响应模式,同步转异步,以Request、Response为中心,扩展接口为Exchanger、ExchangeChannel、ExchangeClient、ExchangeServer
  • transport 网络传输层:抽象mina和netty为统一接口,以Message为中心,扩展接口为Channel、Transporter、Client、Server、Codec
  • serialize 数据序列化层:可复用的一些工具,扩展接口为Serialization、ObjectInput、ObjectOutput、ThreadPool


  • 在 RPC 中, Protocol 是核⼼层,也就是只要有 Protocol + Invoker + Exporter 就可以完成⾮透明的 RPC 调⽤,然后在 Invoker 的主过程上 Filter 拦截点。
  • 图中的 Consumer 和 Provider 是抽象概念,只是想让看图者更直观的了解哪些类分属于客户端与服务器端,不⽤ Client 和 Server 的原因是 Dubbo 在很多场景下都使⽤ Provider,
    Consumer, Registry, Monitor 划分逻辑拓普节点,保持统⼀概念。
  • ⽽ Cluster 是外围概念,所以 Cluster 的⽬的是将多个 Invoker 伪装成⼀个 Invoker,这样其它⼈只要关注 Protocol 层 Invoker 即可,加上 Cluster 或者去掉 Cluster 对其它层都不会造成影响,因为只有⼀个提供者时,是不需要 Cluster 的。
  • Proxy 层封装了所有接⼝的透明化代理,⽽在其它层都以 Invoker 为中⼼,只有到了暴露给⽤户使⽤时,才⽤ Proxy 将 Invoker 转成接⼝,或将接⼝实现转成 Invoker,也就是去掉 Proxy层 RPC 是可以 Run 的,只是不那么透明,不那么看起来像调本地服务⼀样调远程服务。
  • ⽽ Remoting 实现是 Dubbo 协议的实现,如果你选择 RMI 协议,整个 Remoting 都不会⽤上, Remoting 内部再划为 Transport 传输层和 Exchange 信息交换层, Transport 层只负责单向消息传输,是对 Mina, Netty, Grizzly 的抽象,它也可以扩展 UDP 传输,⽽ Exchange 层是在传输层之上封装了 Request-Response 语义。
  • Registry 和 Monitor 实际上不算⼀层,⽽是⼀个独⽴的节点,只是为了全局概览,⽤层的⽅式画在⼀起。



  • ⽅法级优先,接⼝级次之,全局配置再次之。
  • 如果级别⼀样,则消费⽅优先,提供⽅次之。



    private static RootBeanDefinition parse(Element element, ParserContext parserContext, Class<?> beanClass, boolean required) {
        RootBeanDefinition beanDefinition = new RootBeanDefinition();
        String id = resolveAttribute(element, "id", parserContext);
        if (StringUtils.isEmpty(id) && required) {
            //取name属性值作为bean id
            String generatedBeanName = resolveAttribute(element, "name", parserContext);
            if (StringUtils.isEmpty(generatedBeanName)) {
                //如果protocol标签没有指定name,则默认用“dubbo”做bean id
                if (ProtocolConfig.class.equals(beanClass)) {
                    generatedBeanName = "dubbo";
                } else {
                    //其他标签没有指定name属性,则默认使用interface属性作为bean id
                    generatedBeanName = resolveAttribute(element, "interface", parserContext);
            if (StringUtils.isEmpty(generatedBeanName)) {
                generatedBeanName = beanClass.getName();
            id = generatedBeanName;
            int counter = 2;
            //检查重复bean,如果有重复,则生成唯一bean id
            while (parserContext.getRegistry().containsBeanDefinition(id)) {
                id = generatedBeanName + (counter++);
        if (StringUtils.isNotEmpty(id)) {
            if (parserContext.getRegistry().containsBeanDefinition(id)) {
                throw new IllegalStateException("Duplicate spring bean id " + id);
            parserContext.getRegistry().registerBeanDefinition(id, beanDefinition);
            beanDefinition.getPropertyValues().addPropertyValue("id", id);

        // 解析协议
        if (ProtocolConfig.class.equals(beanClass)) {
            for (String name : parserContext.getRegistry().getBeanDefinitionNames()) {
                BeanDefinition definition = parserContext.getRegistry().getBeanDefinition(name);
                PropertyValue property = definition.getPropertyValues().getPropertyValue("protocol");
                if (property != null) {
                    Object value = property.getValue();
                    if (value instanceof ProtocolConfig && id.equals(((ProtocolConfig) value).getName())) {
                        definition.getPropertyValues().addPropertyValue("protocol", new RuntimeBeanReference(id));

        // 解析Service
        } else if (ServiceBean.class.equals(beanClass)) {
            String className = resolveAttribute(element, "class", parserContext);
            if (StringUtils.isNotEmpty(className)) {
                RootBeanDefinition classDefinition = new RootBeanDefinition();
                parseProperties(element.getChildNodes(), classDefinition, parserContext);
                beanDefinition.getPropertyValues().addPropertyValue("ref", new BeanDefinitionHolder(classDefinition, id + "Impl"));
        // 解析provider
        } else if (ProviderConfig.class.equals(beanClass)) {
            parseNested(element, parserContext, ServiceBean.class, true, "service", "provider", id, beanDefinition);
        } else if (ConsumerConfig.class.equals(beanClass)) {
            parseNested(element, parserContext, ReferenceBean.class, false, "reference", "consumer", id, beanDefinition);
        Set<String> props = new HashSet<>();
        ManagedMap parameters = null;

        for (Method setter : beanClass.getMethods()) {
            String name = setter.getName();
            if (name.length() > 3 && name.startsWith("set")
                    && Modifier.isPublic(setter.getModifiers())
                    && setter.getParameterTypes().length == 1) {
                Class<?> type = setter.getParameterTypes()[0];
                String beanProperty = name.substring(3, 4).toLowerCase() + name.substring(4);
                String property = StringUtils.camelToSplitName(beanProperty, "-");
                // check the setter/getter whether match
                Method getter = null;
                try {
                    getter = beanClass.getMethod("get" + name.substring(3), new Class<?>[0]);
                } catch (NoSuchMethodException e) {
                    try {
                        getter = beanClass.getMethod("is" + name.substring(3), new Class<?>[0]);
                    } catch (NoSuchMethodException e2) {
                        // ignore, there is no need any log here since some class implement the interface: EnvironmentAware,
                        // ApplicationAware, etc. They only have setter method, otherwise will cause the error log during application start up.
                if (getter == null
                        || !Modifier.isPublic(getter.getModifiers())
                        || !type.equals(getter.getReturnType())) {
                if ("parameters".equals(property)) {
                    parameters = parseParameters(element.getChildNodes(), beanDefinition, parserContext);
                } else if ("methods".equals(property)) {
                    parseMethods(id, element.getChildNodes(), beanDefinition, parserContext);
                } else if ("arguments".equals(property)) {
                    parseArguments(id, element.getChildNodes(), beanDefinition, parserContext);
                } else {
                    String value = resolveAttribute(element, property, parserContext);
                    if (value != null) {
                        value = value.trim();
                        if (value.length() > 0) {
                            if ("registry".equals(property) && RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(value)) {
                                RegistryConfig registryConfig = new RegistryConfig();
                                beanDefinition.getPropertyValues().addPropertyValue(beanProperty, registryConfig);
                            } else if ("provider".equals(property) || "registry".equals(property) || ("protocol".equals(property) && AbstractServiceConfig.class.isAssignableFrom(beanClass))) {
                           beanDefinition.getPropertyValues().addPropertyValue(beanProperty + "Ids", value);
                            } else {
                                Object reference;
                                if (isPrimitive(type)) {
                                    if ("async".equals(property) && "false".equals(value)
                                            || "timeout".equals(property) && "0".equals(value)
                                            || "delay".equals(property) && "0".equals(value)
                                            || "version".equals(property) && "0.0.0".equals(value)
                                            || "stat".equals(property) && "-1".equals(value)
                                            || "reliable".equals(property) && "false".equals(value)) {
                                        // backward compatibility for the default value in old version's xsd
                                        value = null;
                                    reference = value;
                                } else if (ONRETURN.equals(property) || ONTHROW.equals(property) || ONINVOKE.equals(property)) {
                                    int index = value.lastIndexOf(".");
                                    String ref = value.substring(0, index);
                                    String method = value.substring(index + 1);
                                    reference = new RuntimeBeanReference(ref);
                                    beanDefinition.getPropertyValues().addPropertyValue(property + METHOD, method);
                                } else {
                                    if ("ref".equals(property) && parserContext.getRegistry().containsBeanDefinition(value)) {
                                        BeanDefinition refBean = parserContext.getRegistry().getBeanDefinition(value);
                                        if (!refBean.isSingleton()) {
                                            throw new IllegalStateException("The exported service ref " + value + " must be singleton! Please set the " + value + " bean scope to singleton, eg: <bean id=\"" + value + "\" scope=\"singleton\" ...>");
                                    reference = new RuntimeBeanReference(value);
                                beanDefinition.getPropertyValues().addPropertyValue(beanProperty, reference);
		// 当做parameters参数存储,它是一Map对象
        NamedNodeMap attributes = element.getAttributes();
        int len = attributes.getLength();
        for (int i = 0; i < len; i++) {
            Node node = attributes.item(i);
            String name = node.getLocalName();
            if (!props.contains(name)) {
                if (parameters == null) {
                    parameters = new ManagedMap();
                String value = node.getNodeValue();
                parameters.put(name, new TypedStringValue(value, String.class));
        if (parameters != null) {
            beanDefinition.getPropertyValues().addPropertyValue("parameters", parameters);
        return beanDefinition;



  1. dubbo provider初始化过程,包括配置的检查、待暴露的url封装
  2. dubbo provider的暴露过程,包括将provider的相关信息注册到注册中心、开启netty服务等待consumer的连接等
  3. dubbo provider接受到来自consumer的请求之后处理和应答的过程



  1. dubbo consumer的初始化过程,包括配置的校验、待注册的url封装、 consumer信息注册到注册中⼼等
  2. dubbo consumer接受到来⾃注册中⼼的通知后创建invoker关联provider的过程
  3. dubbo consumer的调⽤过程,包括如何选择invoker、调⽤invoker和接收provider响应的过程





<dubbo:protocol name="dubbo" port="20880" threadpool="fixed" threads="200"
iothreads="8" accepts="0" queues="100" dispatcher="all"/>


  • all(默认值):所有消息都派发到线程池,包括请求、响应、连接事件、断开事件、心跳等,worker就会将事件提交到线程池中,自己再去处理其它事情
  • direct没有用到线程池,统一由IO线程处理。worker线程接收到事件后,将invoker执行到底
  • message:顾名思义,只有将请求响应消息派发到线程池,其它连接断开事件、心跳等事件在IO线程上执行
  • execution:只有请求消息派发到线程池,不含响应和其它连接断开事件、心跳,它们直接在IO线程上执行
  • connection:在IO线程上,将连接、断开事件放入队列,有序逐个执行,其它消息派发到线程池


  1. 客户端的主线程发出⼀个请求后获得future,在执⾏get时进⾏阻塞等待
  2. 服务端使⽤worker线程(netty通信模型)接收到请求后,将请求提交到server线程池中进⾏
  3. server线程处理完成之后,将相应结果返回给客户端的worker线程池(netty通信模型),最
    后, worker线程将响应结果提交到client线程池进⾏处理
  4. client线程将响应结果填充到future中,然后唤醒等待的主线程,主线程获取结果,返回给客




public abstract class AbstractLoadBalance implements LoadBalance {
     * 重新计算权重值的方法
     * @param uptime 服务运行时长
     * @param warmup 预热时长
     * @param weight 权重
     * @return 重新计算后的权重值为 【1~设置的权重值】,运行时间越长,计算出的权重值越接近设置的权重值
    static int calculateWarmupWeight(int uptime, int warmup, int weight) {

        // 计算权重:服务运行时长/(预热时长/设置的权重值),等价于 (服务运行时长/预热时长)*设置的权重值。
        // 随着服务运行时间 uptime 增大,权重计算值 ww 会慢慢接近配置值 weight
        int ww = (int) ( uptime / ((float) warmup / weight));
        return ww < 1 ? 1 : (Math.min(ww, weight));

    public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        if (CollectionUtils.isEmpty(invokers)) {
            return null;

        // 只有一个直接返回
        if (invokers.size() == 1) {
            return invokers.get(0);

        // 否则子类实现负载均衡算法
        return doSelect(invokers, url, invocation);

    protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);

     * 获取权重
    int getWeight(Invoker<?> invoker, Invocation invocation) {
        int weight;
        URL url = invoker.getUrl();
        // Multiple registry scenario, load balance among multiple registries.

        // 当有多个注册地址时,根据注册地址key 获取对应服务权重
        if (REGISTRY_SERVICE_REFERENCE_PATH.equals(url.getServiceInterface())) {
            weight = url.getParameter(REGISTRY_KEY + "." + WEIGHT_KEY, DEFAULT_WEIGHT);
        } else {
            // 从url获取权重,默认100
            weight = url.getMethodParameter(invocation.getMethodName(), WEIGHT_KEY, DEFAULT_WEIGHT);
            if (weight > 0) {
                // 以下过程主要时判断机器的启动时间和预热时间的差值(默认为10min),
                // 为什么要预热? 因为服务在启动之后JVM会对代码有一个优化的过程,预热保证了调用的体验,谨防由此引发的调用超时问题。

                // 获取启动时间
                long timestamp = invoker.getUrl().getParameter(TIMESTAMP_KEY, 0L);
                if (timestamp > 0L) {
                    // 获取运行时间
                    long uptime = System.currentTimeMillis() - timestamp;
                    if (uptime < 0) {
                        return 1;
                    // 获取服务预热时间,默认为10分钟
                    int warmup = invoker.getUrl().getParameter(WARMUP_KEY, DEFAULT_WARMUP);
                    if (uptime > 0 && uptime < warmup) {
                        // 重新计算服务权重
                        weight = calculateWarmupWeight((int)uptime, warmup, weight);

        // 主要用于保证当服务运行时长小于服务预热时间时,对服务进行降权,
        // 避免让服务在启动之初就处于高负载状态。
        // 服务预热是一个优化手段,与此类似的还有 JVM 预热。
        // 主要目的是让服务启动后“低功率”运行一段时间,使其效率慢慢提升至最佳状态

        return Math.max(weight, 0);



RandomLoadBalance 随机选择


  1. 将所有Invoker的权重求和
  2. 随机数就在总权重值的范围内生成


public class RandomLoadBalance extends AbstractLoadBalance {

    public static final String NAME = "random";

     * 使用随机条件在列表中选择一个invoker
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        // Number of invokers
        int length = invokers.size();
        // Every invoker has the same weight?
        boolean sameWeight = true;
        // the maxWeight of every invokers, the minWeight = 0 or the maxWeight of the last invoker
        int[] weights = new int[length];
        // The sum of weights
        int totalWeight = 0;
        for (int i = 0; i < length; i++) {
            int weight = getWeight(invokers.get(i), invocation);
            // Sum
            totalWeight += weight;
            // save for later use
            weights[i] = totalWeight;
            if (sameWeight && totalWeight != weight * (i + 1)) {
                sameWeight = false;
        if (totalWeight > 0 && !sameWeight) {
            // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
            int offset = ThreadLocalRandom.current().nextInt(totalWeight);
            // Return a invoker based on the random value.
            // 如果服务权重不相同, 根据权重和求出一个随机数,然后计算看该数落在哪个区间段
            for (int i = 0; i < length; i++) {
                if (offset < weights[i]) {
                    return invokers.get(i);
        // If all invokers have the same weight value or totalWeight=0, return evenly.
        // 如果权重相同或总权重为0,随机选一个,多线程产生随机数
        return invokers.get(ThreadLocalRandom.current().nextInt(length));



  1. 根据每个Invoker类加总权重,每一个Invoker占用一段权重区间
  2. 在totalWeight进行随机数,获取随机数所在区间的Invoker

LeastActiveLoadBalance 最少活跃数算法



    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        // Number of invokers
        int length = invokers.size();
        // The least active value of all invokers
        // 最小的活跃数
        int leastActive = -1;
        // The number of invokers having the same least active value (leastActive)
        // 具有相同“最小活跃数”的服务者提供者(以下用 Invoker 代称)数量
        int leastCount = 0;
        // The index of invokers having the same least active value (leastActive)
        // leastIndexs 用于记录具有相同“最小活跃数”的 Invoker 在 invokers 列表中的下标信息
        int[] leastIndexes = new int[length];
        // the weight of every invokers
        int[] weights = new int[length];
        // The sum of the warmup weights of all the least active invokers
        int totalWeight = 0;
        // The weight of the first least active invoker
        // 第一个最小活跃数的 Invoker 权重值,用于与其他具有相同最小活跃数的 Invoker 的权重进行对比,
        // 以检测是否“所有具有相同最小活跃数的 Invoker 的权重”均相等
        int firstWeight = 0;
        // Every least active invoker has the same weight value?
        boolean sameWeight = true;

        // 遍历 invokers 列表
        // Filter out all the least active invokers
        for (int i = 0; i < length; i++) {
            Invoker<T> invoker = invokers.get(i);
            // Get the active number of the invoker
            // 获取 Invoker 对应的活跃数
            int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
            // Get the weight of the invoker's configuration. The default value is 100.
            int afterWarmup = getWeight(invoker, invocation);
            // save for later use
            // 获取权重
            weights[i] = afterWarmup;
            // If it is the first invoker or the active number of the invoker is less than the current least active number
            // 发现更小的活跃数,重新开始
            if (leastActive == -1 || active < leastActive) {
                // Reset the active number of the current invoker to the least active number
                // 使用当前活跃数 active 更新最小活跃数 leastActive
                leastActive = active;
                // Reset the number of least active invokers
                // 更新 leastCount 为 1
                leastCount = 1;
                // Put the first least active invoker first in leastIndexes
                // 记录当前下标值到 leastIndexs 中
                leastIndexes[0] = i;
                // Reset totalWeight
                totalWeight = afterWarmup;
                // Record the weight the first least active invoker
                firstWeight = afterWarmup;
                // Each invoke has the same weight (only one invoker here)
                sameWeight = true;
                // If current invoker's active value equals with leaseActive, then accumulating.

             // 当前 Invoker 的活跃数 active 与最小活跃数 leastActive 相同
            } else if (active == leastActive) {
                // Record the index of the least active invoker in leastIndexes order
                // 在 leastIndexs 中记录下当前 Invoker 在 invokers 集合中的下标
                leastIndexes[leastCount++] = i;
                // Accumulate the total weight of the least active invoker
                // 累加权重
                totalWeight += afterWarmup;
                // If every invoker has the same weight?
                // 检测当前 Invoker 的权重与 firstWeight 是否相等,
                // 不相等则将 sameWeight 置为 false
                if (sameWeight && afterWarmup != firstWeight) {
                    sameWeight = false;

        // 当只有一个 Invoker 具有最小活跃数,此时直接返回该 Invoker 即可
        // Choose an invoker from all the least active invokers
        if (leastCount == 1) {
            // If we got exactly one invoker having the least active value, return this invoker directly.
            return invokers.get(leastIndexes[0]);

        // 有多个 Invoker 具有相同的最小活跃数,但它们之间的权重不同
        if (!sameWeight && totalWeight > 0) {
            // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on 
            // totalWeight.
            // 随机生成一个 [0, totalWeight) 之间的数字
            int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
            // Return a invoker based on the random value.
            // 循环让随机数减去具有最小活跃数的 Invoker 的权重值,
            // 当 offset 小于等于0时,返回相应的 Invoker
            for (int i = 0; i < leastCount; i++) {
                int leastIndex = leastIndexes[i];
                // 获取权重值,并让随机数减去权重值
                offsetWeight -= weights[leastIndex];
                if (offsetWeight < 0) {
                    return invokers.get(leastIndex);

        // If all invokers have the same weight value or totalWeight=0, return evenly.
        // 如果权重相同或权重为0时,随机返回一个 Invoker
        return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);


  • 遍历 invokers 列表,寻找活跃数最小的 Invoker
  • 如果有多个 Invoker 具有相同的最小活跃数,此时记录下这些 Invoker 在 invokers 集合中的下标,并累加它们的权重,比较它们的权重值是否相等(类似于加权随机算法策略)
  • 如果只有一个 Invoker 具有最小的活跃数,此时直接返回该 Invoker 即可
  • 如果有多个 Invoker 具有最小活跃数,且它们的权重不相等,此时处理方式和 RandomLoadBalance 一致
  • 如果有多个 Invoker 具有最小活跃数,但它们的权重相等,此时随机返回一个即可

RoundRobinLoadBalance 加权轮循算法,根据权重设置轮训比例



 * Round robin load balance.
public class RoundRobinLoadBalance extends AbstractLoadBalance {
    public static final String NAME = "roundrobin";

    private static final int RECYCLE_PERIOD = 60000;

    protected static class WeightedRoundRobin {
        private int weight;
        private AtomicLong current = new AtomicLong(0);
        private long lastUpdate;

        public int getWeight() {
            return weight;

        public void setWeight(int weight) {
            this.weight = weight;

        public long increaseCurrent() {
            return current.addAndGet(weight);

        public void sel(int total) {
            current.addAndGet(-1 * total);

        public long getLastUpdate() {
            return lastUpdate;

        public void setLastUpdate(long lastUpdate) {
            this.lastUpdate = lastUpdate;

    private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>();

     * get invoker addr list cached for specified invocation
     * <p>
     * <b>for unit test only</b>
     * @param invokers
     * @param invocation
     * @return
    protected <T> Collection<String> getInvokerAddrList(List<Invoker<T>> invokers, Invocation invocation) {
        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
        Map<String, WeightedRoundRobin> map = methodWeightMap.get(key);
        if (map != null) {
            return map.keySet();
        return null;

    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        // key 为 接口名+方法名
        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
        // 查看缓存中是否存在相应服务接口的信息,如果没有则新添加一个元素到缓存中
        ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.computeIfAbsent(key, k -> new ConcurrentHashMap<>());
        // 总权重
        int totalWeight = 0;
        long maxCurrent = Long.MIN_VALUE;
        // 当前时间戳
        long now = System.currentTimeMillis();
        // 最大 current 的 Invoker
        Invoker<T> selectedInvoker = null;
        // 保存选中的 WeightedRoundRobin 对象
        WeightedRoundRobin selectedWRR = null;

        // 遍历 Invokers 列表
        for (Invoker<T> invoker : invokers) {
            // 从缓存中获取 WeightedRoundRobin 对象
            String identifyString = invoker.getUrl().toIdentityString();
            // 获取权重
            int weight = getWeight(invoker, invocation);
            // 获取当前 Invoker 对象
            WeightedRoundRobin weightedRoundRobin = map.computeIfAbsent(identifyString, k -> {
                WeightedRoundRobin wrr = new WeightedRoundRobin();
                return wrr;

            // 如果当前 Invoker 权重不等于对应的 WeightedRoundRobin 对象中的权重,则重新设置当前权重到对应的 WeightedRoundRobin 对象中
            if (weight != weightedRoundRobin.getWeight()) {
                // weight changed
            // 累加权重到 current 中
            long cur = weightedRoundRobin.increaseCurrent();
            // 设置 weightedRoundRobin 对象最后更新时间
            // 最大 current 的 Invoker,并赋值给相应的变量
            if (cur > maxCurrent) {
                maxCurrent = cur;
                selectedInvoker = invoker;
                selectedWRR = weightedRoundRobin;
            // 累加权重到总权重中
            totalWeight += weight;
        // 如果 Invokers 列表中的数量不等于缓存map中的数量
        if (invokers.size() != map.size()) {
            // 清理最后更新时间超过60s的item
            map.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD);
        // 如果存在被选中的 Invoker
        if (selectedInvoker != null) {
            // 计算 current = current - totalWeight
            return selectedInvoker;
        // 正常情况这里不会到达
        return invokers.get(0);



  1. 每个Invoker都有一个current值,初值为自身权重,每个Invoker中 current=current+weight
  2. 每一次遍历所有Invoker后,current值最大的就是本次选中的Invoker
  3. 被选中的Invoker会更新current值:current=current-totalWeight

ConsistentHashLoadBalance Hash 一致性算法

一致性 Hash 负载均衡策略是让参数相同的请求分配到同一机器上。把每个服务节点分布在一个环上,请求也分布在环形中。以请求在环上的位置,顺时针寻找换上第一个服务节点。如图所示:
同时,为避免请求散列不均匀,dubbo 中会将每个 Invoker 再虚拟多个节点出来,使得请求调用更加均匀。


 * ConsistentHashLoadBalance
 * ConsistentHashLoadBalance不关系权重
public class ConsistentHashLoadBalance extends AbstractLoadBalance {
    public static final String NAME = "consistenthash";

     * Hash nodes name
    public static final String HASH_NODES = "hash.nodes";

     * Hash arguments name
    public static final String HASH_ARGUMENTS = "hash.arguments";

    private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();

    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        String methodName = RpcUtils.getMethodName(invocation);
        String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;
        // 获取 invokers 原始的 hashcode
        int invokersHashCode = invokers.hashCode();
        ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
        // 如果 invokers 是⼀个新的 List 对象,意味着服务提供者数量发⽣了变化,可能新增也可能减少了。
        // 此时 selector.identityHashCode != identityHashCode 条件成⽴
        if (selector == null || selector.identityHashCode != invokersHashCode) {
            // 创建新的 ConsistentHashSelector
            selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, invokersHashCode));
            selector = (ConsistentHashSelector<T>) selectors.get(key);
        // 调⽤ ConsistentHashSelector 的 select ⽅法选择 Invoker
        return selector.select(invocation);

     * 一致性hash选择器
     * @param <T>
    private static final class ConsistentHashSelector<T> {

        // 使⽤ TreeMap 存储 Invoker 虚拟节点
        private final TreeMap<Long, Invoker<T>> virtualInvokers;

        private final int replicaNumber;

        private final int identityHashCode;

        private final int[] argumentIndex;

        ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
            this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
            this.identityHashCode = identityHashCode;
            URL url = invokers.get(0).getUrl();
            // 获取虚拟节点数,默认为160
            this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160);
            // 获取参与 hash 计算的参数下标值,默认对第⼀个参数进⾏ hash 运算
            String[] index = COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0"));
            argumentIndex = new int[index.length];
            for (int i = 0; i < index.length; i++) {
                argumentIndex[i] = Integer.parseInt(index[i]);
            for (Invoker<T> invoker : invokers) {
                String address = invoker.getUrl().getAddress();
                for (int i = 0; i < replicaNumber / 4; i++) {
                    // 对 address + i 进⾏ md5 运算,得到⼀个⻓度为16的字节数组
                    byte[] digest = Bytes.getMD5(address + i);
                    // 对 digest 部分字节进⾏4次 hash 运算,得到四个不同的 long 型正整数
                    for (int h = 0; h < 4; h++) {
                        // h = 0 时,取 digest 中下标为 0 ~ 3 的4个字节进⾏位运算
                        // h = 1 时,取 digest 中下标为 4 ~ 7 的4个字节进⾏位运算
                        // h = 2, h = 3 时过程同上
                        long m = hash(digest, h);
                        // 将 hash 到 invoker 的映射关系存储到 virtualInvokers中
                        // virtualInvokers 需要提供⾼效的查询操作,因此选⽤ TreeMap作为存储结构
                        virtualInvokers.put(m, invoker);

        public Invoker<T> select(Invocation invocation) {
            // 将参数转为 key
            String key = toKey(invocation.getArguments());
            // 对参数 key 进⾏ md5 运算
            byte[] digest = Bytes.getMD5(key);
            // 取 digest 数组的前四个字节进⾏ hash 运算,再将 hash 值传给 selectForKey ⽅法,
            // 寻找合适的 Invoker
            return selectForKey(hash(digest, 0));

        private String toKey(Object[] args) {
            StringBuilder buf = new StringBuilder();
            for (int i : argumentIndex) {
                if (i >= 0 && i < args.length) {
            return buf.toString();

        private Invoker<T> selectForKey(long hash) {
            // 到 TreeMap 中查找第⼀个节点值⼤于或等于当前 hash 的 Invoker
            Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash);
            // 如果 hash ⼤于 Invoker 在圆环上最⼤的位置,此时 entry = null,
            // 需要将 TreeMap 的头节点赋值给 entry
            if (entry == null) {
                entry = virtualInvokers.firstEntry();
            // 返回 Invoker
            return entry.getValue();

        private long hash(byte[] digest, int number) {
            return (((long) (digest[3 + number * 4] & 0xFF) << 24)
                    | ((long) (digest[2 + number * 4] & 0xFF) << 16)
                    | ((long) (digest[1 + number * 4] & 0xFF) << 8)
                    | (digest[number * 4] & 0xFF))
                    & 0xFFFFFFFFL;



  1. 一致 hash 实现过程就是先创建好虚拟节点,虚拟节点保存在 TreeMap 中。
  2. TreeMap 的 key 为配置的参数先进行 md5 运算,然后将 md5 值进行 hash 运算。TreeMap 的 value 为被选中的 Invoker。
  3. 最后请求时,计算参数的 hash 值,去从 TreeMap 中获取 Invoker。

ShortestResponseLoadBalance 最短响应时间算法


public class ShortestResponseLoadBalance extends AbstractLoadBalance {

    public static final String NAME = "shortestresponse";

    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        // invokers数量
        int length = invokers.size();
        // 默认的“最短请求时间”
        long shortestResponse = Long.MAX_VALUE;
        // 具有相同“最短请求时间”的invoker数量
        int shortestCount = 0;
        // 具有相同估计最短响应时间的invoker索引
        int[] shortestIndexes = new int[length];
        // 每个invoker的权重数组
        int[] weights = new int[length];
        // 所有最短响应请求者的预热权值之和
        int totalWeight = 0;
        // 第一个最短响应调用者的权重
        int firstWeight = 0;
        // 每个最短响应调用者都有相同的权值?
        boolean sameWeight = true;

        // 过滤掉所有最短响应请求者
        for (int i = 0; i < length; i++) {
            Invoker<T> invoker = invokers.get(i);
            RpcStatus rpcStatus = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
            // 根据活动连接的乘积和成功的平均运行时间计算估计的响应时间
            // 调用成功的请求数总数对应的总耗时 / 调用成功的请求数总数 = 成功调用的平均时间
            long succeededAverageElapsed = rpcStatus.getSucceededAverageElapsed();
            int active = rpcStatus.getActive();
            long estimateResponse = succeededAverageElapsed * active;

            // 获取预热后的权重
            int afterWarmup = getWeight(invoker, invocation);
            weights[i] = afterWarmup;
            // 如果出现更短响应的invoker,则记录更短的响应时间和当前invoker的下标
            if (estimateResponse < shortestResponse) {
                shortestResponse = estimateResponse;
                shortestCount = 1;
                shortestIndexes[0] = i;
                totalWeight = afterWarmup;
                firstWeight = afterWarmup;
                sameWeight = true;

            // 如果出现时间一样长的invoker,则更新shortestCount并记录下标到shortestIndexes
            else if (estimateResponse == shortestResponse) {
                shortestIndexes[shortestCount++] = i;
                totalWeight += afterWarmup;// 计算总权重
                // 判断权重是否相等
                if (sameWeight && i > 0
                        && afterWarmup != firstWeight) {
                    sameWeight = false;
        // 如果最短响应时间invoker只有一个则直接返回
        if (shortestCount == 1) {
            return invokers.get(shortestIndexes[0]);
        // 和加权随机负载均衡一样
        if (!sameWeight && totalWeight > 0) {
            int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
            for (int i = 0; i < shortestCount; i++) {
                int shortestIndex = shortestIndexes[i];
                offsetWeight -= weights[shortestIndex];
                if (offsetWeight < 0) {
                    return invokers.get(shortestIndex);
        // 在多个invoker中岁间选择一个返回
        return invokers.get(shortestIndexes[ThreadLocalRandom.current().nextInt(shortestCount)]);


  1. 找到该服务所有提供者中响应时间最短的⼀个或多个服务,并计算其权重和
  2. 如果响应时间最短的服务提供者只有⼀个,则直接返回给服务
  3. 如果响应时间最短的服务提供者⼤于1个,则分为以下两种情况
    • 如果所有服务权重值不同,则按RandomLoadBalance(2)过程选出服务提供者
    • 如果所有服务权重相同,则随机返回⼀个


名称 说明
RandomLoadBalance 随机算法,根据权重设置随机的概率
LeastActiveLoadBalance 最少活跃数算法,响应越快的服务器堆积的请求越少,对应的活跃数也少
RoundRobinLoadBalance 加权轮循算法,轮询遇到加权则可以让请求(不论多少)严格按照我们的权重之比进行分配
ConsistentHashLoadBalance Hash 一致性算法,相同请求参数分配到相同提供者
ShortestResponseLoadBalance 最短响应时间算法,在所有服务中选出平均响应时间最短的一个

Dubbo 容错方案


  • Failover Cluster
    失败⾃动切换,当出现失败,重试其它服务器。 (缺省)

  • Failfast Cluster

  • Failsafe Cluster

  • Failback Cluster

  • Forking Cluster

  • Broadcast Cluster
    ⼴播调⽤所有提供者,逐个调⽤,任意⼀台报错则报错。 (2.1.0开始⽀持)


从dubbo的框架入手,介绍了dubbo 10层架构作用与之间的调用关系,并对dubbo重要的功能实现进行源码解析


版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。



