Back to Javatutorial

SpringCloudHystrix源码分析

docs/Spring全家桶/SpringCloud源码分析/SpringCloudHystrix源码分析.md

1.0.038.5 KB
Original Source

ѧϰĿ

  1. дMiniHystrix
  2. RxJava֪ʶ
  3. Hystrixĺ̷
  4. Դ֤ 1 дMini ѾҽܹHystrixĺĹܺʹˣ޷Ǿṩ۶ϡȹܣ۶Ϻ͸Ŀģǽʹùʵĵע⣺@EnableHystrix@HystrixCommand@HystrixCollapserͨע @HystrixCommand߼̳ HystrixCommand ʵֽԼһЩϲȲ

ʽԭ֮ǰҪȷһ㣬 @HystrixCommand עʵַ񽵼Hystrix ڲDzAOPķʽشģݣҲϸʵһ¼װ Hystrix һ£ҪΪ²

  • Լ@HystrixCommand ע⡣

  • ʵĴ߼

  • Եá 1.Զע

    @Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface MyHystrixCommand { //Ĭϳʱʱ int timeout() default 1000; //˷ String fallback() default ""; }

2.Զ

@Aspect  //Aspectֲ֧ұΪһ
@Component
public class MyHystrixCommandAspect {
    ExecutorService executorService= Executors.newFixedThreadPool(10);

    //е
    @Pointcut(value = "@annotation(MyHystrixCommand)")
    public void pointCut(){

    }
    //е㷽⻷ִ  @Around൱@Before@AfterReturningܵܺ
    @Around(value = "pointCut()&&@annotation(hystrixCommand)")
    public Object doPointCut(ProceedingJoinPoint joinPoint, MyHystrixCommand hystrixCommand) throws Exception {
        int timeout=hystrixCommand.timeout();
        Future future=executorService.submit(()->{
            try {
                //ִproceedĿ귽ִ
                return joinPoint.proceed();
            } catch (Throwable throwable) {
                throwable.printStackTrace();
            }
            return null;
        });
        Object rs;
        try {
            //ͨget첽ȴʵֳʱ
            rs=future.get(timeout, TimeUnit.MILLISECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            future.cancel(true);
            if(StringUtils.isBlank(hystrixCommand.fallback())){
                throw new Exception("fallback is null");
            }
            //fallback
            rs=invokeFallback(joinPoint,hystrixCommand.fallback());
        }
        return rs;
    }
    private Object invokeFallback(ProceedingJoinPoint joinPoint,String fallback) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
        //ȡķMethod
        MethodSignature signature=(MethodSignature)joinPoint.getSignature(); //ȡͱϢ
        Method method=signature.getMethod();
        Class<?>[] parameterTypes=method.getParameterTypes();
        //õص
        try {
            Method fallbackMethod=joinPoint.getTarget().getClass().getMethod(fallback,parameterTypes);
            method.setAccessible(true);
            //ͨص
            return fallbackMethod.invoke(joinPoint.getTarget(),joinPoint.getArgs());
        } catch (Exception e) {
            throw e;
        }
    }
}

3.Զ

@RestController
public class MyHystrixController {
    @Autowired
    OrderServiceClient orderServiceClient;
    @MyHystrixCommand(fallback = "fallback",timeout = 2000)
    @GetMapping("/myhystrix/get/{num}")
    public String get(@PathVariable("num") int num){
        return orderServiceClient.orderLists(num);
    }
    public String fallback(int num){
        return "Զעⷽ";
    }
}

http://localhost:8080/myhystrix/get/1ʱᴥΪڷˣnum=1ʱ3s

OKǾʵһװHystrixCommandֻʵHystrixĵһһע棬ĵײ߼ԶԶûô򵥣ڽԴ֮ǰһRxJavaʲôΪHystrixײ߼ǻӦʽʵֵġ

2 RxJava 2.1 RxJava RxJava һӦʽ̣¼첽⡣¼ʽá߼ࡣ

RxJava۲ģʽĶԱ

  • ͳ۲һ۲߶۲ߣ۲߷ıʱʱ֪ͨй۲
  • RxJavaһ۲߶۲ߣ۲һڱ۲֮䳯һ򴫵ݣֱݸ۲ ʵ˵ˣRxJavaд2ָһDZ۲ߣһǹ۲ߣ۲߶ͬһ۲ߵʱôű۲ij¼ʱͻȥص۲ߡ

2.2 ۲

Observer

Observer observer = new Observer() {
    @Override
    public void onCompleted() {
        System.out.println("۲Complete¼ø÷");
    }
    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error¼Ӧ");
    }
    @Override
    public void onNext(Object o) {
        System.out.println("Next¼Ӧ:" + o);
    }
};











