Back to Javatutorial

SpringCloudEureka源码分析

docs/Spring全家桶/SpringCloud源码分析/SpringCloudEureka源码分析.md

1.0.055.0 KB
Original Source

EurekaԴ

**1 ** 1.1 Eurekaʲô

ǶףEurekaעģעҪʵʲôأȷˡ

  1. ȻעģҪܱipportϢɣEureka-serverṩĻܡ
  2. ע֮󣬻ҪṩһЩ̬֪ߵĹܰɣһߣEureka-server֪һIJֵˡ
  3. Eureka-server˸֪ı仯֪֮ܵͨѶ˰ɣǣʱserver֪ͨclientػclientԼȥȡϢأڲ֪ȻȥԴ֤
  4. OKĹܶʵˣEurekaϸˣǻһȻipͶ˿ڶEureka-server棬Ѷ˵÷˵ʱͨõOpenFeignOpenFeignô֪ĸģ֮ǰдapplication.properties<servicename>.ribbon.listOfServersУEurekaôԶдȥأ
  5. 4ܻעĸеĹܣʱ˼һ£ע΢ĿУעҲΪһҲҪȺģʱǾҪһ£Ⱥô֤һԣûʲôۣ EurekaҪʵֵĵĹܣЩṩˣĿôȥأֱȥAPIɣ鷳ȥѧһEurekaapi궿ˡ

ʱǾͻ뵽SpringBootԶװStarter˺beanԶע룬ײֱõbeanȻStarterӦԶǵAPIģOKǻعͷҷ֣ҵEureka-clientһstarterٺ٣е㶫ˡclient˺߼϶ǰǷװ˸beanȻǵ˺apiˡ

EurekaĺĹһܽ᣺

  1. ʵעᣬڴ
  2. ̬֪Ľ״̬
  3. ķ֣̬֪ı仯 1.2 Ƶ ȷ˺ĹܣԼεõģ󵨵Ƶһºͼ

Ƶ֮ͨԴһ֤

2 Դ 2.1 ע עspring bootӦʱġִ·ҲȻعһǰǽ֪ʶ

˵spring cloudһ̬ṩһױ׼ױ׼ͨͬʵ֣оͰע/֡۶ϡؾȣspring-cloud-commonУ org.springframework.cloud.client.serviceregistry ·£ԿһעĽӿڶ ServiceRegistry Ƕspring cloudзעһӿڡ

ǿһϵͼӿһΨһʵ EurekaServiceRegistry ʾõEureka ServerΪעġ

2.1.1 עʱ עķǿԲ²һӦʲôʱɣҪʵӦòѲ²⵽עȡڷǷѾˡspring bootУȵspring еö֮עᡣspring bootеrefreshContextɡ

ǹ۲һfinishRefreshϿԿˢµIJҲˢ֮ҪĺõIJҪ

  • ջ

  • ʼһLifecycleProcessorSpringʱbeanspringʱbean

  • LifecycleProcessoronRefreshʵLifecycleӿڵbean

  • ContextRefreshedEvent

  • עBeanͨJMXмغ͹

    protected void finishRefresh() {
        // Clear context-level resource caches (such as ASM metadata from scanning).
        clearResourceCaches();
        // Initialize lifecycle processor for this context.
        initLifecycleProcessor();
        // Propagate refresh to lifecycle processor first.
        getLifecycleProcessor().onRefresh();
        // Publish the final event.
        publishEvent(new ContextRefreshedEvent(this));
        // Participate in LiveBeansView MBean, if active.
        LiveBeansView.registerApplicationContext(this);
    }
    

Уصע getLifecycleProcessor().onRefresh() ǵڴonrefreshҵSmartLifecycleӿڵʵಢstart

2.1.2 SmartLifeCycle չһSmartLifeCycle֪ʶ SmartLifeCycleһӿڣSpringеBeanҳʼ֮󣬻صʵSmartLifeCycleӿڵжӦķ磨start

ʵԼҲչspringboot̵mainͬĿ¼£дһ࣬ʵSmartLifeCycleӿڣͨ @Service ΪһbeanΪҪspringȥأȵbean

@Service
public class TestSmartLifeCycle implements SmartLifecycle {
    /**
     * ִ.ʾstart.
     * isAutoStartup()ֵ,ֻisAutoStartup()trueʱ,start()Żᱻִ
     */
    @Override
    public void start() {
        System.out.println("----------start-----------");
    }
    /**
     * ֹͣǰִз
     * ǰ: isRunning()trueŻᱻִ
     */
    @Override
    public void stop() {
        System.out.println("----------stop-----------");
    }
    /**
     * ط״̬,Ӱ쵽Ƿstop
     * @return
     */
    @Override
    public boolean isRunning() {
        return false;
    }
    /**
     * Ƿstart,Ҫע
     * ǰfalseDzִstart()
     * @return
     */
    @Override
    public boolean isAutoStartup() {
        return true;
    }
    @Override
    public void stop(Runnable runnable) {
        stop();
        runnable.run();
    }
    /**
     * ִָ˳
     * ǰжʵSmartLifecycle,򰴴˷ִֵ
     * @return
     */
    @Override
    public int getPhase() {
        return 0;
    }
}

ţspring bootӦú󣬿Կ̨ start ַ

DefaultLifecycleProcessor.startBeansϼһdebugԺԵĿԼTestSmartLifeCycleɨ赽ˣøbeanstartstartBeansУǿԿȻʵSmartLifeCycleBeanȻѭʵSmartLifeCyclebeanstart¡

private void startBeans(boolean autoStartupOnly) {
    Map<String, Lifecycle> lifecycleBeans = this.getLifecycleBeans();
    Map<Integer, DefaultLifecycleProcessor.LifecycleGroup> phases = new HashMap();
    lifecycleBeans.forEach((beanName, bean) -> {
        if (!autoStartupOnly || bean instanceof SmartLifecycle &&
            ((SmartLifecycle)bean).isAutoStartup()) {
            int phase = this.getPhase(bean);
            DefaultLifecycleProcessor.LifecycleGroup group =
                (DefaultLifecycleProcessor.LifecycleGroup)phases.get(phase);
            if (group == null) {
                group = new DefaultLifecycleProcessor.LifecycleGroup(phase,this.timeoutPerShutdownPhase, lifecycleBeans, autoStartupOnly);
                phases.put(phase, group);
            }
            group.add(beanName, bean);
        }
    });
    if (!phases.isEmpty()) {
        List<Integer> keys = new ArrayList(phases.keySet());
        Collections.sort(keys);
        Iterator var5 = keys.iterator();
        while(var5.hasNext()) {
            Integer key = (Integer)var5.next();
            ((DefaultLifecycleProcessor.LifecycleGroup)phases.get(key)).start();
        }
    }
}

