docs/Spring全家桶/SpringCloud源码分析/SpringCloudEureka源码分析.md
EurekaԴ
**1 ** 1.1 Eurekaʲô
ǶףEurekaעģעҪʵʲôأȷˡ
ʱǾͻ뵽SpringBootԶװStarter˺beanԶע룬ײֱõbeanȻStarterӦԶǵAPIģOKǻعͷҷ֣ҵEureka-clientһstarterٺ٣е㶫ˡclient˺϶ǰǷװ˸beanȻǵ˺apiˡ
EurekaĺĹһܽ
Ƶ֮ͨԴһ֤
2 Դ 2.1 ע עspring bootӦʱġִ·ҲȻعһǰǽ֪ʶ
˵spring cloudһ̬ṩһױױͨͬʵ֣оͰע/֡۶ϡؾȣspring-cloud-commonУ org.springframework.cloud.client.serviceregistry ·£ԿһעĽӿڶ ServiceRegistry Ƕspring cloudзעһӿڡ
ǿһϵͼӿһΨһʵ EurekaServiceRegistry ʾõEureka ServerΪעġ
2.1.1 עʱ עķǿԲ²һӦʲôʱɣҪʵӦòѲ²עȡڷǷѾˡspring bootУȵspring еö֮עᡣspring bootеrefreshContextɡ
ǹ۲һfinishRefreshϿԿˢµIJҲˢ֮ҪĺõIJҪ
ջ
ʼһLifecycleProcessorSpringʱbeanspringʱbean
LifecycleProcessoronRefreshʵLifecycleӿڵbean
ContextRefreshedEvent
עBeanͨJMXмغ
protected void finishRefresh() {
// Clear context-level resource caches (such as ASM metadata from scanning).
clearResourceCaches();
// Initialize lifecycle processor for this context.
initLifecycleProcessor();
// Propagate refresh to lifecycle processor first.
getLifecycleProcessor().onRefresh();
// Publish the final event.
publishEvent(new ContextRefreshedEvent(this));
// Participate in LiveBeansView MBean, if active.
LiveBeansView.registerApplicationContext(this);
}
Уصע getLifecycleProcessor().onRefresh() ǵڴonrefreshҵSmartLifecycleӿڵʵಢstart
2.1.2 SmartLifeCycle չһSmartLifeCycle֪ʶ SmartLifeCycleһӿڣSpringеBeanҳʼ֮صʵSmartLifeCycleӿڵжӦķ磨start
ʵԼҲչspringboot̵mainͬĿ¼£дһ࣬ʵSmartLifeCycleӿڣͨ @Service ΪһbeanΪҪspringȥأȵbean
@Service
public class TestSmartLifeCycle implements SmartLifecycle {
/**
* ִ.ʾstart.
* isAutoStartup()ֵ,ֻisAutoStartup()trueʱ,start()Żᱻִ
*/
@Override
public void start() {
System.out.println("----------start-----------");
}
/**
* ֹͣǰִз
* ǰ: isRunning()trueŻᱻִ
*/
@Override
public void stop() {
System.out.println("----------stop-----------");
}
/**
* ط״̬,Ӱ쵽Ƿstop
* @return
*/
@Override
public boolean isRunning() {
return false;
}
/**
* Ƿstart,Ҫע
* ǰfalseDzִstart()
* @return
*/
@Override
public boolean isAutoStartup() {
return true;
}
@Override
public void stop(Runnable runnable) {
stop();
runnable.run();
}
/**
* ִָ˳
* ǰжʵSmartLifecycle,˷ִֵ
* @return
*/
@Override
public int getPhase() {
return 0;
}
}
ţspring bootӦúԿ̨ start ַ
DefaultLifecycleProcessor.startBeansϼһdebugԺԵĿԼTestSmartLifeCycleɨ赽ˣøbeanstartstartBeansУǿԿȻʵSmartLifeCycleBeanȻѭʵSmartLifeCyclebeanstart¡
private void startBeans(boolean autoStartupOnly) {
Map<String, Lifecycle> lifecycleBeans = this.getLifecycleBeans();
Map<Integer, DefaultLifecycleProcessor.LifecycleGroup> phases = new HashMap();
lifecycleBeans.forEach((beanName, bean) -> {
if (!autoStartupOnly || bean instanceof SmartLifecycle &&
((SmartLifecycle)bean).isAutoStartup()) {
int phase = this.getPhase(bean);
DefaultLifecycleProcessor.LifecycleGroup group =
(DefaultLifecycleProcessor.LifecycleGroup)phases.get(phase);
if (group == null) {
group = new DefaultLifecycleProcessor.LifecycleGroup(phase,this.timeoutPerShutdownPhase, lifecycleBeans, autoStartupOnly);
phases.put(phase, group);
}
group.add(beanName, bean);
}
});
if (!phases.isEmpty()) {
List<Integer> keys = new ArrayList(phases.keySet());
Collections.sort(keys);
Iterator var5 = keys.iterator();
while(var5.hasNext()) {
Integer key = (Integer)var5.next();
((DefaultLifecycleProcessor.LifecycleGroup)phases.get(key)).start();
}
}
}
2.1.3 doStart
private void doStart(Map<String, ? extends Lifecycle> lifecycleBeans, String beanName, boolean autoStartupOnly) {
Lifecycle bean = (Lifecycle)lifecycleBeans.remove(beanName);
if (bean != null && bean != this) {
String[] dependenciesForBean = this.getBeanFactory().getDependenciesForBean(beanName);
String[] var6 = dependenciesForBean;
int var7 = dependenciesForBean.length;
for(int var8 = 0; var8 < var7; ++var8) {
String dependency = var6[var8];
this.doStart(lifecycleBeans, dependency, autoStartupOnly);
}
if (!bean.isRunning() && (!autoStartupOnly || !(bean instanceof SmartLifecycle) || ((SmartLifecycle)bean).isAutoStartup())) {
if (this.logger.isTraceEnabled()) {
this.logger.trace("Starting bean '" + beanName + "' of type [" + bean.getClass().getName() + "]");
}
try {
bean.start(); //ʱ BeanʵӦEurekaAutoServiceRegistration
} catch (Throwable var10) {
throw new ApplicationContextException("Failed to start bean '" + beanName + "'", var10);
}
if (this.logger.isDebugEnabled()) {
this.logger.debug("Successfully started bean '" + beanName + "'");
}
}
}
}
ʱbean.start()õĿ EurekaAutoServiceRegistrationеstartΪȻʵSmartLifeCycleӿڡ
public class EurekaAutoServiceRegistration implements AutoServiceRegistration,SmartLifecycle, Ordered, SmartApplicationListener {
@Override
public void start() {
// only set the port if the nonSecurePort or securePort is 0 and this.port != 0
if (this.port.get() != 0) {
if (this.registration.getNonSecurePort() == 0) {
this.registration.setNonSecurePort(this.port.get());
}
if (this.registration.getSecurePort() == 0 &&
this.registration.isSecure()) {
this.registration.setSecurePort(this.port.get());
}
}
// only initialize if nonSecurePort is greater than 0 and it isn't already running
// because of containerPortInitializer below
if (!this.running.get() && this.registration.getNonSecurePort() > 0) {
this.serviceRegistry.register(this.registration);
this.context.publishEvent(new InstanceRegisteredEvent<>(this,
this.registration.getInstanceConfig()));
this.running.set(true);
}
}
}
startУǿԿ this.serviceRegistry.register ʵϾǷעĻơ
ʱthis.serviceRegistryʵӦ EurekaServiceRegistry ԭ EurekaAutoServiceRegistrationĹ췽Уһֵ췽EurekaClientAutoConfiguration Զװбװͳʼģ¡
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
public EurekaAutoServiceRegistration eurekaAutoServiceRegistration(
ApplicationContext context, EurekaServiceRegistry registry,
EurekaRegistration registration) {
return new EurekaAutoServiceRegistration(context, registry, registration);
}
2.2 ע Ƿע
public class EurekaAutoServiceRegistration implements AutoServiceRegistration,
SmartLifecycle, Ordered, SmartApplicationListener {
@Override
public void start() {
//ʡ...
this.serviceRegistry.register(this.registration);
this.context.publishEvent(new InstanceRegisteredEvent<> this,this.registration.getInstanceConfig()));
}
}
this.serviceRegistry.register(this.registration); ջ
EurekaServiceRegistry е register ʵַע
2.2.1 register
@Override
public void register(EurekaRegistration reg) {
maybeInitializeClient(reg);
if (log.isInfoEnabled()) {
log.info("Registering application "
+ reg.getApplicationInfoManager().getInfo().getAppName()
+ " with eureka with status "
+ reg.getInstanceConfig().getInitialStatus());
}
//õǰʵ״̬һʵ״̬仯ֻҪ״̬DOWNôͻᱻִзעᡣ
reg.getApplicationInfoManager().setInstanceStatus(reg.getInstanceConfig().getInitialStatus());
//ýĴ
reg.getHealthCheckHandler().ifAvailable(healthCheckHandler -> reg.getEurekaClient().registerHealthCheck(healthCheckHandler));
}
ע᷽вûEurekaķȥִעᣬǽһ״̬Լý鴦Ǽһ reg.getApplicationInfoManager().setInstanceStatus
public synchronized void setInstanceStatus(InstanceStatus status) {
InstanceStatus next = instanceStatusMapper.map(status);
if (next == null) {
return;
}
InstanceStatus prev = instanceInfo.setStatus(next);
if (prev != null) {
for (StatusChangeListener listener : listeners.values()) {
try {
listener.notify(new StatusChangeEvent(prev, next));
} catch (Exception e) {
logger.warn("failed to notify listener: {}", listener.getId(),e);
}
}
}
}
Уͨһ״̬¼okʱlistenerʵStatusChangeListener Ҳǵ StatusChangeListener notify¼Ǵһ״̬Ӧеط¼Ȼ¼עᡣ
ʱΪҵ˷ȻȥһһӿڡǷǾ̬ڲӿڣֱӿʵࡣ
ҶԴĶ飬ңΪһܲ²һijط˳ʼĹǣҵ EurekaServiceRegistry.registerе reg.getApplicationInfoManager ʵʲôǷApplicationInfoManagerEurekaRegistrationеԡEurekaRegistrationEurekaAutoServiceRegistrationʵġ룬DzԶװʲôҵEurekaClientAutoConfiguration࣬ȻBeanһЩԶװ䣬а EurekaClient ApplicationInfoMangager EurekaRegistration ȡ
2.2.2 EurekaClientConfiguration
@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingRefreshScope
protected static class EurekaClientConfiguration {
@Autowired
private ApplicationContext context;
@Autowired
private AbstractDiscoveryClientOptionalArgs<?> optionalArgs;
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class,search = SearchStrategy.CURRENT)
public EurekaClient eurekaClient(ApplicationInfoManager manager,EurekaClientConfig config) {
return new CloudEurekaClient(manager, config, this.optionalArgs,this.context);
}
@Bean
@ConditionalOnMissingBean(value = ApplicationInfoManager.class,search = SearchStrategy.CURRENT)
public ApplicationInfoManager eurekaApplicationInfoManager(
EurekaInstanceConfig config) {
InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
return new ApplicationInfoManager(config, instanceInfo);
}
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
@ConditionalOnProperty(
value = "spring.cloud.service-registry.auto-registration.enabled",
matchIfMissing = true)
public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient,CloudEurekaInstanceConfig
instanceConfig,ApplicationInfoManager applicationInfoManager, @Autowired(required = false)
ObjectProvider<HealthCheckHandler> healthCheckHandler) {
return EurekaRegistration.builder(instanceConfig).with(applicationInfoManager).with(eurekaClient).with(healthCheckHandler).build();
}
}
ѷ֣ƺһҪBeanʱԶװ䣬ҲCloudEurekaClient ҿԺʶ²Eurekaͻ˵һ࣬ʵֺͷ˵ͨԼǺܶԴһ·Ҫôڹ췽ȥܶijʼһЩִ̨еijҪôͨ첽¼ķʽţǿһCloudEurekaClientijʼ̣Ĺ췽лͨ super øĹ췽ҲDiscoveryClientĹ졣
2.2.3 CloudEurekaClient super(applicationInfoManager, config, args);øĹ췽CloudEurekaClientĸDiscoveryClient.
public CloudEurekaClient(ApplicationInfoManager applicationInfoManager,EurekaClientConfig config,AbstractDiscoveryClientOptionalArgs<?> args,ApplicationEventPublisher publisher) {
super(applicationInfoManager, config, args);
this.applicationInfoManager = applicationInfoManager;
this.publisher = publisher;
this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class,"eurekaTransport");
ReflectionUtils.makeAccessible(this.eurekaTransportField);
}
2.2.4 DiscoveryClient ǿԿյDiscoveryClient췽УзdzĴ롣ʵܶԲҪģֶһЩʼʼ˼ʱ
scheduler
heartbeatExecutor ʱ
cacheRefreshExecutor ʱȥͬ˵ʵб
DiscoveryClient(ApplicationInfoManager applicationInfoManager,EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,Provider<BackupRegistry> backupRegistryProvider,EndpointRandomizer endpointRandomizer) {
//ʡԲִ...
//ǷҪeureka serverϻȡַϢ
if (config.shouldFetchRegistry()) {
this.registryStalenessMonitor = new ThresholdLevelsMetric(this,METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L,480L});
} else {
this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
//ǷҪעᵽeureka server
if (config.shouldRegisterWithEureka()) {
this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this,METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L,120L, 240L, 480L});
} else {
this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
//ҪעᲢҲҪ·ַ
if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
return; // no need to setup up an network tasks and we are done
}
try {
// default size of 2 - 1 each for heartbeat and cacheRefresh
scheduler = Executors.newScheduledThreadPool(2,new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
heartbeatExecutor = new ThreadPoolExecutor(1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);
AzToRegionMapper azToRegionMapper;
if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
} else {
azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
}
if (null != remoteRegionsToFetch.get()) {
azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
}
instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper,
clientConfig.getRegion());
} catch (Throwable e) {
throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
}
//ҪעᵽEureka serverǿ˳ʼʱǿעᣬregister()ע
if (clientConfig.shouldRegisterWithEureka() &&
clientConfig.shouldEnforceRegistrationAtInit()) {
try {
if (!register() ) {
throw new IllegalStateException("Registration error at startup.Invalid server response.");
}
} catch (Throwable th) {
logger.error("Registration error at startup: {}", th.getMessage());
throw new IllegalStateException(th);
}
}
// finally, init the schedule tasks (e.g. cluster resolvers, heartbeat,instanceInfo replicator, fetch
initScheduledTasks();
}
2.2.5 initScheduledTasks initScheduledTasks ȥһʱ
ͨڲʵStatusChangeListener ʵ״̬ؽӿڣǰڷģnotifyķʵϻ֡
private void initScheduledTasks() {
//˿עˢ·бῪcacheRefreshExecutorʱ
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds =
clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound =
clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
//˷עᵽEurekaͨҪ
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs =
instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound =
clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}",
renewalIntervalInSecs);
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
// InstanceInfo replicator ʼһ:instanceInfoReplicator
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
statusChangeListener = new ApplicationInfoManager.StatusChangeListener()
{
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN ==
statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}",
statusChangeEvent);
} else {
logger.info("Saw local status change event {}",
statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
//עʵ״̬仯ļ
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
//һʵϢҪΪ˿һʱ̣߳ÿ40жʵϢǷע
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationInte
rvalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
2.2.6 onDemandUpdate ҪǸʵǷ仯עĵݡ
public boolean onDemandUpdate() {
//ж
if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
if (!scheduler.isShutdown()) {
//ύһ
scheduler.submit(new Runnable() {
@Override
public void run() {
logger.debug("Executing on-demand update of local InstanceInfo");
//ȡ֮ǰѾύҲstartύĸûִɣȡ֮ǰ
Future latestPeriodic = scheduledPeriodicRef.get();
if (latestPeriodic != null && !latestPeriodic.isDone()) {
logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
latestPeriodic.cancel(false);//δɣȡ
}
//ͨrunʱִУ൱еһ
InstanceInfoReplicator.this.run();
}
});
return true;
} else {
logger.warn("Ignoring onDemand update due to stopped scheduler");
return false;
}
} else {
logger.warn("Ignoring onDemand update due to rate limiter");
return false;
}
}
2.2.7 run runʵϺǰԶװִеķע᷽һģҲǵ register зעᣬfinallyУÿ30sᶨʱִһµǰrun м顣
public void run() {
try {
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds,
TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
2.2.8 register գҵעˣ eurekaTransport.registrationClient.register յõ AbstractJerseyEurekaHttpClient#register(...)` ȻԼȥ룬ͻᷢȥ֮ǰкܶȥĴ룬繤ģʽװģʽȡ
boolean register() throws Throwable {
logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier,e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier,httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
ȻǷһhttpEureka-Serverapps/${APP_NAME}ӿڣǰʵϢ͵Eureka Serverб档
ˣǻѾ֪Spring Cloud Eureka ʱѷϢעᵽEureka Serverϵˡ
public EurekaHttpResponse<Void> register(InstanceInfo info) {
String urlPath = "apps/" + info.getAppName();
ClientResponse response = null;
try {
Builder resourceBuilder =
jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(resourceBuilder);
response = resourceBuilder
.header("Accept-Encoding", "gzip")
.type(MediaType.APPLICATION_JSON_TYPE)
.accept(MediaType.APPLICATION_JSON)
.post(ClientResponse.class, info);
return
anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
ǣƺʼûнҲSpring BootӦʱstartյ StatusChangeListener.notify ȥ·һ״̬ûֱӵregisterעᡣǼȥһ statusChangeListener.notify
2.2.9 ܽ ˣ֪Eureka Clientעʱطִзע
ڣ com.netflix.eureka.resources.ApplicationResource.addInstance()
ҿԷ֣ṩRESTõjerseyʵֵġJerseyǻJAX-RSṩRESTʵֵ֧֣Ͳչˡ
2.3.1 addInstance() EurekaClientregisterעʱ ApplicationResource.addInstance
עǷһPOSTϵǰʵϢ ApplicationResource addInstanceзעᡣ
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info, @HeaderParam("x-netflix-discovery-replication") String isReplication) {
logger.debug("Registering instance {} (replication={})", info.getId(),
isReplication);
DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
if (dataCenterInfo instanceof UniqueIdentifier) {
String dataCenterInfoId =
((UniqueIdentifier)dataCenterInfo).getId();
if (this.isBlank(dataCenterInfoId)) {
boolean experimental = "true".equalsIgnoreCase(this.serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
if (experimental) {
String entity = "DataCenterInfo of type " +
dataCenterInfo.getClass() + " must contain a valid id";
return Response.status(400).entity(entity).build();
}
if (dataCenterInfo instanceof AmazonInfo) {
AmazonInfo amazonInfo = (AmazonInfo)dataCenterInfo;
String effectiveId = amazonInfo.get(MetaDataKey.instanceId);
if (effectiveId == null) {
amazonInfo.getMetadata().put(MetaDataKey.instanceId.getName(), info.getId());
}
} else {
logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
}
}
}
this.registry.register(info, "true".equals(isReplication));
return Response.status(204).build();
}
2.3.2 register
PeerAwareInstanceRegistryImplϵͼϵͼԿPeerAwareInstanceRegistryӿΪLeaseManagerLookupService,
addInstance Уյõ PeerAwareInstanceRegistryImpl.register
leaseDuration ʾԼʱ䣬Ĭ90sҲǵ˳90sûյͻ˵ýڵ
super.registerڵע
ϢƵEureka ServerȺеϣͬʵҲܼǻüȺенڵ㣬Ȼע
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() >0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs(); //ͻԼʱʱ䣬ÿͻ˵ʱ
}
super.register(info, leaseDuration, isReplication); //ڵע
//ƵEureka ServerȺеڵ
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info,
null, isReplication);
}
2.3.3 AbstractInstanceRegistry.register ˵Eureka-Serverķעᣬʵǽͻ˴ݹʵݱ浽Eureka-ServerеConcurrentHashMapС
public void register(InstanceInfo registrant, int leaseDuration, boolean
isReplication) {
try {
read.lock();
//registryлõǰʵϢappName
Map<String, Lease<InstanceInfo>> gMap =
registry.get(registrant.getAppName());
REGISTER.increment(isReplication); //עϢ
if (gMap == null) {//ǰappNameǵһעᣬʼһConcurrentHashMap
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new
ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
//gMapвѯѾڵLeaseϢLeaseķΪԼʵѷṩߵʵϢװһleaseṩ˶ڸķʵԼ
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// instanceѾǣͿͻ˵instanceϢȽϣʱµǸΪЧinstanceϢ
if (existingLease != null && (existingLease.getHolder() != null)) {
Long existingLastDirtyTimestamp =
existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp =
registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}",
existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
// this is a > instead of a >= because if the timestamps are equal,we still take the remote transmitted
// InstanceInfo instead of the server local copy.
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}",
existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
} else {
//leaseʱ뵽δ룬
synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
// Since the client wants to register it, increase the number of clients sending renews
this.expectedNumberOfClientsSendingRenews =
this.expectedNumberOfClientsSendingRenews + 1;
updateRenewsPerMinThreshold();
}
}
logger.debug("No previous lease information found; it is new registration");
}
//һlease
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant,
leaseDuration);
if (existingLease != null) {
// ԭLeaseϢʱserviceUpTimestamp, ֤ʱһֱǵһעǸ
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
gMap.put(registrant.getId(), lease);
synchronized (recentRegisteredQueue) {//ӵעĶ
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
}
// ʵ״̬Ƿ仯DzҴڣԭ״̬
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(),
registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it",
registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(),
registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap =
overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map",
overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// Set the status based on the overridden status rules
InstanceStatus overriddenInstanceStatus =
getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// õinstanceStatusжǷUP״̬
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
// עΪ
registrant.setActionType(ActionType.ADDED);
// Լ¼У¼ʵÿα仯 עϢȡ
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
//ûʧЧ
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(),
registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(),
registrant.getStatus(), isReplication);
} finally {
read.unlock();
}
}
2.3.4 С ˣǾͰѷעڿͻ˺ͷ˵ĴһϸķʵEureka Serverˣѿͻ˵ĵַϢ浽ConcurrentHashMapд洢ҷṩߺע֮䣬ὨһơڼطṩߵĽ״̬
2.4 Eureka Ķ༶ Eureka Server(registryreadWriteCacheMapreadOnlyCacheMap)עϢĬ¶ʱÿ30sreadWriteCacheMapͬreadOnlyCacheMapÿ60s90sδԼĽڵ㣬Eureka Clientÿ30sreadOnlyCacheMap·עϢͻ˷עregistry·עϢ
2.4.1 ༶ ΪʲôҪƶ༶أԭܼǵڴģķעʱֻһConcurrentHashMapݣôƱΪĴڵ¾Ӱܡ
EurekaAPģֻͣҪտþСõ༶ʵֶд롣ע᷽дʱֱдڴעд֮ʧЧд档
ȡעϢӿȴֻȡֻûȥдȡдûȥڴעȡֻȡ˴ϸӣңд»дֻ
public void invalidate(Key... keys) {
for (Key key : keys) {
logger.debug("Invalidating the response cache key : {} {} {} {}, {}",
key.getEntityType(), key.getName(), key.getVersion(),
key.getType(), key.getEurekaAccept());
readWriteCacheMap.invalidate(key);
Collection<Key> keysWithRegions = regionSpecificKeys.get(key);
if (null != keysWithRegions && !keysWithRegions.isEmpty()) {
for (Key keysWithRegion : keysWithRegions) {
logger.debug("Invalidating the response cache key : {} {} {} {} {}",
key.getEntityType(), key.getName(),
key.getVersion(), key.getType(), key.getEurekaAccept());
readWriteCacheMap.invalidate(keysWithRegion);
}
}
}
}
2.4.3 ʱͬ ResponseCacheImplĹ췽Уһʱᶨʱдеݱ仯иºͬ
private TimerTask getCacheUpdateTask() {
return new TimerTask() {
@Override
public void run() {
logger.debug("Updating the client cache from response cache");
for (Key key : readOnlyCacheMap.keySet()) {
if (logger.isDebugEnabled()) {
logger.debug("Updating the client cache from response cache for key : {} {} {} {}",
key.getEntityType(), key.getName(),
key.getVersion(), key.getType());
}
try {
CurrentRequestVersion.set(key.getVersion());
Value cacheValue = readWriteCacheMap.get(key);
Value currentCacheValue = readOnlyCacheMap.get(key);
if (cacheValue != currentCacheValue) {
readOnlyCacheMap.put(key, cacheValue);
}
} catch (Throwable th) {
logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
} finally {
CurrentRequestVersion.remove();
}
}
}
};
}
2.5 Լ νķԼʵһơͻ˻ᶨڷԼôҿһ´ʵ
2.5.1 initScheduledTasks ͻ˻ DiscoveryClient.initScheduledTasks УһĶʱ
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
2.5.2 HeartbeatThread ȻʱУִһ HearbeatThread ̣̻߳߳ᶨʱrenew()Լ
//ÿ30sһ
private class HeartbeatThread implements Runnable {
public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
2.5.3 յĴ ApplicationResource.getInstanceInfoӿУ᷵һInstanceResourceʵڸʵ£һstatusUpdateĽӿ״̬
@Path("{id}")
public InstanceResource getInstanceInfo(@PathParam("id") String id) {
return new InstanceResource(this, id, serverConfig, registry);
}
2.5.4 InstanceResource.statusUpdate() ڸ÷Уصע registry.statusUpdate AbstractInstanceRegistry.statusUpdateָṩڷ˴洢Ϣеı仯
@PUT
@Path("status")
public Response statusUpdate(
@QueryParam("value") String newStatus,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
try {
if (registry.getInstanceByAppAndId(app.getName(), id) == null) {
logger.warn("Instance not found: {}/{}", app.getName(), id);
return Response.status(Status.NOT_FOUND).build();
}
boolean isSuccess = registry.statusUpdate(app.getName(), id,
InstanceStatus.valueOf(newStatus), lastDirtyTimestamp,
"true".equals(isReplication));
if (isSuccess) {
logger.info("Status updated: {} - {} - {}", app.getName(), id,
newStatus);
return Response.ok().build();
} else {
logger.warn("Unable to update status: {} - {} - {}", app.getName(),
id, newStatus);
return Response.serverError().build();
}
} catch (Throwable e) {
logger.error("Error updating instance {} for status {}", id,
newStatus);
return Response.serverError().build();
}
}
2.5.5 AbstractInstanceRegistry.statusUpdate УõӦöӦʵбȻLease.renew()ȥԼ
public boolean statusUpdate(String appName, String id,
InstanceStatus newStatus, String
lastDirtyTimestamp,
boolean isReplication) {
try {
read.lock();
// ״̬Ĵ ״̬ͳ
STATUS_UPDATE.increment(isReplication);
// ӱȡʵϢ
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> lease = null;
if (gMap != null) {
lease = gMap.get(id);
}
// ʵڣֱӷأʾʧ
if (lease == null) {
return false;
} else {
// ִһleaserenewҪǸinstanceʱ䡣
lease.renew();
// ȡinstanceʵϢ
InstanceInfo info = lease.getHolder();
// Lease is always created with its instance info object.
// This log statement is provided as a safeguard, in case this invariant is violated.
if (info == null) {
logger.error("Found Lease without a holder for instance id {}",
id);
}
// instanceϢΪʱʵ״̬˱仯
if ((info != null) && !(info.getStatus().equals(newStatus))) {
// ״̬UP״̬ôһserviceUp() , ҪǸ·עʱ
if (InstanceStatus.UP.equals(newStatus)) {
lease.serviceUp();
}
// instance Id ״̬ӳϢ븲ǻMAPȥ
overriddenInstanceStatusMap.put(id, newStatus);
// Set it for transfer of overridden status to replica on
// ø״̬ʵϢȥ
info.setOverriddenStatus(newStatus);
long replicaDirtyTimestamp = 0;
info.setStatusWithoutDirty(newStatus);
if (lastDirtyTimestamp != null) {
replicaDirtyTimestamp = Long.valueOf(lastDirtyTimestamp);
}
// If the replication's dirty timestamp is more than the existing one, just update
// it to the replica's.
// replicaDirtyTimestamp ʱinstancegetLastDirtyTimestamp() ,
if (replicaDirtyTimestamp > info.getLastDirtyTimestamp()) {
info.setLastDirtyTimestamp(replicaDirtyTimestamp);
}
info.setActionType(ActionType.MODIFIED);
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
info.setLastUpdatedTimestamp();
//д
invalidateCache(appName, info.getVIPAddress(),
info.getSecureVipAddress());
}
return true;
}
} finally {
read.unlock();
}
}
ˣԼܾͷˡ
**2.6 ** Ǽоķֹ̣ǿͻҪܹ
ʱȡָṩߵĵַб Eureka server˵ַ仯ʱҪ̬֪ 2.6.1 DiscoveryClientʱѯ 췽УǰĿͻĬϿfetchRegistryeureka-serverȡݡ
DiscoveryClient(ApplicationInfoManager applicationInfoManager,
EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider,
EndpointRandomizer endpointRandomizer) {
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
fetchRegistryFromBackup();
}
}
2.6.2 fetchRegistry
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// If the delta is disabled or if it is the first time, get all
// applications
Applications applications = getApplications();
if (clientConfig.shouldDisableDelta()
||
(!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
logger.info("Disable delta property : {}",
clientConfig.shouldDisableDelta());
logger.info("Single vip registry refresh property : {}",
clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}",
forceFullRegistryFetch);
logger.info("Application is null : {}", (applications == null));
logger.info("Registered Applications size is zero : {}",
(applications.getRegisteredApplications().size() == 0));
logger.info("Application version is -1: {}",
(applications.getVersion() == -1));
getAndStoreFullRegistry();
} else {
getAndUpdateDelta(applications);
}
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
// Notify about cache refresh before updating the instance remote status
onCacheRefreshed();
// Update remote status based on refreshed data held in the cache
updateInstanceRemoteStatus();
// registry was fetched successfully, so return true
return true;
}
2.6.3 ʱˢ±صַб ÿ30sһ DiscoveryClientʱʼһЩǰǷˡһ̬±طַб cacheRefreshTask
ִеCacheRefreshThread̡߳һִеһ¡
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds =
clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound =
clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
cacheRefreshTask = new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
);
scheduler.schedule(
cacheRefreshTask,
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
2.6.4 TimedSupervisorTask ϿTimedSupervisorTaskǹ̶һʱͻὫһڵļʱʱôÿμʱ䶼һһֱⲿ趨ΪֹһٳʱʱֻԶָΪʼֵƻֵѧϰġ
public void run() {
Future future = null;
try {
//ʹFuture趨̵߳ijʱʱ䣬ǰ߳̾Ͳȴ
future = executor.submit(task);
threadPoolLevelGauge.set((long) executor.getActiveCount());
//ָȴ̵߳ʱ
future.get(timeoutMillis, TimeUnit.MILLISECONDS); // block until done or timeout
//delayǸõıõǵÿִɹὫdelay
delay.set(timeoutMillis);
threadPoolLevelGauge.set((long) executor.getActiveCount());
} catch (TimeoutException e) {
logger.error("task supervisor timed out", e);
timeoutCounter.increment();
long currentDelay = delay.get();
//̳߳ʱʱͰdelayᳬⲿʱ趨ʱʱ
long newDelay = Math.min(maxDelay, currentDelay * 2);
//Ϊµֵǵ̣߳CAS
delay.compareAndSet(currentDelay, newDelay);
} catch (RejectedExecutionException e) {
//һ̳߳صз˴˾ܾԣͻὫͣ
if (executor.isShutdown() || scheduler.isShutdown()) {
logger.warn("task supervisor shutting down, reject the task", e);
} else {
logger.error("task supervisor rejected the task", e);
}
rejectedCounter.increment();
} catch (Throwable e) {
//һδ֪쳣ͣ
if (executor.isShutdown() || scheduler.isShutdown()) {
logger.warn("task supervisor shutting down, can't accept the task");
} else {
logger.error("task supervisor threw an exception", e);
}
throwableCounter.increment();
} finally {
//ҪôִϣҪô쳣cancel
if (future != null) {
future.cancel(true);
}
//ֻҪûָֹͣȴʱִ֮һͬ
if (!scheduler.isShutdown()) {
//ԭֻҪûֹͣٴһִʱʱdealyֵ
//ⲿʱijʱʱΪ30루췽timeoutʱΪ50(췽expBackOffBound)
//һûгʱô30ʼ
//һʱˣô50ʼ쳣иԶIJԶ60볬50룩
scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
}
}
}
2.6.5 refreshRegistry δҪ
жremoteRegionsǷ˱仯
fetchRegistryȡطַ
@VisibleForTesting
void refreshRegistry() {
try {
boolean isFetchingRemoteRegionRegistries =
isFetchingRemoteRegionRegistries();
boolean remoteRegionsModified = false;
//awsϣжһԶµϢ͵ǰԶϢбȽϣȣ
String latestRemoteRegions =
clientConfig.fetchRegistryForRemoteRegions();
if (null != latestRemoteRegions) {
String currentRemoteRegions = remoteRegionsToFetch.get();
if (!latestRemoteRegions.equals(currentRemoteRegions)) {
//жһ
}
boolean success = fetchRegistry(remoteRegionsModified);
if (success) {
registrySize = localRegionApps.get().size();
lastSuccessfulRegistryFetchTimestamp =
System.currentTimeMillis();
}
// ʡ
} catch (Throwable e) {
logger.error("Cannot fetch registry from server", e);
}
}
2.6.6 fetchRegistry
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// If the delta is disabled or if it is the first time, get all
// applications
// ȡػķбϢ
Applications applications = getApplications();
//ж϶ȷǷȫ£һ㶼ȫ£
//1\. Ƿ£
//2\. Ƿijregionرע
//3\. ⲿʱǷָͨȫ£
//4\. ػδЧķбϢ
if (clientConfig.shouldDisableDelta()
||
(!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not
have latest library supporting delta
{
//ȫ
getAndStoreFullRegistry();
} else {
//
getAndUpdateDelta(applications);
}
//¼һhash
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances(); //־ӡӦõʵ֮
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
//ػµ¼㲥עļע÷ѱCloudEurekaClientд
onCacheRefreshed();
// Update remote status based on refreshed data held in the cache
//ոոµĻУEureka serverķба˵ǰӦõ״̬
//ǰʵijԱlastRemoteInstanceStatus¼һθµĵǰӦ״̬
//״̬updateInstanceRemoteStatusȽ һ£lastRemoteInstanceStatusҹ㲥Ӧ¼
updateInstanceRemoteStatus();
// registry was fetched successfully, so return true
return true;
}
2.6.7 getAndStoreFullRegistry eureka server˻ȡעĵĵַϢȻ²õػ localRegionApps
private void getAndStoreFullRegistry() throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
logger.info("Getting all instance registry info from the eureka server");
Applications apps = null;
EurekaHttpResponse<Applications> httpResponse =
clientConfig.getRegistryRefreshSingleVipAddress() == null
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
:
eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddre
ss(), remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
apps = httpResponse.getEntity();
}
logger.info("The response status is {}", httpResponse.getStatusCode());
if (apps == null) {
logger.error("The application is null for some reason. Not storing this information");
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration,
currentUpdateGeneration + 1)) {
localRegionApps.set(this.filterAndShuffle(apps));
logger.debug("Got full registry with apps hashcode {}",
apps.getAppsHashCode());
} else {
logger.warn("Not updating applications as another thread is updating it already");
}
}
2.6.8 ˲ѯַ ǰ֪ͻ˷ַIJѯ֣һȫһȫѯEureka-serverApplicationsResourcegetContainers
ApplicationsResource.getContainerDifferential
2.6.9 ApplicationsResource.getContainers տͻ˷͵ĻȡȫעϢ
@GET
public Response getContainers(@PathParam("version") String version,
@HeaderParam(HEADER_ACCEPT) String acceptHeader,
@HeaderParam(HEADER_ACCEPT_ENCODING) String
acceptEncoding,
@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT)
String eurekaAccept,
@Context UriInfo uriInfo,
@Nullable @QueryParam("regions") String
regionsStr) {
boolean isRemoteRegionRequested = null != regionsStr &&
!regionsStr.isEmpty();
String[] regions = null;
if (!isRemoteRegionRequested) {
EurekaMonitors.GET_ALL.increment();
} else {
regions = regionsStr.toLowerCase().split(",");
Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
}
// EurekaServerṩ403
if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
return Response.status(Status.FORBIDDEN).build();
}
CurrentRequestVersion.set(Version.toEnum(version));
KeyType keyType = Key.KeyType.JSON;// ÷ݸʽĬJSON
String returnMediaType = MediaType.APPLICATION_JSON;
if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
// յͷûоʽϢظʽΪXML
keyType = Key.KeyType.XML;
returnMediaType = MediaType.APPLICATION_XML;
}
//
Key cacheKey = new Key(Key.EntityType.Application,
ResponseCacheImpl.ALL_APPS,
keyType, CurrentRequestVersion.get(),
EurekaAccept.fromString(eurekaAccept), regions
);
// زͬı͵ݣȥȡݵķһ
Response response;
if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
response = Response.ok(responseCache.getGZIP(cacheKey))
.header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
.header(HEADER_CONTENT_TYPE, returnMediaType)
.build();
} else {
response = Response.ok(responseCache.get(cacheKey))
.build();
}
CurrentRequestVersion.remove();
return response;
}
2.6.10 responseCache.getGZIP ӻжȡݡ
public byte[] getGZIP(Key key) {
Value payload = getValue(key, shouldUseReadOnlyResponseCache);
if (payload == null) {
return null;
}
return payload.getGzipped();
}
Value getValue(final Key key, boolean useReadOnlyCache) {
Value payload = null;
try {
if (useReadOnlyCache) {
final Value currentPayload = readOnlyCacheMap.get(key);
if (currentPayload != null) {
payload = currentPayload;
} else {
payload = readWriteCacheMap.get(key);
readOnlyCacheMap.put(key, payload);
}
} else {
payload = readWriteCacheMap.get(key);
}
} catch (Throwable t) {
logger.error("Cannot get value for key : {}", key, t);
}
return payload;
}
https://lijunyi.xyz/docs/SpringCloud/SpringCloud.html#_2-2-x-%E5%88%86%E6%94%AF https://mp.weixin.qq.com/s/2jeovmj77O9Ux96v3A0NtA https://juejin.cn/post/6931922457741770760 https://github.com/D2C-Cai/herring http://c.biancheng.net/springcloud https://github.com/macrozheng/springcloud-learning