//Subscriber = RxJava õһʵ Observer ij࣬ Observer ӿڽչ
Subscriber subscriber = new Subscriber() {
    @Override
    public void onCompleted() {
        System.out.println("۲Complete¼ø÷");
    }
    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error¼Ӧ");
    }
    @Override
    public void onNext(Object o) {
        System.out.println("Next¼Ӧ:" + o);
    }
};

Subscriber Observer ӿڵ

߻ʹ÷ʽһ£RxJavasubscribeУObserverȱתSubscriberʹã Subscriber Observer ӿڽչ

  • onStart()ڻδӦ¼ǰãһЩʼsubscribe ڵ̵߳ãл̣߳ԲܽнUI±絯Щ

  • unsubscribe()ȡġڸ÷ú󣬹۲߽ٽӦ¼onStopпԵô˷ġø÷ǰʹ isUnsubscribed() ж״̬ȷ۲ObservableǷ񻹳й۲Subscriberá 2.3 ۲ RxJava ṩ˶ַ ۲߶Observable

    // 1just(T...)ֱӽIJηͳ Observable observable = Observable.just("A", "B", "C"); // εã // onNext("A"); // onNext("B"); // onNext("C"); // onCompleted(); // 2fromArray(T[]) / from(Iterable<? extends T>) : / Iterable ֳɾηͳ String[] words = {"A", "B", "C"}; Observable observable = Observable.fromArray(words); // εã // onNext("A"); // onNext("B"); // onNext("C"); // onCompleted();

2.4

observable.subscribe(observer); //Ĺϵ

2.5

public class RxJavaDemo {
    // ReactiveX Java  Ӧʽ̿(android
    // Java stream() java8
    //۲ģʽ
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        final String[] datas = new String[]{"¼1"};
        // ִĻص ֹ
        //Observableǰصcall쳣ֹ
        final Action0 onComplated = new Action0() {
            @Override
            public void call() {
                System.out.println("۲Ҫ");
            }
        };
        //۲
        Observable<String> observable = Observable.defer(new Func0<Observable<String>>() {
            @Override
            public Observable<String> call() {
                Observable observable1 = Observable.from(datas);
                return observable1.doOnCompleted(onComplated);
            }
        });
//        Observable<String> observable = Observable.just("¼1","¼2","");
        //۲
        Observer observer = new Observer() {
            @Override
            public void onCompleted() {
                System.out.println("Comlate¼Ӧ");
            }
            @Override
            public void onError(Throwable throwable) {
                System.out.println("Error¼Ӧ");
            }
            @Override
            public void onNext(Object o) {
                System.out.println("Next¼Ӧ:" + o);
            }
        };
        observable.subscribe(observer); //Ĺϵ

//        String s = observable.toBlocking().toFuture().get();//첽ȴ
//        System.out.println(s);
    }
}

OKָʹRxJavaˣǿʼߣԴ롣

3 Դ ϹṩԴͼͼϿԿʵȥɨHystrixCommandעķȻأִ߼涨executequeueѡһеãȻ߼HystrixCommandע⣬Hystrix@EnableHystrixע⡣

@SpringBootApplication
@EnableFeignClients("com.example.clients")
//@EnableDiscoveryClient //עʾUserע
@EnableHystrix //עⷽʽHystrix
public class HystrixEclipseUserApplication {

    public static void main(String[] args) {
        SpringApplication.run(HystrixEclipseUserApplication.class, args);
    }

}

뵽@EnableHystrixע

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@EnableCircuitBreaker
public @interface EnableHystrix {
}
//@EnableHystrix̳@EnableCircuitBreaker
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableCircuitBreakerImportSelector.class)
public @interface EnableCircuitBreaker {
}

