pip/pip-483.md
Sub-PIP of PIP-468: Scalable Topic Controller
PIP-468 gives the Scalable Topic Controller the ability to split and merge segments, but only on explicit operator request via the admin API. An operator has to watch the topic, decide it is hot, and issue a split — and later notice it has gone cold and issue a merge. This is the same operational toil that partition-count management imposes on classic Pulsar topics, which scalable topics were meant to eliminate.
This PIP adds an auto-scaling policy to the controller: the controller leader observes per-segment load and per-subscription consumer pressure, and autonomously splits hot segments and merges cold ones, within hard caps that prevent runaway growth and split/merge flip-flopping.
The design is built around three principles that came out of the design discussion:
maxSegments) and bound split↔merge churn (maxDagDepth, asymmetric cooldown).┌────────────────────────────────────────────────────────────────┐
│ Segment-owning broker (per active segment) │
│ SegmentLoadReporter │
│ - samples the segment topic's TopicStats │
│ - writes SegmentLoadStats to metadata ONLY on material change │
└───────────────────────────────┬────────────────────────────────┘
│ (metadata store, push-on-change)
▼
┌────────────────────────────────────────────────────────────────┐
│ Controller leader (per scalable topic) │
│ │
│ Event-driven — within seconds: │
│ on STREAM/CHECKPOINT consumer register/unregister │
│ (consumers already register with the controller — no poll) │
│ → evaluate the consumer-count split rule immediately │
│ │
│ Periodic AutoScaleTick — traffic, default 60s: │
│ 1. read SegmentLoadStats for all active segments │
│ 2. AutoScalePolicyEvaluator.decide(...) → Split|Merge|None │
│ 3. dispatch to existing splitSegment / mergeSegments │
└────────────────────────────────────────────────────────────────┘
The two trigger sources reflect their different latency needs: a new consumer should get its own segment within seconds, so it is handled the instant the consumer registers with the controller; traffic shifts up or down over a minute or more, so they are evaluated on a slower periodic tick. The only new persistent state is SegmentLoadStats. The split/merge mechanics are entirely reused from PIP-468.
Each segment's owning broker runs a SegmentLoadReporter for every ACTIVE segment:// topic it hosts. The broker writes SegmentLoadStats directly to the metadata store — it already has the rates in memory, so there is no REST round-trip and no controller-initiated pull. On a fixed sampling interval it compares the current rates to the last ones written and writes only when a rate changes by more than a significant threshold (default ±25%) since the last write. A steady-state segment writes once and then goes silent, keeping metadata write volume bounded regardless of traffic.
SegmentLoadStats (new metadata record)Stored at /topics/{tenant}/{ns}/{topic}/segments/{segmentId}/load:
{
"msgRateIn": 12000.0,
"bytesRateIn": 64000000.0,
"msgRateOut": 48000.0,
"bytesRateOut": 256000000.0
}
| Field | Source on the owning broker | Meaning for auto split/merge |
|---|---|---|
msgRateIn / bytesRateIn | segment topic TopicStats (60s rolling) | ingest load |
msgRateOut / bytesRateOut | segment topic TopicStats | dispatch/fanout load (high for topics with many subscriptions) |
The record carries no timestamp of its own: the metadata store's Stat for the znode already exposes creation and last-modified timestamps, and the controller uses the modified timestamp for windowing. A record that still reads "cold" with an old modified time proves the segment has been cold for now − modifiedAt — so split/merge windows derive from the store's Stat with no per-tick history buffer and no extra field.
To avoid rewriting on every minor wobble, the reporter only writes when a rate has moved by more than scalableTopicLoadReportRateChangeThreshold (default 25%) relative to the last value written for that segment. Sampling cadence is scalableTopicLoadReportInterval (default 10s). Both are configurable via broker.conf.
Recall from PIP-468 that scalable-topic subscriptions are STREAM (controller-managed, 1:1 segment↔consumer assignment; covers both StreamConsumer and CheckpointConsumer) or QUEUE (controller-bypassing; every consumer attaches to every segment and the broker round-robins).
| Trigger | STREAM subscriptions | QUEUE subscriptions |
|---|---|---|
| Consumer-count scale-up | Yes — more segments give more 1:1 parallelism | No — queue consumers share segments; more segments don't add parallelism for them |
| Traffic (in/out, msg/bytes) | Yes | Yes — queue traffic still loads the segment's broker and counts toward the per-segment rate |
So a topic with only QUEUE subscriptions never splits on consumer count, but still splits when any segment's in/out rate crosses threshold.
AutoScalePolicyEvaluatorA pure function with no I/O:
decide(layout, loadBySegment, streamConsumerCountBySub, policy, now)
→ Split(segmentId) | Merge(segmentId1, segmentId2) | None
It runs in two passes — split first (short cooldown), then merge (long cooldown) — and emits at most one action per invocation.
Splits fire as soon as conditions are met, bounded by maxSegments, an in-flight-operation guard, and a short splitCooldown (default 1 min). The cooldown is deliberately short: it exists only to coalesce a burst of near-simultaneous triggers — e.g. a group of consumers connecting in rapid succession should cause one split, not N — while still letting a genuinely growing topic split again on the next minute.
if activeSegments >= maxSegments: skip split pass
if now - lastSplitAtMs < splitCooldown: skip split pass
(a) Consumer-driven:
required = max over STREAM subscriptions of consumerCount // per-subscription max
if required > activeSegments:
→ Split(busiest active segment by msgRateIn)
(b) Load-driven (if (a) didn't fire):
candidate segments = active segments where ANY of:
- msgRateIn > splitMsgRateInThreshold
- bytesRateIn > splitBytesRateInThreshold
- msgRateOut > splitMsgRateOutThreshold
- bytesRateOut> splitBytesRateOutThreshold
if candidates non-empty:
→ Split(most-overloaded candidate); set lastSplitAtMs = now
The consumer-driven rule (a) is what the event-driven path evaluates the moment a consumer registers, so a new consumer gets a segment within seconds (subject to splitCooldown). The load-driven rule (b) runs on the periodic tick. Because msgRateIn etc. are already 60-second rolling averages on the broker, a value over threshold already represents sustained load — no extra split window is needed to filter transient spikes.
Merges run only if no split fired this tick, the topic is not within mergeCooldown of its last merge, and the result respects maxDagDepth.
if a split fired this tick: skip merge pass
if now - lastMergeAtMs < mergeCooldown: skip merge pass
if activeSegments <= minSegments: skip merge pass
candidate pairs = adjacent ACTIVE segment pairs where BOTH segments satisfy,
for at least mergeWindow (checked via the store Stat's modified time):
- msgRateIn < mergeMsgRateInThreshold
- bytesRateIn < mergeBytesRateInThreshold
- msgRateOut < mergeMsgRateOutThreshold
- bytesRateOut< mergeBytesRateOutThreshold
AND neither segment's lineage is already at maxDagDepth merges
if candidate pairs non-empty:
→ Merge(coldest pair by combined rate); set lastMergeAtMs = now
Adjacency is required because the existing mergeSegments API only merges hash-range-adjacent active segments.
splitCooldown (default 1 min) that only coalesces bursts. Merges: a longer mergeCooldown (default 5 min) plus a mergeWindow (default 5 min) during which the segment must have stayed cold. A pair must be durably cold to merge, but a segment can split again within a minute of getting hot.maxDagDepth (default 10) caps how many merges a given lineage can accumulate. Once reached, that lineage stops being a merge candidate — but load-driven splits still fire. This bounds the number of split↔merge cycles a hash range can churn through while never blocking a split that throughput requires.Design note — direction of the depth cap. The cap restricts merges, not splits. Splits are needed for correctness/performance and must always be available; merges are the optional efficiency step and are the ones that, combined with splits, could oscillate.
dagDepththerefore counts merges in a segment's lineage, derived from the existingparentIds/childIdsDAG inScalableTopicMetadata— splits do not consume depth budget.
| Cap | Default | Effect |
|---|---|---|
maxSegments | 64 | Splits stop once activeSegments == maxSegments. |
minSegments | 1 | Merges stop once activeSegments == minSegments. |
maxDagDepth | 10 | Merges stop for a lineage at the cap; splits unaffected. |
admin.scalableTopics().splitSegment(...) sets lastSplitAtMs, so a manual split also starts the short auto-split cooldown.admin.scalableTopics().mergeSegments(...) sets lastMergeAtMs, so the operator's manual efficiency action also rate-limits the controller's automatic merges.The controller leader evaluates auto split/merge from two sources:
scheduleAutoScaleTick (separate from the GC tick from PIP-468), default cadence scalableTopicAutoScaleInterval = 60s, evaluates the traffic-driven split rules and the merge pass. Per tick it does one metadata batch-read of the topic's segments/*/load records (or maintains a watch cache), evaluates, and dispatches.Both sources call the same AutoScalePolicyEvaluator; the event-driven path only needs the consumer-count rule, so it is cheap. Both are cancelled on leadership loss / close.
broker.conf)Auto-scaling is default-on cluster-wide; these are the defaults applied to every scalable topic that does not override them.
| Property | Default | Description |
|---|---|---|
scalableTopicAutoScaleEnabled | true | Master switch for auto split/merge. |
scalableTopicAutoScaleInterval | 60s | Periodic (traffic) evaluation cadence. Consumer-count changes are handled event-driven, independent of this. |
scalableTopicMaxSegments | 64 | Hard ceiling on active segments. |
scalableTopicMinSegments | 1 | Hard floor on active segments. |
scalableTopicMaxDagDepth | 10 | Max merges in a lineage before merges are disabled for it. |
scalableTopicSplitCooldown | 1m | Minimum time between automatic splits on a topic (coalesces bursts). |
scalableTopicMergeCooldown | 5m | Minimum time between automatic merges on a topic. |
scalableTopicMergeWindow | 5m | Duration a pair must stay cold before merging. |
scalableTopicSplitMsgRateInThreshold | 10000 | msg/s ingest split trigger. |
scalableTopicSplitBytesRateInThreshold | 50MB | bytes/s ingest split trigger. |
scalableTopicSplitMsgRateOutThreshold | 50000 | msg/s dispatch split trigger. |
scalableTopicSplitBytesRateOutThreshold | 250MB | bytes/s dispatch split trigger. |
scalableTopicMergeMsgRateInThreshold | 1000 | msg/s ingest merge trigger. |
scalableTopicMergeBytesRateInThreshold | 5MB | bytes/s ingest merge trigger. |
scalableTopicMergeMsgRateOutThreshold | 5000 | msg/s dispatch merge trigger. |
scalableTopicMergeBytesRateOutThreshold | 25MB | bytes/s dispatch merge trigger. |
scalableTopicLoadReportInterval | 10s | Segment owner sampling interval. |
scalableTopicLoadReportRateChangeThreshold | 25% | Minimum rate change since the last write that triggers a new SegmentLoadStats write. |
Following the existing autoTopicCreationOverride pattern, an AutoScalePolicyOverride can be set at two levels; resolution is most-specific-wins, falling back to broker.conf:
autoScalePolicy field on ScalableTopicMetadata, set via admin.scalableTopics().setAutoScalePolicy(topic, policy) / getAutoScalePolicy(topic).scalableTopicAutoScalePolicy field on Policies, set via admin.namespaces().setScalableTopicAutoScalePolicy(ns, policy).AutoScalePolicyOverride carries the same knobs as the broker config (all optional; unset fields fall through). Setting enabled = false opts a topic or namespace out entirely.
interface ScalableTopics {
// ... existing ...
void setAutoScalePolicy(String topic, AutoScalePolicyOverride policy) throws PulsarAdminException;
AutoScalePolicyOverride getAutoScalePolicy(String topic) throws PulsarAdminException;
void removeAutoScalePolicy(String topic) throws PulsarAdminException;
}
interface Namespaces {
// ... existing ...
void setScalableTopicAutoScalePolicy(String namespace, AutoScalePolicyOverride policy) ...;
AutoScalePolicyOverride getScalableTopicAutoScalePolicy(String namespace) ...;
void removeScalableTopicAutoScalePolicy(String namespace) ...;
}
| Path | Content | Writer |
|---|---|---|
/topics/{tenant}/{ns}/{topic}/segments/{segmentId}/load | SegmentLoadStats JSON | segment-owning broker |
(autoScalePolicy rides inside the existing ScalableTopicMetadata blob; the namespace override rides inside Policies. No other new paths.)
pulsar_scalable_topic_active_segments, pulsar_scalable_topic_auto_splits_total, pulsar_scalable_topic_auto_merges_total, pulsar_scalable_topic_split_suppressed_max_segments_total, pulsar_scalable_topic_merge_suppressed_max_depth_total.ScalableTopicStats is extended with the most recent SegmentLoadStats per segment and the resolved effective policy, so operators can see why the controller did or did not act.The maxSegments, maxDagDepth, asymmetric cooldown, and threshold-gap guards together bound both the rate and the total amount of structural change a topic can undergo, so enabling auto split/merge cannot cause unbounded segment growth or split/merge storms.
Operators who want manual-only control set scalableTopicAutoScaleEnabled=false (cluster) or an enabled=false override (namespace/topic).
Compatibility: scalable topics are a new, as-yet-unreleased feature (PIP-460), so there is no backward/forward compatibility to consider —
SegmentLoadStats, the policy fields, and the config knobs all ship together with the rest of the scalable-topic feature.
setAutoScalePolicy / getAutoScalePolicy (topic and namespace variants) require the same admin permissions as the corresponding existing scalable-topic and namespace policy operations. SegmentLoadStats is written by brokers via their authenticated internal identity and is not client-writable.