Back to Javatutorial

SpringCloudAlibabaNacos源码分析:概览

docs/Spring全家桶/SpringCloudAlibaba源码分析/SpringCloudAlibabaNacos源码分析:概览.md

1.0.013.0 KB
Original Source

ڿƪ֮ǰöNACOSع˽⣬ƼSpring Cloud Alibaba Nacosƪ

ԹܣĿĵȥӦԴ룬һ˽⹦αʵֳġ

һԴĶȺ̫ϸڣҪߴԴ٣ᡣ

һ

GitHubӦҳNACOScloneĿ¼ļ߳ǶڿԴаIJֲࡣ

<figure data-size="normal">

<figcaption>nacosĿ¼ṹ</figcaption> </figure> <figure data-size="normal">

<figcaption>ģͼ</figcaption> </figure> <figure data-size="normal">

<figcaption>nacosģ</figcaption> </figure>

ͼ˳ҵͻƿˣݾͼnacos-consolenacos-namingnacos-config˳ϣܿˡ

Ǹо޴ֵĻǾƲnacos-exampleҪҵĵڣһ֪

÷

ȴһ˵com.alibaba.nacos.api.NacosFactory

ľ̬ڴConfigServiceNamingServiceƣԴConfigServiceΪ

public static ConfigService createConfigService(Properties properties) throws NacosException {
        try {
            Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
            Constructor constructor = driverImplClass.getConstructor(Properties.class);
            ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);
            return vendorImpl;
        } catch (Throwable e) {
            throw new NacosException(-400, e.getMessage());
        }
}

ûʲôӵ߼ʹõǻķԭpropertiesЩԿͨbootstrap.ymlָӦNacosConfigProperties

Ҫϸǹ캯жnamespaceʼDzݡ

private void initNamespace(Properties properties) {
        String namespaceTmp = null;

        String isUseCloudNamespaceParsing =
            properties.getProperty(PropertyKeyConst.IS_USE_CLOUD_NAMESPACE_PARSING,
                System.getProperty(SystemPropertyKeyConst.IS_USE_CLOUD_NAMESPACE_PARSING,
                    String.valueOf(Constants.DEFAULT_USE_CLOUD_NAMESPACE_PARSING)));

        if (Boolean.valueOf(isUseCloudNamespaceParsing)) {
            namespaceTmp = TemplateUtils.stringBlankAndThenExecute(namespaceTmp, new Callable<String>() {
                @Override
                public String call() {
                    return TenantUtil.getUserTenantForAcm();
                }
            });

            namespaceTmp = TemplateUtils.stringBlankAndThenExecute(namespaceTmp, new Callable<String>() {
                @Override
                public String call() {
                    String namespace = System.getenv(PropertyKeyConst.SystemEnv.ALIBABA_ALIWARE_NAMESPACE);
                    return StringUtils.isNotBlank(namespace) ? namespace : EMPTY;
                }
            });
        }

        if (StringUtils.isBlank(namespaceTmp)) {
            namespaceTmp = properties.getProperty(PropertyKeyConst.NAMESPACE);
        }
        namespace = StringUtils.isNotBlank(namespaceTmp) ? namespaceTmp.trim() : EMPTY;
        properties.put(PropertyKeyConst.NAMESPACE, namespace);
}

propertiesָǷƻеnamespaceǵģȥȡƻϵͳǣôͶȡpropertiesָnamespaceûָĻսǿַӴϿȡƻnamespace첽ʽĿǰ汾ʹõͬá

ConfigService涨һϵнӿڷҪġ

ÿҵʵնΪHttp󣬾õserverAddrַתʹãȻһʱʱ󣬶󲻳ɹˣǾͻ׳쳣

nacos-clientշն䵽nacos-configϣʹJdbcTemplateݳ־û

һֵĴһףãȡúɾö֣Ͳչˡ

صһüֵԴ롣

Ƚעcom.alibaba.nacos.client.config.impl.CacheDataݽṹϣǸ͵ijѪģͣҪdz䵱listenerߵĽɫȡòôѺˡ

ʵϣԿCacheDataϢnamespace, contentlistenerۺһˣΪһÿԸӶlistenerʵʩΪlistenerӿڿжʵ֣ÿlistenerֻһʵϡ

public void addListener(Listener listener) {
        if (null == listener) {
            throw new IllegalArgumentException("listener is null");
        }
        ManagerListenerWrap wrap = new ManagerListenerWrap(listener);
        if (listeners.addIfAbsent(wrap)) {
            LOGGER.info("[{}] [add-listener] ok, tenant={}, dataId={}, group={}, cnt={}", name, tenant, dataId, group,
                listeners.size());
        }
}