ⲽ룬źܶѧspringbootͬѧϤˣõImportע⣬ǿ϶һЩˣȻٽ EnableCircuitBreakerImportSelector;

@Order(Ordered.LOWEST_PRECEDENCE - 100)
public class EnableCircuitBreakerImportSelector
		extends SpringFactoryImportSelector<EnableCircuitBreaker> {
	@Override
	protected boolean isEnabled() {
		return getEnvironment().getProperty("spring.cloud.circuit.breaker.enabled",
				Boolean.class, Boolean.TRUE);
	}
}

EnableCircuitBreakerImportSelector̳SpringFactoryImportSelectorSpringFactoryImportSelectorϤĴ룬ʵDeferredImportSelectorӿڣʵselectImportsselectImportsļspring.factoriesضӦ org.springframework.cloud.client.circuitbreaker.EnableCircuitBreakerspring.facotriesļ

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.hystrix.HystrixAutoConfiguration,\
org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerAutoConfiguration,\
org.springframework.cloud.netflix.hystrix.ReactiveHystrixCircuitBreakerAutoConfiguration,\
org.springframework.cloud.netflix.hystrix.security.HystrixSecurityAutoConfiguration
org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker=\
org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerConfiguration

ӦEnableAutoConfigurationЩʵspringʱͨԶװƻȥʵע뵽IoCУǺĹע HystrixCircuitBreakerConfigurationࡣ

@Configuration(proxyBeanMethods = false)
public class HystrixCircuitBreakerConfiguration {
    //Ǻĵbean
	@Bean
	public HystrixCommandAspect hystrixCommandAspect() {
		return new HystrixCommandAspect();
	}
	...
}

뵽л֣ᷢҪעΪ@HystrixCommand@HystrixCollapserִעεķʱᱻִ methodsAnnotatedWithHystrixCommand

3.1 HystrixCommandAspect

@Aspect
public class HystrixCommandAspect {
    private static final Map<HystrixPointcutType, MetaHolderFactory> META_HOLDER_FACTORY_MAP;
    static {
        //̬ͨעʵ
        META_HOLDER_FACTORY_MAP = ImmutableMap.<HystrixPointcutType, MetaHolderFactory>builder()
            .put(HystrixPointcutType.COMMAND, new CommandMetaHolderFactory())
            .put(HystrixPointcutType.COLLAPSER, new CollapserMetaHolderFactory())
            .build();
    }
    //עHystrixCommand
    @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
    public void hystrixCommandAnnotationPointcut() {
    }
    //עHystrixCollapserϲ
    @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
    public void hystrixCollapserAnnotationPointcut() {
    }
    //֪ͨ
    @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
    public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
        //ȡĿ귽
        Method method = getMethodFromTarget(joinPoint);
        Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
        //ֻעעķ
        if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
            throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
                                            "annotations at the same time");
        }
        //ݲͬע⣬ѡӦmetaHolderFactory, MetaHolder, MetaHolder Ϣ
        MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
        //ȡĿ귽ĵԪݣǩ
        MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
        /**
         * CommandCollapser  GenericCommand ͬ GenericObservableCommand첽
         * GenericCommandкܶsuperͨHystrixCommandBuilderFactory.getInstance().create(metaHolder) һHystrixCommandBuilderΪGenericCommadIJ
         * new  GenericCommand ͨsuperAbstractHystrixCommand
         * AbstractHystrixCommand ͨsuperHystrixCommand
         * HystrixCommandյAbstractCommand  һ·
         * һAbstractCommandз
         */
        HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
        //ݷֵƶִ
        ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
            metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
        //ݲִͬͣؽ
        Object result;
        try {
            //ǷӦʽģЩͬĻ߼
            if (!metaHolder.isObservable()) {
                //executeִ
                result = CommandExecutor.execute(invokable, executionType, metaHolder);
            } else {
                result = executeObservable(invokable, executionType, metaHolder);
            }
        } catch (HystrixBadRequestException e) {
            throw e.getCause();
        } catch (HystrixRuntimeException e) {
            throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
        }
        return result;
    }
    //HystrixCommandʱMetaHolderĴ
    private static class CommandMetaHolderFactory extends MetaHolderFactory {
        @Override
        public MetaHolder create(Object proxy, Method method, Object obj, Object[] args, final ProceedingJoinPoint joinPoint) {
            //ȡעHystrixCommand
            HystrixCommand hystrixCommand = method.getAnnotation(HystrixCommand.class);
            //ݷؽƶַ֪ͣʽִ
            ExecutionType executionType = ExecutionType.getExecutionType(method.getReturnType());
            MetaHolder.Builder builder = metaHolderBuilder(proxy, method, obj, args, joinPoint);
            if (isCompileWeaving()) {
                builder.ajcMethod(getAjcMethodFromTarget(joinPoint));
            }
            //ûжٲҪһhystrixCommandעʲô
            return builder.defaultCommandKey(method.getName())
                .hystrixCommand(hystrixCommand)
                .observableExecutionMode(hystrixCommand.observableExecutionMode())  //ִģʽ
                .executionType(executionType) //ִзʽ
                .observable(ExecutionType.OBSERVABLE == executionType)
                .build();
        }
    }
}
//öExecutionType
public static ExecutionType getExecutionType(Class<?> type) {
    if (Future.class.isAssignableFrom(type)) {
        return ExecutionType.ASYNCHRONOUS;
    } else if (Observable.class.isAssignableFrom(type)) {
        return ExecutionType.OBSERVABLE;
    } else {
        return ExecutionType.SYNCHRONOUS;
    }
}