2.1.3 doStart

private void doStart(Map<String, ? extends Lifecycle> lifecycleBeans, String beanName, boolean autoStartupOnly) {
    Lifecycle bean = (Lifecycle)lifecycleBeans.remove(beanName);
    if (bean != null && bean != this) {
        String[] dependenciesForBean = this.getBeanFactory().getDependenciesForBean(beanName);
        String[] var6 = dependenciesForBean;
        int var7 = dependenciesForBean.length;
        for(int var8 = 0; var8 < var7; ++var8) {
            String dependency = var6[var8];
            this.doStart(lifecycleBeans, dependency, autoStartupOnly);
        }
        if (!bean.isRunning() && (!autoStartupOnly || !(bean instanceof SmartLifecycle) || ((SmartLifecycle)bean).isAutoStartup())) {
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Starting bean '" + beanName + "' of type [" + bean.getClass().getName() + "]");
            }
            try {
                bean.start(); //ʱ BeanʵӦEurekaAutoServiceRegistration
            } catch (Throwable var10) {
                throw new ApplicationContextException("Failed to start bean '" + beanName + "'", var10);
            }
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Successfully started bean '" + beanName + "'");
            }
        }
    }
}

ʱbean.start()õĿ EurekaAutoServiceRegistrationеstartΪȻʵSmartLifeCycleӿڡ

public class EurekaAutoServiceRegistration implements AutoServiceRegistration,SmartLifecycle, Ordered, SmartApplicationListener {
    @Override
    public void start() {
        // only set the port if the nonSecurePort or securePort is 0 and this.port != 0
        if (this.port.get() != 0) {
            if (this.registration.getNonSecurePort() == 0) {
                this.registration.setNonSecurePort(this.port.get());
            }
            if (this.registration.getSecurePort() == 0 &&
                this.registration.isSecure()) {
                this.registration.setSecurePort(this.port.get());
            }
        }
        // only initialize if nonSecurePort is greater than 0 and it isn't already running
        // because of containerPortInitializer below
        if (!this.running.get() && this.registration.getNonSecurePort() > 0) {
            this.serviceRegistry.register(this.registration);
            this.context.publishEvent(new InstanceRegisteredEvent<>(this,
                                                                    this.registration.getInstanceConfig()));
            this.running.set(true);
        }
    }
}

startУǿԿ this.serviceRegistry.register ʵϾǷעĻơ

ʱthis.serviceRegistryʵӦ EurekaServiceRegistry ԭ EurekaAutoServiceRegistrationĹ췽Уһֵ췽EurekaClientAutoConfiguration Զװбװͳʼģ¡

@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
public EurekaAutoServiceRegistration eurekaAutoServiceRegistration(
    ApplicationContext context, EurekaServiceRegistry registry,
    EurekaRegistration registration) {
    return new EurekaAutoServiceRegistration(context, registry, registration);
}

2.2 ע Ƿע

public class EurekaAutoServiceRegistration implements AutoServiceRegistration,
SmartLifecycle, Ordered, SmartApplicationListener {
    @Override
    public void start() {
        //ʡ...
        this.serviceRegistry.register(this.registration);
        this.context.publishEvent(new InstanceRegisteredEvent<> this,this.registration.getInstanceConfig()));
    }
}

this.serviceRegistry.register(this.registration); ջ

EurekaServiceRegistry е register ʵַע

2.2.1 register

@Override
public void register(EurekaRegistration reg) {
    maybeInitializeClient(reg);
    if (log.isInfoEnabled()) {
        log.info("Registering application "
                 + reg.getApplicationInfoManager().getInfo().getAppName()
                 + " with eureka with status "
                 + reg.getInstanceConfig().getInitialStatus());
    }
    //õǰʵ״̬һʵ״̬仯ֻҪ״̬DOWNôͻᱻִзעᡣ
    reg.getApplicationInfoManager().setInstanceStatus(reg.getInstanceConfig().getInitialStatus());
    //ýĴ
    reg.getHealthCheckHandler().ifAvailable(healthCheckHandler -> reg.getEurekaClient().registerHealthCheck(healthCheckHandler));
}

ע᷽вûEurekaķȥִעᣬǽһ״̬Լý鴦Ǽһ reg.getApplicationInfoManager().setInstanceStatus

public synchronized void setInstanceStatus(InstanceStatus status) {
    InstanceStatus next = instanceStatusMapper.map(status);
    if (next == null) {
        return;
    }
    InstanceStatus prev = instanceInfo.setStatus(next);
    if (prev != null) {
        for (StatusChangeListener listener : listeners.values()) {
            try {
                listener.notify(new StatusChangeEvent(prev, next));
            } catch (Exception e) {
                logger.warn("failed to notify listener: {}", listener.getId(),e);
            }
        }
    }
}

Уͨһ״̬¼okʱlistenerʵStatusChangeListener Ҳǵ StatusChangeListener notify¼Ǵһ״̬Ӧеط¼Ȼ¼עᡣ

ʱΪҵ˷ȻȥһһӿڡǷǾ̬ڲӿڣ޷ֱӿʵࡣ

ҶԴĶ飬ңΪһܲ²⵽һijط˳ʼĹǣҵ EurekaServiceRegistry.registerе reg.getApplicationInfoManager ʵʲôǷApplicationInfoManagerEurekaRegistrationеԡEurekaRegistrationEurekaAutoServiceRegistrationʵġ룬DzԶװʲôҵEurekaClientAutoConfiguration࣬ȻBeanһЩԶװ䣬а EurekaClient ApplicationInfoMangager EurekaRegistration ȡ

2.2.2 EurekaClientConfiguration

