docs/Spring全家桶/SpringCloud源码分析/SpringCloudLoadBalancer源码分析.md
Spring Cloud LoadBalancerĿǰSpringٷǷspring-cloud-commonsSpring Cloud°汾Ϊ2021.0.2
Spring Cloud LoadBalancer ĵַ https://docs.spring.io/spring-cloud-commons/docs/3.1.2/reference/html/#spring-cloud-loadbalancer
Spring Cloudĵַ https://docs.spring.io/spring-cloud/docs/current/reference/html/
һNetflix Ribbonֹͣ£Spring Cloud LoadBalancerSpring CloudٷԼṩĿͻ˸ؾ,ʵ֣Ribbon
Spring CloudṩԼĿͻ˸ƽʵ֡ڸؾƣReactiveLoadBalancerӿڣṩ˻round-robinѯRandomʵ֡Ϊ˴ӦʽServiceInstanceListSupplierѡʵҪʹServiceInstanceListSupplierĿǰ֧ServiceInstanceListSupplierĻڷֵʵ֣ʵʹ·еķֿͻ˴Service Discoveryмõʵ
ͨSpring Cloud LoadBalance
spring:
cloud:
loadbalancer:
enabled: false
ǰsimple-ecommerceĿڸPomϸԿǰ<<SpringCloudAlibabaע֮NacosʵսԴ>>Spring Cloudİ汾Ϊ2021.0.1ǰҲ˵Spring Cloud Alibabaspring-cloud-starter-alibaba-nacos-discoveryspring-cloud-loadbalancer
עHoxton֮ǰİ汾ĬϸؾΪRibbonҪƳRibbonúspring.cloud.loadbalancer.ribbon.enabled: false
Spring BootĿstarterҲSpring Boot Caching and Evictor.
<dependency>
<groupId>org.springframework.cloud</groupId>
spring-cloud-starter-loadbalancer
</dependency>
ʹSpringٷṩ˸ؾĿͻ֮һRestTemplateRestTemplateSpringṩڷRestĿͻˣRestTemplateṩ˶ֱݷԶHttpķܹ߿ͻ˵ıдЧʡĬ£RestTemplateĬjdkHTTPӹߡRestTemplateConfig࣬ע @LoadBalancedע⣬ĬʹõReactiveLoadBalancerʵRoundRobinLoadBalancer
package cn.itxs.ecom.order.config;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.client.RestTemplate;
@Configuration
public class RestTemplateConfig {
@LoadBalanced
@Bean
public RestTemplate restTemplate() {
return new RestTemplate() ;
}
}
жdeductRest
package cn.itxs.ecom.order.controller;
import cn.itxs.ecom.commons.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;
/**
* @Name OrderController
* @Description
* @Author itxs
* @Date 2022/4/10 20:15
* @Version 1.0
* @History
*/
@RestController
public class OrderController {
@Autowired
OrderService orderService;
@Autowired
private RestTemplate restTemplate;
@RequestMapping("/create/{userId}/{commodityCode}/{count}")
public String create(@PathVariable("userId") String userId,@PathVariable("commodityCode") String commodityCode, @PathVariable("count") int count){
return orderService.create(userId,commodityCode,count).toString();
}
@RequestMapping("/deductRest/{commodityCode}/{count}")
public String deductRest(@PathVariable("commodityCode") String commodityCode, @PathVariable("count") int count){
String url = "http://ecom-storage-service/deduct/"+commodityCode+"/"+count;
return restTemplate.getForObject(url, String.class);
}
}
ǰserver.portǷNacosעNacosĵ÷ڱļbootstrap.ymlֱΪ4080408140823ʵ
server:
port: 4080
鿴nacos-б飬Կ3Ŀʵ1ʵ
6ζdedectӿڣhttp://localhost:4070/deductRest/1001/1 ӲԵĽҲ֤LoadBalancerĬѯؾԡ
Զ帺ؾCustomLoadBalancerConfiguration
package cn.itxs.ecom.order.config;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.loadbalancer.core.RandomLoadBalancer;
import org.springframework.cloud.loadbalancer.core.ReactorLoadBalancer;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.core.env.Environment;
public class CustomLoadBalancerConfiguration {
@Bean
ReactorLoadBalancer<ServiceInstance> randomLoadBalancer(Environment environment,
LoadBalancerClientFactory loadBalancerClientFactory) {
String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
return new RandomLoadBalancer(loadBalancerClientFactory
.getLazyProvider(name, ServiceInstanceListSupplier.class),
name);
}
}
RestTemplateConfigLoadBalancerClientָ࣬valueֵΪṩҲǿơ
package cn.itxs.ecom.order.config;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.cloud.loadbalancer.annotation.LoadBalancerClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.client.RestTemplate;
@Configuration
@LoadBalancerClient(value = "ecom-storage-service", configuration = CustomLoadBalancerConfiguration.class)
public class RestTemplateConfig {
@LoadBalanced
@Bean
public RestTemplate restTemplate(RestTemplateBuilder builder) {
return builder.build() ;
}
}
ηʶdedectӿڲȷлΪؾԡ
ṩ3мSpring Cloud LoadBalancerķʽ˵һʹù֧Spring Web FluxӦʽ̣WebClientǴSpring WebFlux 5.0汾ʼṩһĻӦʽ̵ĽHttpĿͻ˹ߡӦʽ̵ĻReactorġWebClientṩ˱HttpʽӦgetpostputdeleteȷӦ
ڶspring-boot-starter-webflux
<dependency>
<groupId>org.springframework.boot</groupId>
spring-boot-starter-webflux
</dependency>
WebClientConfig
package cn.itxs.ecom.order.config;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.client.WebClient;
@Configuration
public class WebClientConfig {
@LoadBalanced
@Bean
WebClient.Builder webClientBuilder() {
return WebClient.builder();
}
@Bean
WebClient webClient() {
return webClientBuilder().build();
}
}
WebClientӿʵ֣
@Autowired
private WebClient webClient;
@RequestMapping(value = "/deductWebClient/{commodityCode}/{count}")
public Mono<String> deductWebClient(@PathVariable("commodityCode") String commodityCode, @PathVariable("count") int count) {
String url = "http://ecom-storage-service/deduct/"+commodityCode+"/"+count;
// WebClient
Mono<String> result = webClient.get().uri(url)
.retrieve().bodyToMono(String.class);
return result;
}
ʶеļWebClientӿڣhttp://localhost:4070/deductWebClient/1001/1 سɹ
ǻûڹķʽͨWebClientʹReactiveLoadBalancerĿSpring Cloud LoadBalancer starterSpring -webflux·УReactorLoadBalancerExchangeFilterFunctionԶõġ
WebClientʹReactiveLoadBalancerӿʵ֣
@Autowired
private ReactorLoadBalancerExchangeFilterFunction lbFunction;
@RequestMapping(value = "/deductWebFluxReactor/{commodityCode}/{count}")
public Mono<String> deductWebFluxReactor(@PathVariable("commodityCode") String commodityCode, @PathVariable("count") int count) {
String url = "/deduct/"+commodityCode+"/"+count;
Mono<String> result = WebClient.builder().baseUrl("http://ecom-storage-service")
.filter(lbFunction)
.build()
.get()
.uri(url)
.retrieve()
.bodyToMono(String.class);
return result;
}
ʶеļWebClientӿڣhttp://localhost:4070/deductWebFluxReactor/1001/1 سɹ
LoadBalancerṩܶܣȤϸĺͶʵ
Spring Cloud LoadBalancerԴȴRestTemplateؾļʵ֣֧֮Spring Web FluxӦʽ̵ʵԭ˼Ҳͬͨͻʵָؾ⡣RestTemplateԴп֪̳InterceptingHttpAccessor
InterceptingHttpAccessorṩһsetInterceptorsҪʵClientHttpRequestInterceptorӿڼɣʵԶ˽ӿ֮ǰȵintercept൱ServletеFilter
// ʵڳInterceptingHttpAccessor
// RestTemplate.InterceptingHttpAccessor#setInterceptors
public void setInterceptors(List<ClientHttpRequestInterceptor> interceptors) {
Assert.noNullElements(interceptors, "'interceptors' must not contain null elements");
// Take getInterceptors() List as-is when passed in here
if (this.interceptors != interceptors) {
this.interceptors.clear();
this.interceptors.addAll(interceptors);
AnnotationAwareOrderComparator.sort(this.interceptors);
}
}
ӹ֪Spring Cloud LoadBalancerspring-cloud-commonsҲΪĵ@LoadBalancedעҲspring-cloud-commonsʵ֣SpringBootԶװԭȲ鿴ʵѷspring-cloud-commonsԶLoadBalancerAutoConfigurationReactorLoadBalancerClientAutoConfiguration
ʱ@ConditionalΪע⣩ԶLoadBalancerInterceptorע뵽RestTemplateС
LoadBalancerInterceptorʵClientHttpRequestInterceptorӿڣҲʵinterceptʵָؾش
LoadBalancerClientڽиؾ̳ServiceInstanceChooserӿڣӷбѡһַеáLoadBalancerClientִexecute()ִģreconstructURI()عURL
LoadBalancerClientӿSpring Cloud LoadBalancerṩĬʵΪBlockingLoadBalancerClient
@SuppressWarnings({ "unchecked", "rawtypes" })
public class BlockingLoadBalancerClient implements LoadBalancerClient {
private final ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory;
/**
* @deprecated in favour of
* {@link BlockingLoadBalancerClient#BlockingLoadBalancerClient(ReactiveLoadBalancer.Factory)}
*/
@Deprecated
public BlockingLoadBalancerClient(LoadBalancerClientFactory loadBalancerClientFactory,
LoadBalancerProperties properties) {
this.loadBalancerClientFactory = loadBalancerClientFactory;
}
public BlockingLoadBalancerClient(ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory) {
this.loadBalancerClientFactory = loadBalancerClientFactory;
}
@Override
public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
String hint = getHint(serviceId);
LoadBalancerRequestAdapter<T, DefaultRequestContext> lbRequest = new LoadBalancerRequestAdapter<>(request,
new DefaultRequestContext(request, hint));
Set<LoadBalancerLifecycle> supportedLifecycleProcessors = getSupportedLifecycleProcessors(serviceId);
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));
ServiceInstance serviceInstance = choose(serviceId, lbRequest);
// ѡ
if (serviceInstance == null) {
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(
new CompletionContext<>(CompletionContext.Status.DISCARD, lbRequest, new EmptyResponse())));
throw new IllegalStateException("No instances available for " + serviceId);
}
return execute(serviceId, serviceInstance, lbRequest);
}
@Override
public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request)
throws IOException {
DefaultResponse defaultResponse = new DefaultResponse(serviceInstance);
Set<LoadBalancerLifecycle> supportedLifecycleProcessors = getSupportedLifecycleProcessors(serviceId);
Request lbRequest = request instanceof Request ? (Request) request : new DefaultRequest<>();
supportedLifecycleProcessors
.forEach(lifecycle -> lifecycle.onStartRequest(lbRequest, new DefaultResponse(serviceInstance)));
try {
T response = request.apply(serviceInstance);
Object clientResponse = getClientResponse(response);
supportedLifecycleProcessors
.forEach(lifecycle -> lifecycle.onComplete(new CompletionContext<>(CompletionContext.Status.SUCCESS,
lbRequest, defaultResponse, clientResponse)));
return response;
}
catch (IOException iOException) {
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(
new CompletionContext<>(CompletionContext.Status.FAILED, iOException, lbRequest, defaultResponse)));
throw iOException;
}
catch (Exception exception) {
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(
new CompletionContext<>(CompletionContext.Status.FAILED, exception, lbRequest, defaultResponse)));
ReflectionUtils.rethrowRuntimeException(exception);
}
return null;
}
private <T> Object getClientResponse(T response) {
ClientHttpResponse clientHttpResponse = null;
if (response instanceof ClientHttpResponse) {
clientHttpResponse = (ClientHttpResponse) response;
}
if (clientHttpResponse != null) {
try {
return new ResponseData(clientHttpResponse, null);
}
catch (IOException ignored) {
}
}
return response;
}
private Set<LoadBalancerLifecycle> getSupportedLifecycleProcessors(String serviceId) {
return LoadBalancerLifecycleValidator.getSupportedLifecycleProcessors(
loadBalancerClientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),
DefaultRequestContext.class, Object.class, ServiceInstance.class);
}
@Override
public URI reconstructURI(ServiceInstance serviceInstance, URI original) {
return LoadBalancerUriTools.reconstructURI(serviceInstance, original);
}
@Override
public ServiceInstance choose(String serviceId) {
return choose(serviceId, REQUEST);
}
// ͨͬĸؾͻʵѡͬķ
@Override
public <T> ServiceInstance choose(String serviceId, Request<T> request) {
ReactiveLoadBalancer<ServiceInstance> loadBalancer = loadBalancerClientFactory.getInstance(serviceId);
if (loadBalancer == null) {
return null;
}
Response<ServiceInstance> loadBalancerResponse = Mono.from(loadBalancer.choose(request)).block();
if (loadBalancerResponse == null) {
return null;
}
return loadBalancerResponse.getServer();
}
private String getHint(String serviceId) {
LoadBalancerProperties properties = loadBalancerClientFactory.getProperties(serviceId);
String defaultHint = properties.getHint().getOrDefault("default", "default");
String hintPropertyValue = properties.getHint().get(serviceId);
return hintPropertyValue != null ? hintPropertyValue : defaultHint;
}
}
BlockingLoadBalancerClientгLoadBalancerClientFactoryͨgetInstanceȡĸؾͻˡͨLoadBalancerClientFactoryȡĸؾʵloadBalancer.choose(request)ӿchoose()ʵָݸؾ㷨ѡһɸؾ⣬ReactiveLoadBalancer<t> getInstance(String serviceId) ĬʵLoadBalancerClientFactory </t>
LoadBalancerClientFactoryͻʵ˲ͬĸؾ㷨ѯȡLoadBalancerClientFactory̳NamedContextFactoryNamedContextFactory̳ApplicationContextAwareʵSpring ApplicationContext
ReactiveLoadBalancerؾʵַѡSpring Cloud BalancerʵѯRoundRobinLoadBalancerRandomLoadBalancerNacosLoadBalancer㷨
ûʽָؾ㷨ĬȱʡֵΪRoundRobinLoadBalancer
@Bean
@ConditionalOnMissingBean
public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(Environment environment,
LoadBalancerClientFactory loadBalancerClientFactory) {
String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
return new RoundRobinLoadBalancer(
loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
}
LoadBalancerRequestcreateRequestڴLoadBalancerRequestڲLoadBalancerClientҲBlockingLoadBalancerClient
ճĿУһ㸺ؾⶼǽFeignʹãʱFeignLoadBalancerԶFeignLoadBalancerAutoConfigurationʵ
Ҳһ»WebClient@Loadbalanced̵룬ؾReactorLoadBalancerClientAutoConfigurationһԶװ࣬Ŀ WebClient ReactiveLoadBalancer ֮Զװ̾ͿʼУʼһʵ ExchangeFilterFunction ʵںʵΪע뵽WebClientȤо
֪LoadBalancerClientFactoryǴͻؾͿͻʵĹݿͻƴһSpring ApplicationContextȡbean˽뵽LoadBalancerClientFactoryУҪȥʵӽӿReactorServiceInstanceLoadBalancerΪȥȡؾʵʱͨȥвReactorServiceInstanceLoadBalancer͵beanʵֵģԲRandomLoadBalancerʵִ
package org.springframework.cloud.loadbalancer.core;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Mono;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.DefaultResponse;
import org.springframework.cloud.client.loadbalancer.EmptyResponse;
import org.springframework.cloud.client.loadbalancer.Request;
import org.springframework.cloud.client.loadbalancer.Response;
public class RandomLoadBalancer implements ReactorServiceInstanceLoadBalancer {
private static final Log log = LogFactory.getLog(RandomLoadBalancer.class);
private final String serviceId;
private ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;
/**
* @param serviceInstanceListSupplierProvider a provider of
* {@link ServiceInstanceListSupplier} that will be used to get available instances
* @param serviceId id of the service for which to choose an instance
*/
public RandomLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider,
String serviceId) {
this.serviceId = serviceId;
this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
}
@SuppressWarnings("rawtypes")
@Override
public Mono<Response<ServiceInstance>> choose(Request request) {
ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
.getIfAvailable(NoopServiceInstanceListSupplier::new);
return supplier.get(request).next()
.map(serviceInstances -> processInstanceResponse(supplier, serviceInstances));
}
private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier,
List<ServiceInstance> serviceInstances) {
Response<ServiceInstance> serviceInstanceResponse = getInstanceResponse(serviceInstances);
if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());
}
return serviceInstanceResponse;
}
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {
if (instances.isEmpty()) {
if (log.isWarnEnabled()) {
log.warn("No servers available for service: " + serviceId);
}
return new EmptyResponse();
}
int index = ThreadLocalRandom.current().nextInt(instances.size());
ServiceInstance instance = instances.get(index);
return new DefaultResponse(instance);
}
}
ʵֽмд
package cn.itxs.ecom.order.config;
import java.util.List;
import java.util.Random;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import org.springframework.cloud.client.loadbalancer.DefaultResponse;
import org.springframework.cloud.client.loadbalancer.EmptyResponse;
import org.springframework.cloud.client.loadbalancer.Request;
import org.springframework.cloud.client.loadbalancer.Response;
import reactor.core.publisher.Mono;
public class ItxsRandomLoadBalancerClient implements ReactorServiceInstanceLoadBalancer {
// б
private ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;
public ItxsRandomLoadBalancerClient(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider) {
this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
}
@Override
public Mono<Response<ServiceInstance>> choose(Request request) {
ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider.getIfAvailable();
return supplier.get().next().map(this::getInstanceResponse);
}
/**
* ʹȡ
* @param instances
* @return
*/
private Response<ServiceInstance> getInstanceResponse(
List<ServiceInstance> instances) {
System.out.println("ItxsRandomLoadBalancerClient start");
if (instances.isEmpty()) {
return new EmptyResponse();
}
System.out.println("ItxsRandomLoadBalancerClient random");
// 㷨
int size = instances.size();
Random random = new Random();
ServiceInstance instance = instances.get(random.nextInt(size));
return new DefaultResponse(instance);
}
}
CustomLoadBalancerConfiguration滻Ϊ
package cn.itxs.ecom.order.config;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import org.springframework.context.annotation.Bean;
public class CustomLoadBalancerConfiguration {
@Bean
public ReactorServiceInstanceLoadBalancer customLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider) {
return new ItxsRandomLoadBalancerClient(serviceInstanceListSupplierProvider);
}
}
Ͷhttp://localhost:4070/deductRest/1001/1 ̨ѴӡԶItxsRandomLoadBalancerClientе־ͳɹʽ
https://lijunyi.xyz/docs/SpringCloud/SpringCloud.html#_2-2-x-%E5%88%86%E6%94%AF https://mp.weixin.qq.com/s/2jeovmj77O9Ux96v3A0NtA https://juejin.cn/post/6931922457741770760 https://github.com/D2C-Cai/herring http://c.biancheng.net/springcloud https://github.com/macrozheng/springcloud-learning