صͬͨǿԿHystrixInvokable GenericCommandͬĿ CommandExecutor.execute(invokable, executionType, metaHolder)

public class CommandExecutor {
    public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
        Validate.notNull(invokable);
        Validate.notNull(metaHolder);

        switch (executionType) {
            case SYNCHRONOUS: {
                //ص㿴ͬȰGenericCommand תHystrixExecutable ִexecute
                return castToExecutable(invokable, executionType).execute();
            }
            case ASYNCHRONOUS: {
                // ǿתHystrixExecutable  첽ִ
                HystrixExecutable executable = castToExecutable(invokable, executionType);
                //  fallback첽ִУִвذװ
                if (metaHolder.hasFallbackMethodCommand()
                        && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
                    return new FutureDecorator(executable.queue());
                }
                return executable.queue();
            }
            case OBSERVABLE: {
                // ǿת HystrixObservable
                HystrixObservable observable = castToObservable(invokable);
                // жִģʽDzǼ/裬ѡģʽִ
                return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
            }
            default:
                throw new RuntimeException("unsupported execution type: " + executionType);
        }
    }
}

ҪִӴпԿֱִͣͬ첽ԼӦʽУӦʽַΪCold Observableobservable.toObservable() HotObservableobservable.observe()ĬϵexecutionType=SYNCHRONOUS ͬ

  • execute()ִͬУһһĶʱ׳쳣
  • queue()첽ִУһ Future 󣬰ִн󷵻صĵһ
  • observe()һ Observable ĶѾѵˡ
  • toObservable()һ Observable ĶҪԼֶIJѵ ͼϵ£

ͨGenericCommandһϷնλHystrixCommandиexecute()

public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R> {
    //ִͬ
    public R execute() {
        try {
            //ͨqueue().get()ִͬУװ첽Ľ
            return queue().get();
        } catch (Exception e) {
            throw Exceptions.sneakyThrow(decomposeException(e));
        }
    }
   //첽ִУʲôʱget()ɵ߾get()ʱ
   public Future<R> queue() {
        //ĴնλAbstractCommandtoObservable()
        // toObservableתΪObservable,toBlockingתΪBlockingObservable, 
        // toFutureתΪFuture,ObservableĴͶ
        final Future<R> delegate = toObservable().toBlocking().toFuture();   	
        final Future<R> f = new Future<R>() {
            .....
            @Override
            public R get() throws InterruptedException, ExecutionException {
                return delegate.get();
            }
            @Override
            public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
                return delegate.get(timeout, unit);
            }       	
        };
        //⴦£Ѿִˣget()Ҳ
        if (f.isDone()) {
            try {
                f.get();
                return f;
            } catch (Exception e) {
                ...
            }
        }
        return f;
    }
}

Уصˣһ java.util.concurrent.Future Ȼ getʱίɸ delegate delegate toObservable().toBlocking().toFuture(); ô롣ڵصӦ÷ toObservable() У