ʹCopyOnWriteArrayList.addIfAbsentҪequalsManagerListenerWrapǶlistenerһʽİʵequals

@Override
public boolean equals(Object obj) {
        if (null == obj || obj.getClass() != getClass()) {
            return false;
        }
        if (obj == this) {
            return true;
        }
        ManagerListenerWrap other = (ManagerListenerWrap) obj;
        return listener.equals(other.listener);
}

ϲ㷭ҵlistener߲ĹAPIcom.alibaba.nacos.client.config.impl.ClientWorker

ͬǶlistenerĹظУ飬cacheMapǹؼ¶壺

private final AtomicReference<Map<String, CacheData>> cacheMap = new AtomicReference<Map<String, CacheData>>()

ʹ˾ԭԲԵAtomicReferenceԱⲢݲһµ⣬һHashMapvalueCacheData󣬶keyһɹģGroupKeyпҵ

static public String getKeyTenant(String dataId, String group, String tenant) {
        StringBuilder sb = new StringBuilder();
        urlEncode(dataId, sb);
        sb.append('+');
        urlEncode(group, sb);
        if (StringUtils.isNotEmpty(tenant)) {
            sb.append('+');
            urlEncode(tenant, sb);
        }
        return sb.toString();
}

ʵǽϢá+ŽƴӣϢбˡ+͡%ʹurlEncodeбת塣ȻҲ׵ĽͲչˡ

޷ǾǾcacheMapһϵgetsetάlistenerرעǣÿθ²һcopy󣬲˶֮setǣcacheMapС

˵һlistenerġ

ȻClientWorkerпҵעתƵ캯СУע⵽ʼ̳߳أ

    executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
                t.setDaemon(true);
                return t;
            }
        });

        executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
                t.setDaemon(true);
                return t;
            }
        });

        executor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                try {
                    checkConfigInfo();
                } catch (Throwable e) {
                    LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
                }
            }
       }, 1L, 10L, TimeUnit.MILLISECONDS);

ִжʱscheduledThreadPool̳߳صķֹҲǶ׵ģexecutorڷü񣬶executorServiceĽߣִĽɫ

Էֻ̳߳1ִ̳߳߳صĺ߳CPU

ΪüһѯḶ́һִܼҪõƣNACOSĿǰʹһȽϼ򵥵ķ

public void checkConfigInfo() {
        // 
        int listenerSize = cacheMap.get().size();
        // ȡΪ
        int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
        if (longingTaskCount > currentLongingTaskCount) {
            for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
                // ҪжǷִ Ҫú롣 бġ仯̿
                executorService.execute(new LongPollingRunnable(i));
            }
            currentLongingTaskCount = longingTaskCount;
        }
}

ParamUtil.getPerTaskConfigSize()зصÿܼޣĬ3000ͨϵͳPER_TASK_CONFIG_SIZEޡ

ӴϿԿǰlistenerûг3000ü̳߳ػתϸֵĴ룬ǻᷢһЩģҪΧһϵ⡣

ѯҪ߼

  • 鱾ãCacheData洢Ϣһ£
  • serverãCacheData洢Ϣ

ע뷢

Ļⲿִ뿴Ƚˣṹϻơ

ֱӽcom.alibaba.nacos.api.naming.NamingServiceжregisterInstanceعڷעᡣ

ȿInstanceʵݣidipportserviceNameclusterNameڼȺweightȨأhealthyǷenabledǷãephemeralǷʱģ9ȫConsole֡

Ȼֱӿעķ

   @Override
    public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {

        if (instance.isEphemeral()) {
            BeatInfo beatInfo = new BeatInfo();
            beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
            beatInfo.setIp(instance.getIp());
            beatInfo.setPort(instance.getPort());
            beatInfo.setCluster(instance.getClusterName());
            beatInfo.setWeight(instance.getWeight());
            beatInfo.setMetadata(instance.getMetadata());
            beatInfo.setScheduled(false);
            beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
        }
        serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
    }

ǰһδǶʱʵĴڹһ͸NACOS

registerServiceǷװHTTPInstanceControllerд

Ŀspring-cloud-starter-alibaba-nacos-discoveryĬԶעġ뿴ԶעḶ́ԴAbstractAutoServiceRegistrationʼ֣һδ룺

	@EventListener(WebServerInitializedEvent.class)
	public void bind(WebServerInitializedEvent event) {
		ApplicationContext context = event.getApplicationContext();
		if (context instanceof ConfigurableWebServerApplicationContext) {
			if ("management".equals(
					((ConfigurableWebServerApplicationContext) context).getServerNamespace())) {
				return;
			}
		}
		this.port.compareAndSet(0, event.getWebServer().getPort());
		this.start();
	}

