二、负载均衡器:Netflix Ribbon

导读:本篇文章讲解 二、负载均衡器:Netflix Ribbon,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

所谓的负载均衡,就是将负载分摊到多个执行单元上,常见的负载均衡有两种方式。一种是独立进程单元,通过负载均衡策略,将请求转发到不同的执行单元上,如:Nginx。另一种是将负载均衡逻辑以代码的形式封装到服务消费者的客户端上,服务消费者客户端维护了一份服务提供者的信息列表,有了信息列表,通过负载均衡策略将请求分摊给多个服务提供者,从而达到负载均衡的目的,如Ribbon。

Ribbon作为服务消费者的负载均衡器,有两种使用方式,一种是和RestTemplate结合使用,另一种是和Feign结合使用。其中,Feign已经默认集成了Ribbon。Ribbon子模块:

1. ribbon-loadbalancer:可以独立使用或与其他模块一起使用的负载均衡器API 。    
2. ribbon-eureka :ribbon结合Eureka客户端的API ,为负载均衡器提供动态服务注册列表信息。
3. ribbon-core: Ribbon 的核心API 。
Copy

ribbon:

1.1 Ribbon与RestTemplate结合使用(因Feign默认集成Ribbon,故Ribbon与Feign的结合使用不做介绍): 在使用springcloud ribbon客户端负载均衡的时候,可以给RestTemplate bean 加一个@LoadBalanced注解,就能让这个RestTemplate在请求时拥有客户端负载均衡的能力:

 @Bean
 @LoadBalanced
 RestTemplate restTemplate () {
      return new RestTemplate();
 }
Copy

使用RestTemplateCustomizer对所有标注了@LoadBalanced的RestTemplate Bean添加了一个LoadBalancerInterceptor拦截器,而这个拦截器的作用就是对请求的URI进行转换获取到具体应该请求哪个服务实例ServiceInstance。

1.2 Ribbon负载均衡原理: Ribbon负载均衡器的核心类: LoadBalancerClient, 它是从Eureka Client获取服务注册列表信息的,并将服务注册列表信息缓存一份。在LoadBalancerClient调用choose()方法时,根据负载均衡策略选择一个服务实例的信息,从而进行负载均衡。(LoadBalancerClient 也可以不从Eureka Client获取注册列表信息, 这时就需要手动维护一份服务注册列表信息)。

LoadBalancerClient是一个接口类,继承了ServiceInstanceChooser,它的实现类RibbonLoadBalancerClient,最终的负载均衡的请求处理都是由RibbonLoadBalancerClient 来执行的。LoadBalancerClient 是一个负载均衡的客户端, 有如下3 种方法。其中有2 个execute()方法,均用来执行请求, reconstructURI()用于重构Uri ,代码如下:

public interface LoadBalancerClient extends ServiceinstanceChooser {
    <T> T execute(String serviceid, LoadBalancerRequest<T> request) throws IOException;
    <T> T execute(String serviceid, Servicelnstance servicelnstance, LoadBalancerRequest<T> request) throws IOException;
    URI reconstructURI(Serviceinstance instance, URI original);
}
Copy

ServiceInstanceChooser接口有一个choose()方法,用于根据serviceId获取ServiceInstance, 即通过服务名来选择服务实例。

public interface ServiceInstanceChooser {
ServiceInstance choose(String serviceId);
}
Copy

在RibbonLoadBalancerClient的源码里,choose()方法用于选择具体服务实例。该方法通过getServer()方法去获取实例,最终交给ILoadBalancer类去选择服务实例。 ILoadBalancer接口定义了一系列实现负载均衡的方法。代码如下:

publiC interface ILoadBalancer {
    Public void addServers(List<Server> newServers) ;
    public Server chooseServer(Object key) ;
    public void markServerDown (Server server);
    public List<Server> getReachableServers();
    public List<Server> getAllServers ();
}
Copy

其中, addServers ()方法用于添加一个Server 集合, chooseServer()方法用于根据key 去获取Server, markServerDown()方法用于标记某个服务下线, getReachabI eServers()获取可用的Server 集合, getAllServers()获取所有的Server 集合。

