Back to Javatutorial

SpringCloudSentinel源码分析

docs/Spring全家桶/SpringCloudAlibaba源码分析/SpringCloudSentinel源码分析.md

1.0.030.8 KB
Original Source

| һߣJava Դ |ͷ

ѧϰĿ

  • SentinelĹԭ 1 ԭ SentinelУеԴӦһԴԼһEntryÿһentryԱʾһ󡣶SentinelУԵǰڹжʵصĿƣԭͼʾ

ͼΪ˼չʾͼ Slot ˳Ѻ° Sentinel Slot Chain ˳һ һⲿ֮󣬻ᴴһEntryEntryͬʱҲᴴһϵеslot һÿslotвͬĹְ

  • NodeSelectorSlot ռԴ·ЩԴĵ·״ṹ洢ڸݵ·
  • ClusterBuilderSlot ڴ洢ԴͳϢԼϢԴ RT, QPS,thread count ȵȣЩϢΪάݣ
  • StatisticSlot ڼ¼ͳƲͬγȵ runtime ָϢ
  • FlowSlot ڸԤԼǰ slot ͳƵ״̬ƣ
  • AuthoritySlot õĺڰ͵ԴϢڰƣ
  • DegradeSlot ͨͳϢԼԤĹ۶Ͻ
  • SystemSlot ͨϵͳ״̬ load1 ȣܵ
  • LogSlot ڳ۶ϡϵͳʱ¼־
  • ... Sentinel ProcessorSlot Ϊ SPI ӿڽչ1.7.2 汾ǰ SlotChainBuilder ΪSPIʹ Slot Chain ߱չмԶ slot slot ˳򣬴ӶԸ Sentinel ԶĹܡ

Spring Cloud Sentinelԭ

Spring Cloud мSentinelǻʵ֣ʵ·¡

SentinelWebAutoConfiguration>addInterceptors>SentinelWebInterceptor->AbstractSentinelInterceptor

public boolean preHandle(HttpServletRequest request, HttpServletResponseresponse, Object handler) throws Exception {  try {    String resourceName = this.getResourceName(request);    if (StringUtil.isEmpty(resourceName)) {      return true;   } else if (this.increaseReferece(request,this.baseWebMvcConfig.getRequestRefName(), 1) != 1) {      return true;   } else {      String origin = this.parseOrigin(request);      String contextName = this.getContextName(request);      ContextUtil.enter(contextName, origin);      Entry entry = SphU.entry(resourceName, 1, EntryType.IN);     request.setAttribute(this.baseWebMvcConfig.getRequestAttributeName(), entry);      return true;   } } catch (BlockException var12) {    BlockException e = var12;    try {      this.handleBlockException(request, response, e);   } finally {      ContextUtil.exit();   }    return false; }}

Դõͣ EntryType.IN dz EntryType.OUT עϵͳֻ IN Ч

2 SphU.entry ǼdubboҲãǼɵspring cloudҲãնǵSphU.entryжϵģǴSphU.entryȥ˽ʵԭ

ǿΨһɻģҲؼһ SphU.entry(resource) ǴȥһԴԴǷǽӿڣôʲôأһҿɴ

