系列文章目录
二.SpringCloud源码剖析-Eureka Client 初始化过程
五.SpringCloud源码剖析-Eureka Client服务续约
六.SpringCloud源码剖析-Eureka Client取消注册
七.SpringCloud源码剖析-Eureka Server的自动配置
八.SpringCloud源码剖析-Eureka Server初始化流程
九.SpringCloud源码剖析-Eureka Server服务注册流程
十.SpringCloud源码剖析-Eureka Server服务续约
十一.SpringCloud源码剖析-Eureka Server服务注册表拉取
十二.SpringCloud源码剖析-Eureka Server服务剔除
十三.SpringCloud源码剖析-Eureka Server服务下线
前言
这一章我们来分析一下Eureka Server 服务注册表的拉取流程,请结合《Eureka Client服务发现》
在《Eureka Client服务发现》我们分析了,客户端会通过两种方式从服务端拉取注册表,在客户端系统启动的时候会进行全量拉取,随后默认30s/次会进行差异更新,那么在Eureka Server 服务端是如何处理服务注册表全量拉取和差异更新的呢?
全量拉取
Eureka Client向Eureka Server发请求,拉取服务注册表,Server端还是通过ServeltContainer接待请求,最终交给com.netflix.eureka.resources.ApplicationsResource#getContainers处理
/**
返回所有的应用
* Get information about all {@link com.netflix.discovery.shared.Applications}.
*/
@GET
public Response getContainers(@PathParam("version") String version,
@HeaderParam(HEADER_ACCEPT) String acceptHeader,
@HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
@Context UriInfo uriInfo,
@Nullable @QueryParam("regions") String regionsStr) {
boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
String[] regions = null;
if (!isRemoteRegionRequested) {
//注册表全量拉取统计计数增加
EurekaMonitors.GET_ALL.increment();
} else {
regions = regionsStr.toLowerCase().split(",");
Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
}
// Check if the server allows the access to the registry. The server can
// restrict access if it is not
// ready to serve traffic depending on various reasons.
//检查服务端时候准备好可以被访问
if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
return Response.status(Status.FORBIDDEN).build();
}
CurrentRequestVersion.set(Version.toEnum(version));
//处理返回的数据类型默认JSON
KeyType keyType = Key.KeyType.JSON;
String returnMediaType = MediaType.APPLICATION_JSON;
if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
//请求头么有指定格式,返回XML格式
keyType = Key.KeyType.XML;
returnMediaType = MediaType.APPLICATION_XML;
}
//创建缓存key
Key cacheKey = new Key(Key.EntityType.Application,
ResponseCacheImpl.ALL_APPS, //通过ALL_APPS构建key
keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
);
Response response;
//这里判断是否是GZIP格式,返回结果的编码类型不一样,获取方式是一致的
if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
//如果格式是gzip,调用responseCache.getGZIP(cacheKey)获取
//底层会从一个ConcurrentMap<Key, Value> readOnlyCacheMap 只读缓存中去获取全量注册表
response = Response.ok(responseCache.getGZIP(cacheKey))
.header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
.header(HEADER_CONTENT_TYPE, returnMediaType)
.build();
} else {
//普通获取responseCache.get(cacheKey)
//底层会从一个ConcurrentMap<Key, Value> readOnlyCacheMap 只读缓存中去获取全量注册表
response = Response.ok(responseCache.get(cacheKey))
.build();
}
return response;
}
responseCache.getGZIP(cacheKey)最终会调用 com.netflix.eureka.registry.ResponseCacheImpl#getValue
/**
* Get the payload in both compressed and uncompressed form.
*/
@VisibleForTesting
Value getValue(final Key key, boolean useReadOnlyCache) {
Value payload = null;
try {
if (useReadOnlyCache) {
//从只读缓存中获取
final Value currentPayload = readOnlyCacheMap.get(key);
if (currentPayload != null) {
payload = currentPayload;
} else {
//如果只读缓存中获取不到,从读写缓存中获取
payload = readWriteCacheMap.get(key);
readOnlyCacheMap.put(key, payload);
}
} else {
payload = readWriteCacheMap.get(key);
}
} catch (Throwable t) {
logger.error("Cannot get value for key : {}", key, t);
}
return payload;
}
差异更新
差异更新也在ApplicationsResource中:com.netflix.eureka.resources.ApplicationsResource#getContainerDifferential,源码如下
/**
获取Applications服务注册表中有改变的服务,注册,取消,状态更改和过期都会造成服务的改变
* Get information about all delta changes in {@link com.netflix.discovery.shared.Applications}.
*
* <p>
* The delta changes represent the registry information change for a period
* as configured by
* {@link EurekaServerConfig#getRetentionTimeInMSInDeltaQueue()}. The
* changes that can happen in a registry include
* <em>Registrations,Cancels,Status Changes and Expirations</em>. Normally
* the changes to the registry are infrequent and hence getting just the
* delta will be much more efficient than getting the complete registry.
* </p>
*
* <p>
* Since the delta information is cached over a period of time, the requests
* may return the same data multiple times within the window configured by
* {@link EurekaServerConfig#getRetentionTimeInMSInDeltaQueue()}.The clients
* are expected to handle this duplicate information.
* <p>
*
* @param version the version of the request.
* @param acceptHeader the accept header to indicate whether to serve JSON or XML data.
* @param acceptEncoding the accept header to indicate whether to serve compressed or uncompressed data.
* @param eurekaAccept an eureka accept extension, see {@link com.netflix.appinfo.EurekaAccept}
* @param uriInfo the {@link java.net.URI} information of the request made.
* @return response containing the delta information of the
* {@link AbstractInstanceRegistry}.
*/
@Path("delta")
@GET
public Response getContainerDifferential(
@PathParam("version") String version,
@HeaderParam(HEADER_ACCEPT) String acceptHeader,
@HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
@Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) {
boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
// If the delta flag is disabled in discovery or if the lease expiration
// has been disabled, redirect clients to get all instances
//如果禁用了Delta注册表差异化拉取,或者服务不可访问,返回拒绝
if ((serverConfig.shouldDisableDelta()) || (!registry.shouldAllowAccess(isRemoteRegionRequested))) {
return Response.status(Status.FORBIDDEN).build();
}
String[] regions = null;
if (!isRemoteRegionRequested) {
EurekaMonitors.GET_ALL_DELTA.increment();
} else {
regions = regionsStr.toLowerCase().split(",");
Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
EurekaMonitors.GET_ALL_DELTA_WITH_REMOTE_REGIONS.increment();
}
CurrentRequestVersion.set(Version.toEnum(version));
//处理反会的数据格式JSON默认
KeyType keyType = Key.KeyType.JSON;
String returnMediaType = MediaType.APPLICATION_JSON;
if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
keyType = Key.KeyType.XML;
returnMediaType = MediaType.APPLICATION_XML;
}
//构建缓存key
Key cacheKey = new Key(Key.EntityType.Application,
ResponseCacheImpl.ALL_APPS_DELTA, //通过ALL_APPS_DELTA构建key
keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
);
if (acceptEncoding != null
&& acceptEncoding.contains(HEADER_GZIP_VALUE)) {
//从responseCache获取内容
return Response.ok(responseCache.getGZIP(cacheKey))
.header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
.header(HEADER_CONTENT_TYPE, returnMediaType)
.build();
} else {
return Response.ok(responseCache.get(cacheKey))
.build();
}
}
总结
Eureka Server 拉取服务注册表的逻辑还是比较简单的,不管是全量拉取,还是差别拉取都是通过ApplicationsResource中处理,然后构建出不同的key,从ResponseCache中去获取服务。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/149324.html