3.2 toObservable ͨObservableһ۲ߣ۲߻ᱻtoObservable().toBlocking().toFuture() ʵдĺĺȥһЩ۶߼жִʵҵ߼ִfallbackĻصȻ󽫽ظFuture run() ִҵ߼Ҫ¼£

  • һѵĶҲ֪ЩǸɶģҪ
  • жǷ˻棬ˣҲˣȥObservableʽһ
  • һ۲ߣ۲ߺȥصʵҵ߼fallback

߼۲߻ȥִapplyHystrixSemanticsĶ

public Observable<R> toObservable() {
    final AbstractCommand<R> _cmd = this;
    // ִĻص ֹ
    //Observableǰصcall쳣ֹ
    final Action0 terminateCommandCleanup = new Action0() {
		...
    };
    // Ϊȡ洢ӳ٣˱׼
    //ȡʱļлص call
    final Action0 unsubscribeCommandCleanup = new Action0() {
        @Override
        public void call() {
			...
        }
    };
    // ִʱĻص
    final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
            if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
                // ֹ̡
                return Observable.never();
            }
            //ִObservable
            return applyHystrixSemantics(_cmd);
        }
    };
    final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() {
        @Override
        public R call(R r) {
			...
        }
    };
    final Action0 fireOnCompletedHook = new Action0() {
        @Override
        public void call() {
			...
        }
    };
    // Observable,øִ
    return Observable.defer(new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
            // ־, CASִֻ֤һ
            if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
                IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
                //TODO make a new error type for this
                throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
            }
            // ʼʱ
            commandStartTimestamp = System.currentTimeMillis();
            // ӡ־
            if (properties.requestLogEnabled().get()) {
                // log this command execution regardless of what happened
                if (currentRequestLog != null) {
                    currentRequestLog.addExecutedCommand(_cmd);
                }
            }
            // 濪أKEYHystrix󻺴湦ܣhystrixֽ֧һ
            // һͬkeyֱӴӻȡ
            final boolean requestCacheEnabled = isRequestCachingEnabled();
            final String cacheKey = getCacheKey();
            // 棬ͼӻȡĬ false
            if (requestCacheEnabled) {
                HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                if (fromCache != null) {
                    isResponseFromCache = true;
                    return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                }
            }
            // ִObservable
            // Observable, applyHystrixSemantics() Observable
            Observable<R> hystrixObservable =
                Observable.defer(applyHystrixSemantics)
                .map(wrapWithAllOnNextHooks);
            Observable<R> afterCache;
            // put in cache 
            if (requestCacheEnabled && cacheKey != null) {
                // wrap it for caching
                HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
                HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
                if (fromCache != null) {
                    // another thread beat us so we'll use the cached value instead
                    toCache.unsubscribe();
                    isResponseFromCache = true;
                    return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                } else {
                    // we just created an ObservableCommand so we cast and return it
                    afterCache = toCache.toObservable();
                }
            } else {
                afterCache = hystrixObservable;
            }
            // ڻص
            return afterCache
                //Observableǰص쳣ֹ
                .doOnTerminate(terminateCommandCleanup)     
                //ȡʱļ
                .doOnUnsubscribe(unsubscribeCommandCleanup) 
                //Observableֹʱļ
                .doOnCompleted(fireOnCompletedHook);
        }
    });
}

߼applyHystrixSemantics

Observable<R> hystrixObservable =
    Observable.defer(applyHystrixSemantics)
    .map(wrapWithAllOnNextHooks);











final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
    @Override
    public Observable<R> call() {
        if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
            return Observable.never();
        }
        return applyHystrixSemantics(_cmd);
    }
};

ﴫ_cmdһGenericCommandջִеGenericCommandеrun

circuitBreaker.allowRequest() жǷ۶״̬ģtrueʾûд۶״ִ̬У򣬵 handleShortCircuitViaFallback ʵַ񽵼ջصԶfallbackС

ǰhystrixδ۶״̬

  • getExecutionSemaphore жϵǰǷΪź̳߳أȻĬ̳߳أȻٵtryAcquireʱдΪtrue