ILoadBalancer的实现类为BaseLoadBalancer, BaseLoadBalancer的子类为DynamicServerListLoadBalancer。关系如下: DynamicServerListLoadBalancer与其接口类的关系查看DynamicServerListLoadBalancer 类的源码, DynamicServerListLoadBalancer 需要配置IClientConfig, IRule 、IPing 、ServerList 、ServerListFilter 和ILoadBalancer 。查看BaseLoadBalancer类的源码, 在默认的情况下, 实现了如下配置。

IClientConfig ribbonClientConfig: DefaultClientConfiglmpl。
IRule ribbonRule: RoundRobinRule。
IPing ribbonPing: DummyPing。
ServerList ribbonServerList: ConfigurationBasedServerList。
ServerListFilter ribbonServerListFilter: ZonePreferenceServerListFilter。
ILoadBalancer ribbonLoadBalancer: ZoneAwareLoadBalancer。
Copy

其中,IClientConfig 用于配置负载均衡的客户端, IClientConfig 的默认实现类为DefaultClientConfiglmpl。IRule 用于配置负载均衡的策略,IRule 有3 个方法, 其中choose()是根据key 来获取server实例的, setLoadBalancer()和getLoadBalancer()是用来设置和获取ILoadBalancer 的,它的源码如下:

public interface IRule{
    public Server choose(Object key);
    public void setLoadBalancer(ILoadBalancer lb);
    public ILoadBalancer getLoadBalancer();    
}
Copy

IRule 有很多默认的实现类,这些实现类根据不同的算法和逻辑来处理负载均衡的策略。IRule 的默认实现类有以下7 种。在大多数情况下,这些默认的实现类是可以满足需求的,如果有特殊的需求, 可以自己实现。IRule和其实现类之间的关系如下:

  1. BestAvailableRule: 选择最小请求数。
  2. ClientConfigEnabledRoundRobinRule :轮询。
  3. RandornRule: 随机选择一个server 。
  4. RoundRobinRule: 轮询选择se rver 。
  5. RetryRule: 根据轮询的方式重试。
  6. WeightedResponseTirneRule: 根据响应时间去分配一个weight , weight越低,被选择的可能性就越低。
  7. ZoneAvoidanceRule:根据server的zone 区域和可用性来轮询选择。 IRule及其实现类

IPing 用于向server 发送“ ping”,来判断该server 是否有响应,从而判断该server 是否可用。它有一个isAlive()方法,源码如下:

public interface IPing {
    public boolean isAlive(Server server);
}
Copy

IPing 的实现类有PingUrl , PingConstant 、NoOpPing 、DummyPing 和 NIWSDiscoveryPing。

  1. PingUri: 真实地去ping 某个Uri ,判断其是否可用。
  2. PingConstant: 固定返回某服务是否可用, 默认返回住时,即可用。
  3. NoOpPing:不去ping , 直接返回true ,即可用。
  4. DummyPing: 直接返回true , 并实现了initWithNiwsConfig 方法。
  5. NIWSDiscoveryPing :根据DiscoveryEnabledServer 的Instancelnfo 的InstanceStatus 去判断,如果为Instancestatus.UP ,则可用, 否则不可用。

查看DynamicServerListLoadBalancer 的源码,DynamicServerListLoadBalancer 的构造函数中有一个initWithNiwsConfig()方法。在该方法中经过一系列的初始化配置, 最终执行了restOflnit()方法。DynamicServerListLoadBalancer 的部分源码如下:

