整体框架设计
图例说明:
- 图中左边淡蓝背景的为服务消费⽅使⽤的接⼝,右边淡绿⾊背景的为服务提供⽅使⽤的接⼝,位于中轴线上的为双⽅都⽤到的接⼝。
- 图中从下⾄上分为⼗层,各层均为单向依赖,右边的⿊⾊箭头代表层之间的依赖关系,每⼀层都可以剥离上层被复⽤,其中, Service 和 Config 层为 API,其它各层均为 SPI。
- 图中绿⾊⼩块的为扩展接⼝,蓝⾊⼩块为实现类,图中只显示⽤于关联各层的实现类。
- 图中蓝⾊虚线为初始化过程,即启动时组装链,红⾊实线为⽅法调⽤过程,即运⾏时调时链,紫⾊三⻆箭头为继承,可以把⼦类看作⽗类的同⼀个节点,线上的⽂字为调⽤的⽅法。
基本过程:
比如发起一个请求如何找到服务提供者?
- 首先将请求体编码,包含请求唯一标识符、包含对象体(方法名、参数)
- 将请求对象打包扔给调度中心,如果既是Provider又是Consumer,则直接Invoker本地调用
- Proxy持有Invoker对象通过路由和负载情况,路由根据权重或hash等,根据路由规则找到服务提供者的ip,对ip发起请求,到达Provider
- Provider从请求中解析出协议头、序列化请求参数、解析对象、版本号、方法名称等
- 通过过滤器、上下文解析、限流、计数,然后反序列化成为一个对象
- 根据对象找到具体方法,找到具体线程池执行,然后原路返回
各层说明
- Service 接⼝服务层:该层与业务逻辑相关,根据 provider 和 consumer 的业务设计对应的接
⼝和实现 - config 配置层:对外配置接口,以ServiceConfig、ReferenceConfig为中心,可以直接初始化配置类,也可以通过spring解析配置生成配置类
- proxy 服务代理层:服务接口透明代理,生成服务的客户端Stub和服务端Skeleton,以ServiceProxy为中心,扩展接口为ProxyFactory
- registry 注册中心层:封装服务地址的注册与发现,以服务URL为中心,扩展接口为RegistryFactory、Registry、RegistryService
- cluster 路由层:封装多个提供者的路由及负载均衡,并桥接注册中心,以Invoker为中心,扩展接口为Cluster、Directory、Router、LoadBalance
- monitor 监控层:RPC调用次数和调用时间监控,以Statistics为中心,扩展接口为MonitorFactory、Monitor、MonitorService
- protocol 远程调用层:封装RPC调用,以Invocation、Result为中心,扩展接口为Protocol、Invoker、Exporter
- exchange 信息交换层:封装请求响应模式,同步转异步,以Request、Response为中心,扩展接口为Exchanger、ExchangeChannel、ExchangeClient、ExchangeServer
- transport 网络传输层:抽象mina和netty为统一接口,以Message为中心,扩展接口为Channel、Transporter、Client、Server、Codec
- serialize 数据序列化层:可复用的一些工具,扩展接口为Serialization、ObjectInput、ObjectOutput、ThreadPool
关系说明
- 在 RPC 中, Protocol 是核⼼层,也就是只要有 Protocol + Invoker + Exporter 就可以完成⾮透明的 RPC 调⽤,然后在 Invoker 的主过程上 Filter 拦截点。
- 图中的 Consumer 和 Provider 是抽象概念,只是想让看图者更直观的了解哪些类分属于客户端与服务器端,不⽤ Client 和 Server 的原因是 Dubbo 在很多场景下都使⽤ Provider,
Consumer, Registry, Monitor 划分逻辑拓普节点,保持统⼀概念。 - ⽽ Cluster 是外围概念,所以 Cluster 的⽬的是将多个 Invoker 伪装成⼀个 Invoker,这样其它⼈只要关注 Protocol 层 Invoker 即可,加上 Cluster 或者去掉 Cluster 对其它层都不会造成影响,因为只有⼀个提供者时,是不需要 Cluster 的。
- Proxy 层封装了所有接⼝的透明化代理,⽽在其它层都以 Invoker 为中⼼,只有到了暴露给⽤户使⽤时,才⽤ Proxy 将 Invoker 转成接⼝,或将接⼝实现转成 Invoker,也就是去掉 Proxy层 RPC 是可以 Run 的,只是不那么透明,不那么看起来像调本地服务⼀样调远程服务。
- ⽽ Remoting 实现是 Dubbo 协议的实现,如果你选择 RMI 协议,整个 Remoting 都不会⽤上, Remoting 内部再划为 Transport 传输层和 Exchange 信息交换层, Transport 层只负责单向消息传输,是对 Mina, Netty, Grizzly 的抽象,它也可以扩展 UDP 传输,⽽ Exchange 层是在传输层之上封装了 Request-Response 语义。
- Registry 和 Monitor 实际上不算⼀层,⽽是⼀个独⽴的节点,只是为了全局概览,⽤层的⽅式画在⼀起。
配置解析过程分析
以timeout属性为例,⽤官⽹的⼀张图展示dubbo标签的优先级:
总的原则就是:
- ⽅法级优先,接⼝级次之,全局配置再次之。
- 如果级别⼀样,则消费⽅优先,提供⽅次之。
配置解析源码
在配置解析时序图中可以看到,主要是在DubboBeanDefinitionParser进行配置解析
private static RootBeanDefinition parse(Element element, ParserContext parserContext, Class<?> beanClass, boolean required) {
//生成spring的bean定义,指定beanClass交给spring反射创建实例
RootBeanDefinition beanDefinition = new RootBeanDefinition();
beanDefinition.setBeanClass(beanClass);
beanDefinition.setLazyInit(false);
String id = resolveAttribute(element, "id", parserContext);
//确保spring容器没有重复的bean
if (StringUtils.isEmpty(id) && required) {
//取name属性值作为bean id
String generatedBeanName = resolveAttribute(element, "name", parserContext);
if (StringUtils.isEmpty(generatedBeanName)) {
//如果protocol标签没有指定name,则默认用“dubbo”做bean id
if (ProtocolConfig.class.equals(beanClass)) {
generatedBeanName = "dubbo";
} else {
//其他标签没有指定name属性,则默认使用interface属性作为bean id
generatedBeanName = resolveAttribute(element, "interface", parserContext);
}
}
if (StringUtils.isEmpty(generatedBeanName)) {
generatedBeanName = beanClass.getName();
}
id = generatedBeanName;
int counter = 2;
//检查重复bean,如果有重复,则生成唯一bean id
while (parserContext.getRegistry().containsBeanDefinition(id)) {
id = generatedBeanName + (counter++);
}
}
if (StringUtils.isNotEmpty(id)) {
if (parserContext.getRegistry().containsBeanDefinition(id)) {
throw new IllegalStateException("Duplicate spring bean id " + id);
}
//每次解析都会向Spring注册新的BeanDefiniton,后续会追加属性
parserContext.getRegistry().registerBeanDefinition(id, beanDefinition);
beanDefinition.getPropertyValues().addPropertyValue("id", id);
}
// 解析协议
if (ProtocolConfig.class.equals(beanClass)) {
for (String name : parserContext.getRegistry().getBeanDefinitionNames()) {
BeanDefinition definition = parserContext.getRegistry().getBeanDefinition(name);
PropertyValue property = definition.getPropertyValues().getPropertyValue("protocol");
if (property != null) {
Object value = property.getValue();
if (value instanceof ProtocolConfig && id.equals(((ProtocolConfig) value).getName())) {
definition.getPropertyValues().addPropertyValue("protocol", new RuntimeBeanReference(id));
}
}
}
// 解析Service
} else if (ServiceBean.class.equals(beanClass)) {
//拿到service标签的class属性,也就是服务接口的全限定名
String className = resolveAttribute(element, "class", parserContext);
if (StringUtils.isNotEmpty(className)) {
RootBeanDefinition classDefinition = new RootBeanDefinition();
//反射生成实例,挂在bean定义上
classDefinition.setBeanClass(ReflectUtils.forName(className));
classDefinition.setLazyInit(false);
//该方法主要解析标签中的属性,通过键值对的方式提取出来放到bean定义中,spring会自动注入这些属性。
parseProperties(element.getChildNodes(), classDefinition, parserContext);
beanDefinition.getPropertyValues().addPropertyValue("ref", new BeanDefinitionHolder(classDefinition, id + "Impl"));
}
// 解析provider
} else if (ProviderConfig.class.equals(beanClass)) {
parseNested(element, parserContext, ServiceBean.class, true, "service", "provider", id, beanDefinition);
//解析consumer
} else if (ConsumerConfig.class.equals(beanClass)) {
parseNested(element, parserContext, ReferenceBean.class, false, "reference", "consumer", id, beanDefinition);
}
Set<String> props = new HashSet<>();
ManagedMap parameters = null;
//查找配置对象的get,set,is前缀的方法,并通过反射调用方法名与属性名相同的方法,进行注入
//如果没有属性名找不到对应的get,set,is前缀方法,则当做parameters参数存储,它是一Map对象
for (Method setter : beanClass.getMethods()) {
String name = setter.getName();
if (name.length() > 3 && name.startsWith("set")
&& Modifier.isPublic(setter.getModifiers())
&& setter.getParameterTypes().length == 1) {
Class<?> type = setter.getParameterTypes()[0];
String beanProperty = name.substring(3, 4).toLowerCase() + name.substring(4);
String property = StringUtils.camelToSplitName(beanProperty, "-");
props.add(property);
// check the setter/getter whether match
Method getter = null;
try {
getter = beanClass.getMethod("get" + name.substring(3), new Class<?>[0]);
} catch (NoSuchMethodException e) {
try {
getter = beanClass.getMethod("is" + name.substring(3), new Class<?>[0]);
} catch (NoSuchMethodException e2) {
// ignore, there is no need any log here since some class implement the interface: EnvironmentAware,
// ApplicationAware, etc. They only have setter method, otherwise will cause the error log during application start up.
}
}
if (getter == null
|| !Modifier.isPublic(getter.getModifiers())
|| !type.equals(getter.getReturnType())) {
continue;
}
if ("parameters".equals(property)) {
parameters = parseParameters(element.getChildNodes(), beanDefinition, parserContext);
} else if ("methods".equals(property)) {
parseMethods(id, element.getChildNodes(), beanDefinition, parserContext);
} else if ("arguments".equals(property)) {
parseArguments(id, element.getChildNodes(), beanDefinition, parserContext);
} else {
String value = resolveAttribute(element, property, parserContext);
if (value != null) {
value = value.trim();
if (value.length() > 0) {
if ("registry".equals(property) && RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(value)) {
RegistryConfig registryConfig = new RegistryConfig();
registryConfig.setAddress(RegistryConfig.NO_AVAILABLE);
beanDefinition.getPropertyValues().addPropertyValue(beanProperty, registryConfig);
} else if ("provider".equals(property) || "registry".equals(property) || ("protocol".equals(property) && AbstractServiceConfig.class.isAssignableFrom(beanClass))) {
beanDefinition.getPropertyValues().addPropertyValue(beanProperty + "Ids", value);
} else {
Object reference;
if (isPrimitive(type)) {
if ("async".equals(property) && "false".equals(value)
|| "timeout".equals(property) && "0".equals(value)
|| "delay".equals(property) && "0".equals(value)
|| "version".equals(property) && "0.0.0".equals(value)
|| "stat".equals(property) && "-1".equals(value)
|| "reliable".equals(property) && "false".equals(value)) {
// backward compatibility for the default value in old version's xsd
value = null;
}
reference = value;
} else if (ONRETURN.equals(property) || ONTHROW.equals(property) || ONINVOKE.equals(property)) {
int index = value.lastIndexOf(".");
String ref = value.substring(0, index);
String method = value.substring(index + 1);
reference = new RuntimeBeanReference(ref);
beanDefinition.getPropertyValues().addPropertyValue(property + METHOD, method);
} else {
if ("ref".equals(property) && parserContext.getRegistry().containsBeanDefinition(value)) {
BeanDefinition refBean = parserContext.getRegistry().getBeanDefinition(value);
if (!refBean.isSingleton()) {
throw new IllegalStateException("The exported service ref " + value + " must be singleton! Please set the " + value + " bean scope to singleton, eg: <bean id=\"" + value + "\" scope=\"singleton\" ...>");
}
}
reference = new RuntimeBeanReference(value);
}
beanDefinition.getPropertyValues().addPropertyValue(beanProperty, reference);
}
}
}
}
}
}
// 当做parameters参数存储,它是一Map对象
NamedNodeMap attributes = element.getAttributes();
int len = attributes.getLength();
for (int i = 0; i < len; i++) {
Node node = attributes.item(i);
String name = node.getLocalName();
if (!props.contains(name)) {
if (parameters == null) {
parameters = new ManagedMap();
}
String value = node.getNodeValue();
parameters.put(name, new TypedStringValue(value, String.class));
}
}
if (parameters != null) {
beanDefinition.getPropertyValues().addPropertyValue("parameters", parameters);
}
return beanDefinition;
}
Provider
- dubbo provider初始化过程,包括配置的检查、待暴露的url封装
- dubbo provider的暴露过程,包括将provider的相关信息注册到注册中心、开启netty服务等待consumer的连接等
- dubbo provider接受到来自consumer的请求之后处理和应答的过程
Consumer
- dubbo consumer的初始化过程,包括配置的校验、待注册的url封装、 consumer信息注册到注册中⼼等
- dubbo consumer接受到来⾃注册中⼼的通知后创建invoker关联provider的过程
- dubbo consumer的调⽤过程,包括如何选择invoker、调⽤invoker和接收provider响应的过程
dubbo⽀持的四种负载均衡策略的实现细节
服务调用
线程派发模型
在dubbo的配置中,有如下配置
<dubbo:protocol name="dubbo" port="20880" threadpool="fixed" threads="200"
iothreads="8" accepts="0" queues="100" dispatcher="all"/>
就和我们配置线程池类似,其中有一个dispacher
属性为all
,这就是dubbo的线程派发策略,线程派发策略基于netty(boss、worker线程),有五种派发策略:
- all(默认值):所有消息都派发到线程池,包括请求、响应、连接事件、断开事件、心跳等,worker就会将事件提交到线程池中,自己再去处理其它事情
- direct:没有用到线程池,统一由IO线程处理。worker线程接收到事件后,将invoker执行到底
- message:顾名思义,只有将请求响应消息派发到线程池,其它连接断开事件、心跳等事件在IO线程上执行
- execution:只有请求消息派发到线程池,不含响应和其它连接断开事件、心跳,它们直接在IO线程上执行
- connection:在IO线程上,将连接、断开事件放入队列,有序逐个执行,其它消息派发到线程池
基本的线程派发流程(以all为例):
- 客户端的主线程发出⼀个请求后获得future,在执⾏get时进⾏阻塞等待;
- 服务端使⽤worker线程(netty通信模型)接收到请求后,将请求提交到server线程池中进⾏
处理 - server线程处理完成之后,将相应结果返回给客户端的worker线程池(netty通信模型),最
后, worker线程将响应结果提交到client线程池进⾏处理 - client线程将响应结果填充到future中,然后唤醒等待的主线程,主线程获取结果,返回给客
户端
负载均衡
负载均衡:将所有的请求均匀的分配到多台机器上,避免单个服务器响应同一请求,造成服务器宕机、崩溃等问题
AbstractLoadBalance
是所有负载均衡策略的抽象类
public abstract class AbstractLoadBalance implements LoadBalance {
/**
* 重新计算权重值的方法
*
* @param uptime 服务运行时长
* @param warmup 预热时长
* @param weight 权重
* @return 重新计算后的权重值为 【1~设置的权重值】,运行时间越长,计算出的权重值越接近设置的权重值
*/
static int calculateWarmupWeight(int uptime, int warmup, int weight) {
// 计算权重:服务运行时长/(预热时长/设置的权重值),等价于 (服务运行时长/预热时长)*设置的权重值。
// 随着服务运行时间 uptime 增大,权重计算值 ww 会慢慢接近配置值 weight
int ww = (int) ( uptime / ((float) warmup / weight));
return ww < 1 ? 1 : (Math.min(ww, weight));
}
@Override
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
if (CollectionUtils.isEmpty(invokers)) {
return null;
}
// 只有一个直接返回
if (invokers.size() == 1) {
return invokers.get(0);
}
// 否则子类实现负载均衡算法
return doSelect(invokers, url, invocation);
}
protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);
/**
* 获取权重
*/
int getWeight(Invoker<?> invoker, Invocation invocation) {
int weight;
URL url = invoker.getUrl();
// Multiple registry scenario, load balance among multiple registries.
// 当有多个注册地址时,根据注册地址key 获取对应服务权重
if (REGISTRY_SERVICE_REFERENCE_PATH.equals(url.getServiceInterface())) {
weight = url.getParameter(REGISTRY_KEY + "." + WEIGHT_KEY, DEFAULT_WEIGHT);
} else {
// 从url获取权重,默认100
weight = url.getMethodParameter(invocation.getMethodName(), WEIGHT_KEY, DEFAULT_WEIGHT);
if (weight > 0) {
// 以下过程主要时判断机器的启动时间和预热时间的差值(默认为10min),
// 为什么要预热? 因为服务在启动之后JVM会对代码有一个优化的过程,预热保证了调用的体验,谨防由此引发的调用超时问题。
// 获取启动时间
long timestamp = invoker.getUrl().getParameter(TIMESTAMP_KEY, 0L);
if (timestamp > 0L) {
// 获取运行时间
long uptime = System.currentTimeMillis() - timestamp;
if (uptime < 0) {
return 1;
}
// 获取服务预热时间,默认为10分钟
int warmup = invoker.getUrl().getParameter(WARMUP_KEY, DEFAULT_WARMUP);
if (uptime > 0 && uptime < warmup) {
// 重新计算服务权重
weight = calculateWarmupWeight((int)uptime, warmup, weight);
}
}
}
}
// 主要用于保证当服务运行时长小于服务预热时间时,对服务进行降权,
// 避免让服务在启动之初就处于高负载状态。
// 服务预热是一个优化手段,与此类似的还有 JVM 预热。
// 主要目的是让服务启动后“低功率”运行一段时间,使其效率慢慢提升至最佳状态
return Math.max(weight, 0);
}
}
关键方法在doSelect()
,这是需要子类去实现的
负载均衡共有如下五种策略:
RandomLoadBalance 随机选择
加权随机算法是dubbo默认的负载均衡策略实现方式,会根据各个Invoker的权重分配随机选中的比例,基本过程如下:
- 将所有Invoker的权重求和
- 随机数就在总权重值的范围内生成
如果随机数是5,则将调用服务B
如果随机数是2,则将调用服务A
代码实现:
public class RandomLoadBalance extends AbstractLoadBalance {
public static final String NAME = "random";
/**
* 使用随机条件在列表中选择一个invoker
*/
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// Number of invokers
int length = invokers.size();
// Every invoker has the same weight?
boolean sameWeight = true;
// the maxWeight of every invokers, the minWeight = 0 or the maxWeight of the last invoker
int[] weights = new int[length];
// The sum of weights
int totalWeight = 0;
for (int i = 0; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
// Sum
totalWeight += weight;
// save for later use
weights[i] = totalWeight;
if (sameWeight && totalWeight != weight * (i + 1)) {
sameWeight = false;
}
}
if (totalWeight > 0 && !sameWeight) {
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
int offset = ThreadLocalRandom.current().nextInt(totalWeight);
// Return a invoker based on the random value.
// 如果服务权重不相同, 根据权重和求出一个随机数,然后计算看该数落在哪个区间段
for (int i = 0; i < length; i++) {
if (offset < weights[i]) {
return invokers.get(i);
}
}
}
// If all invokers have the same weight value or totalWeight=0, return evenly.
// 如果权重相同或总权重为0,随机选一个,多线程产生随机数
return invokers.get(ThreadLocalRandom.current().nextInt(length));
}
}
基本流程:
- 根据每个Invoker类加总权重,每一个Invoker占用一段权重区间
- 在totalWeight进行随机数,获取随机数所在区间的Invoker
LeastActiveLoadBalance 最少活跃数算法
活跃数指Invoker正在处理中的请求数,接受请求+1,请求处理完成-1
对于正在处理的请求数最少的服务会被选择
代码实现:
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// Number of invokers
int length = invokers.size();
// The least active value of all invokers
// 最小的活跃数
int leastActive = -1;
// The number of invokers having the same least active value (leastActive)
// 具有相同“最小活跃数”的服务者提供者(以下用 Invoker 代称)数量
int leastCount = 0;
// The index of invokers having the same least active value (leastActive)
// leastIndexs 用于记录具有相同“最小活跃数”的 Invoker 在 invokers 列表中的下标信息
int[] leastIndexes = new int[length];
// the weight of every invokers
int[] weights = new int[length];
// The sum of the warmup weights of all the least active invokers
int totalWeight = 0;
// The weight of the first least active invoker
// 第一个最小活跃数的 Invoker 权重值,用于与其他具有相同最小活跃数的 Invoker 的权重进行对比,
// 以检测是否“所有具有相同最小活跃数的 Invoker 的权重”均相等
int firstWeight = 0;
// Every least active invoker has the same weight value?
boolean sameWeight = true;
// 遍历 invokers 列表
// Filter out all the least active invokers
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
// Get the active number of the invoker
// 获取 Invoker 对应的活跃数
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
// Get the weight of the invoker's configuration. The default value is 100.
int afterWarmup = getWeight(invoker, invocation);
// save for later use
// 获取权重
weights[i] = afterWarmup;
// If it is the first invoker or the active number of the invoker is less than the current least active number
// 发现更小的活跃数,重新开始
if (leastActive == -1 || active < leastActive) {
// Reset the active number of the current invoker to the least active number
// 使用当前活跃数 active 更新最小活跃数 leastActive
leastActive = active;
// Reset the number of least active invokers
// 更新 leastCount 为 1
leastCount = 1;
// Put the first least active invoker first in leastIndexes
// 记录当前下标值到 leastIndexs 中
leastIndexes[0] = i;
// Reset totalWeight
totalWeight = afterWarmup;
// Record the weight the first least active invoker
firstWeight = afterWarmup;
// Each invoke has the same weight (only one invoker here)
sameWeight = true;
// If current invoker's active value equals with leaseActive, then accumulating.
// 当前 Invoker 的活跃数 active 与最小活跃数 leastActive 相同
} else if (active == leastActive) {
// Record the index of the least active invoker in leastIndexes order
// 在 leastIndexs 中记录下当前 Invoker 在 invokers 集合中的下标
leastIndexes[leastCount++] = i;
// Accumulate the total weight of the least active invoker
// 累加权重
totalWeight += afterWarmup;
// If every invoker has the same weight?
// 检测当前 Invoker 的权重与 firstWeight 是否相等,
// 不相等则将 sameWeight 置为 false
if (sameWeight && afterWarmup != firstWeight) {
sameWeight = false;
}
}
}
// 当只有一个 Invoker 具有最小活跃数,此时直接返回该 Invoker 即可
// Choose an invoker from all the least active invokers
if (leastCount == 1) {
// If we got exactly one invoker having the least active value, return this invoker directly.
return invokers.get(leastIndexes[0]);
}
// 有多个 Invoker 具有相同的最小活跃数,但它们之间的权重不同
if (!sameWeight && totalWeight > 0) {
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on
// totalWeight.
// 随机生成一个 [0, totalWeight) 之间的数字
int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
// Return a invoker based on the random value.
// 循环让随机数减去具有最小活跃数的 Invoker 的权重值,
// 当 offset 小于等于0时,返回相应的 Invoker
for (int i = 0; i < leastCount; i++) {
int leastIndex = leastIndexes[i];
// 获取权重值,并让随机数减去权重值
offsetWeight -= weights[leastIndex];
if (offsetWeight < 0) {
return invokers.get(leastIndex);
}
}
}
// If all invokers have the same weight value or totalWeight=0, return evenly.
// 如果权重相同或权重为0时,随机返回一个 Invoker
return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
}
基本的选择过程:
- 遍历 invokers 列表,寻找活跃数最小的 Invoker
- 如果有多个 Invoker 具有相同的最小活跃数,此时记录下这些 Invoker 在 invokers 集合中的下标,并累加它们的权重,比较它们的权重值是否相等(类似于加权随机算法策略)
- 如果只有一个 Invoker 具有最小的活跃数,此时直接返回该 Invoker 即可
- 如果有多个 Invoker 具有最小活跃数,且它们的权重不相等,此时处理方式和 RandomLoadBalance 一致
- 如果有多个 Invoker 具有最小活跃数,但它们的权重相等,此时随机返回一个即可
RoundRobinLoadBalance 加权轮循算法,根据权重设置轮训比例
普通轮询会将请求均匀的分布在每个节点,但不能很好调节不同性能服务器的请求处理,所以加权负载均衡来根据权重在轮询机制中分配相对应的请求比例给每台服务器。
比如有A、B、C三台服务器,权重之比为5:3:2,一共处理10个请求。那么采用负载均衡采用轮询加权算法时,A、B、C服务一定是分别承担5、3、2个请求
代码实现:
/**
* Round robin load balance.
*/
public class RoundRobinLoadBalance extends AbstractLoadBalance {
public static final String NAME = "roundrobin";
private static final int RECYCLE_PERIOD = 60000;
protected static class WeightedRoundRobin {
private int weight;
private AtomicLong current = new AtomicLong(0);
private long lastUpdate;
public int getWeight() {
return weight;
}
public void setWeight(int weight) {
this.weight = weight;
current.set(0);
}
public long increaseCurrent() {
return current.addAndGet(weight);
}
public void sel(int total) {
current.addAndGet(-1 * total);
}
public long getLastUpdate() {
return lastUpdate;
}
public void setLastUpdate(long lastUpdate) {
this.lastUpdate = lastUpdate;
}
}
private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>();
/**
* get invoker addr list cached for specified invocation
* <p>
* <b>for unit test only</b>
*
* @param invokers
* @param invocation
* @return
*/
protected <T> Collection<String> getInvokerAddrList(List<Invoker<T>> invokers, Invocation invocation) {
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
Map<String, WeightedRoundRobin> map = methodWeightMap.get(key);
if (map != null) {
return map.keySet();
}
return null;
}
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// key 为 接口名+方法名
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
// 查看缓存中是否存在相应服务接口的信息,如果没有则新添加一个元素到缓存中
ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.computeIfAbsent(key, k -> new ConcurrentHashMap<>());
// 总权重
int totalWeight = 0;
long maxCurrent = Long.MIN_VALUE;
// 当前时间戳
long now = System.currentTimeMillis();
// 最大 current 的 Invoker
Invoker<T> selectedInvoker = null;
// 保存选中的 WeightedRoundRobin 对象
WeightedRoundRobin selectedWRR = null;
// 遍历 Invokers 列表
for (Invoker<T> invoker : invokers) {
// 从缓存中获取 WeightedRoundRobin 对象
String identifyString = invoker.getUrl().toIdentityString();
// 获取权重
int weight = getWeight(invoker, invocation);
// 获取当前 Invoker 对象
WeightedRoundRobin weightedRoundRobin = map.computeIfAbsent(identifyString, k -> {
WeightedRoundRobin wrr = new WeightedRoundRobin();
wrr.setWeight(weight);
return wrr;
});
// 如果当前 Invoker 权重不等于对应的 WeightedRoundRobin 对象中的权重,则重新设置当前权重到对应的 WeightedRoundRobin 对象中
if (weight != weightedRoundRobin.getWeight()) {
// weight changed
weightedRoundRobin.setWeight(weight);
}
// 累加权重到 current 中
long cur = weightedRoundRobin.increaseCurrent();
// 设置 weightedRoundRobin 对象最后更新时间
weightedRoundRobin.setLastUpdate(now);
// 最大 current 的 Invoker,并赋值给相应的变量
if (cur > maxCurrent) {
maxCurrent = cur;
selectedInvoker = invoker;
selectedWRR = weightedRoundRobin;
}
// 累加权重到总权重中
totalWeight += weight;
}
// 如果 Invokers 列表中的数量不等于缓存map中的数量
if (invokers.size() != map.size()) {
// 清理最后更新时间超过60s的item
map.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD);
}
// 如果存在被选中的 Invoker
if (selectedInvoker != null) {
// 计算 current = current - totalWeight
selectedWRR.sel(totalWeight);
return selectedInvoker;
}
// 正常情况这里不会到达
return invokers.get(0);
}
}
基本流程:
- 每个Invoker都有一个current值,初值为自身权重,每个Invoker中
current=current+weight
- 每一次遍历所有Invoker后,current值最大的就是本次选中的Invoker
- 被选中的Invoker会更新current值:
current=current-totalWeight
ConsistentHashLoadBalance Hash 一致性算法
一致性 Hash 负载均衡策略是让参数相同的请求分配到同一机器上。把每个服务节点分布在一个环上,请求也分布在环形中。以请求在环上的位置,顺时针寻找换上第一个服务节点。如图所示:
同时,为避免请求散列不均匀,dubbo 中会将每个 Invoker 再虚拟多个节点出来,使得请求调用更加均匀。
代码实现:
/**
* ConsistentHashLoadBalance
* ConsistentHashLoadBalance不关系权重
*/
public class ConsistentHashLoadBalance extends AbstractLoadBalance {
public static final String NAME = "consistenthash";
/**
* Hash nodes name
*/
public static final String HASH_NODES = "hash.nodes";
/**
* Hash arguments name
*/
public static final String HASH_ARGUMENTS = "hash.arguments";
private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();
@SuppressWarnings("unchecked")
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String methodName = RpcUtils.getMethodName(invocation);
String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;
// 获取 invokers 原始的 hashcode
int invokersHashCode = invokers.hashCode();
ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
// 如果 invokers 是⼀个新的 List 对象,意味着服务提供者数量发⽣了变化,可能新增也可能减少了。
// 此时 selector.identityHashCode != identityHashCode 条件成⽴
if (selector == null || selector.identityHashCode != invokersHashCode) {
// 创建新的 ConsistentHashSelector
selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, invokersHashCode));
selector = (ConsistentHashSelector<T>) selectors.get(key);
}
// 调⽤ ConsistentHashSelector 的 select ⽅法选择 Invoker
return selector.select(invocation);
}
/**
* 一致性hash选择器
*
* @param <T>
*/
private static final class ConsistentHashSelector<T> {
// 使⽤ TreeMap 存储 Invoker 虚拟节点
private final TreeMap<Long, Invoker<T>> virtualInvokers;
private final int replicaNumber;
private final int identityHashCode;
private final int[] argumentIndex;
ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
this.identityHashCode = identityHashCode;
URL url = invokers.get(0).getUrl();
// 获取虚拟节点数,默认为160
this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160);
// 获取参与 hash 计算的参数下标值,默认对第⼀个参数进⾏ hash 运算
String[] index = COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0"));
argumentIndex = new int[index.length];
for (int i = 0; i < index.length; i++) {
argumentIndex[i] = Integer.parseInt(index[i]);
}
for (Invoker<T> invoker : invokers) {
String address = invoker.getUrl().getAddress();
for (int i = 0; i < replicaNumber / 4; i++) {
// 对 address + i 进⾏ md5 运算,得到⼀个⻓度为16的字节数组
byte[] digest = Bytes.getMD5(address + i);
// 对 digest 部分字节进⾏4次 hash 运算,得到四个不同的 long 型正整数
for (int h = 0; h < 4; h++) {
// h = 0 时,取 digest 中下标为 0 ~ 3 的4个字节进⾏位运算
// h = 1 时,取 digest 中下标为 4 ~ 7 的4个字节进⾏位运算
// h = 2, h = 3 时过程同上
long m = hash(digest, h);
// 将 hash 到 invoker 的映射关系存储到 virtualInvokers中
// virtualInvokers 需要提供⾼效的查询操作,因此选⽤ TreeMap作为存储结构
virtualInvokers.put(m, invoker);
}
}
}
}
public Invoker<T> select(Invocation invocation) {
// 将参数转为 key
String key = toKey(invocation.getArguments());
// 对参数 key 进⾏ md5 运算
byte[] digest = Bytes.getMD5(key);
// 取 digest 数组的前四个字节进⾏ hash 运算,再将 hash 值传给 selectForKey ⽅法,
// 寻找合适的 Invoker
return selectForKey(hash(digest, 0));
}
private String toKey(Object[] args) {
StringBuilder buf = new StringBuilder();
for (int i : argumentIndex) {
if (i >= 0 && i < args.length) {
buf.append(args[i]);
}
}
return buf.toString();
}
private Invoker<T> selectForKey(long hash) {
// 到 TreeMap 中查找第⼀个节点值⼤于或等于当前 hash 的 Invoker
Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash);
// 如果 hash ⼤于 Invoker 在圆环上最⼤的位置,此时 entry = null,
// 需要将 TreeMap 的头节点赋值给 entry
if (entry == null) {
entry = virtualInvokers.firstEntry();
}
// 返回 Invoker
return entry.getValue();
}
private long hash(byte[] digest, int number) {
return (((long) (digest[3 + number * 4] & 0xFF) << 24)
| ((long) (digest[2 + number * 4] & 0xFF) << 16)
| ((long) (digest[1 + number * 4] & 0xFF) << 8)
| (digest[number * 4] & 0xFF))
& 0xFFFFFFFFL;
}
}
}
基本过程如下:
- 一致 hash 实现过程就是先创建好虚拟节点,虚拟节点保存在 TreeMap 中。
- TreeMap 的 key 为配置的参数先进行 md5 运算,然后将 md5 值进行 hash 运算。TreeMap 的 value 为被选中的 Invoker。
- 最后请求时,计算参数的 hash 值,去从 TreeMap 中获取 Invoker。
ShortestResponseLoadBalance 最短响应时间算法
代码实现:
public class ShortestResponseLoadBalance extends AbstractLoadBalance {
public static final String NAME = "shortestresponse";
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// invokers数量
int length = invokers.size();
// 默认的“最短请求时间”
long shortestResponse = Long.MAX_VALUE;
// 具有相同“最短请求时间”的invoker数量
int shortestCount = 0;
// 具有相同估计最短响应时间的invoker索引
int[] shortestIndexes = new int[length];
// 每个invoker的权重数组
int[] weights = new int[length];
// 所有最短响应请求者的预热权值之和
int totalWeight = 0;
// 第一个最短响应调用者的权重
int firstWeight = 0;
// 每个最短响应调用者都有相同的权值?
boolean sameWeight = true;
// 过滤掉所有最短响应请求者
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
RpcStatus rpcStatus = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
// 根据活动连接的乘积和成功的平均运行时间计算估计的响应时间
// 调用成功的请求数总数对应的总耗时 / 调用成功的请求数总数 = 成功调用的平均时间
long succeededAverageElapsed = rpcStatus.getSucceededAverageElapsed();
int active = rpcStatus.getActive();
long estimateResponse = succeededAverageElapsed * active;
// 获取预热后的权重
int afterWarmup = getWeight(invoker, invocation);
weights[i] = afterWarmup;
// 如果出现更短响应的invoker,则记录更短的响应时间和当前invoker的下标
if (estimateResponse < shortestResponse) {
shortestResponse = estimateResponse;
shortestCount = 1;
shortestIndexes[0] = i;
totalWeight = afterWarmup;
firstWeight = afterWarmup;
sameWeight = true;
}
// 如果出现时间一样长的invoker,则更新shortestCount并记录下标到shortestIndexes
else if (estimateResponse == shortestResponse) {
shortestIndexes[shortestCount++] = i;
totalWeight += afterWarmup;// 计算总权重
// 判断权重是否相等
if (sameWeight && i > 0
&& afterWarmup != firstWeight) {
sameWeight = false;
}
}
}
// 如果最短响应时间invoker只有一个则直接返回
if (shortestCount == 1) {
return invokers.get(shortestIndexes[0]);
}
// 和加权随机负载均衡一样
if (!sameWeight && totalWeight > 0) {
int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
for (int i = 0; i < shortestCount; i++) {
int shortestIndex = shortestIndexes[i];
offsetWeight -= weights[shortestIndex];
if (offsetWeight < 0) {
return invokers.get(shortestIndex);
}
}
}
// 在多个invoker中岁间选择一个返回
return invokers.get(shortestIndexes[ThreadLocalRandom.current().nextInt(shortestCount)]);
}
}
整个算法过程和LeastActiveLoadBalance类似,整个算法过程分为三个阶段
- 找到该服务所有提供者中响应时间最短的⼀个或多个服务,并计算其权重和
- 如果响应时间最短的服务提供者只有⼀个,则直接返回给服务
- 如果响应时间最短的服务提供者⼤于1个,则分为以下两种情况
- 如果所有服务权重值不同,则按RandomLoadBalance(2)过程选出服务提供者
- 如果所有服务权重相同,则随机返回⼀个
五种负载均衡策略总结
名称 | 说明 |
---|---|
RandomLoadBalance | 随机算法,根据权重设置随机的概率 |
LeastActiveLoadBalance | 最少活跃数算法,响应越快的服务器堆积的请求越少,对应的活跃数也少 |
RoundRobinLoadBalance | 加权轮循算法,轮询遇到加权则可以让请求(不论多少)严格按照我们的权重之比进行分配 |
ConsistentHashLoadBalance | Hash 一致性算法,相同请求参数分配到相同提供者 |
ShortestResponseLoadBalance | 最短响应时间算法,在所有服务中选出平均响应时间最短的一个 |
Dubbo 容错方案
-
Failover Cluster
失败⾃动切换,当出现失败,重试其它服务器。 (缺省)
通常⽤于读操作,但重试会带来更⻓延迟。
可通过retries=”2″来设置重试次数(不含第⼀次)。正是⽂章刚开始说的那种情况. -
Failfast Cluster
快速失败,只发起⼀次调⽤,失败⽴即报错。
通常⽤于⾮幂等性的写操作,⽐如新增记录。 -
Failsafe Cluster
失败安全,出现异常时,直接忽略。
通常⽤于写⼊审计⽇志等操作。 -
Failback Cluster
失败⾃动恢复,后台记录失败请求,定时重发。
通常⽤于消息通知操作。 -
Forking Cluster
并⾏调⽤多个服务器,只要⼀个成功即返回。
通常⽤于实时性要求较⾼的读操作,但需要浪费更多服务资源。
可通过forks=”2″来设置最⼤并⾏数。 -
Broadcast Cluster
⼴播调⽤所有提供者,逐个调⽤,任意⼀台报错则报错。 (2.1.0开始⽀持)
通常⽤于通知所有提供者更新缓存或⽇志等本地资源信息。
总结
从dubbo的框架入手,介绍了dubbo 10层架构作用与之间的调用关系,并对dubbo重要的功能实现进行源码解析
参考
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/17843.html