public static Entry entry(String name) throws BlockException {    return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);}public class Env {    public static final Sph sph = new CtSph();    ......//ʡԲִ}

SphU.entry() ִл뵽 Sph.entry() SphĬʵ CtSph,ջCtSph entry

@Overridepublic Entry entry(String name, EntryType type, int count, Object... args) throws BlockException { //װһԴ    StringResourceWrapper resource = new StringResourceWrapper(name, type);    return entry(resource, count, args);}

ҪͨǸԴȥװһ StringResourceWrapper ȻԼط̶ entryWithPriority(resourceWrapper, count, false, args)

  • ResourceWrapper ʾsentinelԴ˷װ
  • countʾռõIJĬ1
  • prioritizedȼ
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count,boolean prioritized, Object... args)    throws BlockException {    //ȡĻ洢ThreadLocalУcontextл洢    Context context = ContextUtil.getContext();    // NullContextô˵ context name  2000 μ ContextUtil#trueEnter    //ʱSentinel ٽܴµ context ãҲDzЩµĽӿڵͳơ۶ϵ    if (context instanceof NullContext) {        // The {@link NullContext} indicates that the amount of context has exceeded the threshold,        // so here init the entry only. No rule checking will be done.        return new CtEntry(resourceWrapper, null, context);    }    if (context == null) {//ʹĬcontext        // ContextIJ        context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);    }    // Global switch is close, no rule checking will do.    if (!Constants.ON) {//ȫǷѾرˣͲ        return new CtEntry(resourceWrapper, null, context);    }    //ģʽеģʽ    //һslot    ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);    // lookProcessChain ֪ resource  Constants.MAX_SLOT_CHAIN_SIZE    // Ҳ 6000 ʱSentinel ʼµôҪΪ Sentinel ܿ    if (chain == null) {        return new CtEntry(resourceWrapper, null, context);    }    //ʼɸentry    Entry e = new CtEntry(resourceWrapper, chain, context);    try {        //ʼ        chain.entry(context, resourceWrapper, null, count, prioritized, args);    } catch (BlockException e1) {        e.exit(count, args); //׳쳣        throw e1;    } catch (Throwable e1) {        // This should not happen, unless there are errors existing in Sentinel internal.        RecordLog.info("Sentinel unexpected exception", e1);    }    return e;//Ľ}

Ĵǿ֪÷Ҫǻȡ˱ԴӦԴ lookProcessChain з֣ȥȡһȥִԴϴȻﴦĻ£ô϶ǶڵǰصĴԷΪ¼֣

  • Բȫ⣬ҪֱӷһCtEntry󣬲ٽк⣬ļ̡ݰװԴȡӦSlotChain
  • ִSlotChainentrySlotChainentry׳BlockException򽫸쳣׳SlotChainentryִˣὫentry󷵻
  • ϲ㷽BlockException˵ˣִ 2.1 Context InternalContextUtil.internalEnter--->trueEnter
protected static Context trueEnter(String name, String origin) {    //ThreadLocalлȡһο϶null    Context context = contextHolder.get();    if (context == null) {        //ǸContextֻȡNode        Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;        DefaultNode node = localCacheNameMap.get(name);        if (node == null) {            if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {                setNullContext();                return NULL_CONTEXT;            } else {                LOCK.lock();                try {                    node = contextNameNodeMap.get(name);                    if (node == null) {                        if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {                            setNullContext();                            return NULL_CONTEXT;                        } else {                            //EntranceNode                            node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);                            //ȫֵĽڵ                            // Add entrance node.                            Constants.ROOT.addChild(node);//map                            Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);                            newMap.putAll(contextNameNodeMap);                            newMap.put(name, node);                            contextNameNodeMap = newMap;                        }                    }                } finally {                    LOCK.unlock();                }            }        }        context = new Context(node, name);        context.setOrigin(origin);        //ThreadLocal        contextHolder.set(context);    }    return context;}

߼DZȽϼ򵥵

  • ThreadLocalȡȡʹȻͷ
  • ȻMapиContextNameһNode
  • ûҵNodeͼķʽһEntranceNodeȻMap
  • ContextnodenameoriginٷThreadLocal Contextʹ

ĿǰContext״̬ͼ2.2 slot һslot·Ϊ

DefaultProcessorSlotChain -> NodeSelectorSlot -> ClusterBuilderSlot -> LogSlot ->StatisticSlot -> AuthoritySlot -> SystemSlot -> ParamFlowSlot -> FlowSlot -> DegradeSlot

ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {    //ԿchainԴΪkeyͬԴ϶Dzͬchain    ProcessorSlotChain chain = chainMap.get(resourceWrapper);    if (chain == null) {////spring(bean) dubbo(˫ؼ)һޣû        synchronized (LOCK) {            chain = chainMap.get(resourceWrapper);            if (chain == null) {                //chainMapСһֵҲentryСˣһchainӦһentry                if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {                    return null;                }                //һslot chain                chain = SlotChainProvider.newSlotChain();                //߼ǣ½һMapСoldMap+1                Map<ResourceWrapper, ProcessorSlotChain> newMap = new                    HashMap<ResourceWrapper, ProcessorSlotChain>(                    chainMap.size() + 1);                //ȻoldMapٷ½chain                newMap.putAll(chainMap);                newMap.put(resourceWrapper, chain); //ӵnewMap ӦǿDZƵ                chainMap = newMap;            }        }    }    return chain;}

ĴԷ֣ȴӻлȡôһν϶ûеģ SlotChainProvider ȥ촦ɺ뻺Ա´ʹã

public static ProcessorSlotChain newSlotChain() {    if (slotChainBuilder != null) {        return slotChainBuilder.build();    }    // ͨspiȥԼslotĻֻҪSPIʵSlotChainBuilderӿھͺ    //SentinelĬϵsentinel-coreµMETA-INF.services    slotChainBuilder = SpiLoader.loadFirstInstanceOrDefault(SlotChainBuilder.class, DefaultSlotChainBuilder.class);    if (slotChainBuilder == null) {        // Should not go through here.        RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");        slotChainBuilder = new DefaultSlotChainBuilder();    } else {        RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: "                       + slotChainBuilder.getClass().getCanonicalName());    }    return slotChainBuilder.build();}

˶εУ飬ȷbuilder ΪգȻͨȥ

public class DefaultSlotChainBuilder implements SlotChainBuilder {    @Override    public ProcessorSlotChain build() {        ProcessorSlotChain chain = new DefaultProcessorSlotChain();        chain.addLast(new NodeSelectorSlot());        chain.addLast(new ClusterBuilderSlot());        chain.addLast(new LogSlot());        chain.addLast(new StatisticSlot());        chain.addLast(new SystemSlot());        chain.addLast(new AuthoritySlot());        chain.addLast(new FlowSlot());        chain.addLast(new DegradeSlot());        return chain;    }}

ڷҲж˵ϾSentinel㷨ʵָأǿһ¹Ľܣ

Sentinel 棬еԴӦһԴƣresourceNameÿԴöᴴһ Entry Entry ͨܵԶҲͨעķʽ SphU API ʽEntry ʱͬʱҲᴴһϵйۣܲslot chainЩвְͬ𡣾ְѾᵽˡ


ִ¡

  • NodeSelectorSlotҪڹ

  • ClusterBuilderSlotڼȺ۶ϡ

  • LogSlotڼ¼־

  • StatisticSlotʵʱռʵʱϢ

  • AuthoritySlotȨУġ

  • SystemSlot֤ϵͳĹ

  • FlowSlotʵơ

  • DegradeSlotʵ۶ϻơ 2.3 Entry

    Entry e = new CtEntry(resourceWrapper, chain, context);

CtEntry(ResourceWrapper resourceWrapper, ProcessorSlot<Object> chain, Context context) {    super(resourceWrapper);    this.chain = chain;    this.context = context;    setUpEntryFor(context);}private void setUpEntryFor(Context context) {    // The entry should not be associated to NullContext.    if (context instanceof NullContext) {        return;    }    this.parent = context.getCurEntry();    if (parent != null) {        ((CtEntry) parent).child = this;    }    context.setCurEntry(this);}

һEntryɵʱcontext.getCurEntryضNULLôֱִContext.setCurEntry

ȻContext״̬ͼ ִһµSphu.entryٴ½һEntryʱcurEntrynullôִ((CtEntry)parent).child = this;

ͼԿԭCtEntryƳContext½CtEntry;CtEntryͨڲparentchild

2.4 NodeSelectorSlot ҪڹҪһ£ںлȽϹؼ¡

@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, Object obj,                  int count, boolean prioritized, Object... args)    throws Throwable {    //и棬contextֻnode    DefaultNode node = map.get(context.getName());    //˫ؼ⣬̰߳ȫ    if (node == null) {        synchronized (this) {            node = map.get(context.getName());            if (node == null) {                //ɵDefaultNodeڵ                node = new DefaultNode(resourceWrapper, null);                //Щ߼Ƿmap߼ΪmapȽϴ룬ܻһЩ                HashMap<String, DefaultNode> cacheMap = new HashMap<String,DefaultNode>(map.size());                cacheMap.putAll(map);                cacheMap.put(context.getName(), node);                map = cacheMap;                // ؼ⣬޸ĵĵط                ((DefaultNode) context.getLastNode()).addChild(node);            }        }    }    //滻contextеcurEntryеcurNode    context.setCurNode(node);    fireEntry(context, resourceWrapper, node, count, prioritized, args);}

ѯǷnode߼Ҳܼ

  • ContextNameѯǷNode
  • ûоDefaultNode뻺棬Ȼ
  • ContextcurEntryеcurnodeΪnode мҪ˵

1contextʾģһ̶߳ӦһcontextаһЩ

  • name
  • entranceNode
  • curEntryǰentry
  • originԴ
  • async첽 2Node ʾһڵ㣬ڵᱣijԴĸʵʱͳݣͨijڵ㣬ͿԻöӦԴʵʱ״̬Ϣͽмֽڵ
  • StatisticNodeʵNodeӿڣװ˻ͳƺͻȡ
  • DefaultNodeĬϽڵ㣬NodeSelectorSlotдľڵ㣻ͬԴڲͬиԵ
  • ClusterNodeȺڵ㣬ͬԴڲͬ
  • EntranceNodeýڵʾһõڽڵ㣬ͨԻȡеӽڵ㣻ÿĶһڽڵ㣬ͳƵǰĵ
  • OriginNodeһStatisticNode͵Ľڵ㣬ͬԴԴ 2.5 StatisticSlot slot·УȽҪģͳԼslotһStatisticSlot

StatisticSlot Sentinel ĺĹ֮ܲһͳʵʱĵݡ

  • clusterNodeԴΨһʶ ClusterNode runtime ͳ
  • originԲͬߵͳϢ
  • defaultnode: ĿƺԴ ID runtime ͳ
  • ڵͳ
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,                  boolean prioritized, Object... args) throws Throwable {    try {        // Ƚɺ&processorSlotȻݴͳ        // Sentinelľʹ for ѭ ProcessorSlot ԭ        fireEntry(context, resourceWrapper, node, count, prioritized, args);        //ִеʾͨ飬        // Request passed, add thread count and pass count.        node.increaseThreadNum(); //ǰڵ߳1        node.addPassRequest(count);        //Բͬ͵node¼߳ͨͳơ        if (context.getCurEntry().getOriginNode() != null) {            // Add count for origin node.            context.getCurEntry().getOriginNode().increaseThreadNum();            context.getCurEntry().getOriginNode().addPassRequest(count);        }        if (resourceWrapper.getEntryType() == EntryType.IN) {            // Add count for global inbound entry node for global statistics.            Constants.ENTRY_NODE.increaseThreadNum();            Constants.ENTRY_NODE.addPassRequest(count);        }        //ɵ StatisticSlotCallbackRegistry#addEntryCallback ̬עProcessorSlotEntryCallback        for (ProcessorSlotEntryCallback<DefaultNode> handler :             StatisticSlotCallbackRegistry.getEntryCallbacks()) {            handler.onPass(context, resourceWrapper, node, count, args);        }        //ȼȴ쳣FlowRuleл漰    } catch (PriorityWaitException ex) {//߳ͳ        node.increaseThreadNum();        if (context.getCurEntry().getOriginNode() != null) {            // Add count for origin node.            context.getCurEntry().getOriginNode().increaseThreadNum();        }        if (resourceWrapper.getEntryType() == EntryType.IN) {            // Add count for global inbound entry node for global statistics.            Constants.ENTRY_NODE.increaseThreadNum();        }        // Handle pass event with registered entry callback handlers.        for (ProcessorSlotEntryCallback<DefaultNode> handler :             StatisticSlotCallbackRegistry.getEntryCallbacks()) {            handler.onPass(context, resourceWrapper, node, count, args);        }    } catch (BlockException e) {        // Blocked, set block exception to current entry.        context.getCurEntry().setBlockError(e); //쳣ǰentry        // Add block count.        node.increaseBlockQps(count); //ӱ        //ݲͬNodeĴ        if (context.getCurEntry().getOriginNode() != null) {            context.getCurEntry().getOriginNode().increaseBlockQps(count);        }        if (resourceWrapper.getEntryType() == EntryType.IN) {            // Add count for global inbound entry node for global statistics.            Constants.ENTRY_NODE.increaseBlockQps(count);        }        // Handle block event with registered entry callback handlers.        for (ProcessorSlotEntryCallback<DefaultNode> handler :             StatisticSlotCallbackRegistry.getEntryCallbacks()) {            handler.onBlocked(e, context, resourceWrapper, node, count, args);        }        throw e;    } catch (Throwable e) {        // Unexpected internal error, set error to current entry.        context.getCurEntry().setError(e);        throw e;    }}

ֳ֣һentry÷ȻᴥslotentrySystemSlotFlowSlotDegradeSlotȵĹͨͻ׳BlockExceptionnodeͳƱblock֮nodeͳͨ߳ϢڶexitУ˳Entryʱͳrtʱ䣬߳

ǿԿ node.addPassRequest() δfireEntryִִ֮еģζţǰͨsentinelصȹ򣬴ʱҪ¼Ҳִ node.addPassRequest()д룬Ǹȥ

2.5.1 addPassRequest @Overridepublic void addPassRequest(int count) { // øࣨStatisticNodeͳ super.addPassRequest(count); // clusterNode ͳƣҲǵøStatisticNode this.clusterNode.addPassRequest(count);} ֪nodeһ DefaultNode ʵڵһNodeSelectorSlot entryжԴ˷װװһDefaultNode

  • DefaultNodeijresourceijcontextеʵʱָ꣬ÿDefaultNodeָһClusterNode
  • ClusterNodeijresourceеcontextʵʱָܺͣͬresourceṲͬһClusterNodeĸcontext ֱʱ䴰

ڲʵʵõArrayMetricͳ

//ͳƣֳڣÿ500msͳQPSprivate transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,        IntervalProperty.INTERVAL);//շͳƣ60ڣÿ1000msprivate transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);public void addPassRequest(int count) {    rollingCounterInSecond.addPass(count);    rollingCounterInMinute.addPass(count);}

õǻڵķʽ¼ĴĹϵͼʵDZȽģArrayMetricʵһװ࣬ڲͨLeapArrayʵ־ͳ߼LeapArrayά˶WindowWrapڣWindowWrapвMetricBucketָݵͳơ

  • Metric: ָռĽӿڣ廬гɹ쳣TPSӦʱ
  • ArrayMetric ںʵ
  • LeapArray
  • WindowWrap ÿһڵİװ࣬ڲݽṹMetricBucket
  • MetricBucket ʾָͰ쳣ɹӦʱ
  • MetricEvent ָͣͨ쳣ɹ 2.5.2 ArrayMetric.addPass Ŵ¿뵽ArrayMetric.addPass
  • LeapArrayиݵǰʱõӦĴ
  • MetricBucketеaddPassӵǰеͳƴ

ӴǿԿָ addPass ͨһ ArrayMetric ࣬ڽ ArrayMetric пһ¡Ĵʾ

private final LeapArray<MetricBucket> data;// SAMPLE_COUNT=2  INTERVAL=1000public ArrayMetric(int sampleCount, int intervalInMs) {  //ʾڵĴС2ÿһڵʱ䵥λ500ms    this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);}public void addPass(int count) {    WindowWrap<MetricBucket> wrap = data.currentWindow();    wrap.value().addPass(count);}

ڳ뻬ô windowˣwindowǴͨ data ȡǰڡĴڴСΪ sampleCount=2.ǿԿͨ MetricBucket ָ꣬άһͳLongAdder[] counters 棬 WindowWrapǿԿÿһ WindowWrapɣ

public class WindowWrap<T> {// ʱ䴰ڵij    private final long windowLengthInMs;// ʱ䴰ڵĿʼʱ䣬λǺ    private long windowStart; //ʱ䴰ڵݣ WindowWrap ÷ͱʾֵģʵϾ MetricBucket     private T value;    //......ʡԲִ}

ٿ LeapArray ࣺ

public abstract class LeapArray<T> {    // ʱ䴰ڵij    protected int windowLength;    // ڵĸ    protected int sampleCount;    // ԺΪλʱ    protected int intervalInMs;    // ʱ䴰    protected AtomicReferenceArray<WindowWrap<T>> array;    /**     * LeapArray     * @param windowLength ʱ䴰ڵijȣλ     * @param intervalInSec ͳƵļλ     */    public LeapArray(int windowLength, int intervalInSec) {        this.windowLength = windowLength;        // ʱ䴰ڵIJĬΪ2        this.sampleCount = intervalInSec * 1000 / windowLength;        this.intervalInMs = intervalInSec * 1000;//Ϊλʱ䴰Уʼȵ飺`AtomicReferenceArray<WindowWrap<T>>array`ʾڵĴСУÿڻռ500msʱ䡣        this.array = new AtomicReferenceArray<WindowWrap<T>>(sampleCount);    }}

ԺĿ LeapArray дһ AtomicReferenceArray 飬ʱ䴰еͳֵвͨͳֵټƽֵҪյʵʱֵָˡԿĴͨעͣĬϲʱ䴰ڵĸ2ֵôõأǻһ LeapArray 󴴽ͨ StatisticNode Уnewһ ArrayMetricȻ󽫲һ·ϴݺ󴴽ģ

private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,IntervalProperty.INTERVAL);

2.5.3 currentWindow Ǹȡǰڵķ data.currentWindow() У

@Overridepublic WindowWrap<Window> currentWindow(long time) {    .....//ʡԲִ    //㵱ǰʱڻе㷽ʽȽϼ򵥣ǰʱԵʱ䴰ڵʱ䳤ȣٴʱ䴰ڳȽȡģ int idx = calculateTimeIdx(timeMillis);    //㵱ǰʱʱ䴰еĿʼʱ    long windowStart = calculateWindowStart(timeMillis);    // timeÿһwindowLengthijȣtimeIdͻ1ʱ䴰ھͻǰһ        while (true) {        // Ӳиȡʱ䴰        WindowWrap<Window> old = array.get(idx);        // array鳤Ȳ˹󣬷oldܶ¶вˣͻᴴܶWindowWrap        //Ϊգ˵˴δʼ        if (old == null) {            // ûлȡ򴴽һµ            WindowWrap<Window> window = new WindowWrap<Window>(windowLength, currentWindowStart, new Window());            // ͨCAS´õȥ            if (array.compareAndSet(idx, null, window)) {                // óɹ򽫸ôڷ                return window;            } else {                // ǰ߳óʱƬȴ                Thread.yield();            }        // ǰڵĿʼʱoldĿʼʱȣֱӷold        } else if (currentWindowStart == old.windowStart()) {            return old;        // ǰʱ䴰ڵĿʼʱѾoldڵĿʼʱ䣬old        // timeΪµʱ䴰ڵĿʼʱ䣬ʱǰ        } else if (currentWindowStart > old.windowStart()) {            if (addLock.tryLock()) {                try {                    // if (old is deprecated) then [LOCK] resetTo currentTime.                    return resetWindowTo(old, currentWindowStart);                } finally {                    addLock.unlock();                }            } else {                Thread.yield();            }        // ܴ        } else if (currentWindowStart < old.windowStart()) {            // Cannot go through here.            return new WindowWrap<Window>(windowLength, currentWindowStart, new Window());        }    }}

ܳ𲽽ֽ⣬ʵʿ԰ֳ¼

  1. ݵǰʱ䣬ʱtimeIdtimeIdǰڲеidx
  2. ݵǰʱǰڵӦöӦĿʼʱtimeԺΪλ
  3. idxڲȡһʱ䴰ڡ
  4. ѭжֱȡһǰʱ䴰 old
  • oldΪգ򴴽һʱ䴰ڣ뵽arrayĵidxλãarrayѾˣһ AtomicReferenceArray
  • ǰڵĿʼʱtimeoldĿʼʱȣô˵oldǵǰʱ䴰ڣֱӷold
  • ǰڵĿʼʱtimeoldĿʼʱ䣬˵oldѾʱˣoldĿʼʱΪֵtimeһεѭжϵǰڵĿʼʱtimeoldĿʼʱȵʱ򷵻ء
  • ǰڵĿʼʱtimeСoldĿʼʱ䣬ʵDzܴڵģΪtimeǵǰʱ䣬oldǹȥһʱ䡣 timeIdǻʱӣǰʱÿһwindowLengthijȣtimeIdͼ1idxֻ01֮任Ϊarrayij2ֻʱ䴰ڡΪʲôĬֻڣ˾ΪsentinelDZȽĿܡʱ䴰бźܶͳݣʱ䴰ڹĻһռùڴ棬һʱ䴰ڹζʱ䴰ڵijȻСʱ䴰ڳȱСͻᵼʱ䴰ڹƵĻһеĵһڶ
private int calculateTimeIdx(/*@Valid*/ long timeMillis) {    // timeÿһwindowLengthijȣtimeIdͻ1ʱ䴰ھͻǰһ    long timeId = timeMillis / windowLengthInMs;     // idxֳ[0,arrayLength-1]еijһΪarrayе    return (int)(timeId % array.length());}protected long calculateWindowStart(/*@Valid*/ long timeMillis) {    return timeMillis - timeMillis % windowLengthInMs;}

ݵǰʱ windowLength õһ timeId(500msֵһµ),timeIdȡڵijȽһȡģôһ 01λõһȻݵǰʱǰڵӦöӦĿʼʱtimeڸոտʼʱ array ǿյģôȡoldӦnullôᴴһµʵͼһ³ʼ LeapArray

Ӧ currentWindow 4.1 (idx=0)ȡnull,ôʼʱarraysֻһ(ǵһ(idx=0)Ҳǵڶ(idx=1))ÿʱ䴰ڵij500msζֻҪǰʱʱ䴰ڵIJֵ500ms֮ڣʱ䴰ھͲǰ磬統ǰʱߵ300500ʱǰʱ䴰ȻͬǸ

Ӧ currentWindow 4.2 裺ʱǰߣ500msʱʱ䴰ھͻǰһʱͻµǰڵĿʼʱ,ʱǰߣֻҪ1000msǰڲᷢ仯дʵ resetWindowTo

protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long time) {    // Update the start time and reset value.    // windowStart    w.resetTo(time);    MetricBucket borrowBucket = borrowArray.getWindowValue(time);    if (borrowBucket != null) {        w.value().reset();        w.value().addPass((int)borrowBucket.pass());    } else {        w.value().reset();    }    return w;}

Ӧ currentWindow 4.3 ʱǰߣǰʱ䳬1000msʱͻٴνһʱ䴰ڣʱarraysеĴڽһʧЧһµĴڽ滻Դʱţʱ䴰Ҳڷ仯ڵǰʱн󣬻ᱻͳƵǰʱӦʱ䴰Уصaddpass У

public void addPass(int count) {    WindowWrap<MetricBucket> wrap = data.currentWindow();    wrap.value().addPass(count);}

ȡԺ뵽 wrap.value().addPass(count); QPSӡ wrap.value() õ֮ǰᵽ MetricBucket Sentinel QPSݵͳƽά LongAdder[] УָʵúõĹƥ䣬鿴ǷҲ StatisticSlotentry е fireEntry(context, resourceWrapper, node, count, prioritized, args); ҪȽ뵽 FlowSlotentryˣ

public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,                  boolean prioritized, Object... args) throws Throwable {    checkFlow(resourceWrapper, context, node, count, prioritized);    fireEntry(context, resourceWrapper, node, count, prioritized, args);}

ԿиҪķ checkFlow ȥ

public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,                      Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {    if (ruleProvider == null || resource == null) {        return;    }    Collection<FlowRule> rules = ruleProvider.apply(resource.getName());    if (rules != null) {        for (FlowRule rule : rules) {            if (!canPassCheck(rule, context, node, count, prioritized)) {                throw new FlowException(rule.getLimitApp(), rule);            }        }    }}

һжӦˣõõ FlowRule ѭƥԴˡSentinel ԭ

2.6 FlowRuleSlot slot ҪԤԴͳϢչ̶ĴЧһԴӦ߶ع´μ飬ֱȫͨһЧΪֹ:

  • ָӦЧĹ򣬼Ե÷ģ
  • ÷Ϊ other Ĺ
  • ÷Ϊ default Ĺ
@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,                  boolean prioritized, Object... args) throws Throwable {    checkFlow(resourceWrapper, context, node, count, prioritized);    fireEntry(context, resourceWrapper, node, count, prioritized, args);}

2.6.1 checkFlow 뵽FlowRuleChecker.checkFlowС

  • Դƣҵб
  • Ϊգ򣬵canPassCheckУ顣
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider,ResourceWrapper resource,                      Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {    if (ruleProvider == null || resource == null) {        return;    }    Collection<FlowRule> rules = ruleProvider.apply(resource.getName());    if (rules != null) {        for (FlowRule rule : rules) {            if (!canPassCheck(rule, context, node, count, prioritized)) {                throw new FlowException(rule.getLimitApp(), rule);            }        }    }}

2.6.2 canPassCheck жǷǼȺģʽǣpassClusterCheck򣬵passLocalCheck

public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context,DefaultNode node, int acquireCount,                            boolean prioritized) {    String limitApp = rule.getLimitApp();    if (limitApp == null) {        return true;    }    if (rule.isClusterMode()) {        return passClusterCheck(rule, context, node, acquireCount, prioritized);    }    return passLocalCheck(rule, context, node, acquireCount, prioritized);}

2.6.3 passLocalCheck

  • selectNodeByRequesterAndStrategyͲNode
  • rule.getRater(), ݲͬΪcanPassУ顣
private static boolean passLocalCheck(FlowRule rule, Context context,DefaultNode node, int acquireCount,                                      boolean prioritized) {    Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);    if (selectedNode == null) {        return true;    }    return rule.getRater().canPass(selectedNode, acquireCount, prioritized);}

2.6.4 DefaultController.canPass ͨĬϵΪֱӾܾжϡ

@Overridepublic boolean canPass(Node node, int acquireCount, boolean prioritized) {    //ȸnodeȡԴǰʹqps߲صֵ    int curCount = avgUsedTokens(node);    //ǰʹõϱǷֵ    if (curCount + acquireCount > count) {//Ϊtrue˵Ӧñ        // һȼ󣬲Ϊqps򲻻ʧܣȥռδʱ䴰ڣȵһʱ䴰ͨ        if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) { //            long currentTime;            long waitInMs;            currentTime = TimeUtil.currentTimeMillis();            waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);            if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {                node.addWaitingRequest(currentTime + waitInMs, acquireCount);                node.addOccupiedPass(acquireCount);                sleep(waitInMs);                // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.                throw new PriorityWaitException(waitInMs);            }        }        return false;    }    return true;}

һܾ׳ FlowException 쳣

2.7 PriorityWait DefaultController.canPassУ´ȥĴ

node.addWaitingRequest(currentTime + waitInMs, acquireCount);node.addOccupiedPass(acquireCount);

addWaitingRequest -> ArrayMetric.addWaiting->OccupiableBucketLeapArray.addWaiting

borrowArrayһFutureBucketLeapArrayﶨδʱ䴰ڣȻδʱĴȥӼ

@Overridepublic void addWaiting(long time, int acquireCount) {    WindowWrap<MetricBucket> window = borrowArray.currentWindow(time);    window.value().add(MetricEvent.PASS, acquireCount);}

գStatisticSlot.entryУ쳣

ȼȽϸߵ񣬲ҵǰѾﵽֵ׳쳣ʵȥռδһʱ䴰ȥм׳쳣֮󣬻뵽StatisticSlotнвȻֱͨ

public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,                  boolean prioritized, Object... args) throws Throwable {    try{        //...    } catch (PriorityWaitException ex) {        node.increaseThreadNum();        if (context.getCurEntry().getOriginNode() != null) {            // Add count for origin node.            context.getCurEntry().getOriginNode().increaseThreadNum();        }        if (resourceWrapper.getEntryType() == EntryType.IN) {            // Add count for global inbound entry node for global statistics.            Constants.ENTRY_NODE.increaseThreadNum();        }        // Handle pass event with registered entry callback handlers.        for (ProcessorSlotEntryCallback<DefaultNode> handler :             StatisticSlotCallbackRegistry.getEntryCallbacks()) {            handler.onPass(context, resourceWrapper, node, count, args);        }    }}

ο

https://lijunyi.xyz/docs/SpringCloud/SpringCloud.html#_2-2-x-%E5%88%86%E6%94%AF https://mp.weixin.qq.com/s/2jeovmj77O9Ux96v3A0NtA https://juejin.cn/post/6931922457741770760 https://github.com/D2C-Cai/herring http://c.biancheng.net/springcloud https://github.com/macrozheng/springcloud-learning