1、状态更新
服务端InstanceInfo对象状态更新的流程和续约流程类似,不过该过程不再经过SpringCloud提供的InstanceRegistry类了,即SpringCloud没有再对状态更新过程进行扩展,即没有增加状态变更事件的广播。基本流程如下:
- InstanceResource类的statusUpdate()方法,是状态更新的入口,用来处理状态更新请求。
- 然后,调用PeerAwareInstanceRegistryImpl类的statusUpdate()方法,在该过程主要实现状态更新事件同步到集群中的其他节点
- 最后,调用AbstractInstanceRegistry类的statusUpdate()方法,该方法就是更新实例状态的真正方法。
1.1、InstanceResource类的statusUpdate()方法
InstanceResource类的statusUpdate()方法,是状态更新的入口,主要用来处理状态更新请求,包括客户端请求和集群节点间数据同步请求。
@Path("status")
@PUT
public Response statusUpdate(
@QueryParam("value") String newStatus,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
try {
//没有对应的InstanceInfo对象信息,则直接返回NOT_FOUND状态的响应
if (registry.getInstanceByAppAndId(app.getName(), id) == null) {
logger.warn("Instance not found: {}/{}", app.getName(), id);
return Response.status(Status.NOT_FOUND).build();
}
//核心逻辑,调用PeerAwareInstanceRegistryImpl对象的状态更新方法
boolean isSuccess = registry.statusUpdate(app.getName(), id,
InstanceStatus.valueOf(newStatus), lastDirtyTimestamp,
"true".equals(isReplication));
//处理返回结果
if (isSuccess) {
logger.info("Status updated: {} - {} - {}", app.getName(), id, newStatus);
return Response.ok().build();
} else {
logger.warn("Unable to update status: {} - {} - {}", app.getName(), id, newStatus);
return Response.serverError().build();
}
} catch (Throwable e) {
logger.error("Error updating instance {} for status {}", id,
newStatus);
return Response.serverError().build();
}
}
1.2、PeerAwareInstanceRegistryImpl类的statusUpdate()方法
PeerAwareInstanceRegistryImpl类的statusUpdate()方法,和续约、取消租约等操作的逻辑类似,先是调用父类AbstractInstanceRegistry的statusUpdate()方法,如果返回true时,再调用replicateToPeers()方法进行集群节点之间的数据同步。
@Override
public boolean statusUpdate(final String appName, final String id, final InstanceStatus newStatus, String lastDirtyTimestamp, final boolean isReplication) {
if (super.statusUpdate(appName, id, newStatus, lastDirtyTimestamp, isReplication)) {
replicateToPeers(Action.StatusUpdate, appName, id, null, newStatus, isReplication);
return true;
}
return false;
}
在statusUpdate()方法中,通过调用replicateToPeers()方法,而在replicateToPeers()方法又调用了replicateInstanceActionsToPeers()方法,这里都是用来做集群节点间数据同步的方法,前面在《Eureka服务端——服务注册》博文中已经学习了。我们这里直接看replicateInstanceActionsToPeers()方法中关于状态更新的相关代码如下:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
1.2.1、PeerEurekaNode的statusUpdate()方法
进行状态更新的同步操作是通过PeerEurekaNode的statusUpdate()方法实现,在该方法中,首先构建了同步任务InstanceReplicationTask,然后再通过batchingDispatcher.process()方法执行该任务,具体实现如下:
public void statusUpdate(final String appName, final String id, final InstanceStatus newStatus, final InstanceInfo info) {
long expiryTime = System.currentTimeMillis() + maxProcessingDelayMs;
batchingDispatcher.process(
taskId("statusUpdate", appName, id),
new InstanceReplicationTask(targetHost, Action.StatusUpdate, info, null, false) {
@Override
public EurekaHttpResponse<Void> execute() {
return replicationClient.statusUpdate(appName, id, newStatus, info);
}
},
expiryTime
);
}
其中,关于batchingDispatcher.process()方法,主要用于处理任务InstanceReplicationTask对象,并执行其中的replicationClient.cancel()方法,详细请参考《Eureka 源码解析 —— 任务批处理》。
1.2.2、AbstractJerseyEurekaHttpClient类的statusUpdate()方法
AbstractJerseyEurekaHttpClient类的cancel()方法,是基于Jersey构建的状态更新请求。
@Override
public EurekaHttpResponse<Void> statusUpdate(String appName, String id, InstanceStatus newStatus, InstanceInfo info) {
String urlPath = "apps/" + appName + '/' + id + "/status";
ClientResponse response = null;
try {
Builder requestBuilder = jerseyClient.resource(serviceUrl)
.path(urlPath)
.queryParam("value", newStatus.name())
.queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString())
.getRequestBuilder();
addExtraHeaders(requestBuilder);
response = requestBuilder.put(ClientResponse.class);
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP PUT {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
1.3、AbstractInstanceRegistry类的statusUpdate()方法
AbstractInstanceRegistry类的statusUpdate()方法是真正实现InstanceInfo对象状态更新的方法,具体实现如下:
@Override
public boolean statusUpdate(String appName, String id, InstanceStatus newStatus, String lastDirtyTimestamp, boolean isReplication) {
try {
read.lock();
//EurekaMonitors计数
STATUS_UPDATE.increment(isReplication);
//获取appName对应应用的InstanceInfo集合
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> lease = null;
if (gMap != null) {//获取需要更新状态的Lease<InstanceInfo>对象
lease = gMap.get(id);
}
if (lease == null) {//为null时,直接返回false
return false;
} else {
//更新状态的时候,同时会更新lastUpdateTimestamp
lease.renew();
//获取租约持有的InstanceInfo 对象
InstanceInfo info = lease.getHolder();
//当InstanceInfo 为null事,直接打印错误日志
if (info == null) {
logger.error("Found Lease without a holder for instance id {}", id);
}
//当InstanceInfo 对象不为null,且InstanceInfo 对象状态发送了变化
if ((info != null) && !(info.getStatus().equals(newStatus))) {
// 如果变为UP状态,则更新serviceUpTimestamp时间
if (InstanceStatus.UP.equals(newStatus)) {
lease.serviceUp();
}
// 在overriddenInstanceStatusMap中记录当前实例的状态
overriddenInstanceStatusMap.put(id, newStatus);
//更新实例的重写状态
info.setOverriddenStatus(newStatus);
long replicaDirtyTimestamp = 0;
//更新实例的状态
info.setStatusWithoutDirty(newStatus);
if (lastDirtyTimestamp != null) {
replicaDirtyTimestamp = Long.valueOf(lastDirtyTimestamp);
}
// 更新时间
if (replicaDirtyTimestamp > info.getLastDirtyTimestamp()) {
info.setLastDirtyTimestamp(replicaDirtyTimestamp);
}
//设置操作类型
info.setActionType(ActionType.MODIFIED);
//在recentlyChangedQueue队列中记录变化记录,主要用于统计和debug
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
//更新lastUpdatedTimestamp时间
info.setLastUpdatedTimestamp();
//使得缓存失效
invalidateCache(appName, info.getVIPAddress(), info.getSecureVipAddress());
}
return true;
}
} finally {
read.unlock();
}
}
2、删除状态更新
服务端删除状态覆盖的流程和状态更新流程类似,所以这里只是简单的分析了其特有的几个关键方法。
2.1、工作流程
基本流程如下:
- InstanceResource类的deleteStatusUpdate()方法,是删除状态覆盖的入口,用来处理删除状态覆盖的请求。
- 然后,调用PeerAwareInstanceRegistryImpl类的deleteStatusOverride()方法,在该过程主要实现删除状态覆盖事件同步到集群中的其他节点
- 最后,调用AbstractInstanceRegistry类的deleteStatusOverride()方法,该方法就是进行删除状态覆盖的真正方法。
2.2、集群节点间信息同步
和状态更新等操作类似,最终还是通过PeerEurekaNode的deleteStatusOverride()方法实现删除状态覆盖的数据同步,在该方法中,首先构建了同步任务InstanceReplicationTask,然后再通过batchingDispatcher.process()方法执行该任务,具体实现如下:
public void deleteStatusOverride(final String appName, final String id, final InstanceInfo info) {
long expiryTime = System.currentTimeMillis() + maxProcessingDelayMs;
batchingDispatcher.process(
taskId("deleteStatusOverride", appName, id),
new InstanceReplicationTask(targetHost, Action.DeleteStatusOverride, info, null, false) {
@Override
public EurekaHttpResponse<Void> execute() {
return replicationClient.deleteStatusOverride(appName, id, info);
}
},
expiryTime);
}
2.3、AbstractInstanceRegistry类的deleteStatusOverride()方法
AbstractInstanceRegistry类的deleteStatusOverride()方法是真正进行删除状态覆盖的操作。
@Override
public boolean deleteStatusOverride(String appName, String id,InstanceStatus newStatus, String lastDirtyTimestamp, boolean isReplication) {
try {
read.lock();
//EurekaMonitors计数
STATUS_OVERRIDE_DELETE.increment(isReplication);
//获取appName对应应用的InstanceInfo集合
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> lease = null;
if (gMap != null) {//获取对应的Lease<InstanceInfo>对象
lease = gMap.get(id);
}
if (lease == null) {
return false;
} else {
//更新租约的lastUpdateTimestamp时间
lease.renew();
//获取租约对应的InstanceInfo 实例
InstanceInfo info = lease.getHolder();
if (info == null) {
logger.error("Found Lease without a holder for instance id {}", id);
}
//从overriddenInstanceStatusMap中删除对应的数据
InstanceStatus currentOverride = overriddenInstanceStatusMap.remove(id);
if (currentOverride != null && info != null) {
//如果删除了对应的覆盖状态,就把当前info的覆盖状态设置成UNKNOWN
info.setOverriddenStatus(InstanceStatus.UNKNOWN);
//更新状态
info.setStatusWithoutDirty(newStatus);
long replicaDirtyTimestamp = 0;
if (lastDirtyTimestamp != null) {
replicaDirtyTimestamp = Long.valueOf(lastDirtyTimestamp);
}
//更新info的时间戳
if (replicaDirtyTimestamp > info.getLastDirtyTimestamp()) {
info.setLastDirtyTimestamp(replicaDirtyTimestamp);
}
//记录变更数据
info.setActionType(ActionType.MODIFIED);
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
//更新更新时间
info.setLastUpdatedTimestamp();
//使得缓存失效
invalidateCache(appName, info.getVIPAddress(), info.getSecureVipAddress());
}
return true;
}
} finally {
read.unlock();
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/68763.html