public DynamicServerListLoadBalancer(IClientConfig clientConfig) {
    initWithNiwsConfig(clientConfig);
}
@Override
public void initWithNiwsConfig ( IClientConfig clientConfig) {
    try {
        //省略代码
        this . serverListUpdater = (ServerListUpdater) ClientFactory.instantiateinstanceWithClientConfig(serverListUpdaterClassName, clientConfig);

        restOfInit(clientConfig);
    } catch (Exception e) {
    ...//省略代码
Copy

在restOfInit() 方法中, 有一个updateListOfServers()的方法,该方法是用来获取所有的ServerList 的。

void restOfInit(IClientConfig clientConfig) {
    ...//省略代码
    updateListOfServers () ;
    ...//省略代码
}
Copy

进一步跟踪updateListOfServers()方法的源码,最终由serverListlmpl.getUpdatedListOfServers()获取所有的服务列表,代码如下:

public void updateListOfServers() {
    List<T> servers = new ArrayList<T>();
    if (serverListImpl != null) {
        servers = serverListImpl.getUpdatedListOfServers();
    }
    updateAllServerList(servers);
}
Copy

而serverListlmpl 是ServerList 接口的具体实现类。跟踪源码, ServerList 的实现类为DiscoveryEnabledNIWSServerList,这个类在ribbon-eureka.jar 的com. netflix.niws.loadbalancer包下。其中, DiscoveryEnabledNIWSServerList 有getInitialListOfServers()和getUpdatedListOfServers()方法,具体代码如下:

@Override
public List<DiscoveryEnabledServer> getlnitialListOfServers () {
    return obtainServersViaDiscovery();
}
@Override
public List<DiscoveryEnabledServer> getUpdatedListOfServers() {
    return obtainServersViaDiscovery ();
}
Copy

继续跟踪源码, obtainServers ViaDiscovery()方法是根据eurekaClientProvider.get()方法来获取EurekaClient 的, 再根据EurekaClient 来获取服务注册列表信息,代码如下:

    private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
        List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();

       ...//省略代码

        EurekaClient eurekaClient = eurekaClientProvider.get();
        if (vipAddresses !=null){
            for (String vipAddress : vipAddresses.split(",")) {

                ...//省略代码
                for (InstanceInfo ii : listOfInstanceInfo) {
                    if (ii.getStatus().equals(InstanceStatus.UP)) {   

                        ...//省略代码
                        DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr);
                        des.setZone(DiscoveryClient.getZone(ii));
                        serverList.add(des);
                    }
                }
               ...//省略代码
            }
        }
        return serverList;
}
Copy

其中, eurekaClientProvider 的实现类是LegacyEurekaClientProvider, LegacyEurekaClientProvider是一个获取eurekaClient 实例的类, 其代码如下:

class LegacyEurekaClientProvider implements Provider {

private volatile EurekaClient eurekaClient;

@Override
public synchronized EurekaClient get() {
    if (eurekaClient == null) {
        eurekaClient = DiscoveryManager.getInstance().getDiscoveryClient();
    }

    return eurekaClient;
}
Copy

EurekaClient 的实现类为DiscoveryClient ,DiscoveryClient 具有服务注册、获取服务注册列表等功能。

由此可见,负载均衡器是从Eureka Client 获取服务列表信息的, 井根据IRule 的策略去路由,根据IPing 去判断服务的可用性。
Copy

那么现在还有一个问题,负载均衡器每隔多长时间从Eureka Client 获取注册信息呢?在BaseLoadBalancer 类的源码中, 在BaseLoadBalancer 的构造方法开启了一个PingTask任务,代码如下:

public BaseLoadBalance r( Stri ng name , IRule rule , LoadBalancerStats stats ,IPing ping , IPingStrategy pingStrategy) (
    .. //代码省略
    setupPingTask ();
    ..//代码省略
}
Copy

在setupPingTask ()的具体代码逻辑里, 开启了ShutdownEnabledTimer 的PingTask 任务,在默认情况下, 变盘pinglntervalSeconds 的值为10 ,即每10 秒向EurekaClient 发送一次心跳“ ping ”。

void setupPingTask () {
    if (canSkipPing ()) {
        return ;
    }
    if (lbTimer != null) {
        lbTimer.cancel() ;
    }
    lbTimer = new ShutdownEnabledT 工mer ( "NFLoadBalancer-PingTimer-" + name ,true);
    lbTimer.schedule(new PingTask() , 0, pingintervalSeconds * 1000);
    forceQuickPing();
}
Copy

查看Ping Task 的源码, Ping Task 创建了一个Pinger 对象,并执行了runPinger()方法。

class PingTask extends TimerTask {
    public void run() {
        try {
            new Pinger(pingStrategy).runPinger();
        } catch (Exception e) {
            logger.error("LoadBalancer [{}]: Error pinging", name, e);
        }
    }
}
Copy

查看Pinger 的runPinger()方法, 最终根据pingerStrategy.pingServers(ping, allServers )来获取服务的可用性,如果该返回结果与之前相同,则不向EurekaClient 获取注册列表:如果不同,则通知ServerStatusChangeListener 服务注册列表信息发生了改变,进行更新或者重新拉取,代码如下:

public void runPinger() throws Exception {
        if (!pingInProgress.compareAndSet(false, true)) { 
            return; 
        }         

        Server[] allServers = null;
        boolean[] results = null;

        Lock allLock = null;
        Lock upLock = null;

        try {

            allLock = allServerLock.readLock();
            allLock.lock();
            allServers = allServerList.toArray(new Server[allServerList.size()]);
            allLock.unlock();

            int numCandidates = allServers.length;
            results = pingerStrategy.pingServers(ping, allServers);

            final List<Server> newUpList = new ArrayList<Server>();
            final List<Server> changedServers = new ArrayList<Server>();

            for (int i = 0; i < numCandidates; i++) {
                boolean isAlive = results[i];
                Server svr = allServers[i];
                boolean oldIsAlive = svr.isAlive();

                svr.setAlive(isAlive);

                if (oldIsAlive != isAlive) {
                    changedServers.add(svr);
                    logger.debug("LoadBalancer [{}]:  Server [{}] status changed to {}", 
                        name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));
                }

                if (isAlive) {
                    newUpList.add(svr);
                }
            }
            upLock = upServerLock.writeLock();
            upLock.lock();
            upServerList = newUpList;
            upLock.unlock();

            notifyServerStatusChangeListener(changedServers);
        } finally {
            pingInProgress.set(false);
        }
}
Copy