@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingRefreshScope
protected static class EurekaClientConfiguration {
    @Autowired
    private ApplicationContext context;
    @Autowired
    private AbstractDiscoveryClientOptionalArgs<?> optionalArgs;
    @Bean(destroyMethod = "shutdown")
    @ConditionalOnMissingBean(value = EurekaClient.class,search = SearchStrategy.CURRENT)
    public EurekaClient eurekaClient(ApplicationInfoManager manager,EurekaClientConfig config) {
        return new CloudEurekaClient(manager, config, this.optionalArgs,this.context);
    }
    @Bean
    @ConditionalOnMissingBean(value = ApplicationInfoManager.class,search = SearchStrategy.CURRENT)
    public ApplicationInfoManager eurekaApplicationInfoManager(
        EurekaInstanceConfig config) {
        InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
        return new ApplicationInfoManager(config, instanceInfo);
    }
    @Bean
    @ConditionalOnBean(AutoServiceRegistrationProperties.class)
    @ConditionalOnProperty(
        value = "spring.cloud.service-registry.auto-registration.enabled",
        matchIfMissing = true)
    public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient,CloudEurekaInstanceConfig
                                                 instanceConfig,ApplicationInfoManager applicationInfoManager, @Autowired(required = false)
                                                 ObjectProvider<HealthCheckHandler> healthCheckHandler) {
        return EurekaRegistration.builder(instanceConfig).with(applicationInfoManager).with(eurekaClient).with(healthCheckHandler).build();
    }
}

ѷ֣ƺһҪBeanʱԶװ䣬ҲCloudEurekaClient ҿԺ׵ʶ𲢲²Eurekaͻ˵һ࣬ʵֺͷ˵ͨԼǺܶԴһ·Ҫôڹ췽ȥܶijʼһЩִ̨еijҪôͨ첽¼ķʽţǿһCloudEurekaClientijʼ̣Ĺ췽лͨ super øĹ췽ҲDiscoveryClientĹ졣

2.2.3 CloudEurekaClient super(applicationInfoManager, config, args);øĹ췽CloudEurekaClientĸDiscoveryClient.

public CloudEurekaClient(ApplicationInfoManager applicationInfoManager,EurekaClientConfig config,AbstractDiscoveryClientOptionalArgs<?> args,ApplicationEventPublisher publisher) {
    super(applicationInfoManager, config, args);
    this.applicationInfoManager = applicationInfoManager;
    this.publisher = publisher;
    this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class,"eurekaTransport");
    ReflectionUtils.makeAccessible(this.eurekaTransportField);
}

2.2.4 DiscoveryClient ǿԿյDiscoveryClient췽УзdzĴ롣ʵܶԲҪģ󲿷ֶһЩʼʼ˼ʱ

  • scheduler

  • heartbeatExecutor ʱ

  • cacheRefreshExecutor ʱȥͬ˵ʵб

    DiscoveryClient(ApplicationInfoManager applicationInfoManager,EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,Provider<BackupRegistry> backupRegistryProvider,EndpointRandomizer endpointRandomizer) {
        //ʡԲִ...
        //ǷҪeureka serverϻȡַϢ
        if (config.shouldFetchRegistry()) {
            this.registryStalenessMonitor = new ThresholdLevelsMetric(this,METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L,480L});
        } else {
            this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
        }
        //ǷҪעᵽeureka server
        if (config.shouldRegisterWithEureka()) {
            this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this,METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L,120L, 240L, 480L});
        } else {
            this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
        }
        //ҪעᲢҲҪ·ַ
        if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
    
            return;  // no need to setup up an network tasks and we are done
        }
        try {
            // default size of 2 - 1 each for heartbeat and cacheRefresh
            scheduler = Executors.newScheduledThreadPool(2,new ThreadFactoryBuilder()       .setNameFormat("DiscoveryClient-%d")
                                                         .setDaemon(true)
                                                         .build());
            heartbeatExecutor = new ThreadPoolExecutor(1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0,
                                                       TimeUnit.SECONDS,
                                                       new SynchronousQueue<Runnable>(),
                                                       new ThreadFactoryBuilder()
                                                       .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                                                       .setDaemon(true)
                                                       .build()
                                                      );  // use direct handoff
            cacheRefreshExecutor = new ThreadPoolExecutor(
                1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0,
                TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactoryBuilder()
                .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                .setDaemon(true)
                .build()
            );  // use direct handoff
            eurekaTransport = new EurekaTransport();
            scheduleServerEndpointTask(eurekaTransport, args);
            AzToRegionMapper azToRegionMapper;
            if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
                azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
            } else {
                azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
            }
            if (null != remoteRegionsToFetch.get()) {
    
                azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
            }
            instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper,
                                                              clientConfig.getRegion());
        } catch (Throwable e) {
            throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
        }
        //ҪעᵽEureka serverǿ˳ʼʱǿעᣬregister()ע
        if (clientConfig.shouldRegisterWithEureka() &&
            clientConfig.shouldEnforceRegistrationAtInit()) {
            try {
                if (!register() ) {
                    throw new IllegalStateException("Registration error at startup.Invalid server response.");
                }
            } catch (Throwable th) {
                logger.error("Registration error at startup: {}", th.getMessage());
                throw new IllegalStateException(th);
            }
        }
        // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat,instanceInfo replicator, fetch
        initScheduledTasks();
    }
    

2.2.5 initScheduledTasks initScheduledTasks ȥһʱ

  • ˿עˢ·бῪcacheRefreshExecutorʱ
  • ˷עᵽEurekaͨҪ.

ͨڲʵStatusChangeListener ʵ״̬ؽӿڣǰڷģnotifyķʵϻ֡

private void initScheduledTasks() {
    //˿עˢ·бῪcacheRefreshExecutorʱ
    if (clientConfig.shouldFetchRegistry()) {
        // registry cache refresh timer
        int registryFetchIntervalSeconds =
            clientConfig.getRegistryFetchIntervalSeconds();
        int expBackOffBound =
            clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        scheduler.schedule(
            new TimedSupervisorTask(
                "cacheRefresh",
                scheduler,
                cacheRefreshExecutor,
                registryFetchIntervalSeconds,
                TimeUnit.SECONDS,
                expBackOffBound,
                new CacheRefreshThread()
            ),
            registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }
    //˷עᵽEurekaͨҪ
    if (clientConfig.shouldRegisterWithEureka()) {
        int renewalIntervalInSecs =
            instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
        int expBackOffBound =
            clientConfig.getHeartbeatExecutorExponentialBackOffBound();
        logger.info("Starting heartbeat executor: " + "renew interval is: {}",
                    renewalIntervalInSecs);
        // Heartbeat timer
        scheduler.schedule(
            new TimedSupervisorTask(
                "heartbeat",
                scheduler,
                heartbeatExecutor,
                renewalIntervalInSecs,
                TimeUnit.SECONDS,
                expBackOffBound,
                new HeartbeatThread()
            ),
            renewalIntervalInSecs, TimeUnit.SECONDS);
        // InstanceInfo replicator ʼһ:instanceInfoReplicator
        instanceInfoReplicator = new InstanceInfoReplicator(
            this,
            instanceInfo,
            clientConfig.getInstanceInfoReplicationIntervalSeconds(),
            2); // burstSize
        statusChangeListener = new ApplicationInfoManager.StatusChangeListener()
        {
            @Override
            public String getId() {
                return "statusChangeListener";
            }
            @Override
            public void notify(StatusChangeEvent statusChangeEvent) {
                if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                    InstanceStatus.DOWN ==
                    statusChangeEvent.getPreviousStatus()) {
                    // log at warn level if DOWN was involved
                    logger.warn("Saw local status change event {}",
                                statusChangeEvent);
                } else {
                    logger.info("Saw local status change event {}",
                                statusChangeEvent);
                }
                instanceInfoReplicator.onDemandUpdate();
            }
        };
        //עʵ״̬仯ļ
        if (clientConfig.shouldOnDemandUpdateStatusChange()) {
            applicationInfoManager.registerStatusChangeListener(statusChangeListener);
        }
        //һʵϢҪΪ˿һʱ̣߳ÿ40жʵϢǷע
        instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationInte
                                     rvalSeconds());
    } else {
        logger.info("Not registering with Eureka server per configuration");
    }
}

