SpringCloud Gateway——集成服务发现实现服务化路由
1. 前言
在Nacos——服务订阅
一文中分析了Gateway
集成Nacos
服务发现实现服务化路由的流程,流程图如下:
集成服务发现实现服务化路由的实质是,通过服务发现周期性订阅服务对应的实例地址,通过服务编码和负载均衡算法从实例地址中筛选一个目标地址,最后根据目标地址ip发起http请求。
接下来通过源码具体分析
2. 源码分析
1. 获取服务对应的实例
LoadBalancerClientFilter
是个全局过滤器,顺序比上篇文章中的请求转发过滤器NettyRoutingFilter
小,则在转发过滤器之前执行,所以从这里也可以看出服务化路由是根据服务获取对应的实例ip,然后进行转发。
LoadBalancerClientFilter
@Override
@SuppressWarnings("Duplicates")
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
// 只处理lb协议的转发
if (url == null
|| (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
return chain.filter(exchange);
}
// preserve the original url
addOriginalRequestUrl(exchange, url);
if (log.isTraceEnabled()) {
log.trace("LoadBalancerClientFilter url before: " + url);
}
// 根据服务获取实例
final ServiceInstance instance = choose(exchange);
if (instance == null) {
throw NotFoundException.create(properties.isUse404(),
"Unable to find instance for " + url.getHost());
}
URI uri = exchange.getRequest().getURI();
// if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,
// if the loadbalancer doesn't provide one.
String overrideScheme = instance.isSecure() ? "https" : "http";
if (schemePrefix != null) {
overrideScheme = url.getScheme();
}
// 将lb替换成http,后面接实例的ip
URI requestUrl = loadBalancer.reconstructURI(
new DelegatingServiceInstance(instance, overrideScheme), uri);
if (log.isTraceEnabled()) {
log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
}
// GATEWAY_REQUEST_URL_ATTR属性替换成真正请求的http路径
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
return chain.filter(exchange);
}
这个过滤器的逻辑,就是从服务发现中获取对应服务的实例,替换GATEWAY_REQUEST_URL_ATTR属性为实例ip。
重点关注choose()
选择实例逻辑,根据上面流程图跟踪代码可知,最终从BaseLoadBalancer.allServerList
属性获取所有实例,然后再根据负载均衡策略进行筛选。
2. 何处保存实例到allServerList
在启动的时候会初始化DynamicServerListLoadBalancer
DynamicServerListLoadBalancer
public DynamicServerListLoadBalancer(IClientConfig clientConfig) {
this.isSecure = false;
this.useTunnel = false;
this.serverListUpdateInProgress = new AtomicBoolean(false);
// 更新实例任务
class NamelessClass_1 implements UpdateAction {
NamelessClass_1() {
}
public void doUpdate() {
DynamicServerListLoadBalancer.this.updateListOfServers();
}
}
this.updateAction = new NamelessClass_1();
this.initWithNiwsConfig(clientConfig);
}
public void initWithNiwsConfig(IClientConfig clientConfig) {
try {
super.initWithNiwsConfig(clientConfig);
String niwsServerListClassName = clientConfig.getPropertyAsString(CommonClientConfigKey.NIWSServerListClassName, "com.netflix.loadbalancer.ConfigurationBasedServerList");
ServerList<T> niwsServerListImpl = (ServerList)ClientFactory.instantiateInstanceWithClientConfig(niwsServerListClassName, clientConfig);
this.serverListImpl = niwsServerListImpl;
if (niwsServerListImpl instanceof AbstractServerList) {
AbstractServerListFilter<T> niwsFilter = ((AbstractServerList)niwsServerListImpl).getFilterImpl(clientConfig);
niwsFilter.setLoadBalancerStats(this.getLoadBalancerStats());
this.filter = niwsFilter;
}
String serverListUpdaterClassName = clientConfig.getPropertyAsString(CommonClientConfigKey.ServerListUpdaterClassName, "com.netflix.loadbalancer.PollingServerListUpdater");
this.serverListUpdater = (ServerListUpdater)ClientFactory.instantiateInstanceWithClientConfig(serverListUpdaterClassName, clientConfig);
this.restOfInit(clientConfig);
} catch (Exception var5) {
throw new RuntimeException("Exception while initializing NIWSDiscoveryLoadBalancer:" + clientConfig.getClientName() + ", niwsClientConfig:" + clientConfig, var5);
}
}
void restOfInit(IClientConfig clientConfig) {
boolean primeConnection = this.isEnablePrimingConnections();
this.setEnablePrimingConnections(false);
// 开启定时任务周期性更新实例
this.enableAndInitLearnNewServersFeature();
// 初始化实例
this.updateListOfServers();
if (primeConnection && this.getPrimeConnections() != null) {
this.getPrimeConnections().primeConnections(this.getReachableServers());
}
this.setEnablePrimingConnections(primeConnection);
LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}
该构造方法中主要做了更新实例任务的定义、初始化实例集合、开启定时任务更新实例。
更新实例任务定义,初始化实例集合:
DynamicServerListLoadBalancer
volatile ServerList<T> serverListImpl;
class NamelessClass_1 implements UpdateAction {
NamelessClass_1() {
}
public void doUpdate() {
DynamicServerListLoadBalancer.this.updateListOfServers();
}
}
@VisibleForTesting
public void updateListOfServers() {
List<T> servers = new ArrayList();
if (this.serverListImpl != null) {
// 调用ServerList接口获取所有实例,根据不同的注册中心有不同实现
servers = this.serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}", this.getIdentifier(), servers);
if (this.filter != null) {
servers = this.filter.getFilteredListOfServers((List)servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}", this.getIdentifier(), servers);
}
}
this.updateAllServerList((List)servers);
}
protected void updateAllServerList(List<T> ls) {
// CAS防止并发写入
if (this.serverListUpdateInProgress.compareAndSet(false, true)) {
try {
Iterator var2 = ls.iterator();
while(var2.hasNext()) {
T s = (Server)var2.next();
s.setAlive(true);
}
// 更新allServerList属性
this.setServersList(ls);
super.forceQuickPing();
} finally {
this.serverListUpdateInProgress.set(false);
}
}
}
ServerList
接口根据不同注册中心会有不同实现,比如集成Nacos
,实现类为NacosServerList
,作用是调用远程服务获取实例。
定时更新allServerList属性:
既然集成服务发现,那就肯定有服务的上下线监听,这里采用定时拉取的方法
DynamicServerListLoadBalancer
protected volatile ServerListUpdater serverListUpdater;
public void enableAndInitLearnNewServersFeature() {
LOGGER.info("Using serverListUpdater {}", this.serverListUpdater.getClass().getSimpleName());
// 开启定时任务更新实例
this.serverListUpdater.start(this.updateAction);
}
public synchronized void start(final UpdateAction updateAction) {
if (this.isActive.compareAndSet(false, true)) {
Runnable wrapperRunnable = new Runnable() {
public void run() {
if (!PollingServerListUpdater.this.isActive.get()) {
if (PollingServerListUpdater.this.scheduledFuture != null) {
PollingServerListUpdater.this.scheduledFuture.cancel(true);
}
} else {
try {
// 调用updateListOfServers()
updateAction.doUpdate();
PollingServerListUpdater.this.lastUpdated = System.currentTimeMillis();
} catch (Exception var2) {
PollingServerListUpdater.logger.warn("Failed one update cycle", var2);
}
}
}
};
// 开启定时任务线程池
this.scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(wrapperRunnable, this.initialDelayMs, this.refreshIntervalMs, TimeUnit.MILLISECONDS);
} else {
logger.info("Already active, no-op");
}
}
好了,到站下车,未来可期…
欢迎同频共振的那一部分人
作者公众号:Tarzan写bug
原文始发于微信公众号(Tarzan写bug):SpringCloud Gateway——服务化路由
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/45896.html