1、接收续约请求
和前面提到服务注册的流程类似,Eureka服务端续约请求接收是InstanceResource类中提供的renewLease()方法实现。该方法主要实现接收客户端或者集群其他节点的服务续约请求,其中包括了以下参数:
- isReplication 表示是否是从其他节点同步数据的操作
- overriddenStatus 重写状态
- status 实例的状态
- lastDirtyTimestamp 实例上次更新时间
@PUT
public Response renewLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("overriddenstatus") String overriddenStatus,
@QueryParam("status") String status,
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
//转成boolean类型,表示是否是数据同步操作
boolean isFromReplicaNode = "true".equals(isReplication);
//调用registry的renew()方法,进行服务续约(核心代码)
boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
// 续约失败时,直接返回响应对象(状态404)
if (!isSuccess) {
logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
return Response.status(Status.NOT_FOUND).build();
}
// 比较当前请求的lastDirtyTimestamp 和实例对象的lastDirtyTimestamp
Response response;
//lastDirtyTimestamp 不为null且syncWhenTimestampDiffers=true时
if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
//验证两个时间大小,并返回对应的响应内容
response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
// 满足四个条件:1、请求时间大于实例的更新时间;2、重写状态不为null;3、重写状态不为UNKNOWN;4、同步操作
if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
&& (overriddenStatus != null)
&& !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
&& isFromReplicaNode) {
//满足要求后,设置instanceInfo实例的重写状态,该操作主要发生在续约请求的数据同步的请求中
registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
}
} else {
response = Response.ok().build();
}
logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());
return response;
}
根据上述方法,我们梳理了接收并处理续约请求的步骤如下:
- 首先调用registry.renew()方法,进行服务续约,并返回是否续约成功的结果isSuccess
- 如果续约失败时,直接返回响应对象(状态为NOT_FOUND)
- 续约成功后,继续判断是否满足(lastDirtyTimestamp!=null && syncWhenTimestampDiffers==true)条件,不满足条件时,则直接构建注册情况响应结果(即lastDirtyTimestamp为空,或不处理不一致情况时)
- 当前面条件满足时,即需要处理请求lastDirtyTimestamp时间戳和当前实例对象中lastDirtyTimestamp时间不一致的情况,首先调用validateDirtyTimestamp()方法,三种返回结果:1、NOT_FOUND,表示请求中的lastDirtyTimestamp大于实例中的lastDirtyTimestamp时间戳;2、CONFLICT,表示实例中的lastDirtyTimestamp大于请求中的lastDirtyTimestamp;3、其他情况,直接方法OK状态。
- 当validateDirtyTimestamp()方法返回NOT_FOUND状态时,且符合其他条件时,就会调用registry.storeOverriddenStatusIfRequired()方法,把请求中的重写状态赋值给当前实例。
- 最后,返回响应结果。
1.2、validateDirtyTimestamp()方法
当满足请求的lastDirtyTimestamp参数 不为null且syncWhenTimestampDiffers=true时,会进入调用该方法的逻辑,该方法主要根据请求时间lastDirtyTimestamp 和实例的lastDirtyTimestamp 时间戳进行比较,并返回对应的响应结果,如果lastDirtyTimestamp > appInfo.getLastDirtyTimestamp(),则返回NOT_FOUND状态,而lastDirtyTimestamp < appInfo.getLastDirtyTimestamp(),则返回CONFLICT状态,其他情况返回OK。
private Response validateDirtyTimestamp(Long lastDirtyTimestamp, boolean isReplication) {
InstanceInfo appInfo = registry.getInstanceByAppAndId(app.getName(), id, false);
if (appInfo != null) {
//请求时间不为空,且与实例中的更新时间不相等时
if ((lastDirtyTimestamp != null) && (!lastDirtyTimestamp.equals(appInfo.getLastDirtyTimestamp()))) {
//构建日志输出需要的参数
Object[] args = {id, appInfo.getLastDirtyTimestamp(), lastDirtyTimestamp, isReplication};
//请求时间lastDirtyTimestamp 大时,直接返回NOT_FOUND
if (lastDirtyTimestamp > appInfo.getLastDirtyTimestamp()) {
logger.debug(
"Time to sync, since the last dirty timestamp differs -"
+ " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}",
args);
return Response.status(Status.NOT_FOUND).build();
} else if (appInfo.getLastDirtyTimestamp() > lastDirtyTimestamp) {//否则返回CONFLICT
if (isReplication) {
logger.debug(
"Time to sync, since the last dirty timestamp differs -"
+ " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}",
args);
return Response.status(Status.CONFLICT).entity(appInfo).build();
} else {//因为外层判断依据处理了相等情况,这里应该进不来才对??!!
return Response.ok().build();
}
}
}
}
return Response.ok().build();
}
1.3、storeOverriddenStatusIfRequired()方法
该方法实际上是在AbstractInstanceRegistry抽象类中定义和实现的,主要用来用来更新实例的重写状态。当满足请求的lastDirtyTimestamp 大于实例的lastDirtyTimestamp时间戳,且重写状态不为空且不为UNKNOWN,且isFromReplicaNode=true(表示是节点间数据同步时)时,执行该方法。该方法主要实现了把当前重写状态存储到overriddenInstanceStatusMap变量中,同时修改实例的重写状态。
@Override
public void storeOverriddenStatusIfRequired(String appName, String id, InstanceStatus overriddenStatus) {
//获取实例的重写状态,overriddenInstanceStatusMap集合存储了所有实例的重写状态
InstanceStatus instanceStatus = overriddenInstanceStatusMap.get(id);
//当实例重写状态不存在或者与要设置的不匹配时
if ((instanceStatus == null) || (!overriddenStatus.equals(instanceStatus))) {
logger.info("Adding overridden status for instance id {} and the value is {}",
id, overriddenStatus.name());
//维护overriddenInstanceStatusMap集合
overriddenInstanceStatusMap.put(id, overriddenStatus);
//获取当前对应的实例对象
InstanceInfo instanceInfo = this.getInstanceByAppAndId(appName, id, false);
//为当前实例设置新的重写对象
instanceInfo.setOverriddenStatus(overriddenStatus);
logger.info("Set the overridden status for instance (appname:{}, id:{}} and the value is {} ",
appName, id, overriddenStatus.name());
}
}
2、InstanceRegistry类的renew()方法
在上述renewLease()方法中,最核心的逻辑其实是调用了registry.renew()方法,该方法用来进行服务续约。这里的registry实例,其实就是InstanceRegistry类(SpringCloud提供)的实例对象,即调用了InstanceRegistry类的renew()方法。具体实现如下:
@Override
public boolean renew(final String appName, final String serverId,
boolean isReplication) {
log("renew " + appName + " serverId " + serverId + ", isReplication {}"
+ isReplication);
//获取Application集合
List<Application> applications = getSortedApplications();
//遍历Application集合,查找符合要求的Application对象 (根据AppName判断)
for (Application input : applications) {
if (input.getName().equals(appName)) {
InstanceInfo instance = null;
//获取Application对象包含的InstanceInfo对象集合,然后通过遍历查找到符合要求的InstanceInfo 对象
for (InstanceInfo info : input.getInstances()) {
if (info.getId().equals(serverId)) {
instance = info;
break;
}
}
//为符合要求的InstanceInfo 对象进行事件广播
publishEvent(new EurekaInstanceRenewedEvent(this, appName, serverId,
instance, isReplication));
break;
}
}
//调用父类PeerAwareInstanceRegistryImpl的续约方法
return super.renew(appName, serverId, isReplication);
}
在InstanceRegistry类的renew()方法中,通过遍历所有的实例对象,然后找到符合要求的对象,发送一个EurekaInstanceRenewedEvent广播事件,用于后续的扩展。
3、PeerAwareInstanceRegistryImpl类的renew()方法
在上述InstanceRegistry类的renew()方法中,最后又调用了super.renew()方法,即调用了PeerAwareInstanceRegistryImpl类的renew()方法。
在该方法中,首先是调用父类AbstractInstanceRegistry的续约方法renew(),如果续约成功,则进行续约事件的数据同步,即通知集群其他节点该续约事件,这里通过调用replicateToPeers()方法实现。
public boolean renew(final String appName, final String id, final boolean isReplication) {
if (super.renew(appName, id, isReplication)) {
replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
return true;
}
return false;
}
在renew()方法中,通过调用replicateToPeers()方法,而在replicateToPeers()方法又调用了replicateInstanceActionsToPeers()方法,这里都是用来做集群节点间数据同步的方法,前面在《Eureka服务端——服务注册》博文中已经学习了。我们这里直接看replicateInstanceActionsToPeers()方法中关于续约(Heartbeat类型)的相关代码如下:
//获取实例对应的重写状态
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
//获取实例对象
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
//通过PeerEurekaNode的heartbeat()方法实现心跳数据同步
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
3.1、PeerEurekaNode类的heartbeat()方法
在方法中,首先创建了InstanceReplicationTask任务,其中调用replicationClient.sendHeartBeat()方法,用来发送数据同步的请求,最后调用batchingDispatcher.process()执行任务。
在创建的InstanceReplicationTask实例对象中,有一个handleFailure()方法用来处理异常结果,即出现statusCode == 404时,调用register(info)进行实例注册。
public void heartbeat(final String appName, final String id,
final InstanceInfo info, final InstanceStatus overriddenStatus,
boolean primeConnection) throws Throwable {
if (primeConnection) {
// We do not care about the result for priming request.
replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
return;
}
ReplicationTask replicationTask = new InstanceReplicationTask(targetHost, Action.Heartbeat, info, overriddenStatus, false) {
@Override
public EurekaHttpResponse<InstanceInfo> execute() throws Throwable {
return replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
}
@Override
public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
super.handleFailure(statusCode, responseEntity);
if (statusCode == 404) {
logger.warn("{}: missing entry.", getTaskName());
if (info != null) {
logger.warn("{}: cannot find instance id {} and hence replicating the instance with status {}",
getTaskName(), info.getId(), info.getStatus());
register(info);
}
} else if (config.shouldSyncWhenTimestampDiffers()) {
InstanceInfo peerInstanceInfo = (InstanceInfo) responseEntity;
if (peerInstanceInfo != null) {
syncInstancesIfTimestampDiffers(appName, id, info, peerInstanceInfo);
}
}
}
};
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
batchingDispatcher.process(taskId("heartbeat", info), replicationTask, expiryTime);
}
其中,关于batchingDispatcher.process()方法,主要用于处理任务InstanceReplicationTask对象,并执行其中的replicationClient.sendHeartBeat()方法,详细请参考《Eureka 源码解析 —— 任务批处理》。
3.2、JerseyReplicationClient类的register()方法
JerseyReplicationClient类的sendHeartBeat()方法,是基于Jersey构建的服务心跳请求。
@Override
public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) {
String urlPath = "apps/" + appName + '/' + id;
ClientResponse response = null;
try {
WebResource webResource = jerseyClient.getClient().resource(serviceUrl)
.path(urlPath)
.queryParam("status", info.getStatus().toString())
.queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
if (overriddenStatus != null) {
webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());
}
Builder requestBuilder = webResource.getRequestBuilder();
addExtraHeaders(requestBuilder);
response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).put(ClientResponse.class);
InstanceInfo infoFromPeer = null;
if (response.getStatus() == Status.CONFLICT.getStatusCode() && response.hasEntity()) {
infoFromPeer = response.getEntity(InstanceInfo.class);
}
return anEurekaHttpResponse(response.getStatus(), infoFromPeer).type(MediaType.APPLICATION_JSON_TYPE).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("[heartbeat] Jersey HTTP PUT {}; statusCode={}", urlPath, response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
4、AbstractInstanceRegistry类的renew()方法
在PeerAwareInstanceRegistryImpl类的renew()方法中调用的super.renew()方法,其实就是调用的AbstractInstanceRegistry类的renew()方法,是最后真正进行服务续约的地方。
public boolean renew(String appName, String id, boolean isReplication) {
//EurekaMonitors计数
RENEW.increment(isReplication);
//获取appName对应应用的InstanceInfo集合
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToRenew = null;
if (gMap != null) {//获取需要进行续约的Lease<InstanceInfo>对象
leaseToRenew = gMap.get(id);
}
//如果续约对象为空,则直接返回
if (leaseToRenew == null) {
RENEW_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
return false;
} else {
//获取需要续约的InstanceInfo 对象
InstanceInfo instanceInfo = leaseToRenew.getHolder();
if (instanceInfo != null) {
// 根据指定规则,获取续约对象的重写状态
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);
//如果重写状态为UNKNOWN,则直接返回
if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
+ "; re-register required", instanceInfo.getId());
RENEW_NOT_FOUND.increment(isReplication);
return false;
}
//如果当前instanceInfo实例的状态和overriddenInstanceStatus不匹配的话,设置重写状态
if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
logger.info(
"The instance status {} is different from overridden instance status {} for instance {}. "
+ "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
instanceInfo.getOverriddenStatus().name(),
instanceInfo.getId());
instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
}
}
//流控
renewsLastMin.increment();
//更新时间
leaseToRenew.renew();
return true;
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/68765.html