2.2.6 onDemandUpdate ҪǸʵǷ仯עĵݡ

public boolean onDemandUpdate() {
    //ж
    if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
        if (!scheduler.isShutdown()) {
            //ύһ
            scheduler.submit(new Runnable() {
                @Override
                public void run() {
                    logger.debug("Executing on-demand update of local InstanceInfo");
                    //ȡ֮ǰѾύҲstartύĸûִɣȡ֮ǰ
                    Future latestPeriodic = scheduledPeriodicRef.get();
                    if (latestPeriodic != null && !latestPeriodic.isDone()) {
                        logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
                        latestPeriodic.cancel(false);//δɣȡ
                    }
                    //ͨrunʱִУ൱еһ
                    InstanceInfoReplicator.this.run();
                }
            });
            return true;
        } else {
            logger.warn("Ignoring onDemand update due to stopped scheduler");
            return false;
        }
    } else {
        logger.warn("Ignoring onDemand update due to rate limiter");
        return false;
    }
}

2.2.7 run runʵϺǰԶװִеķע᷽һģҲǵ register зעᣬfinallyУÿ30sᶨʱִһµǰrun м顣

public void run() {
    try {
        discoveryClient.refreshInstanceInfo();
        Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
        if (dirtyTimestamp != null) {
            discoveryClient.register();
            instanceInfo.unsetIsDirty(dirtyTimestamp);
        }
    } catch (Throwable t) {
        logger.warn("There was a problem with the instance info replicator", t);
    } finally {
        Future next = scheduler.schedule(this, replicationIntervalSeconds,
                                         TimeUnit.SECONDS);
        scheduledPeriodicRef.set(next);
    }
}

2.2.8 register գҵעˣ eurekaTransport.registrationClient.register յõ AbstractJerseyEurekaHttpClient#register(...)` ȻԼȥ룬ͻᷢȥ֮ǰкܶȥĴ룬繤ģʽװģʽȡ

boolean register() throws Throwable {
    logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
    EurekaHttpResponse<Void> httpResponse;
    try {
        httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
    } catch (Exception e) {
        logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier,e.getMessage(), e);
        throw e;
    }
    if (logger.isInfoEnabled()) {
        logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier,httpResponse.getStatusCode());
    }
    return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}

ȻǷһhttp󣬷Eureka-Serverapps/${APP_NAME}ӿڣǰʵϢ͵Eureka Serverб档

ˣǻѾ֪Spring Cloud Eureka ʱѷϢעᵽEureka Serverϵˡ

public EurekaHttpResponse<Void> register(InstanceInfo info) {
    String urlPath = "apps/" + info.getAppName();
    ClientResponse response = null;
    try {
        Builder resourceBuilder =
            jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
        addExtraHeaders(resourceBuilder);
        response = resourceBuilder
            .header("Accept-Encoding", "gzip")
            .type(MediaType.APPLICATION_JSON_TYPE)
            .accept(MediaType.APPLICATION_JSON)
            .post(ClientResponse.class, info);
        return
            anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
    } finally {
        if (logger.isDebugEnabled()) {
            logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                         response == null ? "N/A" : response.getStatus());
        }
        if (response != null) {
            response.close();
        }
    }
}

ǣƺʼ⻹ûнҲSpring BootӦʱstartյ StatusChangeListener.notify ȥ·һ״̬ûֱӵregisterעᡣǼȥһ statusChangeListener.notify

2.2.9 ܽ ˣ֪Eureka Clientעʱطִзע

  1. Spring BootʱԶװƽCloudEurekaClientע뵽ִ˹췽ڹ췽һʱÿ40sִһжϣжʵϢǷ˱仯ᷢע
  2. Spring BootʱͨrefreshյStatusChangeListener.notifyз״̬ļķܵ¼֮ȥִзעᡣ 2.3 Server߼ ûԴʵ֮ǰһ֪϶ķʵݽ˴洢ôȥEureka Server˿һ´̡

ڣ com.netflix.eureka.resources.ApplicationResource.addInstance()

ҿԷ֣ṩREST񣬲õjerseyʵֵġJerseyǻJAX-RS׼ṩRESTʵֵ֧֣Ͳչˡ

2.3.1 addInstance() EurekaClientregisterעʱ ApplicationResource.addInstance

עǷһPOSTϵǰʵϢ ApplicationResource addInstanceзעᡣ

@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info, @HeaderParam("x-netflix-discovery-replication") String isReplication) {
    logger.debug("Registering instance {} (replication={})", info.getId(),
                 isReplication);
    DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
    if (dataCenterInfo instanceof UniqueIdentifier) {
        String dataCenterInfoId =
            ((UniqueIdentifier)dataCenterInfo).getId();
        if (this.isBlank(dataCenterInfoId)) {
            boolean experimental =            "true".equalsIgnoreCase(this.serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
            if (experimental) {
                String entity = "DataCenterInfo of type " +
                    dataCenterInfo.getClass() + " must contain a valid id";
                return Response.status(400).entity(entity).build();
            }
            if (dataCenterInfo instanceof AmazonInfo) {
                AmazonInfo amazonInfo = (AmazonInfo)dataCenterInfo;
                String effectiveId = amazonInfo.get(MetaDataKey.instanceId);
                if (effectiveId == null) {
                    amazonInfo.getMetadata().put(MetaDataKey.instanceId.getName(), info.getId());
                }
            } else {
                logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
            }
        }
    }
    this.registry.register(info, "true".equals(isReplication));
    return Response.status(204).build();
}

2.3.2 register

PeerAwareInstanceRegistryImplϵͼϵͼԿPeerAwareInstanceRegistryӿΪLeaseManagerLookupService,

  • LookupServiceķʾΪ
  • LeaseManager˴ͻעᣬԼעȲ

addInstance Уյõ PeerAwareInstanceRegistryImpl.register

  • leaseDuration ʾԼʱ䣬Ĭ90sҲǵ˳90sûյͻ˵޳ýڵ

  • super.registerڵע

  • ϢƵEureka ServerȺеϣͬʵҲܼ򵥣ǻüȺенڵ㣬Ȼע

    public void register(final InstanceInfo info, final boolean isReplication) {
        int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() >0) {
            leaseDuration = info.getLeaseInfo().getDurationInSecs(); //ͻԼʱʱ䣬ÿͻ˵ʱ
        }
        super.register(info, leaseDuration, isReplication); //ڵע
        //ƵEureka ServerȺеڵ
        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info,
                         null, isReplication);
    }
    