由此可见, LoadBalancerClient 是在初始化时向Eureka 获取服务注册列表信息, 并且每10秒向EurekaClient 发送“ ping ”,来判断服务的可用性。如果服务的可用性发生了改变或者服务数量和之前的不一致,则更新或者重新拉取。LoadBalancerClient 有了这些服务注册列表信息,就可以根据具体的IRule 的策略来进行负载均衡。

最后,回到问题的本身,为什么在RestTemplate 类的Bean 上加一个@LoadBalance 注解就可以使用Ribbon 的负载均衡呢?全局搜索( IDEA 的快捷键为“ Ctrl "+ “ Shift "+ “ F ”)查看有哪些类用到了@LoadBalanced注解。通过搜索,可以发现LoadBalancerAutoConfiguration 类( LoadBalancer 自动配置类〉使用到了该注解。

在LoadBalancerAutoConfiguration 类中, 首先维护了一个被@LoadBalanced 修饰的RestTemplate 对象的List。在初始化的过程中,通过调用customizer.customize(restTemplate)方法来给RestTemplate 增加拦截器LoadBalancerlnterceptor 。LoadBalancerinterceptor 用于实时拦截,在LoadBalancerlnterceptor 中实现了负载均衡的方法。LoadBalancerlnterceptor 类的拦截方法的代码如下:

@Override
publiC ClientHttpResponse intercept(final HttpRequest request , final byte[] body, final ClientHttpRequestExecution execution) throws IOException {
    final URI originalUri= request.getURI();
    String serviceName = originalUri.getHost() ;
    return this.loadBalancer.execute ( serviceName , requestFactory.createRequest (request , body , execution) );
}
Copy

综上所述, Ribbon 的负载均衡主要是通过LoadBalancerClient 来实现的,而LoadBalancerClient具体交给了ILoadBalancer 来处理, ILoadBalancer 通过配置IRule、IPing 等,向EurekaClient获取注册列表的信息,默认每10 秒向EurekaClient 发送一次“ping ”, 进而检查是否需要更新服务的注册列表信息。最后,在得到服务注册列表信息后, ILoadBalancer 根据IRule 的策略进行负载均衡。而RestTemplate 加上@LoadBalance 注解后,在远程调度时能够负载均衡, 主要是维护了一个被@LoadBalance 注解的RestTemplate 列表,并给该列表中的RestTemplate 对象添加了拦截器。在拦截器的方法中,将远程调度方法交给了Ribbon 的负载均衡器LoadBalancerClient去处理,从而达到了负载均衡的目的。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/71429.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!