docs/Spring全家桶/SpringCloudAlibaba源码分析/SpringCloudSentinel源码分析.md
| һߣJava Դ |ͷ
ѧϰĿ
ͼΪ˼չʾͼ Slot ˳Ѻ° Sentinel Slot Chain ˳һ һⲿ֮ᴴһEntryEntryͬʱҲᴴһϵеslot һÿslotвͬĹְ
Spring Cloud Sentinelԭ
Spring Cloud мSentinelǻʵ֣ʵ·¡
SentinelWebAutoConfiguration>addInterceptors>SentinelWebInterceptor->AbstractSentinelInterceptor
public boolean preHandle(HttpServletRequest request, HttpServletResponseresponse, Object handler) throws Exception { try { String resourceName = this.getResourceName(request); if (StringUtil.isEmpty(resourceName)) { return true; } else if (this.increaseReferece(request,this.baseWebMvcConfig.getRequestRefName(), 1) != 1) { return true; } else { String origin = this.parseOrigin(request); String contextName = this.getContextName(request); ContextUtil.enter(contextName, origin); Entry entry = SphU.entry(resourceName, 1, EntryType.IN); request.setAttribute(this.baseWebMvcConfig.getRequestAttributeName(), entry); return true; } } catch (BlockException var12) { BlockException e = var12; try { this.handleBlockException(request, response, e); } finally { ContextUtil.exit(); } return false; }}
Դõͣ EntryType.IN dz EntryType.OUT עϵͳֻ IN Ч
2 SphU.entry ǼdubboҲãǼɵspring cloudҲãնǵSphU.entryжϵģǴSphU.entryȥ˽ʵԭ
ǿΨһɻģҲؼһ SphU.entry(resource) ǴȥһԴԴǷǽӿڣôʲôأһҿɴ
public static Entry entry(String name) throws BlockException { return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);}public class Env { public static final Sph sph = new CtSph(); ......//ʡԲִ}
SphU.entry() ִл뵽 Sph.entry() SphĬʵ CtSph,ջCtSph entry
@Overridepublic Entry entry(String name, EntryType type, int count, Object... args) throws BlockException { //װһԴ StringResourceWrapper resource = new StringResourceWrapper(name, type); return entry(resource, count, args);}
ҪͨǸԴȥװһ StringResourceWrapper ȻԼط̶ entryWithPriority(resourceWrapper, count, false, args)
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count,boolean prioritized, Object... args) throws BlockException { //ȡĻ洢ThreadLocalУcontextл洢 Context context = ContextUtil.getContext(); // NullContextô˵ context name 2000 μ ContextUtil#trueEnter //ʱSentinel ٽܴµ context ãҲDzЩµĽӿڵͳơ۶ϵ if (context instanceof NullContext) { // The {@link NullContext} indicates that the amount of context has exceeded the threshold, // so here init the entry only. No rule checking will be done. return new CtEntry(resourceWrapper, null, context); } if (context == null) {//ʹĬcontext // ContextIJ context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME); } // Global switch is close, no rule checking will do. if (!Constants.ON) {//ȫǷѾرˣͲ return new CtEntry(resourceWrapper, null, context); } //ģʽеģʽ //һslot ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper); // lookProcessChain ֪ resource Constants.MAX_SLOT_CHAIN_SIZE // Ҳ 6000 ʱSentinel ʼµôҪΪ Sentinel ܿ if (chain == null) { return new CtEntry(resourceWrapper, null, context); } //ʼɸentry Entry e = new CtEntry(resourceWrapper, chain, context); try { //ʼ chain.entry(context, resourceWrapper, null, count, prioritized, args); } catch (BlockException e1) { e.exit(count, args); //׳쳣 throw e1; } catch (Throwable e1) { // This should not happen, unless there are errors existing in Sentinel internal. RecordLog.info("Sentinel unexpected exception", e1); } return e;//Ľ}
Ĵǿ֪÷Ҫǻȡ˱ԴӦԴ lookProcessChain з֣ȥȡһȥִԴϴȻﴦĻ£ô϶ǶڵǰصĴԷΪ¼֣
protected static Context trueEnter(String name, String origin) { //ThreadLocalлȡһο϶null Context context = contextHolder.get(); if (context == null) { //ǸContextֻȡNode Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap; DefaultNode node = localCacheNameMap.get(name); if (node == null) { if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) { setNullContext(); return NULL_CONTEXT; } else { LOCK.lock(); try { node = contextNameNodeMap.get(name); if (node == null) { if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) { setNullContext(); return NULL_CONTEXT; } else { //EntranceNode node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null); //ȫֵĽڵ // Add entrance node. Constants.ROOT.addChild(node);//map Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1); newMap.putAll(contextNameNodeMap); newMap.put(name, node); contextNameNodeMap = newMap; } } } finally { LOCK.unlock(); } } } context = new Context(node, name); context.setOrigin(origin); //ThreadLocal contextHolder.set(context); } return context;}
DZȽϼ
ĿǰContext״̬ͼ2.2 slot һslot·Ϊ
DefaultProcessorSlotChain -> NodeSelectorSlot -> ClusterBuilderSlot -> LogSlot ->StatisticSlot -> AuthoritySlot -> SystemSlot -> ParamFlowSlot -> FlowSlot -> DegradeSlot
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) { //ԿchainԴΪkeyͬԴ϶Dzͬchain ProcessorSlotChain chain = chainMap.get(resourceWrapper); if (chain == null) {////spring(bean) dubbo(˫ؼ)һޣû synchronized (LOCK) { chain = chainMap.get(resourceWrapper); if (chain == null) { //chainMapСһֵҲentryСˣһchainӦһentry if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) { return null; } //һslot chain chain = SlotChainProvider.newSlotChain(); //ǣ½һMapСoldMap+1 Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>( chainMap.size() + 1); //ȻoldMapٷ½chain newMap.putAll(chainMap); newMap.put(resourceWrapper, chain); //ӵnewMap ӦǿDZƵ chainMap = newMap; } } } return chain;}
ĴԷ֣ȴӻлȡôһν϶ûеģ SlotChainProvider ȥ촦ɺ뻺Ա´ʹã
public static ProcessorSlotChain newSlotChain() { if (slotChainBuilder != null) { return slotChainBuilder.build(); } // ͨspiȥԼslotĻֻҪSPIʵSlotChainBuilderӿھͺ //SentinelĬϵsentinel-coreµMETA-INF.services slotChainBuilder = SpiLoader.loadFirstInstanceOrDefault(SlotChainBuilder.class, DefaultSlotChainBuilder.class); if (slotChainBuilder == null) { // Should not go through here. RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default"); slotChainBuilder = new DefaultSlotChainBuilder(); } else { RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: " + slotChainBuilder.getClass().getCanonicalName()); } return slotChainBuilder.build();}
˶εУ飬ȷbuilder ΪգȻͨȥ
public class DefaultSlotChainBuilder implements SlotChainBuilder { @Override public ProcessorSlotChain build() { ProcessorSlotChain chain = new DefaultProcessorSlotChain(); chain.addLast(new NodeSelectorSlot()); chain.addLast(new ClusterBuilderSlot()); chain.addLast(new LogSlot()); chain.addLast(new StatisticSlot()); chain.addLast(new SystemSlot()); chain.addLast(new AuthoritySlot()); chain.addLast(new FlowSlot()); chain.addLast(new DegradeSlot()); return chain; }}
ڷҲж˵ϾSentinel㷨ʵָأǿһ¹Ľܣ
Sentinel 棬еԴӦһԴƣresourceNameÿԴöᴴһ Entry Entry ͨܵԶҲͨעķʽ SphU API ʽEntry ʱͬʱҲᴴһϵйۣܲslot chainЩвְͬ𡣾ְѾᵽˡ
ִ¡
NodeSelectorSlotҪڹ
ClusterBuilderSlotڼȺ۶ϡ
LogSlotڼ¼־
StatisticSlotʵʱռʵʱϢ
AuthoritySlotȨУġ
SystemSlot֤ϵͳĹ
FlowSlotʵơ
DegradeSlotʵ۶ϻơ 2.3 Entry
Entry e = new CtEntry(resourceWrapper, chain, context);
CtEntry(ResourceWrapper resourceWrapper, ProcessorSlot<Object> chain, Context context) { super(resourceWrapper); this.chain = chain; this.context = context; setUpEntryFor(context);}private void setUpEntryFor(Context context) { // The entry should not be associated to NullContext. if (context instanceof NullContext) { return; } this.parent = context.getCurEntry(); if (parent != null) { ((CtEntry) parent).child = this; } context.setCurEntry(this);}
һEntryɵʱcontext.getCurEntryضNULLôֱִContext.setCurEntry
ȻContext״̬ͼ ִһµSphu.entryٴ½һEntryʱcurEntrynullôִ((CtEntry)parent).child = this;
ͼԿԭCtEntryƳContext½CtEntry;CtEntryͨڲparentchild
2.4 NodeSelectorSlot ҪڹҪһ£ںлȽϹؼ¡
@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable { //и棬contextֻnode DefaultNode node = map.get(context.getName()); //˫ؼ⣬̰߳ȫ if (node == null) { synchronized (this) { node = map.get(context.getName()); if (node == null) { //ɵDefaultNodeڵ node = new DefaultNode(resourceWrapper, null); //ЩǷmapΪmapȽϴ룬ܻһЩ HashMap<String, DefaultNode> cacheMap = new HashMap<String,DefaultNode>(map.size()); cacheMap.putAll(map); cacheMap.put(context.getName(), node); map = cacheMap; // ؼ⣬ĵĵط ((DefaultNode) context.getLastNode()).addChild(node); } } } //滻contextеcurEntryеcurNode context.setCurNode(node); fireEntry(context, resourceWrapper, node, count, prioritized, args);}
ѯǷnodeҲܼ
1contextʾģһ̶߳ӦһcontextаһЩ
StatisticSlot Sentinel ĺĹ֮ܲһͳʵʱĵݡ
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { try { // Ƚɺ&processorSlotȻݴͳ // Sentinelľʹ for ѭ ProcessorSlot ԭ fireEntry(context, resourceWrapper, node, count, prioritized, args); //ִеʾͨ飬 // Request passed, add thread count and pass count. node.increaseThreadNum(); //ǰڵ߳1 node.addPassRequest(count); //Բͬ͵node¼߳ͨͳơ if (context.getCurEntry().getOriginNode() != null) { // Add count for origin node. context.getCurEntry().getOriginNode().increaseThreadNum(); context.getCurEntry().getOriginNode().addPassRequest(count); } if (resourceWrapper.getEntryType() == EntryType.IN) { // Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseThreadNum(); Constants.ENTRY_NODE.addPassRequest(count); } //ɵ StatisticSlotCallbackRegistry#addEntryCallback ̬עProcessorSlotEntryCallback for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onPass(context, resourceWrapper, node, count, args); } //ȼȴ쳣FlowRuleл漰 } catch (PriorityWaitException ex) {//߳ͳ node.increaseThreadNum(); if (context.getCurEntry().getOriginNode() != null) { // Add count for origin node. context.getCurEntry().getOriginNode().increaseThreadNum(); } if (resourceWrapper.getEntryType() == EntryType.IN) { // Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseThreadNum(); } // Handle pass event with registered entry callback handlers. for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onPass(context, resourceWrapper, node, count, args); } } catch (BlockException e) { // Blocked, set block exception to current entry. context.getCurEntry().setBlockError(e); //쳣ǰentry // Add block count. node.increaseBlockQps(count); //ӱ //ݲͬNodeĴ if (context.getCurEntry().getOriginNode() != null) { context.getCurEntry().getOriginNode().increaseBlockQps(count); } if (resourceWrapper.getEntryType() == EntryType.IN) { // Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseBlockQps(count); } // Handle block event with registered entry callback handlers. for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onBlocked(e, context, resourceWrapper, node, count, args); } throw e; } catch (Throwable e) { // Unexpected internal error, set error to current entry. context.getCurEntry().setError(e); throw e; }}
ֳ֣һentry÷ȻᴥslotentrySystemSlotFlowSlotDegradeSlotȵĹͨͻ׳BlockExceptionnodeͳƱblock֮nodeͳͨ߳ϢڶexitУ˳Entryʱͳrtʱ䣬߳
ǿԿ node.addPassRequest() δfireEntryִִ֮еģζţǰͨsentinelصȹʱҪ¼Ҳִ node.addPassRequest()д룬Ǹȥ
2.5.1 addPassRequest @Overridepublic void addPassRequest(int count) { // øࣨStatisticNodeͳ super.addPassRequest(count); // clusterNode ͳƣҲǵøStatisticNode this.clusterNode.addPassRequest(count);} ֪nodeһ DefaultNode ʵڵһNodeSelectorSlot entryжԴ˷װװһDefaultNode
ڲʵʵõArrayMetricͳ
//ͳƣֳڣÿ500msͳQPSprivate transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL);//շͳƣ60ڣÿ1000msprivate transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);public void addPassRequest(int count) { rollingCounterInSecond.addPass(count); rollingCounterInMinute.addPass(count);}
õǻڵķʽ¼ĴĹϵͼʵDZȽģArrayMetricʵһװ࣬ڲͨLeapArrayʵ־ͳLeapArrayά˶WindowWrapڣWindowWrapвMetricBucketָݵͳơ
ӴǿԿָ addPass ͨһ ArrayMetric ࣬ڽ ArrayMetric пһ¡Ĵʾ
private final LeapArray<MetricBucket> data;// SAMPLE_COUNT=2 INTERVAL=1000public ArrayMetric(int sampleCount, int intervalInMs) { //ʾڵĴС2ÿһڵʱ䵥λ500ms this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);}public void addPass(int count) { WindowWrap<MetricBucket> wrap = data.currentWindow(); wrap.value().addPass(count);}
ڳ뻬ô windowˣwindowǴͨ data ȡǰڡĴڴСΪ sampleCount=2.ǿԿͨ MetricBucket ָ꣬άһͳLongAdder[] counters 棬 WindowWrapǿԿÿһ WindowWrapɣ
public class WindowWrap<T> {// ʱ䴰ڵij private final long windowLengthInMs;// ʱ䴰ڵĿʼʱ䣬λǺ private long windowStart; //ʱ䴰ڵݣ WindowWrap ÷ͱʾֵģʵϾ MetricBucket private T value; //......ʡԲִ}
ٿ LeapArray ࣺ
public abstract class LeapArray<T> { // ʱ䴰ڵij protected int windowLength; // ڵĸ protected int sampleCount; // ԺΪλʱ protected int intervalInMs; // ʱ䴰 protected AtomicReferenceArray<WindowWrap<T>> array; /** * LeapArray * @param windowLength ʱ䴰ڵijȣλ * @param intervalInSec ͳƵļλ */ public LeapArray(int windowLength, int intervalInSec) { this.windowLength = windowLength; // ʱ䴰ڵIJĬΪ2 this.sampleCount = intervalInSec * 1000 / windowLength; this.intervalInMs = intervalInSec * 1000;//Ϊλʱ䴰Уʼȵ飺`AtomicReferenceArray<WindowWrap<T>>array`ʾڵĴСУÿڻռ500msʱ䡣 this.array = new AtomicReferenceArray<WindowWrap<T>>(sampleCount); }}
ԺĿ LeapArray дһ AtomicReferenceArray 飬ʱ䴰еͳֵвͨͳֵټƽֵҪյʵʱֵָˡԿĴͨעͣĬϲʱ䴰ڵĸ2ֵôõأǻһ LeapArray ͨ StatisticNode Уnewһ ArrayMetricȻһ·ϴݺģ
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,IntervalProperty.INTERVAL);
2.5.3 currentWindow Ǹȡǰڵķ data.currentWindow() У
@Overridepublic WindowWrap<Window> currentWindow(long time) { .....//ʡԲִ //㵱ǰʱڻе㷽ʽȽϼǰʱԵʱ䴰ڵʱ䳤ȣٴʱ䴰ڳȽȡģ int idx = calculateTimeIdx(timeMillis); //㵱ǰʱʱ䴰еĿʼʱ long windowStart = calculateWindowStart(timeMillis); // timeÿһwindowLengthijȣtimeIdͻ1ʱ䴰ھͻǰһ while (true) { // Ӳиȡʱ䴰 WindowWrap<Window> old = array.get(idx); // array鳤Ȳ˹oldܶ¶вˣͻᴴܶWindowWrap //Ϊգ˵˴δʼ if (old == null) { // ûлȡһµ WindowWrap<Window> window = new WindowWrap<Window>(windowLength, currentWindowStart, new Window()); // ͨCAS´õȥ if (array.compareAndSet(idx, null, window)) { // óɹôڷ return window; } else { // ǰ߳óʱƬȴ Thread.yield(); } // ǰڵĿʼʱoldĿʼʱȣֱӷold } else if (currentWindowStart == old.windowStart()) { return old; // ǰʱ䴰ڵĿʼʱѾoldڵĿʼʱ䣬old // timeΪµʱ䴰ڵĿʼʱ䣬ʱǰ } else if (currentWindowStart > old.windowStart()) { if (addLock.tryLock()) { try { // if (old is deprecated) then [LOCK] resetTo currentTime. return resetWindowTo(old, currentWindowStart); } finally { addLock.unlock(); } } else { Thread.yield(); } // ܴ } else if (currentWindowStart < old.windowStart()) { // Cannot go through here. return new WindowWrap<Window>(windowLength, currentWindowStart, new Window()); } }}
ֽܳ⣬ʵʿֳ¼
private int calculateTimeIdx(/*@Valid*/ long timeMillis) { // timeÿһwindowLengthijȣtimeIdͻ1ʱ䴰ھͻǰһ long timeId = timeMillis / windowLengthInMs; // idxֳ[0,arrayLength-1]еijһΪarrayе return (int)(timeId % array.length());}protected long calculateWindowStart(/*@Valid*/ long timeMillis) { return timeMillis - timeMillis % windowLengthInMs;}
ݵǰʱ windowLength õһ timeId(500msֵһµ),timeIdȡڵijȽһȡģôһ 01λõһȻݵǰʱǰڵӦöӦĿʼʱtimeڸոտʼʱ array ǿյģôȡoldӦnullôᴴһµʵͼһ³ʼ LeapArray
Ӧ currentWindow 4.1 (idx=0)ȡnull,ôʼʱarraysֻһ(ǵһ(idx=0)Ҳǵڶ(idx=1))ÿʱ䴰ڵij500msζֻҪǰʱʱ䴰ڵIJֵ500ms֮ڣʱ䴰ھͲǰ磬統ǰʱߵ300500ʱǰʱ䴰ȻͬǸ
Ӧ currentWindow 4.2 裺ʱǰߣ500msʱʱ䴰ھͻǰһʱͻµǰڵĿʼʱ,ʱǰߣֻҪ1000msǰڲᷢ仯дʵ resetWindowTo
protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long time) { // Update the start time and reset value. // windowStart w.resetTo(time); MetricBucket borrowBucket = borrowArray.getWindowValue(time); if (borrowBucket != null) { w.value().reset(); w.value().addPass((int)borrowBucket.pass()); } else { w.value().reset(); } return w;}
Ӧ currentWindow 4.3 ʱǰߣǰʱ䳬1000msʱͻٴνһʱ䴰ڣʱarraysеĴڽһʧЧһµĴڽ滻Դʱţʱ䴰Ҳڷ仯ڵǰʱнᱻͳƵǰʱӦʱ䴰Уصaddpass У
public void addPass(int count) { WindowWrap<MetricBucket> wrap = data.currentWindow(); wrap.value().addPass(count);}
ȡԺ뵽 wrap.value().addPass(count); QPSӡ wrap.value() õ֮ǰᵽ MetricBucket Sentinel QPSݵͳƽά LongAdder[] УָʵúõĹƥ䣬鿴ǷҲ StatisticSlotentry е fireEntry(context, resourceWrapper, node, count, prioritized, args); ҪȽ뵽 FlowSlotentryˣ
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { checkFlow(resourceWrapper, context, node, count, prioritized); fireEntry(context, resourceWrapper, node, count, prioritized, args);}
ԿиҪķ checkFlow ȥ
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException { if (ruleProvider == null || resource == null) { return; } Collection<FlowRule> rules = ruleProvider.apply(resource.getName()); if (rules != null) { for (FlowRule rule : rules) { if (!canPassCheck(rule, context, node, count, prioritized)) { throw new FlowException(rule.getLimitApp(), rule); } } }}
һжӦˣõõ FlowRule ѭƥԴˡSentinel ԭ
2.6 FlowRuleSlot slot ҪԤԴͳϢչ̶ĴЧһԴӦ߶ع´μ飬ֱȫͨһЧΪֹ:
@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { checkFlow(resourceWrapper, context, node, count, prioritized); fireEntry(context, resourceWrapper, node, count, prioritized, args);}
2.6.1 checkFlow 뵽FlowRuleChecker.checkFlowС
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider,ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException { if (ruleProvider == null || resource == null) { return; } Collection<FlowRule> rules = ruleProvider.apply(resource.getName()); if (rules != null) { for (FlowRule rule : rules) { if (!canPassCheck(rule, context, node, count, prioritized)) { throw new FlowException(rule.getLimitApp(), rule); } } }}
2.6.2 canPassCheck жǷǼȺģʽǣpassClusterCheckpassLocalCheck
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context,DefaultNode node, int acquireCount, boolean prioritized) { String limitApp = rule.getLimitApp(); if (limitApp == null) { return true; } if (rule.isClusterMode()) { return passClusterCheck(rule, context, node, acquireCount, prioritized); } return passLocalCheck(rule, context, node, acquireCount, prioritized);}
2.6.3 passLocalCheck
private static boolean passLocalCheck(FlowRule rule, Context context,DefaultNode node, int acquireCount, boolean prioritized) { Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node); if (selectedNode == null) { return true; } return rule.getRater().canPass(selectedNode, acquireCount, prioritized);}
2.6.4 DefaultController.canPass ͨĬϵΪֱӾܾжϡ
@Overridepublic boolean canPass(Node node, int acquireCount, boolean prioritized) { //ȸnodeȡԴǰʹqps߲صֵ int curCount = avgUsedTokens(node); //ǰʹõϱǷֵ if (curCount + acquireCount > count) {//Ϊtrue˵Ӧñ // һȼΪqpsʧܣȥռδʱ䴰ڣȵһʱ䴰ͨ if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) { // long currentTime; long waitInMs; currentTime = TimeUtil.currentTimeMillis(); waitInMs = node.tryOccupyNext(currentTime, acquireCount, count); if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) { node.addWaitingRequest(currentTime + waitInMs, acquireCount); node.addOccupiedPass(acquireCount); sleep(waitInMs); // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}. throw new PriorityWaitException(waitInMs); } } return false; } return true;}
һܾ׳ FlowException 쳣
2.7 PriorityWait DefaultController.canPassУ´ȥĴ
node.addWaitingRequest(currentTime + waitInMs, acquireCount);node.addOccupiedPass(acquireCount);
addWaitingRequest -> ArrayMetric.addWaiting->OccupiableBucketLeapArray.addWaiting
borrowArrayһFutureBucketLeapArrayﶨδʱ䴰ڣȻδʱĴȥӼ
@Overridepublic void addWaiting(long time, int acquireCount) { WindowWrap<MetricBucket> window = borrowArray.currentWindow(time); window.value().add(MetricEvent.PASS, acquireCount);}
գStatisticSlot.entryУ쳣
ȼȽϸߵҵǰѾﵽֵ׳쳣ʵȥռδһʱ䴰ȥм׳쳣֮뵽StatisticSlotнвȻֱͨ
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { try{ //... } catch (PriorityWaitException ex) { node.increaseThreadNum(); if (context.getCurEntry().getOriginNode() != null) { // Add count for origin node. context.getCurEntry().getOriginNode().increaseThreadNum(); } if (resourceWrapper.getEntryType() == EntryType.IN) { // Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseThreadNum(); } // Handle pass event with registered entry callback handlers. for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onPass(context, resourceWrapper, node, count, args); } }}
https://lijunyi.xyz/docs/SpringCloud/SpringCloud.html#_2-2-x-%E5%88%86%E6%94%AF https://mp.weixin.qq.com/s/2jeovmj77O9Ux96v3A0NtA https://juejin.cn/post/6931922457741770760 https://github.com/D2C-Cai/herring http://c.biancheng.net/springcloud https://github.com/macrozheng/springcloud-learning