2.3.3 AbstractInstanceRegistry.register ˵Eureka-Serverķעᣬʵǽͻ˴ݹʵݱ浽Eureka-ServerеConcurrentHashMapС

public void register(InstanceInfo registrant, int leaseDuration, boolean
                     isReplication) {
    try {
        read.lock();
        //registryлõǰʵϢappName
        Map<String, Lease<InstanceInfo>> gMap =
            registry.get(registrant.getAppName());
        REGISTER.increment(isReplication); //עϢ
        if (gMap == null) {//ǰappNameǵһעᣬʼһConcurrentHashMap
            final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new
                ConcurrentHashMap<String, Lease<InstanceInfo>>();
            gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
            if (gMap == null) {
                gMap = gNewMap;
            }
        }
        //gMapвѯѾڵLeaseϢLeaseķΪԼʵѷṩߵʵϢװһleaseṩ˶ڸķʵԼ
        Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
        // instanceѾǣͿͻ˵instanceϢȽϣʱµǸΪЧinstanceϢ
        if (existingLease != null && (existingLease.getHolder() != null)) {
            Long existingLastDirtyTimestamp =
                existingLease.getHolder().getLastDirtyTimestamp();
            Long registrationLastDirtyTimestamp =
                registrant.getLastDirtyTimestamp();
            logger.debug("Existing lease found (existing={}, provided={}",
                         existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
            // this is a > instead of a >= because if the timestamps are equal,we still take the remote transmitted
            // InstanceInfo instead of the server local copy.
            if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                            " than the one that is being registered {}",
                            existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                registrant = existingLease.getHolder();
            }
        } else {
            //leaseʱ뵽δ룬
            synchronized (lock) {
                if (this.expectedNumberOfClientsSendingRenews > 0) {
                    // Since the client wants to register it, increase the number of clients sending renews
                    this.expectedNumberOfClientsSendingRenews =
                        this.expectedNumberOfClientsSendingRenews + 1;
                    updateRenewsPerMinThreshold();
                }
            }
            logger.debug("No previous lease information found; it is new registration");
        }
        //һlease
        Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant,
                                                            leaseDuration);
        if (existingLease != null) {
            // ԭLeaseϢʱserviceUpTimestamp, ֤ʱһֱǵһעǸ
            lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
        }
        gMap.put(registrant.getId(), lease);
        synchronized (recentRegisteredQueue) {//ӵעĶ
            recentRegisteredQueue.add(new Pair<Long, String>(
                System.currentTimeMillis(),
                registrant.getAppName() + "(" + registrant.getId() + ")"));
        }
        // ʵ״̬Ƿ仯DzҴڣ򸲸ԭ״̬
        if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
            logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                         + "overrides", registrant.getOverriddenStatus(),
                         registrant.getId());
            if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                logger.info("Not found overridden id {} and hence adding it",
                            registrant.getId());
                overriddenInstanceStatusMap.put(registrant.getId(),
                                                registrant.getOverriddenStatus());
            }
        }
        InstanceStatus overriddenStatusFromMap =
            overriddenInstanceStatusMap.get(registrant.getId());
        if (overriddenStatusFromMap != null) {
            logger.info("Storing overridden status {} from map",
                        overriddenStatusFromMap);
            registrant.setOverriddenStatus(overriddenStatusFromMap);
        }
        // Set the status based on the overridden status rules
        InstanceStatus overriddenInstanceStatus =
            getOverriddenInstanceStatus(registrant, existingLease, isReplication);
        registrant.setStatusWithoutDirty(overriddenInstanceStatus);
        // õinstanceStatusжǷUP״̬
        if (InstanceStatus.UP.equals(registrant.getStatus())) {
            lease.serviceUp();
        }
        // עΪ
        registrant.setActionType(ActionType.ADDED);
        // Լ¼У¼ʵÿα仯 עϢȡ
        recentlyChangedQueue.add(new RecentlyChangedItem(lease));
        registrant.setLastUpdatedTimestamp();
        //ûʧЧ
        invalidateCache(registrant.getAppName(), registrant.getVIPAddress(),
                        registrant.getSecureVipAddress());
        logger.info("Registered instance {}/{} with status {} (replication={})",
                    registrant.getAppName(), registrant.getId(),
                    registrant.getStatus(), isReplication);
    } finally {
        read.unlock();
    }
}

2.3.4 С ˣǾͰѷעڿͻ˺ͷ˵ĴһϸķʵEureka Serverˣѿͻ˵ĵַϢ浽ConcurrentHashMapд洢ҷṩߺע֮䣬ὨһơڼطṩߵĽ״̬

2.4 Eureka Ķ༶ Eureka Server(registryreadWriteCacheMapreadOnlyCacheMap)עϢĬ¶ʱÿ30sreadWriteCacheMapͬreadOnlyCacheMapÿ60s90sδԼĽڵ㣬Eureka Clientÿ30sreadOnlyCacheMap·עϢͻ˷עregistry·עϢ

2.4.1 ༶ ΪʲôҪƶ༶أԭܼ򵥣ǵڴģķע͸ʱֻ޸һConcurrentHashMapݣôƱΪĴڵ¾Ӱܡ

EurekaAPģֻͣҪտþСõ༶ʵֶд롣ע᷽дʱֱдڴעд֮ʧЧд档