executeCommandAndObserve

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {

    executionHook.onStart(_cmd);

    // Ƿ󣬼·Ƿ Ҳкü
    if (circuitBreaker.allowRequest()) {
        // źȡ
        final TryableSemaphore executionSemaphore = getExecutionSemaphore();
        final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);

        // źͷŻص
        final Action0 singleSemaphoreRelease = new Action0() {
            @Override
            public void call() {
                if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
                    executionSemaphore.release();
                }
            }
        };

        // 쳣ص
        final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
            @Override
            public void call(Throwable t) {
                eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
            }
        };

        // ȡźţضӦ Observable
        // ǷźԴ룬δ com.netflix.hystrix.AbstractCommand.TryableSemaphoreNoOp#tryAcquire ĬϷͨ
        if (executionSemaphore.tryAcquire()) {
            try {
                executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                return executeCommandAndObserve(_cmd)   // ִǻصԲ
                    .doOnError(markExceptionThrown)
                    .doOnTerminate(singleSemaphoreRelease)
                    .doOnUnsubscribe(singleSemaphoreRelease);
            } catch (RuntimeException e) {
                return Observable.error(e);
            }
        } else {
            // ȡźʧ򽵼
            return handleSemaphoreRejectionViaFallback();
        }
    } else {
        // ·Ѵ򿪣ֱӽ
        return handleShortCircuitViaFallback();
    }
}

һִʧܽ뽵߼ֱӽ뵽 HystrixCommand#getFallbackObservable

public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R> {
    @Override
    final protected Observable<R> getFallbackObservable() {
        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                try {
                    return Observable.just(getFallback());
                } catch (Throwable ex) {
                    return Observable.error(ex);
                }
            }
        });
    }
}

getFallbackջصԶfallback

صexecuteCommandAndObserveҪ

  • 岻ͬĻصdoOnNextdoOnCompletedonErrorResumeNextdoOnEach
  • executeCommandWithSpecifiedIsolation

ִʱԿ Observable.liftʵִʱܡ

private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
    final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
    // ActionFuncǶһAction޷ֵFuncзֵ
    // doOnNextеĻصִ֮ǰִеIJ
    final Action1<R> markEmits = new Action1<R>() {
        @Override
        public void call(R r) {
            if (shouldOutputOnNextEvents()) {
                executionResult = executionResult.addEvent(HystrixEventType.EMIT);
                eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);
            }
            if (commandIsScalar()) {
                long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
                eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
                executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
                circuitBreaker.markSuccess();
            }
        }
    };
    // doOnCompletedеĻصִϺִеIJ
    final Action0 markOnCompleted = new Action0() {
        @Override
        public void call() {
            if (!commandIsScalar()) {
                long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
                eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
                executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
                circuitBreaker.markSuccess();
            }
        }
    };
    // onErrorResumeNextеĻصִʧܺĻ߼
    final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
        @Override
        public Observable<R> call(Throwable t) {
            Exception e = getExceptionFromThrowable(t);
            executionResult = executionResult.setExecutionException(e);
            if (e instanceof RejectedExecutionException) {
                // ̵߳ʧܻص
                return handleThreadPoolRejectionViaFallback(e);
            } else if (t instanceof HystrixTimeoutException) {
                // ʱص
                return handleTimeoutViaFallback();
            } else if (t instanceof HystrixBadRequestException) {
                // HystrixBadRequestException 쳣ص
                return handleBadRequestByEmittingError(e);
            } else {
                if (e instanceof HystrixBadRequestException) {
                    eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
                    return Observable.error(e);
                }
                // 
                return handleFailureViaFallback(e);
            }
        }
    };
    // doOnEachеĻص`Observable`ÿһݶִص
    final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {
        @Override
        public void call(Notification<? super R> rNotification) {
            setRequestContextIfNeeded(currentRequestContext);
        }
    };
    // Ӧ Observableʵ ̸߳롢 Ȳ
    Observable<R> execution;
    // ж ʱعǷ
    if (properties.executionTimeoutEnabled().get()) {
        // HystrixObservableTimeoutOperator  תӦ Observable
        execution = executeCommandWithSpecifiedIsolation(_cmd)
            .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
    } else {
        execution = executeCommandWithSpecifiedIsolation(_cmd);
    }
    //ûص
    return execution.doOnNext(markEmits)
        .doOnCompleted(markOnCompleted)
        .onErrorResumeNext(handleFallback)
        .doOnEach(setRequestContext);
}

