docs/Spring全家桶/SpringCloud源码分析/SpringCloudHystrix源码分析.md
ѧϰĿ
ʽԭ֮ǰҪȷһ㣬 @HystrixCommand עʵַHystrix ڲDzAOPķʽشģݣҲϸʵһ¼װ Hystrix һ£ҪΪ²
Լ@HystrixCommand ע⡣
ʵĴ
Եá 1.Զע
@Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface MyHystrixCommand { //Ĭϳʱʱ int timeout() default 1000; //˷ String fallback() default ""; }
2.Զ
@Aspect //Aspectֲ֧ұΪһ
@Component
public class MyHystrixCommandAspect {
ExecutorService executorService= Executors.newFixedThreadPool(10);
//е
@Pointcut(value = "@annotation(MyHystrixCommand)")
public void pointCut(){
}
//е㷽ִ @Around൱@Before@AfterReturningܵܺ
@Around(value = "pointCut()&&@annotation(hystrixCommand)")
public Object doPointCut(ProceedingJoinPoint joinPoint, MyHystrixCommand hystrixCommand) throws Exception {
int timeout=hystrixCommand.timeout();
Future future=executorService.submit(()->{
try {
//ִproceedĿ귽ִ
return joinPoint.proceed();
} catch (Throwable throwable) {
throwable.printStackTrace();
}
return null;
});
Object rs;
try {
//ͨget첽ȴʵֳʱ
rs=future.get(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
future.cancel(true);
if(StringUtils.isBlank(hystrixCommand.fallback())){
throw new Exception("fallback is null");
}
//fallback
rs=invokeFallback(joinPoint,hystrixCommand.fallback());
}
return rs;
}
private Object invokeFallback(ProceedingJoinPoint joinPoint,String fallback) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
//ȡķMethod
MethodSignature signature=(MethodSignature)joinPoint.getSignature(); //ȡͱϢ
Method method=signature.getMethod();
Class<?>[] parameterTypes=method.getParameterTypes();
//õص
try {
Method fallbackMethod=joinPoint.getTarget().getClass().getMethod(fallback,parameterTypes);
method.setAccessible(true);
//ͨص
return fallbackMethod.invoke(joinPoint.getTarget(),joinPoint.getArgs());
} catch (Exception e) {
throw e;
}
}
}
3.Զ
@RestController
public class MyHystrixController {
@Autowired
OrderServiceClient orderServiceClient;
@MyHystrixCommand(fallback = "fallback",timeout = 2000)
@GetMapping("/myhystrix/get/{num}")
public String get(@PathVariable("num") int num){
return orderServiceClient.orderLists(num);
}
public String fallback(int num){
return "Զעⷽ";
}
}
http://localhost:8080/myhystrix/get/1ʱᴥΪڷˣnum=1ʱ3s
OKǾʵһװHystrixCommandֻʵHystrixĵһһע棬ĵײԶԶûôڽԴ֮ǰһRxJavaʲôΪHystrixײǻӦʽʵֵġ
2 RxJava 2.1 RxJava RxJava һӦʽ̣¼첽⡣¼ʽáࡣ
RxJava۲ģʽĶԱ
2.2 ۲
Observer
Observer observer = new Observer() {
@Override
public void onCompleted() {
System.out.println("۲Complete¼ø÷");
}
@Override
public void onError(Throwable throwable) {
System.out.println("Error¼Ӧ");
}
@Override
public void onNext(Object o) {
System.out.println("Next¼Ӧ:" + o);
}
};
//Subscriber = RxJava õһʵ Observer ij࣬ Observer ӿڽչ
Subscriber subscriber = new Subscriber() {
@Override
public void onCompleted() {
System.out.println("۲Complete¼ø÷");
}
@Override
public void onError(Throwable throwable) {
System.out.println("Error¼Ӧ");
}
@Override
public void onNext(Object o) {
System.out.println("Next¼Ӧ:" + o);
}
};
Subscriber Observer ӿڵ
ʹ÷ʽһ£RxJavasubscribeУObserverȱתSubscriberʹã Subscriber Observer ӿڽչ
onStart()ڻδӦ¼ǰãһЩʼsubscribe ڵ̵߳ãл̣߳ԲܽнUI±絯Щ
unsubscribe()ȡġڸ÷ú۲߽ٽӦ¼onStopпԵô˷ġø÷ǰʹ isUnsubscribed() ж״̬ȷ۲ObservableǷй۲Subscriberá 2.3 ۲ RxJava ṩ˶ַ ۲߶Observable
// 1just(T...)ֱӽIJηͳ Observable observable = Observable.just("A", "B", "C"); // εã // onNext("A"); // onNext("B"); // onNext("C"); // onCompleted(); // 2fromArray(T[]) / from(Iterable<? extends T>) : / Iterable ֳɾηͳ String[] words = {"A", "B", "C"}; Observable observable = Observable.fromArray(words); // εã // onNext("A"); // onNext("B"); // onNext("C"); // onCompleted();
2.4
observable.subscribe(observer); //Ĺϵ
2.5
public class RxJavaDemo {
// ReactiveX Java Ӧʽ̿(android
// Java stream() java8
//۲ģʽ
public static void main(String[] args) throws ExecutionException, InterruptedException {
final String[] datas = new String[]{"¼1"};
// ִĻص ֹ
//Observableǰصcall쳣ֹ
final Action0 onComplated = new Action0() {
@Override
public void call() {
System.out.println("۲Ҫ");
}
};
//۲
Observable<String> observable = Observable.defer(new Func0<Observable<String>>() {
@Override
public Observable<String> call() {
Observable observable1 = Observable.from(datas);
return observable1.doOnCompleted(onComplated);
}
});
// Observable<String> observable = Observable.just("¼1","¼2","");
//۲
Observer observer = new Observer() {
@Override
public void onCompleted() {
System.out.println("Comlate¼Ӧ");
}
@Override
public void onError(Throwable throwable) {
System.out.println("Error¼Ӧ");
}
@Override
public void onNext(Object o) {
System.out.println("Next¼Ӧ:" + o);
}
};
observable.subscribe(observer); //Ĺϵ
// String s = observable.toBlocking().toFuture().get();//첽ȴ
// System.out.println(s);
}
}
OKָʹRxJavaˣǿʼߣԴ롣
3 Դ ϹṩԴͼͼϿԿʵȥɨHystrixCommandעķȻأִ涨executequeueѡһеãȻHystrixCommandע⣬Hystrix@EnableHystrixע⡣
@SpringBootApplication
@EnableFeignClients("com.example.clients")
//@EnableDiscoveryClient //עʾUserע
@EnableHystrix //עⷽʽHystrix
public class HystrixEclipseUserApplication {
public static void main(String[] args) {
SpringApplication.run(HystrixEclipseUserApplication.class, args);
}
}
뵽@EnableHystrixע
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@EnableCircuitBreaker
public @interface EnableHystrix {
}
//@EnableHystrix̳@EnableCircuitBreaker
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableCircuitBreakerImportSelector.class)
public @interface EnableCircuitBreaker {
}
ⲽ룬źܶѧspringbootͬѧϤˣõImportע⣬ǿ϶һЩˣȻٽ EnableCircuitBreakerImportSelector;
@Order(Ordered.LOWEST_PRECEDENCE - 100)
public class EnableCircuitBreakerImportSelector
extends SpringFactoryImportSelector<EnableCircuitBreaker> {
@Override
protected boolean isEnabled() {
return getEnvironment().getProperty("spring.cloud.circuit.breaker.enabled",
Boolean.class, Boolean.TRUE);
}
}
EnableCircuitBreakerImportSelector̳SpringFactoryImportSelectorSpringFactoryImportSelectorϤĴ룬ʵDeferredImportSelectorӿڣʵselectImportsselectImportsļspring.factoriesضӦ org.springframework.cloud.client.circuitbreaker.EnableCircuitBreakerspring.facotriesļ
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.hystrix.HystrixAutoConfiguration,\
org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerAutoConfiguration,\
org.springframework.cloud.netflix.hystrix.ReactiveHystrixCircuitBreakerAutoConfiguration,\
org.springframework.cloud.netflix.hystrix.security.HystrixSecurityAutoConfiguration
org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker=\
org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerConfiguration
ӦEnableAutoConfigurationЩʵspringʱͨԶװƻȥʵע뵽IoCУǺĹע HystrixCircuitBreakerConfigurationࡣ
@Configuration(proxyBeanMethods = false)
public class HystrixCircuitBreakerConfiguration {
//Ǻĵbean
@Bean
public HystrixCommandAspect hystrixCommandAspect() {
return new HystrixCommandAspect();
}
...
}
뵽л֣ᷢҪעΪ@HystrixCommand@HystrixCollapserִעεķʱᱻִ methodsAnnotatedWithHystrixCommand
3.1 HystrixCommandAspect
@Aspect
public class HystrixCommandAspect {
private static final Map<HystrixPointcutType, MetaHolderFactory> META_HOLDER_FACTORY_MAP;
static {
//̬ͨעʵ
META_HOLDER_FACTORY_MAP = ImmutableMap.<HystrixPointcutType, MetaHolderFactory>builder()
.put(HystrixPointcutType.COMMAND, new CommandMetaHolderFactory())
.put(HystrixPointcutType.COLLAPSER, new CollapserMetaHolderFactory())
.build();
}
//עHystrixCommand
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
public void hystrixCommandAnnotationPointcut() {
}
//עHystrixCollapserϲ
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
public void hystrixCollapserAnnotationPointcut() {
}
//֪ͨ
@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
//ȡĿ귽
Method method = getMethodFromTarget(joinPoint);
Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
//ֻעעķ
if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
"annotations at the same time");
}
//ݲͬע⣬ѡӦmetaHolderFactory, MetaHolder, MetaHolder Ϣ
MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
//ȡĿ귽ĵԪݣǩ
MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
/**
* CommandCollapser GenericCommand ͬ GenericObservableCommand첽
* GenericCommandкܶsuperͨHystrixCommandBuilderFactory.getInstance().create(metaHolder) һHystrixCommandBuilderΪGenericCommadIJ
* new GenericCommand ͨsuperAbstractHystrixCommand
* AbstractHystrixCommand ͨsuperHystrixCommand
* HystrixCommandյAbstractCommand һ·
* һAbstractCommandз
*/
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
//ݷֵƶִ
ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
//ݲִͬͣؽ
Object result;
try {
//ǷӦʽģЩͬĻ
if (!metaHolder.isObservable()) {
//executeִ
result = CommandExecutor.execute(invokable, executionType, metaHolder);
} else {
result = executeObservable(invokable, executionType, metaHolder);
}
} catch (HystrixBadRequestException e) {
throw e.getCause();
} catch (HystrixRuntimeException e) {
throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
}
return result;
}
//HystrixCommandʱMetaHolderĴ
private static class CommandMetaHolderFactory extends MetaHolderFactory {
@Override
public MetaHolder create(Object proxy, Method method, Object obj, Object[] args, final ProceedingJoinPoint joinPoint) {
//ȡעHystrixCommand
HystrixCommand hystrixCommand = method.getAnnotation(HystrixCommand.class);
//ݷؽƶַ֪ͣʽִ
ExecutionType executionType = ExecutionType.getExecutionType(method.getReturnType());
MetaHolder.Builder builder = metaHolderBuilder(proxy, method, obj, args, joinPoint);
if (isCompileWeaving()) {
builder.ajcMethod(getAjcMethodFromTarget(joinPoint));
}
//ûжٲҪһhystrixCommandעʲô
return builder.defaultCommandKey(method.getName())
.hystrixCommand(hystrixCommand)
.observableExecutionMode(hystrixCommand.observableExecutionMode()) //ִģʽ
.executionType(executionType) //ִзʽ
.observable(ExecutionType.OBSERVABLE == executionType)
.build();
}
}
}
//öExecutionType
public static ExecutionType getExecutionType(Class<?> type) {
if (Future.class.isAssignableFrom(type)) {
return ExecutionType.ASYNCHRONOUS;
} else if (Observable.class.isAssignableFrom(type)) {
return ExecutionType.OBSERVABLE;
} else {
return ExecutionType.SYNCHRONOUS;
}
}
صͬͨǿԿHystrixInvokable GenericCommandͬĿ CommandExecutor.execute(invokable, executionType, metaHolder)
public class CommandExecutor {
public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
Validate.notNull(invokable);
Validate.notNull(metaHolder);
switch (executionType) {
case SYNCHRONOUS: {
//ص㿴ͬȰGenericCommand תHystrixExecutable ִexecute
return castToExecutable(invokable, executionType).execute();
}
case ASYNCHRONOUS: {
// ǿתHystrixExecutable 첽ִ
HystrixExecutable executable = castToExecutable(invokable, executionType);
// fallback첽ִУִвذװ
if (metaHolder.hasFallbackMethodCommand()
&& ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
return new FutureDecorator(executable.queue());
}
return executable.queue();
}
case OBSERVABLE: {
// ǿת HystrixObservable
HystrixObservable observable = castToObservable(invokable);
// жִģʽDzǼ/裬ѡģʽִ
return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
}
default:
throw new RuntimeException("unsupported execution type: " + executionType);
}
}
}
ҪִӴпԿֱִͣͬ첽ԼӦʽУӦʽַΪCold Observableobservable.toObservable() HotObservableobservable.observe()ĬϵexecutionType=SYNCHRONOUS ͬ
ͨGenericCommandһϷնλHystrixCommandиexecute()
public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R> {
//ִͬ
public R execute() {
try {
//ͨqueue().get()ִͬУװ첽Ľ
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}
//첽ִУʲôʱget()ɵ߾get()ʱ
public Future<R> queue() {
//ĴնλAbstractCommandtoObservable()
// toObservableתΪObservable,toBlockingתΪBlockingObservable,
// toFutureתΪFuture,ObservableĴͶ
final Future<R> delegate = toObservable().toBlocking().toFuture();
final Future<R> f = new Future<R>() {
.....
@Override
public R get() throws InterruptedException, ExecutionException {
return delegate.get();
}
@Override
public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return delegate.get(timeout, unit);
}
};
//£Ѿִˣget()Ҳ
if (f.isDone()) {
try {
f.get();
return f;
} catch (Exception e) {
...
}
}
return f;
}
}
Уصˣһ java.util.concurrent.Future Ȼ getʱίɸ delegate delegate toObservable().toBlocking().toFuture(); ô롣ڵصӦ÷ toObservable() У
3.2 toObservable ͨObservableһ۲ߣ۲ᱻtoObservable().toBlocking().toFuture() ʵдĺĺȥһЩ۶жִʵҵִfallbackĻصȻظFuture run() ִҵҪ¼£
۲ȥִapplyHystrixSemanticsĶ
public Observable<R> toObservable() {
final AbstractCommand<R> _cmd = this;
// ִĻص ֹ
//Observableǰصcall쳣ֹ
final Action0 terminateCommandCleanup = new Action0() {
...
};
// Ϊȡ洢ӳ٣˱
//ȡʱļлص call
final Action0 unsubscribeCommandCleanup = new Action0() {
@Override
public void call() {
...
}
};
// ִʱĻص
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
// ֹ̡
return Observable.never();
}
//ִObservable
return applyHystrixSemantics(_cmd);
}
};
final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() {
@Override
public R call(R r) {
...
}
};
final Action0 fireOnCompletedHook = new Action0() {
@Override
public void call() {
...
}
};
// Observable,øִ
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
// ־, CASִֻ֤һ
if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
//TODO make a new error type for this
throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
}
// ʼʱ
commandStartTimestamp = System.currentTimeMillis();
// ӡ־
if (properties.requestLogEnabled().get()) {
// log this command execution regardless of what happened
if (currentRequestLog != null) {
currentRequestLog.addExecutedCommand(_cmd);
}
}
// 濪أKEYHystrix湦ܣhystrixֽ֧һ
// һͬkeyֱӴӻȡ
final boolean requestCacheEnabled = isRequestCachingEnabled();
final String cacheKey = getCacheKey();
// 棬ͼӻȡĬ false
if (requestCacheEnabled) {
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if (fromCache != null) {
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
}
}
// ִObservable
// Observable, applyHystrixSemantics() Observable
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
Observable<R> afterCache;
// put in cache
if (requestCacheEnabled && cacheKey != null) {
// wrap it for caching
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
if (fromCache != null) {
// another thread beat us so we'll use the cached value instead
toCache.unsubscribe();
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
} else {
// we just created an ObservableCommand so we cast and return it
afterCache = toCache.toObservable();
}
} else {
afterCache = hystrixObservable;
}
// ڻص
return afterCache
//Observableǰص쳣ֹ
.doOnTerminate(terminateCommandCleanup)
//ȡʱļ
.doOnUnsubscribe(unsubscribeCommandCleanup)
//Observableֹʱļ
.doOnCompleted(fireOnCompletedHook);
}
});
}
applyHystrixSemantics
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
return Observable.never();
}
return applyHystrixSemantics(_cmd);
}
};
ﴫ_cmdһGenericCommandջִеGenericCommandеrun
circuitBreaker.allowRequest() жǷ۶״̬ģtrueʾûд۶״ִ̬У handleShortCircuitViaFallback ʵַջصԶfallbackС
ǰhystrixδ۶״̬
executeCommandAndObserve
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
executionHook.onStart(_cmd);
// Ƿ·Ƿ Ҳкü
if (circuitBreaker.allowRequest()) {
// źȡ
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
// źͷŻص
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
executionSemaphore.release();
}
}
};
// 쳣ص
final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
@Override
public void call(Throwable t) {
eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
}
};
// ȡźţضӦ Observable
// ǷźԴ룬δ com.netflix.hystrix.AbstractCommand.TryableSemaphoreNoOp#tryAcquire ĬϷͨ
if (executionSemaphore.tryAcquire()) {
try {
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
return executeCommandAndObserve(_cmd) // ִǻصԲ
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
// ȡźʧ
return handleSemaphoreRejectionViaFallback();
}
} else {
// ·Ѵֱӽ
return handleShortCircuitViaFallback();
}
}
һִʧܽ뽵ֱӽ뵽 HystrixCommand#getFallbackObservable
public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R> {
@Override
final protected Observable<R> getFallbackObservable() {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
try {
return Observable.just(getFallback());
} catch (Throwable ex) {
return Observable.error(ex);
}
}
});
}
}
getFallbackջصԶfallback
صexecuteCommandAndObserveҪ
ִʱԿ Observable.liftʵִʱܡ
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
// ActionFuncǶһActionֵFuncзֵ
// doOnNextеĻصִ֮ǰִеIJ
final Action1<R> markEmits = new Action1<R>() {
@Override
public void call(R r) {
if (shouldOutputOnNextEvents()) {
executionResult = executionResult.addEvent(HystrixEventType.EMIT);
eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);
}
if (commandIsScalar()) {
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
circuitBreaker.markSuccess();
}
}
};
// doOnCompletedеĻصִϺִеIJ
final Action0 markOnCompleted = new Action0() {
@Override
public void call() {
if (!commandIsScalar()) {
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
circuitBreaker.markSuccess();
}
}
};
// onErrorResumeNextеĻصִʧܺĻ
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) {
// ̵߳ʧܻص
return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {
// ʱص
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
// HystrixBadRequestException 쳣ص
return handleBadRequestByEmittingError(e);
} else {
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}
//
return handleFailureViaFallback(e);
}
}
};
// doOnEachеĻص`Observable`ÿһݶִص
final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {
@Override
public void call(Notification<? super R> rNotification) {
setRequestContextIfNeeded(currentRequestContext);
}
};
// Ӧ Observableʵ ̸߳롢 Ȳ
Observable<R> execution;
// ж ʱعǷ
if (properties.executionTimeoutEnabled().get()) {
// HystrixObservableTimeoutOperator תӦ Observable
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
//ûص
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
3.3 executeCommandWithSpecifiedIsolation ǸݵǰͬԴִвͬTHREADSEMAPHORE
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
// ̸߳, Ƿ THREAD Դ뽵
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
//һObservable
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
executionResult = executionResult.setExecutionOccurred();
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
}
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);
// ڰװ߳гʱأҲκμ
if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
// the command timed out in the wrapping thread so we will return immediately
// and not increment any of the counters below or other such logic
return Observable.error(new RuntimeException("timed out before executing run()"));
}
// ߳
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
//we have not been unsubscribed, so should proceed
HystrixCounters.incrementGlobalConcurrentThreads();
threadPool.markThreadExecution();
// store the command that is being run
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
executionResult = executionResult.setExecutedInThread();
try {
executionHook.onThreadStart(_cmd);
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
// Observable,ջ᷵һװǵrun()Observable
return getUserExecutionObservable(_cmd);
} catch (Throwable ex) {
return Observable.error(ex);
}
} else {
//command has already been unsubscribed, so return immediately
return Observable.error(new RuntimeException("unsubscribed before executing run()"));
}
}
}).doOnTerminate(new Action0() {
@Override
public void call() {
if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {
handleThreadEnd(_cmd);
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {
}
}
}).doOnUnsubscribe(new Action0() {
@Override
public void call() {
if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {
handleThreadEnd(_cmd);
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {
}
}
}).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {
return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
}
}));
} else {
// ź
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
executionResult = executionResult.setExecutionOccurred();
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
}
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
try {
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
// ִ
return getUserExecutionObservable(_cmd);
} catch (Throwable ex) {
//If the above hooks throw, then use that as the result of the run method
return Observable.error(ex);
}
}
});
}
}
жǷǻڶ·ʵ֣·жӦصʧܻ
· رգȻȡźţȡʧӦص
ȡɹɷ executeCommandAndObserve Ӧ Observable ʵ ̸߳롢 Ȳͬʱע˶Ӧ ڻص 3.4 getUserExecutionObservable Ȼִ HystrixCommand#getExecutionObservable
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> { private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) { Observable<R> userObservable; try { userObservable = getExecutionObservable(); } catch (Throwable ex) { userObservable = Observable.error(ex); } return userObservable .lift(new ExecutionHookApplication(_cmd)) .lift(new DeprecatedOnRunHookApplication(_cmd)); } } public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R> { @Override final protected Observable<R> getExecutionObservable() { return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { try { return Observable.just(run()); } catch (Throwable ex) { return Observable.error(ex); } } }).doOnSubscribe(new Action0() { @Override public void call() { // Save thread on which we get subscribed so that we can interrupt it later if needed executionThread.set(Thread.currentThread()); } }); } }
run() Ѿˣҵִз
@ThreadSafe
public class GenericCommand extends AbstractHystrixCommand<Object> {
@Override
protected Object run() throws Exception {
LOGGER.debug("execute command: {}", getCommandKey().name());
return process(new Action() {
@Override
Object execute() {
return getCommandAction().execute(getExecutionType());
}
});
}
}
յõԼҵ
https://lijunyi.xyz/docs/SpringCloud/SpringCloud.html#_2-2-x-%E5%88%86%E6%94%AF https://mp.weixin.qq.com/s/2jeovmj77O9Ux96v3A0NtA https://juejin.cn/post/6931922457741770760 https://github.com/D2C-Cai/herring http://c.biancheng.net/springcloud https://github.com/macrozheng/springcloud-learning