Webʼɵ¼ջִstart

	public void start() {
		if (!isEnabled()) {
			if (logger.isDebugEnabled()) {
				logger.debug("Discovery Lifecycle disabled. Not starting");
			}
			return;
		}
		// only initialize if nonSecurePort is greater than 0 and it isn't already running
		// because of containerPortInitializer below
		if (!this.running.get()) {
			register();
			if (shouldRegisterManagement()) {
				registerManagement();
			}
			this.context.publishEvent(new InstanceRegisteredEvent<>(this, getConfiguration()));
			this.running.compareAndSet(false, true);
		}

	}

УregisterĵIJˣԴNacosServiceRegistryʵ֣

	@Override
	public void register(NacosRegistration registration) {

		if (!registration.isRegisterEnabled()) {
			logger.info("Nacos Registration is disabled...");
			return;
		}
		if (StringUtils.isEmpty(registration.getServiceId())) {
			logger.info("No service to register for nacos client...");
			return;
		}
		NamingService namingService = registration.getNacosNamingService();
		String serviceId = registration.getServiceId();

		Instance instance = new Instance();
		instance.setIp(registration.getHost());
		instance.setPort(registration.getPort());
		instance.setWeight(registration.getRegisterWeight());
		instance.setClusterName(registration.getCluster());
		instance.setMetadata(registration.getMetadata());
		try {
			namingService.registerInstance(serviceId, instance);
			logger.info("nacos registry, {} {}:{} register finished", serviceId, instance.getIp(), instance.getPort());
		}catch (Exception e) {
			logger.error("nacos registry, {} register failed...{},", serviceId, registration.toString(), e);
		}
	}

δͷdzϤˣվͻصnamingService.registerInstance

    /**
     * Map<namespace, Map<group::serviceName, Service>>
     */
    private Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();

ϳһʵࣺcom.alibaba.nacos.naming.core.ServiceServiceǰInstanceһServiceжInstanceһCluster

<figure data-size="normal">

<figcaption>ʵȺ</figcaption> </figure>

ڵregisterInstanceעʵʱֶӦServiceûбעᣬôregisterServiceһʼӦClusterĶʱ

registerInstance෴deregisterInstanceΪȡעᣬҲΪǷʵߡ

NACOSʵַֹܡ

ߣ÷ĽǶɵstarterĿиࣺNacosServerListҪǼ̳AbstractServerListʵؼĽӿڷ൱NACOSRibbonĶԽӵ㡣

public interface ServerList<T extends Server> {

    public List<T> getInitialListOfServers();

    /**
     * Return updated list of servers. This is called say every 30 secs
     * (configurable) by the Loadbalancer's Ping cycle
     * 
     */
    public List<T> getUpdatedListOfServers();   

}

NACOSӿڵʵ֣ʹgetServers뵽getServers棬ʵ˵NacosNamingService.selectInstancesͨserviceIdȡServiceInfoȻȡServiceЧInstance

ṩߣ÷ĽǶȿNACOSͨʱʵʱServiceInfoҪҵ߼HostReactorʵֵġǰserviceMapһHostReactorάserviceInfoMap

private Map<String, ServiceInfo> serviceInfoMap;

HostReactorFailoverReactorServiceInfo˴̻棬Ȼ˶ʱָĿ¼лServiceInfoԴʵFailoverơfailover-modeҲпصģʵһضļһݣЩõļҲͨʱʵֵġ

File switchFile = new File(failoverDir + UtilAndComs.FAILOVER_SWITCH);

ͼʾ

<figure data-size="normal">

<figcaption>ָͼ</figcaption> </figure>

ġ̨(Console)

һǹ̨ʵ֣ʵһdz͵WEBĿ

ʹSpring Security + JWTаȫƣǰ˼ReactJsJdbcTemplateݿ־û

Ҫעǣ̨ṩĹܲǴnacos-consoleлȡݣǷɢ˸С

nacos-consoleṩ˿̨¼namespacę״̬ùͷֱnacos-confignacos-namingṩAPIЩAPIǹᵽOpen-API

塢ܽ

NACOSԴͨ׶ûʲôҲûнвװͰһ̾ijԱڰСʱ֮ڰĿ硣

ȻҲһЩɺӵȱ㣬磬ע͹٣뻹кܴعռ䣬tenantnamespaceʹá

Spring Cloud Alibaba NacosĽܵ˾ͽˣϣ

ο

https://lijunyi.xyz/docs/SpringCloud/SpringCloud.html#_2-2-x-%E5%88%86%E6%94%AF https://mp.weixin.qq.com/s/2jeovmj77O9Ux96v3A0NtA https://juejin.cn/post/6931922457741770760 https://github.com/D2C-Cai/herring http://c.biancheng.net/springcloud https://github.com/macrozheng/springcloud-learning