ȡעϢӿȴֻȡֻûȥдȡдûȥڴעȡֻȡ˴ϸӣңд»дֻ

  • responseCacheUpdateIntervalMs readOnlyCacheMap µĶʱʱĬΪ30
  • responseCacheAutoExpirationInSeconds : readWriteCacheMap ʱ䣬ĬΪ 180 롣 2.4.2 עĻʧЧ AbstractInstanceRegistry.register󣬻invalidateCache(registrant.getAppName(), registrant.getVIPAddress(),registrant.getSecureVipAddress()); ʹöдʧЧ
public void invalidate(Key... keys) {
    for (Key key : keys) {
        logger.debug("Invalidating the response cache key : {} {} {} {}, {}",
                     key.getEntityType(), key.getName(), key.getVersion(),
                     key.getType(), key.getEurekaAccept());
        readWriteCacheMap.invalidate(key);
        Collection<Key> keysWithRegions = regionSpecificKeys.get(key);
        if (null != keysWithRegions && !keysWithRegions.isEmpty()) {
            for (Key keysWithRegion : keysWithRegions) {
                logger.debug("Invalidating the response cache key : {} {} {} {} {}",
                             key.getEntityType(), key.getName(),
                             key.getVersion(), key.getType(), key.getEurekaAccept());
                readWriteCacheMap.invalidate(keysWithRegion);
            }
        }
    }
}

2.4.3 ʱͬ ResponseCacheImplĹ췽Уһʱᶨʱдеݱ仯иºͬ

private TimerTask getCacheUpdateTask() {
    return new TimerTask() {
        @Override
        public void run() {
            logger.debug("Updating the client cache from response cache");
            for (Key key : readOnlyCacheMap.keySet()) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Updating the client cache from response cache for key : {} {} {} {}",
                                 key.getEntityType(), key.getName(),
                                 key.getVersion(), key.getType());
                }
                try {
                    CurrentRequestVersion.set(key.getVersion());
                    Value cacheValue = readWriteCacheMap.get(key);
                    Value currentCacheValue = readOnlyCacheMap.get(key);
                    if (cacheValue != currentCacheValue) {
                        readOnlyCacheMap.put(key, cacheValue);
                    }
                } catch (Throwable th) {
                    logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
                } finally {
                    CurrentRequestVersion.remove();
                }
            }
        }
    };
}

2.5 Լ νķԼʵһơͻ˻ᶨڷԼô򵥸ҿһ´ʵ

2.5.1 initScheduledTasks ͻ˻ DiscoveryClient.initScheduledTasks УһĶʱ

// Heartbeat timer
scheduler.schedule(
    new TimedSupervisorTask(
        "heartbeat",
        scheduler,
        heartbeatExecutor,
        renewalIntervalInSecs,
        TimeUnit.SECONDS,
        expBackOffBound,
        new HeartbeatThread()
    ),
    renewalIntervalInSecs, TimeUnit.SECONDS);

2.5.2 HeartbeatThread ȻʱУִһ HearbeatThread ̣̻߳߳ᶨʱrenew()Լ

//ÿ30sһ
private class HeartbeatThread implements Runnable {
    public void run() {
        if (renew()) {
            lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
        }
    }
}

2.5.3 յĴ ApplicationResource.getInstanceInfoӿУ᷵һInstanceResourceʵڸʵ£һstatusUpdateĽӿ״̬

@Path("{id}")
public InstanceResource getInstanceInfo(@PathParam("id") String id) {
    return new InstanceResource(this, id, serverConfig, registry);
}

2.5.4 InstanceResource.statusUpdate() ڸ÷Уصע registry.statusUpdate AbstractInstanceRegistry.statusUpdateָṩڷ˴洢Ϣеı仯

@PUT
@Path("status")
public Response statusUpdate(
    @QueryParam("value") String newStatus,
    @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
    @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
    try {
        if (registry.getInstanceByAppAndId(app.getName(), id) == null) {
            logger.warn("Instance not found: {}/{}", app.getName(), id);
            return Response.status(Status.NOT_FOUND).build();
        }
        boolean isSuccess = registry.statusUpdate(app.getName(), id,                      
                                                  InstanceStatus.valueOf(newStatus), lastDirtyTimestamp,
                                                  "true".equals(isReplication));
        if (isSuccess) {
            logger.info("Status updated: {} - {} - {}", app.getName(), id,
                        newStatus);
            return Response.ok().build();
        } else {
            logger.warn("Unable to update status: {} - {} - {}", app.getName(),
                        id, newStatus);
            return Response.serverError().build();
        }
    } catch (Throwable e) {
        logger.error("Error updating instance {} for status {}", id,
                     newStatus);
        return Response.serverError().build();
    }
}

2.5.5 AbstractInstanceRegistry.statusUpdate УõӦöӦʵбȻLease.renew()ȥԼ

public boolean statusUpdate(String appName, String id,
                            InstanceStatus newStatus, String
                            lastDirtyTimestamp,
                            boolean isReplication) {
    try {
        read.lock();
        // ״̬Ĵ ״̬ͳ
        STATUS_UPDATE.increment(isReplication);
        // ӱȡʵϢ
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> lease = null;
        if (gMap != null) {
            lease = gMap.get(id);
        }
        // ʵڣֱӷأʾʧ
        if (lease == null) {
            return false;
        } else {
            // ִһleaserenewҪǸinstanceʱ䡣
            lease.renew();
            // ȡinstanceʵϢ
            InstanceInfo info = lease.getHolder();
            // Lease is always created with its instance info object.
            // This log statement is provided as a safeguard, in case this invariant is violated.
            if (info == null) {
                logger.error("Found Lease without a holder for instance id {}",
                             id);
            }
            // instanceϢΪʱʵ״̬˱仯
            if ((info != null) && !(info.getStatus().equals(newStatus))) {
                // ״̬UP״̬ôһserviceUp() , ҪǸ·עʱ
                
                    if (InstanceStatus.UP.equals(newStatus)) {
                        lease.serviceUp();
                    }
                // instance Id ״̬ӳϢ븲ǻMAPȥ
                overriddenInstanceStatusMap.put(id, newStatus);
                // Set it for transfer of overridden status to replica on
                // ø״̬ʵϢȥ
                info.setOverriddenStatus(newStatus);
                long replicaDirtyTimestamp = 0;
                info.setStatusWithoutDirty(newStatus);
                if (lastDirtyTimestamp != null) {
                    replicaDirtyTimestamp = Long.valueOf(lastDirtyTimestamp);
                }
                // If the replication's dirty timestamp is more than the existing one, just update
                // it to the replica's.
                // replicaDirtyTimestamp ʱinstancegetLastDirtyTimestamp() ,

                if (replicaDirtyTimestamp > info.getLastDirtyTimestamp()) {
                    info.setLastDirtyTimestamp(replicaDirtyTimestamp);
                }
                info.setActionType(ActionType.MODIFIED);
                recentlyChangedQueue.add(new RecentlyChangedItem(lease));
                info.setLastUpdatedTimestamp();
                //д
                invalidateCache(appName, info.getVIPAddress(),
                                info.getSecureVipAddress());
            }
            return true;
        }
    } finally {
        read.unlock();
    }
}

