系列文章目录
二.SpringCloud源码剖析-Eureka Client 初始化过程
五.SpringCloud源码剖析-Eureka Client服务续约
六.SpringCloud源码剖析-Eureka Client取消注册
七.SpringCloud源码剖析-Eureka Server的自动配置
八.SpringCloud源码剖析-Eureka Server初始化流程
九.SpringCloud源码剖析-Eureka Server服务注册流程
十.SpringCloud源码剖析-Eureka Server服务续约
十一.SpringCloud源码剖析-Eureka Server服务注册表拉取
十二.SpringCloud源码剖析-Eureka Server服务剔除
十三.SpringCloud源码剖析-Eureka Server服务下线
前言
本片文章的目的是分析Eureka Server的注册流程,您可以结合《Eureka Client服务注册》更容易理解
Eureka Server服务注册流程
在《Eureka Server初始化流程》 文章中我们知道,在EurekaServerAutoConfiguration中注册了JerseyFilter用来处理所有的/eureka开头的请求,当Eureka Client客户端发起注册请求,请求被该Filter接待
/**
* Register the Jersey filter
* 注册Jersey filter
*/
@Bean
public FilterRegistrationBean jerseyFilterRegistration(
javax.ws.rs.core.Application eurekaJerseyApp) {
FilterRegistrationBean bean = new FilterRegistrationBean();
//ServletContainer 是核心处理类
bean.setFilter(new ServletContainer(eurekaJerseyApp));
bean.setOrder(Ordered.LOWEST_PRECEDENCE);
//处理的请求/eureka/**
bean.setUrlPatterns(
Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));
return bean;
}
具体的实现是在ServletContainer中完成
//处理请求
private void doFilter(HttpServletRequest request, HttpServletResponse response, FilterChain chain, String requestURI, String servletPath, String queryString) throws IOException, ServletException {
Pattern p = this.getStaticContentPattern();
//处理 ContextPath 上下文路径
if (p != null && p.matcher(servletPath).matches()) {
chain.doFilter(request, response);
} else {
if (this.filterContextPath != null) {
if (!servletPath.startsWith(this.filterContextPath)) {
throw new ContainerException("The servlet path, \"" + servletPath + "\", does not start with the filter context path, \"" + this.filterContextPath + "\"");
}
if (servletPath.length() == this.filterContextPath.length()) {
if (this.webComponent.getResourceConfig().getFeature("com.sun.jersey.config.feature.Redirect")) {
URI l = UriBuilder.fromUri(request.getRequestURL().toString()).path("/").replaceQuery(queryString).build(new Object[0]);
response.setStatus(307);
response.setHeader("Location", l.toASCIIString());
return;
}
requestURI = requestURI + "/";
}
}
UriBuilder absoluteUriBuilder = UriBuilder.fromUri(request.getRequestURL().toString());
URI baseUri = this.filterContextPath == null ? absoluteUriBuilder.replacePath(request.getContextPath()).path("/").build(new Object[0]) : absoluteUriBuilder.replacePath(request.getContextPath()).path(this.filterContextPath).path("/").build(new Object[0]);
URI requestUri = absoluteUriBuilder.replacePath(requestURI).replaceQuery(queryString).build(new Object[0]);
//这里调用service方法
int status = this.service(baseUri, requestUri, request, response);
if (this.forwardOn404 && status == 404 && !response.isCommitted()) {
response.setStatus(200);
chain.doFilter(request, response);
}
}
}
// 处理请求,调用WebComponent的service方法
public int service(URI baseUri, URI requestUri, HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
return this.webComponent.service(baseUri, requestUri, request, response);
}
最终会调用ApplicationResource实现服务注册 , Eureka Server对于Eureka client注册服务实例,获取服务实例的的REST请求的都交给ApplicationResource处理,其中用来服务注册的方法是addInstance,我们来看一下他的源码
/**
服务实例注册,InstanceInfo是服务注册信息,isReplicationd为true代表是从其他Eureka Server节点复制实例,如果是isReplication为false,代表是Eureka Client 注册的
* Registers information about a particular instance for an
* {@link com.netflix.discovery.shared.Application}.
*
* @param info
* {@link InstanceInfo} information of the instance.
* @param isReplication
* a header parameter containing information whether this is
* replicated from other nodes.
*/
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
//验证instanceinfo包含所有必需的必填字段
// validate that the instanceinfo contains all the necessary required fields
if (isBlank(info.getId())) {
return Response.status(400).entity("Missing instanceId").build();
} else if (isBlank(info.getHostName())) {
return Response.status(400).entity("Missing hostname").build();
} else if (isBlank(info.getIPAddr())) {
return Response.status(400).entity("Missing ip address").build();
} else if (isBlank(info.getAppName())) {
return Response.status(400).entity("Missing appName").build();
} else if (!appName.equals(info.getAppName())) {
return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
} else if (info.getDataCenterInfo() == null) {
return Response.status(400).entity("Missing dataCenterInfo").build();
} else if (info.getDataCenterInfo().getName() == null) {
return Response.status(400).entity("Missing dataCenterInfo Name").build();
}
//处理客户端可能在数据缺失的情况下向错误的DataCenterInfo注册的情况
// handle cases where clients may be registering with bad DataCenterInfo with missing data
DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
if (dataCenterInfo instanceof UniqueIdentifier) {
String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
if (isBlank(dataCenterInfoId)) {
boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
if (experimental) {
String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
return Response.status(400).entity(entity).build();
} else if (dataCenterInfo instanceof AmazonInfo) {
AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
if (effectiveId == null) {
amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
}
} else {
logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
}
}
}
//【重要】:这里在调用PeerAwareInstanceRegistry的register注册服务,使用的是实现类:InstanceRegistry
registry.register(info, "true".equals(isReplication));
return Response.status(204).build(); // 204 to be backwards compatible
}
ApplicationResource.addInstance看方法名就能推测出他是用来注册实例的方法,其中参数InstanceInfo 是客户端提交的注册信息,请求头中isReplicationd为true代表是从其他Eureka Server节点复制实例,如果是isReplication为false,代表是Eureka Client 注册的
在做了一些列参数判断之后,这里在调用PeerAwareInstanceRegistry的register注册服务,使用的是实现类:InstanceRegistry,这个类在之前有介绍过,就是Eureak Server用来实现服务注册,服务发现,服务续约,取消注册等的具体实现,他的继承关系如下:
跟踪下去,InstanceRegistry .register方法源码如下
public class InstanceRegistry extends PeerAwareInstanceRegistryImpl implements ApplicationContextAware {
public void register(final InstanceInfo info, final boolean isReplication) {
//调用handleRegistration方法,抛出事件:EurekaInstanceRegisteredEvent
this.handleRegistration(info, this.resolveInstanceLeaseDuration(info), isReplication);
//调用父类PeerAwareInstanceRegistryImpl的register方法
super.register(info, isReplication);
}
//服务注册,抛出EurekaInstanceRegisteredEvent事件
private void handleRegistration(InstanceInfo info, int leaseDuration, boolean isReplication) {
this.log("register " + info.getAppName() + ", vip " + info.getVIPAddress() + ", leaseDuration " + leaseDuration + ", isReplication " + isReplication);
this.publishEvent(new EurekaInstanceRegisteredEvent(this, info, leaseDuration, isReplication));
}
...省略...
}
这里在调用handleRegistration方法,抛出事件:EurekaInstanceRegisteredEvent后继续调用了super.register方法,即:PeerAwareInstanceRegistryImpl.register,继续跟踪下去:
@Singleton
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {
/**
注册服务信息 InstanceInfo,并将此信息InstanceInfo复制到所有对等的eureka server节点。
如果这是来自其他副本节点的复制事件,则不会复制它。
* Registers the information about the {@link InstanceInfo} and replicates
* this information to all peer eureka nodes. If this is replication event
* from other replica nodes then it is not replicated.
*
* @param info
* the {@link InstanceInfo} to be registered and replicated.
* @param isReplication
* true if this is a replication event from other replica nodes,
* false otherwise.
*/
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
//租期失效时间 90 s
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
//如果服务的租期失效时间大于默认的90s,则重新赋值租期时间
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
//服务注册
super.register(info, leaseDuration, isReplication);
//把注册的服务信息复制到其他的Eureka Server 节点,注意这里是Action.Register
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
}
该方法做了3个事情,
- 1是更新租期失效时间,
- 2是调用super.register服务注册(AbstractInstanceRegistry.register)
- 3是调用replicateToPeers把服务实例拷贝到其他的Eureak Server节点
我们先看下super.register方法即:AbstractInstanceRegistry.register方法的源码
/**
* Registers a new instance with a given duration.
*
* @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)
*/
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
//获取锁:ReentrantReadWriteLock.lock()
read.lock();
//根据注册的服务的名字取本地服务注册表中获取服务注册信息,如果该服务已经被注册了,那么registry中将会存在它
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
if (gMap == null) {
//如果该服务实例没被注册,就把服务实例注册到本地的registry中,本质是一个Map
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
//没有的话就添加到registry中
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
//根据服务实例id获取服务的租约对象
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it, if there is already a lease
//如果已经有租约,则保留最后的脏时间戳而不覆盖它
if (existingLease != null && (existingLease.getHolder() != null)) {
//registry中已经存在的当前服务的最后修改时间的时间戳
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
//提交注册的当前服务的最后修改时间
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
//因为如果时间戳相等,我们仍然采用远程传输的InstanceInfo而不是服务器本地副本
// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
// InstanceInfo instead of the server local copy.
//如果已存在的该服务的修改时间 大于 当前提交注册的该服务的最后修改时间,
//则采用registy中已存在的服务为准,因为要选择修改时间靠后的
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
//使用现有的instanceInfo代替新的instanceInfo作为注册者
registrant = existingLease.getHolder();
}
} else {
//执行到这里,说明该服务是新注册
// The lease does not exist and hence it is a new registration
synchronized (lock) {
//这里在计算服务的续约频率值
if (this.expectedNumberOfRenewsPerMin > 0) {
// Since the client wants to cancel it, reduce the threshold
// (1
// for 30 seconds, 2 for a minute)
//(expectedNumberOfRenewsPerMin)期待的每分钟续订次数,默认是30s/个,给他增加到2,每分钟2个请求
this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
//修改numberOfRenewsPerMinThreshold每分钟续约阀值 = 2 *(85%),
//RenewalPercentThreshold是获取续订阈值百分比
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
}
}
logger.debug("No previous lease information found; it is new registration");
}
//创建租约对象,把注册实例和租期放进去
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
//设置服务上线时间
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
//以注册的实例的ID为key把服务实例存封装到 Map
gMap.put(registrant.getId(), lease);
//添加到注册队列
synchronized (recentRegisteredQueue) {
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
}
// This is where the initial state transfer of overridden status happens
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
//添加服务的OverriddenStatus
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
//根据覆盖的状态规则设置状态
// Set the status based on the overridden status rules
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// If the lease is registered with UP status, set lease service up timestamp
//如果租约已注册为UP状态,请设置租约服务启动时间戳记
if (InstanceStatus.UP.equals(registrant.getStatus())) {
//更新服务上线时间戳
lease.serviceUp();
}
//服务动作:ADDED添加,MODIFIED修改,DELETED删除
registrant.setActionType(ActionType.ADDED);
//添加到最近更改的队列
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
//最后更新时间
registrant.setLastUpdatedTimestamp();
//使当前应用的ResponseCache失效
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
read.unlock();
}
}
总结一下,这个方法做了什么呢
- 判断当前服务是否已经被注册,如果是,则以最后更新时间为准,选择更新时间靠后的服务实例进行注册
- 维护实例的租约信息Lease,并放到Eureka Server本地维护维护的registry注册表中,本质是一个Map(ConcurrentHashMap<String, Map<String, Lease>>)
- 如果是服务是新注册的,把注册的实例封装成Leaset存储到registry注册表中,并更新每分钟续约阀值numberOfRenewsPerMinThreshold
- 维护了两个队列,recentRegisteredQueue最近注册队列,recentlyChangedQueue最近更改队列,这个队列可以用来获取最近操作的信息。
- 维护当前实例的OverriddenStatus
- 更新服务实例的最后更新时间戳
- 使ResponseCache缓存失效
该方法结束,我们回到PeerAwareInstanceRegistryImpl.register方法中,继续跟踪replicateToPeers 方法
@Singleton
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {
/**
* Replicates all eureka actions to peer eureka nodes except for replication
* traffic to this node.
* 将所有eureka操作复制到对等eureka节点
*/
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
//开始计时
Stopwatch tracer = action.getTimer().start();
try {
//是否是其他节点复制过来的
if (isReplication) {
//最后一分钟的复制次数+1
numberOfReplicationsLastMin.increment();
}
// If it is a replication already, do not replicate again as this will create a poison replication
//如果已经是复制,则不要再次复制
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
//遍历集群所有节点
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
//如果该URL代表此主机,请不要复制到您自己,当前节点不复制
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
//复制实例到其他某个Eureka
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}
继续跟踪replicateInstanceActionsToPeers的源码
/**
* Replicates all instance changes to peer eureka nodes except for
* replication traffic to this node.
*
*/
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry = null;
CurrentRequestVersion.set(Version.V2);
//判断请求的是什么操作
switch (action) {
//取消注册,调用 PeerEurekaNode.cancel
case Cancel:
node.cancel(appName, id);
break;
//心跳请求,调用PeerEurekaNode.heartbeat
case Heartbeat:
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
//服务注册调用PeerEurekaNode.register
case Register:
node.register(info);
break;
//状态修改调用PeerEurekaNode.statusUpdate
case StatusUpdate:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
//状态删除调用PeerEurekaNode.deleteStatusOverride
case DeleteStatusOverride:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
}
}
这里在根据请求的动作类型选择PeerEurekaNode的不同方法,我们这里是服务注册.register,调用的是 PeerEurekaNode.register ,源码如下:
/**
* Sends the registration information of {@link InstanceInfo} receiving by
* this node to the peer node represented by this class.
*
* @param info
* the instance information {@link InstanceInfo} of any instance
* that is send to this instance.
* @throws Exception
*/
public void register(final InstanceInfo info) throws Exception {
//到期时间当,前时间加上30s过期
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
//封装InstanceReplicationTask 实例赋值任务到调度器中
batchingDispatcher.process(
taskId("register", info),
new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
public EurekaHttpResponse<Void> execute() {
//复制器客户端 HttpReplicationClient(JerseyReplicationClient),执行注册
//调用AbstractJerseyEurekaHttpClient#register
return replicationClient.register(info);
}
},
expiryTime
);
}
把InstanceInfo注册实例封装成InstanceReplicationTask实例复制任务,交给batchingDispatcher批量任务调度器去执行,replicationClient是HttpReplicationClient它的默认实现是JerseyEurekaHttpClient,底层会调用AbstractJerseyEurekaHttpClient#register的方法完成实例的注册,这里其实就和当时我们分析Eureka Client 服务注册的最后注册请求一样了
public abstract class AbstractJerseyEurekaHttpClient implements EurekaHttpClient {
public EurekaHttpResponse<Void> register(InstanceInfo info) {
//请求地址
String urlPath = "apps/" + info.getAppName();
ClientResponse response = null;
EurekaHttpResponse var5;
try {
Builder resourceBuilder = this.jerseyClient.resource(this.serviceUrl).path(urlPath).getRequestBuilder();
this.addExtraHeaders(resourceBuilder);
//把InstanceInfo作为参数,发送post请求提交服务注册
response = (ClientResponse)((Builder)((Builder)((Builder)resourceBuilder.header("Accept-Encoding", "gzip")).type(MediaType.APPLICATION_JSON_TYPE)).accept(new String[]{"application/json"})).post(ClientResponse.class, info);
var5 = EurekaHttpResponse.anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", new Object[]{this.serviceUrl, urlPath, info.getId(), response == null ? "N/A" : response.getStatus()});
}
if (response != null) {
response.close();
}
}
return var5;
}
到这里就结束了,在PeerEurekaNode中封装InstanceReplicationTask实例服务任务,通过EurekaHttpClient去发起请求(JerseyEurekaHttpClient),最终通过JerseyEurekaHttpClient父类AbstractJerseyEurekaHttpClient#register方法注册,方法中把InstanceInfo作为参数使用POST提交请求。
总结
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/149326.html