系列文章目录
二.SpringCloud源码剖析-Eureka Client 初始化过程
五.SpringCloud源码剖析-Eureka Client服务续约
六.SpringCloud源码剖析-Eureka Client取消注册
七.SpringCloud源码剖析-Eureka Server的自动配置
八.SpringCloud源码剖析-Eureka Server初始化流程
九.SpringCloud源码剖析-Eureka Server服务注册流程
十.SpringCloud源码剖析-Eureka Server服务续约
十一.SpringCloud源码剖析-Eureka Server服务注册表拉取
十二.SpringCloud源码剖析-Eureka Server服务剔除
十三.SpringCloud源码剖析-Eureka Server服务下线
前言
上一章我们分析了一下EureakServer的自动配置,这章节我们来详细分析一下Eureak Server中的核心组件以及初始化流程
一.Eureka Server 核心组件介绍
1.EurekaServerContext
Eureka服务端上下文对象,包含了初始化,关闭,获取服务配置,获取集群节点,获取服务注册器,获取服务信息管理器等方法,默认实现类是DefaultEurekaServerContext
public interface EurekaServerContext {
//初始化
void initialize() throws Exception;
//关闭
void shutdown() throws Exception;
//获取服务配置
EurekaServerConfig getServerConfig();
//获取集群节点管理管理类
PeerEurekaNodes getPeerEurekaNodes();
//服务器编解码器
ServerCodecs getServerCodecs();
//服务注册器
PeerAwareInstanceRegistry getRegistry();
//instanceInfo实例信息管理器
ApplicationInfoManager getApplicationInfoManager();
}
DefaultEurekaServerContext实现类代码
/**
* Represent the local server context and exposes getters to components of the
* local server such as the registry.
*
* @author David Liu
*/
@Singleton
public class DefaultEurekaServerContext implements EurekaServerContext {
private static final Logger logger = LoggerFactory.getLogger(DefaultEurekaServerContext.class);
private final EurekaServerConfig serverConfig;
private final ServerCodecs serverCodecs;
private final PeerAwareInstanceRegistry registry;
private final PeerEurekaNodes peerEurekaNodes;
private final ApplicationInfoManager applicationInfoManager;
@Inject
public DefaultEurekaServerContext(EurekaServerConfig serverConfig,
ServerCodecs serverCodecs,
PeerAwareInstanceRegistry registry,
PeerEurekaNodes peerEurekaNodes,
ApplicationInfoManager applicationInfoManager) {
this.serverConfig = serverConfig;
this.serverCodecs = serverCodecs;
this.registry = registry;
this.peerEurekaNodes = peerEurekaNodes;
this.applicationInfoManager = applicationInfoManager;
}
// @PostConstruct :EurekaServerContext初始化的时候initialize方法被执行,调用 peerEurekaNodes.start();开启EurekaServer的初始化,
//然后再调用 peerAwareInstanceRegistry的.init(peerEurekaNodes);方法初始化
@PostConstruct
@Override
public void initialize() {
logger.info("Initializing ...");
//PeerEurekaNodes开始初始化
peerEurekaNodes.start();
try {
//peerAwareInstanceRegistry开始初始化
registry.init(peerEurekaNodes);
} catch (Exception e) {
throw new RuntimeException(e);
}
logger.info("Initialized");
}
//EurekaServerContext销毁之前(@PreDestroy)调用shutdown,
//peerAwareInstanceRegistry 注册器的shutdown执行关闭流程
@PreDestroy
@Override
public void shutdown() {
logger.info("Shutting down ...");
//服务注册器关闭
registry.shutdown();
//peerEurekaNodes集群节点关闭
peerEurekaNodes.shutdown();
logger.info("Shut down");
}
...省略...
}
DefaultEurekaServerContext
的initialize初始化方法中做的事情就是在初始化的时候,调用peerEurekaNodes.start();
初始化集群节点, 调用PeerAwareInstanceRegistry.init
初始化注册器,在shutdown
销毁方法中调用PeerAwareInstanceRegistry.shudown
执行注册器的关闭流程,调用peerEurekaNodes.shutdown
执行集群节点的关闭
2.PeerEurekaNodes
PeerEurekaNodes
用来管理Eureka集群节点PeerEurekaNode
生命周期的工具被DefaultEurekaServerContext
的initialize初始化方法中执行,源码如下
/**
* Helper class to manage lifecycle of a collection of {@link PeerEurekaNode}s.
*
* @author Tomasz Bak
*/
@Singleton
public class PeerEurekaNodes {
private static final Logger logger = LoggerFactory.getLogger(PeerEurekaNodes.class);
//服务注册接口
protected final PeerAwareInstanceRegistry registry;
//服务端配置对象
protected final EurekaServerConfig serverConfig;
//客户端配置
protected final EurekaClientConfig clientConfig;
protected final ServerCodecs serverCodecs;
//InstanceInfo实例管理器
private final ApplicationInfoManager applicationInfoManager;
//Eureka集群节点集合
private volatile List<PeerEurekaNode> peerEurekaNodes = Collections.emptyList();
//Eureka集群节点的url集合
private volatile Set<String> peerEurekaNodeUrls = Collections.emptySet();
//定时任务执行器
private ScheduledExecutorService taskExecutor;
//初始化节点工具
@Inject
public PeerEurekaNodes(
PeerAwareInstanceRegistry registry,
EurekaServerConfig serverConfig,
EurekaClientConfig clientConfig,
ServerCodecs serverCodecs,
ApplicationInfoManager applicationInfoManager) {
this.registry = registry;
this.serverConfig = serverConfig;
this.clientConfig = clientConfig;
this.serverCodecs = serverCodecs;
this.applicationInfoManager = applicationInfoManager;
}
//获取集群节点集合,不可修改
public List<PeerEurekaNode> getPeerNodesView() {
return Collections.unmodifiableList(peerEurekaNodes);
}
//获取集群节点集合
public List<PeerEurekaNode> getPeerEurekaNodes() {
return peerEurekaNodes;
}
//此实例提供对等复制实例的最小数量,被认为是健康的
public int getMinNumberOfAvailablePeers() {
return serverConfig.getHealthStatusMinNumberOfAvailablePeers();
}
//开始
public void start() {
//创建 一个名字为Eureka-PeerNodesUpdater"单线程的定时执行器
taskExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
thread.setDaemon(true);
return thread;
}
}
);
try {
//更新集群中的节点中的注册信息
updatePeerEurekaNodes(resolvePeerUrls());
//创建runnable线程,业务逻辑为:updatePeerEurekaNodes(resolvePeerUrls());
Runnable peersUpdateTask = new Runnable() {
@Override
public void run() {
try {
updatePeerEurekaNodes(resolvePeerUrls());
} catch (Throwable e) {
logger.error("Cannot update the replica Nodes", e);
}
}
};
//
taskExecutor.scheduleWithFixedDelay(
peersUpdateTask,
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
//定时器时间间隔默认:10分钟peerEurekaNodesUpdateIntervalMs=10 * MINUTES
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
TimeUnit.MILLISECONDS
);
} catch (Exception e) {
throw new IllegalStateException(e);
}
for (PeerEurekaNode node : peerEurekaNodes) {
logger.info("Replica node URL: {}", node.getServiceUrl());
}
}
//关闭,关闭节点更新的定时任务,清空peerEurekaNodes ,peerEurekaNodeUrls ,调用每个节点的shutDown方法
public void shutdown() {
taskExecutor.shutdown();
List<PeerEurekaNode> toRemove = this.peerEurekaNodes;
this.peerEurekaNodes = Collections.emptyList();
this.peerEurekaNodeUrls = Collections.emptySet();
for (PeerEurekaNode node : toRemove) {
node.shutDown();
}
}
/**
基于相同的Zone得到Eureka集群中多个节点的url,过滤掉当前节点
* Resolve peer URLs.
*
* @return peer URLs with node's own URL filtered out
*/
protected List<String> resolvePeerUrls() {
InstanceInfo myInfo = applicationInfoManager.getInfo();
String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);
//配置的eureka地址url
List<String> replicaUrls = EndpointUtils
.getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));
int idx = 0;
while (idx < replicaUrls.size()) {
//移除当前eureka节点的url
if (isThisMyUrl(replicaUrls.get(idx))) {
replicaUrls.remove(idx);
} else {
idx++;
}
}
return replicaUrls;
}
/**
更新集群节点列表的方法,在定时器中被执行,newPeerUrls是集群中的eureka server节点的url,过滤了本地节点的url
做法是删除老的不可用的节点调用shutDown方法,使用createPeerEurekaNode创建新的节点添加新的节点
* Given new set of replica URLs, destroy {@link PeerEurekaNode}s no longer available, and
* create new ones.
*
* @param newPeerUrls peer node URLs; this collection should have local node's URL filtered out
*/
//修改集群节点
protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
if (newPeerUrls.isEmpty()) {
logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
return;
}
//需要关闭的节点
Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
//移除掉新的节点,新的节点不需要关闭
toShutdown.removeAll(newPeerUrls);
//新的节点需要添加
Set<String> toAdd = new HashSet<>(newPeerUrls);
//新的节点中移除老的节点
toAdd.removeAll(peerEurekaNodeUrls);
if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change
return;
}
// Remove peers no long available 移除不可用的节点
//节点集合,本地缓存的所有节点
List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);
//如果需要关闭的节点集合不为空
if (!toShutdown.isEmpty()) {
logger.info("Removing no longer available peer nodes {}", toShutdown);
int i = 0;
while (i < newNodeList.size()) {
PeerEurekaNode eurekaNode = newNodeList.get(i);
//如果当前节点需要关闭,包含在toShutdown中
if (toShutdown.contains(eurekaNode.getServiceUrl())) {
//从newNodeList中移除掉
newNodeList.remove(i);
//执行节点的关闭方法
eurekaNode.shutDown();
} else {
i++;
}
}
}
// Add new peers 如果需要添加新的节点
if (!toAdd.isEmpty()) {
logger.info("Adding new peer nodes {}", toAdd);
for (String peerUrl : toAdd) {
//调用 createPeerEurekaNode 创建新的节点,添加到节点集合中
newNodeList.add(createPeerEurekaNode(peerUrl));
}
}
this.peerEurekaNodes = newNodeList;
this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
}
//创建集群节点PeerEurekaNode
protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) {
HttpReplicationClient replicationClient = JerseyReplicationClient.createReplicationClient(serverConfig, serverCodecs, peerEurekaNodeUrl);
String targetHost = hostFromUrl(peerEurekaNodeUrl);
if (targetHost == null) {
targetHost = "host";
}
return new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig);
}
...省略...
PeerEurekaNodes
主要定义了eureka
集群节点更新逻辑,通过定时任务定时更新,默认10分钟更新一次,更新逻辑是删除旧的节点,添加新的节点,旧的节点调用shutdown
做关闭操作,新的节点调用createPeerEurekaNode
进行创建,集群节点最终存储在List<PeerEurekaNode>
结构中
3.PeerAwareInstanceRegistry
PeerAwareInstanceRegistry
翻译为“对等感知实例注册表” ,其实就是服务注册器,只是这个注册器会考虑集群中的其它节点的数据同步,
@Bean
public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
ServerCodecs serverCodecs) {
this.eurekaClient.getApplications(); // force initialization
return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
serverCodecs, this.eurekaClient,
this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(),
this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
}
看一下继承关系:
- LookupService:服务查找接口,提供了服务的方法,在之前有介绍过,在客户端EurekaClient继承该接口,在服务端InstanceRegistry继承了该接口
public interface LookupService<T> {
//获取应用
Application getApplication(String var1);
//获取应用注册表
Applications getApplications();
//获取实例信息列表
List<InstanceInfo> getInstancesById(String var1);
//获取下一台服务实例信息
InstanceInfo getNextServerFromEureka(String var1, boolean var2);
}
- LeaseManager:租约管理器接口,提供了register注册,cancel取消,renew续约,evict过期等服务相关的操作
public interface LeaseManager<T> {
//注册
void register(T r, int leaseDuration, boolean isReplication);
//取消,下线
boolean cancel(String appName, String id, boolean isReplication);
//续约
boolean renew(String appName, String id, boolean isReplication);
//过期
void evict();
}
- InstanceRegistry:应用实例注册表接口继承了 LookupService 、LeaseManager ,提供应用实例的注册与发现服务,
public interface InstanceRegistry extends LeaseManager<InstanceInfo>, LookupService<String> {
//允许开始传输数据
void openForTraffic(ApplicationInfoManager applicationInfoManager, int count);
//关闭
void shutdown();
@Deprecated
void storeOverriddenStatusIfRequired(String id, InstanceStatus overriddenStatus);
//存储实例 覆盖状态:使用的是InstanceInfo.InstanceStatus overriddenStatus 覆盖状态,
//使用该状态来修改注册中心服务的注册状态· ·
void storeOverriddenStatusIfRequired(String appName, String id, InstanceStatus overriddenStatus);
//更新服务注册状态
boolean statusUpdate(String appName, String id, InstanceStatus newStatus,
String lastDirtyTimestamp, boolean isReplication);
//删除覆盖的状态
boolean deleteStatusOverride(String appName, String id, InstanceStatus newStatus,
String lastDirtyTimestamp, boolean isReplication);
//服务状态快照
Map<String, InstanceStatus> overriddenInstanceStatusesSnapshot();
//获取本地服务注册表,从本地ConcurrentHashMap缓存的服务注册表中获取
Applications getApplicationsFromLocalRegionOnly();
//获取服务注册表
List<Application> getSortedApplications();
//根据名字获取服务
Application getApplication(String appName, boolean includeRemoteRegion);
//根据名字和id获取实例信息
InstanceInfo getInstanceByAppAndId(String appName, String id);
//根据名字和id获取实例信息
InstanceInfo getInstanceByAppAndId(String appName, String id, boolean includeRemoteRegions);
//完全清除注册表
//overriddenInstanceStatusMap.clear(); 覆盖状态清除
//recentCanceledQueue.clear(); 最近取消队列
//recentRegisteredQueue.clear(); 最近注册队列
//recentlyChangedQueue.clear(); 最近更改队列
//registry.clear(); 清除注册表
void clearRegistry();
//初始化的响应缓存
void initializedResponseCache();
//获取响应缓存
ResponseCache getResponseCache();
//最后一分钟续约次数,用作自我保护计算值
long getNumOfRenewsInLastMin();
//获取每分钟续约次数,用作自我保护计算值
int getNumOfRenewsPerMinThreshold();
//检查续订次数是否小于阈值。
int isBelowRenewThresold();
//最近注册的实例
List<Pair<Long, String>> getLastNRegisteredInstances();
//最近取消的实例
List<Pair<Long, String>> getLastNCanceledInstances();
//最近过期的实例
boolean isLeaseExpirationEnabled();
//是否开启自我保护
boolean isSelfPreservationModeEnabled();
}
-
AbstractInstanceRegistry:InstanceRegistry的实现类,应用对象注册表抽象,处理客户端的注册请求,包括 register注册,Renewals续约,Cancels下线,Expirations过期,Status Changes状态改变,服务注册表以增量的方式增加
-
PeerAwareInstanceRegistry: InstanceRegistry的子接口,应用对象注册表接口,实现了 Eureka-Server 集群内注册信息同步功能
public interface PeerAwareInstanceRegistry extends InstanceRegistry {
//初始化PeerEurekaNodes 集群节点
void init(PeerEurekaNodes peerEurekaNodes) throws Exception;
//注册表信息同步, 如果节点之间通信失败,列表中耗尽该操作故障转移到其他节点
int syncUp();
//检查是否有访问权限
boolean shouldAllowAccess(boolean remoteRegionRequired);
//注册InstanceInfo到其他Eureka节点
void register(InstanceInfo info, boolean isReplication);
//修改状态
void statusUpdate(final String asgName, final ASGResource.ASGStatus newStatus, final boolean isReplication);
}
- PeerAwareInstanceRegistryImpl:PeerAwareInstanceRegistry的子类,,应用对象注册的具体实现,同时继承了AbstractInstanceRegistry
- InstanceRegistry :PeerAwareInstanceRegistryImpl的子类,
有些实现类没拉开看,后面会详细分析
4.PeerAwareInstanceRegistryImpl
服务注册器,继承AbstractInstanceRegistry抽象类, 实现 PeerAwareInstanceRegistry服务注册接口,包含了服务注册,续约,下线,过期,状态改变等等功能。
/**
集群之间节点同步的服务注册器, 所有操作都在其父类 AbstractInstanceRegistry 中,
* Handles replication of all operations to {@link AbstractInstanceRegistry} to peer
* <em>Eureka</em> nodes to keep them all in sync.
*
* <p>
* 主要操作是副本的注册,续约,取消,到期和状态更改
* Primary operations that are replicated are the
* <em>Registers,Renewals,Cancels,Expirations and Status Changes</em>
* </p>
*
* <p>
* 当eureka服务器启动时,它将尝试从对等的eureka节点获取所有注册表信息,如果由于某种原因该操作失败,
* 则服务器将不允许用户在指定的时间段内获取注册表信息。
* When the eureka server starts up it tries to fetch all the registry
* information from the peer eureka nodes.If for some reason this operation
* fails, the server does not allow the user to get the registry information for
* a period specified in
* {@link com.netflix.eureka.EurekaServerConfig#getWaitTimeInMsWhenSyncEmpty()}.
* </p>
*
* <p>
* *关于续约的重要注意事项。如果续约失败次数超过EurekaServerConfig.getRenewalPercentThreshold()中指定的指定阈值,则在EurekaServerConfig#getRenewalThresholdUpdateIntervalMs()时间内,eureka将其视为危险,并停止实例过期
*
* One important thing to note about <em>renewals</em>.If the renewal drops more
* than the specified threshold as specified in
* {@link com.netflix.eureka.EurekaServerConfig#getRenewalPercentThreshold()} within a period of
* {@link com.netflix.eureka.EurekaServerConfig#getRenewalThresholdUpdateIntervalMs()}, eureka
* perceives this as a danger and stops expiring instances.
* </p>
*
* @author Karthik Ranganathan, Greg Kim
*
*/
@Singleton
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {
private static final Logger logger = LoggerFactory.getLogger(PeerAwareInstanceRegistryImpl.class);
private static final String US_EAST_1 = "us-east-1";
private static final int PRIME_PEER_NODES_RETRY_MS = 30000;
private long startupTime = 0;
private boolean peerInstancesTransferEmptyOnStartup = true;
//把功能抽成枚举,心跳检查,注册,取消注册,状态改变,删除覆盖状态
public enum Action {
Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride;
private com.netflix.servo.monitor.Timer timer = Monitors.newTimer(this.name());
public com.netflix.servo.monitor.Timer getTimer() {
return this.timer;
}
}
private static final Comparator<Application> APP_COMPARATOR = new Comparator<Application>() {
public int compare(Application l, Application r) {
return l.getName().compareTo(r.getName());
}
};
private final MeasuredRate numberOfReplicationsLastMin;
//客户端
protected final EurekaClient eurekaClient;
//集群节点管理
protected volatile PeerEurekaNodes peerEurekaNodes;
private final InstanceStatusOverrideRule instanceStatusOverrideRule;
private Timer timer = new Timer(
"ReplicaAwareInstanceRegistry - RenewalThresholdUpdater", true);
@Inject
public PeerAwareInstanceRegistryImpl(
EurekaServerConfig serverConfig,
EurekaClientConfig clientConfig,
ServerCodecs serverCodecs,
EurekaClient eurekaClient
) {
super(serverConfig, clientConfig, serverCodecs);
this.eurekaClient = eurekaClient;
//最后一分钟的复制次数
this.numberOfReplicationsLastMin = new MeasuredRate(1000 * 60 * 1);
// We first check if the instance is STARTING or DOWN, then we check explicit overrides,
// then we check the status of a potentially existing lease.
this.instanceStatusOverrideRule = new FirstMatchWinsCompositeRule(new DownOrStartingRule(),
new OverrideExistsRule(overriddenInstanceStatusMap), new LeaseExistsRule());
}
@Override
protected InstanceStatusOverrideRule getInstanceInfoOverrideRule() {
return this.instanceStatusOverrideRule;
}
//初始化方法
@Override
public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
//最后一分钟的复制次数定时器Timer开始
this.numberOfReplicationsLastMin.start();
this.peerEurekaNodes = peerEurekaNodes;
//初始化 ResponseCache ,负责缓存客户端查询的注册表信息 30s/1次
initializedResponseCache();
//续约阈值定时更新任务,15min/1次 调用 updateRenewalThreshold()方法 更新
scheduleRenewalThresholdUpdateTask();
//初始化远程注册表,默认么有远程Region
initRemoteRegionRegistry();
try {
//注册到对象监视器
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);
}
}
/**
执行所有清理和关闭操作。
* Perform all cleanup and shutdown operations.
*/
@Override
public void shutdown() {
try {
//注销对象监视
DefaultMonitorRegistry.getInstance().unregister(Monitors.newObjectMonitor(this));
} catch (Throwable t) {
logger.error("Cannot shutdown monitor registry", t);
}
try {
//集群节点关闭
peerEurekaNodes.shutdown();
} catch (Throwable t) {
logger.error("Cannot shutdown ReplicaAwareInstanceRegistry", t);
}
//最后一分钟的复制次数定时器 Timer停止
numberOfReplicationsLastMin.stop();
//执行所有清理和关闭操作。
//deltaRetentionTimer.cancel(); 增量保留计时器
//evictionTimer.cancel(); 服务剔除计时器
//renewsLastMin.stop(); 最后一分钟的复制次数机器停止
super.shutdown();
}
//续约阈值定时更新任务,15min/1次 调用 updateRenewalThreshold()方法 更新
private void scheduleRenewalThresholdUpdateTask() {
timer.schedule(new TimerTask() {
@Override
public void run() {
updateRenewalThreshold();
}
}, serverConfig.getRenewalThresholdUpdateIntervalMs(),
serverConfig.getRenewalThresholdUpdateIntervalMs());
}
/**集群数据同步,从集群中eureka节点复制注册表信息。如果通信失败,此操作将故障转移到其他节点,直到列表用尽。
* Populates the registry information from a peer eureka node. This
* operation fails over to other nodes until the list is exhausted if the
* communication fails.
*/
@Override
public int syncUp() {
// Copy entire entry from neighboring DS node
int count = 0;
//getRegistrySyncRetries重试次数默认5次
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
if (i > 0) {
try {
//通信中断,等待下一次切换实例
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
} catch (InterruptedException e) {
logger.warn("Interrupted during registry transfer..");
break;
}
}
//获取注册表
Applications apps = eurekaClient.getApplications();
//循环服务列表,依次注册
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
try {
if (isRegisterable(instance)) {
//获取InstanceInfo之后注册到当前节点,保存到 ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry 中缓存起来
register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
count++;
}
} catch (Throwable t) {
logger.error("During DS init copy", t);
}
}
}
}
return count;
}
//运行开始传输数据
@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
// Renewals happen every 30 seconds and for a minute it should be a factor of 2.
//每分钟的预期续订次数 2次,30s/一次续约
this.expectedNumberOfRenewsPerMin = count * 2;
//每分钟续约次数阈值 = expectedNumberOfRenewsPerMin每分钟续约次数 * 85%
//如果客户端续约低于这个阈值,将会开启服务端的自我保护功能
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
logger.info("Got {} instances from neighboring DS node", count);
logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
this.startupTime = System.currentTimeMillis();
if (count > 0) {
this.peerInstancesTransferEmptyOnStartup = false;
}
DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
boolean isAws = Name.Amazon == selfName;
if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
logger.info("Priming AWS connections for all replicas..");
primeAwsReplicas(applicationInfoManager);
}
//改变服务的状态为UP
logger.info("Changing status to UP");
applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
//这里使用定时任务开启新的 服务剔除任务
super.postInit();
}
//取消注册,服务下线
@Override
public boolean cancel(final String appName, final String id,
final boolean isReplication) {
//调用父类的下线方法
if (super.cancel(appName, id, isReplication)) {
replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);
synchronized (lock) {
if (this.expectedNumberOfRenewsPerMin > 0) {
// Since the client wants to cancel it, reduce the threshold (1 for 30 seconds, 2 for a minute)
//客户下线,降低续约阈值
this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin - 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
}
}
return true;
}
return false;
}
//服务注册
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
//调用父类的注册
super.register(info, leaseDuration, isReplication);
//注册信息同步到集群中其他节点
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
//续约
public boolean renew(final String appName, final String id, final boolean isReplication) {
//调用父类的续约
if (super.renew(appName, id, isReplication)) {
//同步到集群中的其他节点
replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
return true;
}
return false;
}
//修改服务状态
@Override
public boolean statusUpdate(final String appName, final String id,
final InstanceStatus newStatus, String lastDirtyTimestamp,
final boolean isReplication) {
if (super.statusUpdate(appName, id, newStatus, lastDirtyTimestamp, isReplication)) {
//状态同步到其他节点
replicateToPeers(Action.StatusUpdate, appName, id, null, newStatus, isReplication);
return true;
}
return false;
}
//删除状态
@Override
public boolean deleteStatusOverride(String appName, String id,
InstanceStatus newStatus,
String lastDirtyTimestamp,
boolean isReplication) {
if (super.deleteStatusOverride(appName, id, newStatus, lastDirtyTimestamp, isReplication)) {
replicateToPeers(Action.DeleteStatusOverride, appName, id, null, null, isReplication);
return true;
}
return false;
}
//是否启用租约到期
@Override
public boolean isLeaseExpirationEnabled() {
if (!isSelfPreservationModeEnabled()) {
// The self preservation mode is disabled, hence allowing the instances to expire.
return true;
}
return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
}
//更新续约阈值
private void updateRenewalThreshold() {
try {
Applications apps = eurekaClient.getApplications();
//统计有多少个实例
int count = 0;
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
if (this.isRegisterable(instance)) {
++count;
}
}
}
synchronized (lock) {
//仅当阈值大于当前的预期阈值,或禁用了自我保留时才更新阈值。
// Update threshold only if the threshold is greater than the
// current expected threshold or if self preservation is disabled.
if ((count * 2) > (serverConfig.getRenewalPercentThreshold() * expectedNumberOfRenewsPerMin)
|| (!this.isSelfPreservationModeEnabled())) {
this.expectedNumberOfRenewsPerMin = count * 2;
this.numberOfRenewsPerMinThreshold = (int) ((count * 2) * serverConfig.getRenewalPercentThreshold());
}
}
logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold);
} catch (Throwable e) {
logger.error("Cannot update renewal threshold", e);
}
}
/**
集群之间的节点复制
* Replicates all eureka actions to peer eureka nodes except for replication
* traffic to this node.
*
*/
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
// If it is a replication already, do not replicate again as this will create a poison replication
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}
/**
集群之间的节点复制
* Replicates all instance changes to peer eureka nodes except for
* replication traffic to this node.
*
*/
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry = null;
CurrentRequestVersion.set(Version.V2);
switch (action) {
case Cancel:
node.cancel(appName, id);
break;
case Heartbeat:
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
node.register(info);
break;
case StatusUpdate:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
}
}
....省略一些代码....
}
这个服务注册器实现类看起来很复杂它做了那些事情呢
init初始化
:注册表缓存ResponseCache初始化,续约阈值定时更新任务初始化,初始化远程注册表showdown
:执行所有清理和关闭操作syncUp
:集群之间的数据同步节点复制cancel
:服务下线,并同步到其他节点register
:服务注册,并同步到其他节点renew
: 续约,并同步到其他节点
5.EurekaServerInitializerConfiguration
EurekaServerAutoConfiguration
通过 @Import(EurekaServerInitializerConfiguration.class)
进行初始化,EurekaServerInitializerConfiguration
实现了SmartLifecycle
,其中的start
方法会再Spring启动过程中,执行LifecycleProcessor().onRefresh()
生命周期处理器刷新的时候被调用,然后再调用EurekaServerBootstrap.contextInitialized
进行初始化Eureka和启动Eureka
/**
* @author Dave Syer
*/
@Configuration
public class EurekaServerInitializerConfiguration
implements ServletContextAware, SmartLifecycle, Ordered {
private static final Log log = LogFactory.getLog(EurekaServerInitializerConfiguration.class);
//EurekaServer 配置
@Autowired
private EurekaServerConfig eurekaServerConfig;
//Servlet上下文
private ServletContext servletContext;
//应用上下文对象
@Autowired
private ApplicationContext applicationContext;
//启动引导
@Autowired
private EurekaServerBootstrap eurekaServerBootstrap;
private boolean running;
private int order = 1;
//初始化Servlet上下文
@Override
public void setServletContext(ServletContext servletContext) {
this.servletContext = servletContext;
}
//开始方法,复写于 SmartLifecycle 在Spring启动的时候,该方法会被地调用,
@Override
public void start() {
new Thread(new Runnable() {
@Override
public void run() {
try {
//TODO: is this class even needed now?
//初始化EurekaServer上下文,启动EurekaServer
eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
log.info("Started Eureka Server");
//发布一个EurekaRegistryAvailableEvent注册事件
publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
//改变running状态true
EurekaServerInitializerConfiguration.this.running = true;
//发布EurekaServer启动事件EurekaServerStartedEvent
publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
}
catch (Exception ex) {
// Help!
log.error("Could not initialize Eureka servlet context", ex);
}
}
}).start();
}
private EurekaServerConfig getEurekaServerConfig() {
return this.eurekaServerConfig;
}
private void publish(ApplicationEvent event) {
this.applicationContext.publishEvent(event);
}
//生命周期,停止,销毁eurekaServer
@Override
public void stop() {
this.running = false;
eurekaServerBootstrap.contextDestroyed(this.servletContext);
}
@Override
public boolean isRunning() {
return this.running;
}
@Override
public int getPhase() {
return 0;
}
@Override
public boolean isAutoStartup() {
return true;
}
@Override
public void stop(Runnable callback) {
callback.run();
}
@Override
public int getOrder() {
return this.order;
}
}
EurekaServerInitializerConfiguration
通过starter初始化和启动eureka,并抛出两个事件:EurekaRegistryAvailableEvent
服务注册事件,EurekaServerStartedEvent
服务启动事件,EurekaServer初始化核心的代码在eurekaServerBootstrap.contextInitialized
中
6.EurekaServerBootstrap
/**
* @author Spencer Gibb
*/
public class EurekaServerBootstrap {
private static final Log log = LogFactory.getLog(EurekaServerBootstrap.class);
private static final String TEST = "test";
private static final String ARCHAIUS_DEPLOYMENT_ENVIRONMENT = "archaius.deployment.environment";
private static final String EUREKA_ENVIRONMENT = "eureka.environment";
private static final String DEFAULT = "default";
private static final String ARCHAIUS_DEPLOYMENT_DATACENTER = "archaius.deployment.datacenter";
private static final String EUREKA_DATACENTER = "eureka.datacenter";
protected EurekaServerConfig eurekaServerConfig;
protected ApplicationInfoManager applicationInfoManager;
protected EurekaClientConfig eurekaClientConfig;
protected PeerAwareInstanceRegistry registry;
protected volatile EurekaServerContext serverContext;
protected volatile AwsBinder awsBinder;
public EurekaServerBootstrap(ApplicationInfoManager applicationInfoManager,
EurekaClientConfig eurekaClientConfig, EurekaServerConfig eurekaServerConfig,
PeerAwareInstanceRegistry registry, EurekaServerContext serverContext) {
this.applicationInfoManager = applicationInfoManager;
this.eurekaClientConfig = eurekaClientConfig;
this.eurekaServerConfig = eurekaServerConfig;
this.registry = registry;
this.serverContext = serverContext;
}
//Eureka初始化
public void contextInitialized(ServletContext context) {
try {
//初始化环境
initEurekaEnvironment();
//初始化上下文
initEurekaServerContext();
//设置上下文属性
context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
}
catch (Throwable e) {
log.error("Cannot bootstrap eureka server :", e);
throw new RuntimeException("Cannot bootstrap eureka server :", e);
}
}
//eureka上下文销毁
public void contextDestroyed(ServletContext context) {
try {
log.info("Shutting down Eureka Server..");
context.removeAttribute(EurekaServerContext.class.getName());
destroyEurekaServerContext();
destroyEurekaEnvironment();
}
catch (Throwable e) {
log.error("Error shutting down eureka", e);
}
log.info("Eureka Service is now shutdown...");
}
//初始化环境,设置一些环境参数
protected void initEurekaEnvironment() throws Exception {
log.info("Setting the eureka configuration..");
//设置数据中心
String dataCenter = ConfigurationManager.getConfigInstance()
.getString(EUREKA_DATACENTER);
if (dataCenter == null) {
log.info(
"Eureka data center value eureka.datacenter is not set, defaulting to default");
ConfigurationManager.getConfigInstance()
.setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, DEFAULT);
}
else {
ConfigurationManager.getConfigInstance()
.setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, dataCenter);
}
//设置Eureka环境
String environment = ConfigurationManager.getConfigInstance()
.getString(EUREKA_ENVIRONMENT);
if (environment == null) {
ConfigurationManager.getConfigInstance()
.setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, TEST);
log.info(
"Eureka environment value eureka.environment is not set, defaulting to test");
}
else {
ConfigurationManager.getConfigInstance()
.setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, environment);
}
}
//初始化eurekaServer上下文
protected void initEurekaServerContext() throws Exception {
// For backward compatibility
JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
XStream.PRIORITY_VERY_HIGH);
XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
XStream.PRIORITY_VERY_HIGH);
if (isAws(this.applicationInfoManager.getInfo())) {
this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
this.eurekaClientConfig, this.registry, this.applicationInfoManager);
this.awsBinder.start();
}
//把EurekaServerContext设置到EurekaServerContextHolder中
EurekaServerContextHolder.initialize(this.serverContext);
log.info("Initialized server context");
// Copy registry from neighboring eureka node
//从相邻的eureka节点复制注册表,使用的是PeerAwareInstanceRegistryImpl的实现
int registryCount = this.registry.syncUp();
this.registry.openForTraffic(this.applicationInfoManager, registryCount);
// Register all monitoring statistics.
//注册所有监视统计信息。
EurekaMonitors.registerAllStats();
}
/**
* Server context shutdown hook. Override for custom logic
*/
protected void destroyEurekaServerContext() throws Exception {
EurekaMonitors.shutdown();
if (this.awsBinder != null) {
this.awsBinder.shutdown();
}
if (this.serverContext != null) {
this.serverContext.shutdown();
}
}
/**
* Users can override to clean up the environment themselves.
*/
protected void destroyEurekaEnvironment() throws Exception {
}
protected boolean isAws(InstanceInfo selfInstanceInfo) {
boolean result = DataCenterInfo.Name.Amazon == selfInstanceInfo
.getDataCenterInfo().getName();
log.info("isAws returned " + result);
return result;
}
}
EurekaServerBootstrap 的contextInitialized方法中做了两个事情
- 通过
initEurekaEnvironment();
方法初始化环境,通过ConfigurationManager
设置环境相关的参数 - 通过
initEurekaServerContext();
初始化上下文,使用PeerAwareInstanceRegistryImpl.syncUp
从相邻的eureka节点复制注册表
7.JerseyFilter
在EurekaServerAutoConfiguration中注册了JerseyFilter用来处理所有的/eureka开头的请求
/**
* Register the Jersey filter
*/
@Bean
public FilterRegistrationBean jerseyFilterRegistration(
javax.ws.rs.core.Application eurekaJerseyApp) {
FilterRegistrationBean bean = new FilterRegistrationBean();
bean.setFilter(new ServletContainer(eurekaJerseyApp));
bean.setOrder(Ordered.LOWEST_PRECEDENCE);
bean.setUrlPatterns(
Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));
return bean;
}
通过FilterRegistrationBean来注册filter,其核心逻辑是交给 ServletContainer 来完成的
public class ServletContainer extends HttpServlet implements Filter {
...省略...
}
二.EurekaServer初始化流程
这里我们整理一下EurekaServer启动时是以什么样的流程进行初始化的,下面是根据Eureka Server启动断点跟踪出来的流程
1.ServletContainer 初始化
首先ServletContainer 会被创建并进行初始化,调用configure方法进行配置,至于 doFilter方法会在接受到请求时被执行
public class ServletContainer extends HttpServlet implements Filter {
...省略...
public void init(FilterConfig filterConfig) throws ServletException {
this.filterConfig = filterConfig;
this.init((WebConfig)(new WebFilterConfig(filterConfig)));
}
protected void configure(WebConfig wc, ResourceConfig rc, WebApplication wa) {
if (this.getServletConfig() != null) {
this.configure(this.getServletConfig(), rc, wa);
} else if (this.filterConfig != null) {
this.configure(this.filterConfig, rc, wa);
}
if (rc instanceof ReloadListener) {
List<ContainerNotifier> notifiers = new ArrayList();
Object o = rc.getProperties().get("com.sun.jersey.spi.container.ContainerNotifier");
Iterator i$;
if (o instanceof ContainerNotifier) {
notifiers.add((ContainerNotifier)o);
} else if (o instanceof List) {
i$ = ((List)o).iterator();
while(i$.hasNext()) {
Object elem = i$.next();
if (elem instanceof ContainerNotifier) {
notifiers.add((ContainerNotifier)elem);
}
}
}
i$ = ServiceFinder.find(ContainerNotifier.class).iterator();
while(i$.hasNext()) {
ContainerNotifier cn = (ContainerNotifier)i$.next();
notifiers.add(cn);
}
rc.getProperties().put("com.sun.jersey.spi.container.ContainerNotifier", notifiers);
}
}
protected void configure(FilterConfig fc, ResourceConfig rc, WebApplication wa) {
rc.getSingletons().add(new ServletContainer.ContextInjectableProvider(FilterConfig.class, fc));
String regex = (String)rc.getProperty("com.sun.jersey.config.property.WebPageContentRegex");
if (regex != null && regex.length() > 0) {
try {
this.staticContentPattern = Pattern.compile(regex);
} catch (PatternSyntaxException var6) {
throw new ContainerException("The syntax is invalid for the regular expression, " + regex + ", associated with the initialization parameter " + "com.sun.jersey.config.property.WebPageContentRegex", var6);
}
}
this.forwardOn404 = rc.getFeature("com.sun.jersey.config.feature.FilterForwardOn404");
this.filterContextPath = this.filterConfig.getInitParameter("com.sun.jersey.config.feature.FilterContextPath");
if (this.filterContextPath != null) {
if (this.filterContextPath.isEmpty()) {
this.filterContextPath = null;
} else {
if (!this.filterContextPath.startsWith("/")) {
this.filterContextPath = '/' + this.filterContextPath;
}
if (this.filterContextPath.endsWith("/")) {
this.filterContextPath = this.filterContextPath.substring(0, this.filterContextPath.length() - 1);
}
}
}
}
...省略...
}
2.Eureka上下文初始化
紧接着EureakServerContext的initialize方法被调用,该方法有 @PostConstruct注解决定了它是初始化方法
@Singleton
public class DefaultEurekaServerContext implements EurekaServerContext {
@PostConstruct
@Override
public void initialize() {
logger.info("Initializing ...");
//开始集群节点更新
peerEurekaNodes.start();
try {
//服务注册器初始化
registry.init(peerEurekaNodes);
} catch (Exception e) {
throw new RuntimeException(e);
}
logger.info("Initialized");
}
这里做了两个事情
- 调用 peerEurekaNodes.start(); 定时更新Eureka集群中的节点
- 调用服务注册器PeerAwareInstanceRegistryImpl的初始化init
3.启动PeerEurekaNodes集群节点更新
PeerEurekaNodes.start被调用,这里通过定时器定时更新Eureka集群节点,默认10m/次
@Singleton
public class PeerEurekaNodes {
//开始
public void start() {
//创建 一个名字为Eureka-PeerNodesUpdater"单线程的定时执行器
taskExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
thread.setDaemon(true);
return thread;
}
}
);
try {
//更新集群中的节点中的注册信息
updatePeerEurekaNodes(resolvePeerUrls());
//创建runnable线程,业务逻辑为:updatePeerEurekaNodes(resolvePeerUrls());
Runnable peersUpdateTask = new Runnable() {
@Override
public void run() {
try {
updatePeerEurekaNodes(resolvePeerUrls());
} catch (Throwable e) {
logger.error("Cannot update the replica Nodes", e);
}
}
};
//定时任务
taskExecutor.scheduleWithFixedDelay(
peersUpdateTask,
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
//定时器时间间隔默认:10分钟peerEurekaNodesUpdateIntervalMs=10 * MINUTES
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
TimeUnit.MILLISECONDS
);
} catch (Exception e) {
throw new IllegalStateException(e);
}
for (PeerEurekaNode node : peerEurekaNodes) {
logger.info("Replica node URL: {}", node.getServiceUrl());
}
}
}
定时调用updatePeerEurekaNodes
方法更新集群,默认10分钟更新一次,更新逻辑是删除旧的节点,添加新的节点,旧的节点调用shutdown做关闭操作,新的节点调用createPeerEurekaNode进行创建,集群节点最终存储在List结构中
4.服务注册器初始化
在DefaultEurekaServerContext
中调用完peerEurekaNodes.start();
方法后调用PeerAwareInstanceRegistryImpl
.init方法进行注册器的初始化
//初始化方法
@Override
public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
//最后一分钟的复制次数定时器Timer开始
this.numberOfReplicationsLastMin.start();
this.peerEurekaNodes = peerEurekaNodes;
//初始化 ResponseCache(ResponseCacheImpl) ,负责缓存客户端查询的注册表信息
initializedResponseCache();
//续约阈值定时更新任务,15min/1次 调用 updateRenewalThreshold()方法 更新
scheduleRenewalThresholdUpdateTask();
//初始化远程注册表,默认么有远程Region
initRemoteRegionRegistry();
try {
//注册到对象监视器
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);
}
}
这里我们主要分析两个东西
- initializedResponseCache 初始化注册表响应缓存
- scheduleRenewalThresholdUpdateTask 定时更新续约阈值
initializedResponseCache初始化响应缓存
注意:这里有这么一句代码initializedResponseCache,它初始化了一个ResponseCache 响应缓存,ResponseCacheImpl是具体实现,该类中构造了一个readWriteCacheMap读写缓存的Map,和一个只读缓存readOnlyCacheMap的Map。为什么是响应缓存,以为客户端在获取服务注册表的时候就会从readOnlyCacheMap缓存中去获取
public class ResponseCacheImpl implements ResponseCache {
...省略...
//只读缓存
private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();
//读写缓存
private final LoadingCache<Key, Value> readWriteCacheMap;
ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
this.serverConfig = serverConfig;
this.serverCodecs = serverCodecs;
//获取配置,是否是只读缓存,默认true,拉取注册表的时候还会从只读缓存拉取
this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
this.registry = registry;
//获取响应缓存更新时间间隔 30s
long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
//构建一个 readWriteCacheMap
this.readWriteCacheMap =
CacheBuilder.newBuilder().initialCapacity(1000)
.expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
.removalListener(new RemovalListener<Key, Value>() {
@Override
public void onRemoval(RemovalNotification<Key, Value> notification) {
Key removedKey = notification.getKey();
if (removedKey.hasRegions()) {
Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
}
}
})
.build(new CacheLoader<Key, Value>() {
@Override
public Value load(Key key) throws Exception {
if (key.hasRegions()) {
Key cloneWithNoRegions = key.cloneWithoutRegions();
regionSpecificKeys.put(cloneWithNoRegions, key);
}
Value value = generatePayload(key);
return value;
}
});
//如果使用只读响应缓存,
if (shouldUseReadOnlyResponseCache) {
//每隔responseCacheUpdateIntervalMs=30s执行getCacheUpdateTask
timer.schedule(getCacheUpdateTask(),
new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
+ responseCacheUpdateIntervalMs),
responseCacheUpdateIntervalMs);
}
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register the JMX monitor for the InstanceRegistry", e);
}
}
private TimerTask getCacheUpdateTask() {
return new TimerTask() {
@Override
public void run() {
//如果数据不一致,从readWriteCacheMap缓存更新readOnlyCacheMap缓存
logger.debug("Updating the client cache from response cache");
for (Key key : readOnlyCacheMap.keySet()) {
if (logger.isDebugEnabled()) {
logger.debug("Updating the client cache from response cache for key : {} {} {} {}",
key.getEntityType(), key.getName(), key.getVersion(), key.getType());
}
try {
CurrentRequestVersion.set(key.getVersion());
Value cacheValue = readWriteCacheMap.get(key);
Value currentCacheValue = readOnlyCacheMap.get(key);
if (cacheValue != currentCacheValue) {
readOnlyCacheMap.put(key, cacheValue);
}
} catch (Throwable th) {
logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
}
}
}
};
}
}
scheduleRenewalThresholdUpdateTask 定时更新续约阈值
定时任务每renewalThresholdUpdateIntervalMs=900秒 更新一次续约阀值
/**
每renewalThresholdUpdateIntervalMs=900秒 更新一次续约阀值
* Schedule the task that updates <em>renewal threshold</em> periodically.
* The renewal threshold would be used to determine if the renewals drop
* dramatically because of network partition and to protect expiring too
* many instances at a time.
*
*/
private void scheduleRenewalThresholdUpdateTask() {
//定时任务
timer.schedule(new TimerTask() {
@Override
public void run() {
//更新续约阈值
updateRenewalThreshold();
}
}, serverConfig.getRenewalThresholdUpdateIntervalMs(),
serverConfig.getRenewalThresholdUpdateIntervalMs()); //900s
}
updateRenewalThreshold是具体的更新逻辑
// PeerAwareInstanceRegistryImpl#updateRenewalThreshold()
/**
* Updates the <em>renewal threshold</em> based on the current number of
* renewals. The threshold is a percentage as specified in
* {@link EurekaServerConfig#getRenewalPercentThreshold()} of renewals
* received per minute {@link #getNumOfRenewsInLastMin()}.
*/
private void updateRenewalThreshold() {
try {
//获取到注册表
Applications apps = eurekaClient.getApplications();
int count = 0;
// 计算有多少个注册的服务实例
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
if (this.isRegisterable(instance)) {
++count;
}
}
}
//枷锁
synchronized (lock) {
// Update threshold only if the threshold is greater than the
// current expected threshold of if the self preservation is disabled.
// 只有当阀值大于当前预期值时或者关闭了自我保护模式才更新
if ((count * 2) > (serverConfig.getRenewalPercentThreshold() * numberOfRenewsPerMinThreshold)
|| (!this.isSelfPreservationModeEnabled())) {
//判断如果阈值时候大于预期的阈值 或者 关闭了我保护
//更新每分钟的预期续订次数:服务数 * 2 ,每个客户端30s/次,1分钟2次
this.expectedNumberOfRenewsPerMin = count * 2;
//更新每分钟阈值的续订次数 :服务数 * 2 * 0.85 (百分比阈值)
this.numberOfRenewsPerMinThreshold = (int) ((count * 2) * serverConfig.getRenewalPercentThreshold());
}
}
logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold);
} catch (Throwable e) {
logger.error("Cannot update renewal threshold", e);
}
}
当关闭自我保护,或者当前阈值大于预期阈值,就会更新续约的阈值,那么这是怎么样的一个更新算法呢?
- 每分钟的预期续订次数 = 服务数 * 2 ,因为: 一个服务30s/一次续约
- 每分钟阈值 = 服务数 * 2 * 0.85
5.EurekaServer初始化配置
EurekaServerInitializerConfiguration 的start方法会在Spring容器刷新的时候调用,因为它实现了SmartLifecycle接口 , start方法中新开线程调用eurekaServerBootstrap.contextInitialized进行初始化
public void start() {
new Thread(new Runnable() {
@Override
public void run() {
try {
//TODO: is this class even needed now?
eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
log.info("Started Eureka Server");
publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
EurekaServerInitializerConfiguration.this.running = true;
publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
}
catch (Exception ex) {
// Help!
log.error("Could not initialize Eureka servlet context", ex);
}
}
}).start();
}
6.Eureka启动引导
EurekaServerBootstrap .contextInitialized 负责初始化Eureak环境和初始化上下文
//Eureka初始化
public void contextInitialized(ServletContext context) {
try {
//初始化环境
initEurekaEnvironment();
//初始化上下文
initEurekaServerContext();
//设置上下文属性
context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
}
catch (Throwable e) {
log.error("Cannot bootstrap eureka server :", e);
throw new RuntimeException("Cannot bootstrap eureka server :", e);
}
}
在初始化上下文的时候会调用 PeerAwareInstanceRegistryImpl.syncUp(); 从相邻的集群节点同步注册表,通过PeerAwareInstanceRegistryImpl.register注册到当前Eureka节点
//初始化eurekaServer上下文
protected void initEurekaServerContext() throws Exception {
...省略...
//把EurekaServerContext设置到EurekaServerContextHolder中
EurekaServerContextHolder.initialize(this.serverContext);
log.info("Initialized server context");
// Copy registry from neighboring eureka node
//从相邻的eureka节点复制注册表,使用的是PeerAwareInstanceRegistryImpl的实现
int registryCount = this.registry.syncUp();
this.registry.openForTraffic(this.applicationInfoManager, registryCount);
// Register all monitoring statistics.
//注册所有监视统计信息。
EurekaMonitors.registerAllStats();
}
同步相邻节点的注册表PeerAwareInstanceRegistryImpl.syncUp()
@Override
public int syncUp() {
// Copy entire entry from neighboring DS node
int count = 0;
//getRegistrySyncRetries重试次数默认5次
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
if (i > 0) {
try {
//通信中断,等待下一次切换实例
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
} catch (InterruptedException e) {
logger.warn("Interrupted during registry transfer..");
break;
}
}
//获取注册表
Applications apps = eurekaClient.getApplications();
//循环服务列表,依次注册
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
try {
if (isRegisterable(instance)) {
//获取InstanceInfo之后注册到当前节点,保存到 ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry 中缓存起来
register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
count++;
}
} catch (Throwable t) {
logger.error("During DS init copy", t);
}
}
}
}
return count;
}
到这里EurekaServer就算初始化完成了
总结
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/149329.html