3.3 executeCommandWithSpecifiedIsolation ǸݵǰͬԴִвͬ߼THREADSEMAPHORE

private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
    // ̸߳, Ƿ THREAD Դ뽵
    if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
        //һObservable
        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                executionResult = executionResult.setExecutionOccurred();
                if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
                    return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
                }

                metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);

                // ڰװ߳гʱأҲκμ߼
                if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
                    // the command timed out in the wrapping thread so we will return immediately
                    // and not increment any of the counters below or other such logic
                    return Observable.error(new RuntimeException("timed out before executing run()"));
                }

                // ߳
                if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
                    //we have not been unsubscribed, so should proceed
                    HystrixCounters.incrementGlobalConcurrentThreads();
                    threadPool.markThreadExecution();
                    // store the command that is being run
                    endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
                    executionResult = executionResult.setExecutedInThread();

                    try {
                        executionHook.onThreadStart(_cmd);
                        executionHook.onRunStart(_cmd);
                        executionHook.onExecutionStart(_cmd);
                        // Observable,ջ᷵һװǵrun()߼Observable
                        return getUserExecutionObservable(_cmd);
                    } catch (Throwable ex) {
                        return Observable.error(ex);
                    }
                } else {
                    //command has already been unsubscribed, so return immediately
                    return Observable.error(new RuntimeException("unsubscribed before executing run()"));
                }
            }
        }).doOnTerminate(new Action0() {
            @Override
            public void call() {
                if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {
                    handleThreadEnd(_cmd);
                }
                if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {
                }
            }
        }).doOnUnsubscribe(new Action0() {
            @Override
            public void call() {
                if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {
                    handleThreadEnd(_cmd);
                }
                if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {
                }
            }
        }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
            @Override
            public Boolean call() {
                return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
            }
        }));
    } else {
        // ź
        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                executionResult = executionResult.setExecutionOccurred();
                if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
                    return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
                }
                metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
                endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
                try {
                    executionHook.onRunStart(_cmd);
                    executionHook.onExecutionStart(_cmd);
                    // ִ
                    return getUserExecutionObservable(_cmd); 
                } catch (Throwable ex) {
                    //If the above hooks throw, then use that as the result of the run method
                    return Observable.error(ex);
                }
            }
        });
    }
}
  • жǷǻڶ·ʵ֣·򿪣жӦصʧܻ򽵼

  • · رգȻȡźţȡʧӦص

  • ȡɹɷ executeCommandAndObserve Ӧ Observable ʵ ̸߳롢 Ȳͬʱע˶Ӧ ڻص 3.4 getUserExecutionObservable Ȼִ HystrixCommand#getExecutionObservable

    abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> { private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) { Observable<R> userObservable; try { userObservable = getExecutionObservable(); } catch (Throwable ex) { userObservable = Observable.error(ex); } return userObservable .lift(new ExecutionHookApplication(_cmd)) .lift(new DeprecatedOnRunHookApplication(_cmd)); } } public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R> { @Override final protected Observable<R> getExecutionObservable() { return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { try { return Observable.just(run()); } catch (Throwable ex) { return Observable.error(ex); } } }).doOnSubscribe(new Action0() { @Override public void call() { // Save thread on which we get subscribed so that we can interrupt it later if needed executionThread.set(Thread.currentThread()); } }); } }

run() Ѿˣҵִз

@ThreadSafe
public class GenericCommand extends AbstractHystrixCommand<Object> {
    @Override
    protected Object run() throws Exception {
        LOGGER.debug("execute command: {}", getCommandKey().name());
        return process(new Action() {
            @Override
            Object execute() {
                return getCommandAction().execute(getExecutionType());
            }
        });
    }
}

յõԼҵ߼

ο

https://lijunyi.xyz/docs/SpringCloud/SpringCloud.html#_2-2-x-%E5%88%86%E6%94%AF https://mp.weixin.qq.com/s/2jeovmj77O9Ux96v3A0NtA https://juejin.cn/post/6931922457741770760 https://github.com/D2C-Cai/herring http://c.biancheng.net/springcloud https://github.com/macrozheng/springcloud-learning