ˣԼܾͷˡ

**2.6 ** Ǽоķֹ̣ǿͻҪܹ

ʱȡָṩߵĵַб Eureka server˵ַ仯ʱҪ̬֪ 2.6.1 DiscoveryClientʱѯ 췽УǰĿͻĬϿfetchRegistryeureka-serverȡݡ

DiscoveryClient(ApplicationInfoManager applicationInfoManager,
                EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                Provider<BackupRegistry> backupRegistryProvider,
                EndpointRandomizer endpointRandomizer) {
    if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
        fetchRegistryFromBackup();
    }
}

2.6.2 fetchRegistry

private boolean fetchRegistry(boolean forceFullRegistryFetch) {
    Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
    try {
        // If the delta is disabled or if it is the first time, get all
        // applications
        Applications applications = getApplications();
        if (clientConfig.shouldDisableDelta()
            ||
            (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
            || forceFullRegistryFetch
            || (applications == null)
            || (applications.getRegisteredApplications().size() == 0)
            || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
        {
            logger.info("Disable delta property : {}",
                        clientConfig.shouldDisableDelta());
            logger.info("Single vip registry refresh property : {}",
                        clientConfig.getRegistryRefreshSingleVipAddress());
            logger.info("Force full registry fetch : {}",
                        forceFullRegistryFetch);
            logger.info("Application is null : {}", (applications == null));
            logger.info("Registered Applications size is zero : {}",
                        (applications.getRegisteredApplications().size() == 0));
            logger.info("Application version is -1: {}",
                        (applications.getVersion() == -1));
            getAndStoreFullRegistry();
        } else {
            getAndUpdateDelta(applications);
        }
        applications.setAppsHashCode(applications.getReconcileHashCode());
        logTotalInstances();
    } catch (Throwable e) {
        logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
        return false;
    } finally {
        if (tracer != null) {
            tracer.stop();
        }
    }
    // Notify about cache refresh before updating the instance remote status
    onCacheRefreshed();
    // Update remote status based on refreshed data held in the cache
    updateInstanceRemoteStatus();
    // registry was fetched successfully, so return true
    return true;
}

2.6.3 ʱˢ±صַб ÿ30sһ DiscoveryClientʱ򣬻ʼһЩǰǷˡһ̬±طַб cacheRefreshTask

ִеCacheRefreshThread̡߳һִе񣬾һ¡

private void initScheduledTasks() {
    if (clientConfig.shouldFetchRegistry()) {
        // registry cache refresh timer
        int registryFetchIntervalSeconds =
            clientConfig.getRegistryFetchIntervalSeconds();
        int expBackOffBound =
            clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        cacheRefreshTask = new TimedSupervisorTask(
            "cacheRefresh",
            scheduler,
            cacheRefreshExecutor,
            registryFetchIntervalSeconds,
            TimeUnit.SECONDS,
            expBackOffBound,
            new CacheRefreshThread()
        );
        scheduler.schedule(
            cacheRefreshTask,
            registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }

2.6.4 TimedSupervisorTask ϿTimedSupervisorTaskǹ̶һʱͻὫһڵļʱʱôÿμʱ䶼һһֱⲿ趨ΪֹһٳʱʱֻԶָΪʼֵƻֵѧϰġ

public void run() {
      Future future = null;
  try {
    //ʹFuture趨̵߳ijʱʱ䣬ǰ߳̾Ͳ޵ȴ
    future = executor.submit(task);
    threadPoolLevelGauge.set((long) executor.getActiveCount());
    //ָȴ̵߳ʱ
    future.get(timeoutMillis, TimeUnit.MILLISECONDS);  // block until done or timeout
    //delayǸõıõǵÿִɹὫdelay
    delay.set(timeoutMillis);
    threadPoolLevelGauge.set((long) executor.getActiveCount());
 } catch (TimeoutException e) {
    logger.error("task supervisor timed out", e);
    timeoutCounter.increment();
    long currentDelay = delay.get();
    //̳߳ʱʱ򣬾Ͱdelayᳬⲿʱ趨ʱʱ
    long newDelay = Math.min(maxDelay, currentDelay * 2);
    //Ϊµֵǵ̣߳CAS
    delay.compareAndSet(currentDelay, newDelay);
 } catch (RejectedExecutionException e) {
    //һ̳߳صз˴񣬴˾ܾԣͻὫͣ
    if (executor.isShutdown() || scheduler.isShutdown()) {
      logger.warn("task supervisor shutting down, reject the task", e);
   } else {
      logger.error("task supervisor rejected the task", e);
   }
    rejectedCounter.increment();
 } catch (Throwable e) {
    //һδ֪쳣ͣ
    if (executor.isShutdown() || scheduler.isShutdown()) {
      logger.warn("task supervisor shutting down, can't accept the task");
   } else {
      logger.error("task supervisor threw an exception", e);
   }
    throwableCounter.increment();
 } finally {
    //ҪôִϣҪô쳣cancel
    if (future != null) {
      future.cancel(true);
   }
    //ֻҪûָֹͣȴʱִ֮һͬ
    if (!scheduler.isShutdown()) {
      //ԭֻҪûֹͣٴһִʱʱdealyֵ
      //ⲿʱijʱʱΪ30루췽timeoutʱΪ50(췽expBackOffBound)
      //һûгʱô30ʼ
      //һʱˣô50ʼ쳣иԶIJԶ60볬50룩
      scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
   }
 }
}

2.6.5 refreshRegistry δҪ߼

  • жremoteRegionsǷ˱仯

  • fetchRegistryȡطַ

    @VisibleForTesting
    void refreshRegistry() {
        try {
            boolean isFetchingRemoteRegionRegistries =
                isFetchingRemoteRegionRegistries();
            boolean remoteRegionsModified = false;
            //awsϣжһԶµϢ͵ǰԶϢбȽϣȣ
            String latestRemoteRegions =
                clientConfig.fetchRegistryForRemoteRegions();
            if (null != latestRemoteRegions) {
                String currentRemoteRegions = remoteRegionsToFetch.get();
                if (!latestRemoteRegions.equals(currentRemoteRegions)) {
                    //жһ
                }
                boolean success = fetchRegistry(remoteRegionsModified);
                if (success) {
                    registrySize = localRegionApps.get().size();
                    lastSuccessfulRegistryFetchTimestamp =
                        System.currentTimeMillis();
                }
                // ʡ
            } catch (Throwable e) {
                logger.error("Cannot fetch registry from server", e);
            }
        }
    

2.6.6 fetchRegistry

private boolean fetchRegistry(boolean forceFullRegistryFetch) {
    Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
    try {
        // If the delta is disabled or if it is the first time, get all
        // applications
        // ȡػķбϢ
        Applications applications = getApplications();
        //ж϶ȷǷ񴥷ȫ£һ㶼ȫ£
        //1\. Ƿ£
        //2\. Ƿijregionرע
        //3\. ⲿʱǷָͨȫ£
        //4\. ػδЧķбϢ
        if (clientConfig.shouldDisableDelta()
            ||
            (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
            || forceFullRegistryFetch
            || (applications == null)
            || (applications.getRegisteredApplications().size() == 0)
            || (applications.getVersion() == -1)) //Client application does not
            have latest library supporting delta
        {
            //ȫ
            getAndStoreFullRegistry();
        } else {
            //
            getAndUpdateDelta(applications);
        }
        //¼һhash
        applications.setAppsHashCode(applications.getReconcileHashCode());
        logTotalInstances(); //־ӡӦõʵ֮
    } catch (Throwable e) {
        logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
        return false;
    } finally {
        if (tracer != null) {
            tracer.stop();
        }
    }
    //ػµ¼㲥עļע÷ѱCloudEurekaClientд
    onCacheRefreshed();
    // Update remote status based on refreshed data held in the cache
    //ոոµĻУEureka serverķба˵ǰӦõ״̬
    //ǰʵijԱlastRemoteInstanceStatus¼һθµĵǰӦ״̬
    //״̬updateInstanceRemoteStatusȽ һ£͸lastRemoteInstanceStatusҹ㲥Ӧ¼
    updateInstanceRemoteStatus();
    // registry was fetched successfully, so return true
    return true;
}

2.6.7 getAndStoreFullRegistry eureka server˻ȡעĵĵַϢȻ²õػ localRegionApps

private void getAndStoreFullRegistry() throws Throwable {
    long currentUpdateGeneration = fetchRegistryGeneration.get();
    logger.info("Getting all instance registry info from the eureka server");
    Applications apps = null;
    EurekaHttpResponse<Applications> httpResponse =
        clientConfig.getRegistryRefreshSingleVipAddress() == null
        ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
        :
    eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddre
                                       ss(), remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        apps = httpResponse.getEntity();
    }
    logger.info("The response status is {}", httpResponse.getStatusCode());
    if (apps == null) {
        logger.error("The application is null for some reason. Not storing this information");
    } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration,
                                                     currentUpdateGeneration + 1)) {
        localRegionApps.set(this.filterAndShuffle(apps));
        logger.debug("Got full registry with apps hashcode {}",
                     apps.getAppsHashCode());
    } else {
        logger.warn("Not updating applications as another thread is updating it already");
    }
}

2.6.8 ˲ѯַ ǰ֪ͻ˷ַIJѯ֣һȫһȫѯ󣬻Eureka-serverApplicationsResourcegetContainers

󣬻 ApplicationsResource.getContainerDifferential

2.6.9 ApplicationsResource.getContainers տͻ˷͵ĻȡȫעϢ

@GET
public Response getContainers(@PathParam("version") String version,
                              @HeaderParam(HEADER_ACCEPT) String acceptHeader,
                              @HeaderParam(HEADER_ACCEPT_ENCODING) String
                              acceptEncoding,
                              @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT)
                              String eurekaAccept,
                              @Context UriInfo uriInfo,
                              @Nullable @QueryParam("regions") String
                              regionsStr) {
    boolean isRemoteRegionRequested = null != regionsStr &&
        !regionsStr.isEmpty();
    String[] regions = null;
    if (!isRemoteRegionRequested) {
        EurekaMonitors.GET_ALL.increment();
    } else {
        regions = regionsStr.toLowerCase().split(",");
        Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
        EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
    }
    // EurekaServer޷ṩ񣬷403
    if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
        return Response.status(Status.FORBIDDEN).build();
    }
    CurrentRequestVersion.set(Version.toEnum(version));
    KeyType keyType = Key.KeyType.JSON;// ÷ݸʽĬJSON
    String returnMediaType = MediaType.APPLICATION_JSON;
    if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
        // յͷûоʽϢ򷵻ظʽΪXML
        keyType = Key.KeyType.XML;
        returnMediaType = MediaType.APPLICATION_XML;
    }
    // 
    Key cacheKey = new Key(Key.EntityType.Application,
                           ResponseCacheImpl.ALL_APPS,
                           keyType, CurrentRequestVersion.get(),
                           EurekaAccept.fromString(eurekaAccept), regions
                          );
    // زͬı͵ݣȥȡݵķһ
    Response response;
    if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
        response = Response.ok(responseCache.getGZIP(cacheKey))
            .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
            .header(HEADER_CONTENT_TYPE, returnMediaType)
            .build();
    } else {
        response = Response.ok(responseCache.get(cacheKey))
            .build();
    }
    CurrentRequestVersion.remove();
    return response;
}

2.6.10 responseCache.getGZIP ӻжȡݡ

public byte[] getGZIP(Key key) {
    Value payload = getValue(key, shouldUseReadOnlyResponseCache);
    if (payload == null) {
        return null;
    }
    return payload.getGzipped();
}
Value getValue(final Key key, boolean useReadOnlyCache) {
    Value payload = null;
    try {
        if (useReadOnlyCache) {
            final Value currentPayload = readOnlyCacheMap.get(key);
            if (currentPayload != null) {
                payload = currentPayload;
            } else {
                payload = readWriteCacheMap.get(key);
                readOnlyCacheMap.put(key, payload);
            }
        } else {
            payload = readWriteCacheMap.get(key);
        }
    } catch (Throwable t) {
        logger.error("Cannot get value for key : {}", key, t);
    }
    return payload;
}

ο

https://lijunyi.xyz/docs/SpringCloud/SpringCloud.html#_2-2-x-%E5%88%86%E6%94%AF https://mp.weixin.qq.com/s/2jeovmj77O9Ux96v3A0NtA https://juejin.cn/post/6931922457741770760 https://github.com/D2C-Cai/herring http://c.biancheng.net/springcloud https://github.com/macrozheng/springcloud-learning