Back to Pulsar

PIP-483: Scalable Topic Auto Split/Merge

pip/pip-483.md

5.0.0-M119.0 KB
Original Source

PIP-483: Scalable Topic Auto Split/Merge

Sub-PIP of PIP-468: Scalable Topic Controller

Motivation

PIP-468 gives the Scalable Topic Controller the ability to split and merge segments, but only on explicit operator request via the admin API. An operator has to watch the topic, decide it is hot, and issue a split — and later notice it has gone cold and issue a merge. This is the same operational toil that partition-count management imposes on classic Pulsar topics, which scalable topics were meant to eliminate.

This PIP adds an auto-scaling policy to the controller: the controller leader observes per-segment load and per-subscription consumer pressure, and autonomously splits hot segments and merges cold ones, within hard caps that prevent runaway growth and split/merge flip-flopping.

The design is built around three principles that came out of the design discussion:

  1. Splits are fast; merges are lazy. A split protects throughput and latency under load, so it fires quickly — with only a short cooldown to coalesce bursts of near-simultaneous triggers (e.g. a group of consumers connecting in rapid succession). A merge is purely an efficiency reclaim, so it can wait, be rate-limited, and be skipped when in doubt.
  2. The controller reacts, it does not poll. New stream/checkpoint consumers register directly with the controller, so consumer-count changes are handled event-driven within seconds. Load data is pushed into the metadata store by each segment's owning broker (only when it changes materially) and read by the controller leader. The controller never fans out RPCs to segment owners.
  3. The decision is a pure function. Given a snapshot of load + layout + policy, the split/merge decision is deterministic and unit-testable in isolation from all I/O.

Goals

  • Automatically increase segment count when a topic is under ingest/dispatch load or has more stream/checkpoint consumers than segments.
  • Automatically decrease segment count when load subsides, reclaiming broker resources.
  • Bound growth (maxSegments) and bound split↔merge churn (maxDagDepth, asymmetric cooldown).
  • Default-on cluster-wide, with per-namespace and per-topic overrides, following Pulsar's existing policy-resolution conventions.

Non-goals

  • Broker placement / rebalancing. Which broker owns a segment's bundle is the load balancer's job; this PIP only changes how many segments exist.
  • Key-aware or non-midpoint splits. Splits use the existing midpoint-split mechanism from PIP-468.
  • Cross-topic global optimization. Each topic's controller decides independently.

Design

Overview

┌────────────────────────────────────────────────────────────────┐
│ Segment-owning broker (per active segment)                      │
│   SegmentLoadReporter                                            │
│   - samples the segment topic's TopicStats                      │
│   - writes SegmentLoadStats to metadata ONLY on material change │
└───────────────────────────────┬────────────────────────────────┘
                                 │ (metadata store, push-on-change)
                                 ▼
┌────────────────────────────────────────────────────────────────┐
│ Controller leader (per scalable topic)                          │
│                                                                  │
│  Event-driven — within seconds:                                 │
│    on STREAM/CHECKPOINT consumer register/unregister            │
│      (consumers already register with the controller — no poll) │
│      → evaluate the consumer-count split rule immediately       │
│                                                                  │
│  Periodic AutoScaleTick — traffic, default 60s:                 │
│    1. read SegmentLoadStats for all active segments             │
│    2. AutoScalePolicyEvaluator.decide(...) → Split|Merge|None   │
│    3. dispatch to existing splitSegment / mergeSegments         │
└────────────────────────────────────────────────────────────────┘

The two trigger sources reflect their different latency needs: a new consumer should get its own segment within seconds, so it is handled the instant the consumer registers with the controller; traffic shifts up or down over a minute or more, so they are evaluated on a slower periodic tick. The only new persistent state is SegmentLoadStats. The split/merge mechanics are entirely reused from PIP-468.

Load reporting: push-to-metadata, not pull-per-tick

Each segment's owning broker runs a SegmentLoadReporter for every ACTIVE segment:// topic it hosts. The broker writes SegmentLoadStats directly to the metadata store — it already has the rates in memory, so there is no REST round-trip and no controller-initiated pull. On a fixed sampling interval it compares the current rates to the last ones written and writes only when a rate changes by more than a significant threshold (default ±25%) since the last write. A steady-state segment writes once and then goes silent, keeping metadata write volume bounded regardless of traffic.

SegmentLoadStats (new metadata record)

Stored at /topics/{tenant}/{ns}/{topic}/segments/{segmentId}/load:

json
{
  "msgRateIn": 12000.0,
  "bytesRateIn": 64000000.0,
  "msgRateOut": 48000.0,
  "bytesRateOut": 256000000.0
}
FieldSource on the owning brokerMeaning for auto split/merge
msgRateIn / bytesRateInsegment topic TopicStats (60s rolling)ingest load
msgRateOut / bytesRateOutsegment topic TopicStatsdispatch/fanout load (high for topics with many subscriptions)

The record carries no timestamp of its own: the metadata store's Stat for the znode already exposes creation and last-modified timestamps, and the controller uses the modified timestamp for windowing. A record that still reads "cold" with an old modified time proves the segment has been cold for now − modifiedAt — so split/merge windows derive from the store's Stat with no per-tick history buffer and no extra field.

Significant-change threshold

To avoid rewriting on every minor wobble, the reporter only writes when a rate has moved by more than scalableTopicLoadReportRateChangeThreshold (default 25%) relative to the last value written for that segment. Sampling cadence is scalableTopicLoadReportInterval (default 10s). Both are configurable via broker.conf.

Subscription types and what each load type drives

Recall from PIP-468 that scalable-topic subscriptions are STREAM (controller-managed, 1:1 segment↔consumer assignment; covers both StreamConsumer and CheckpointConsumer) or QUEUE (controller-bypassing; every consumer attaches to every segment and the broker round-robins).

TriggerSTREAM subscriptionsQUEUE subscriptions
Consumer-count scale-upYes — more segments give more 1:1 parallelismNo — queue consumers share segments; more segments don't add parallelism for them
Traffic (in/out, msg/bytes)YesYes — queue traffic still loads the segment's broker and counts toward the per-segment rate

So a topic with only QUEUE subscriptions never splits on consumer count, but still splits when any segment's in/out rate crosses threshold.

The decision: AutoScalePolicyEvaluator

A pure function with no I/O:

decide(layout, loadBySegment, streamConsumerCountBySub, policy, now)
    → Split(segmentId) | Merge(segmentId1, segmentId2) | None

It runs in two passes — split first (short cooldown), then merge (long cooldown) — and emits at most one action per invocation.

Pass 1 — SPLIT (fast, lightly coalesced)

Splits fire as soon as conditions are met, bounded by maxSegments, an in-flight-operation guard, and a short splitCooldown (default 1 min). The cooldown is deliberately short: it exists only to coalesce a burst of near-simultaneous triggers — e.g. a group of consumers connecting in rapid succession should cause one split, not N — while still letting a genuinely growing topic split again on the next minute.

if activeSegments >= maxSegments: skip split pass
if now - lastSplitAtMs < splitCooldown: skip split pass

(a) Consumer-driven:
      required = max over STREAM subscriptions of consumerCount   // per-subscription max
      if required > activeSegments:
          → Split(busiest active segment by msgRateIn)

(b) Load-driven (if (a) didn't fire):
      candidate segments = active segments where ANY of:
        - msgRateIn   > splitMsgRateInThreshold
        - bytesRateIn > splitBytesRateInThreshold
        - msgRateOut  > splitMsgRateOutThreshold
        - bytesRateOut> splitBytesRateOutThreshold
      if candidates non-empty:
          → Split(most-overloaded candidate); set lastSplitAtMs = now

The consumer-driven rule (a) is what the event-driven path evaluates the moment a consumer registers, so a new consumer gets a segment within seconds (subject to splitCooldown). The load-driven rule (b) runs on the periodic tick. Because msgRateIn etc. are already 60-second rolling averages on the broker, a value over threshold already represents sustained load — no extra split window is needed to filter transient spikes.

Pass 2 — MERGE (lazy, rate-limited)

Merges run only if no split fired this tick, the topic is not within mergeCooldown of its last merge, and the result respects maxDagDepth.

if a split fired this tick: skip merge pass
if now - lastMergeAtMs < mergeCooldown: skip merge pass
if activeSegments <= minSegments: skip merge pass

candidate pairs = adjacent ACTIVE segment pairs where BOTH segments satisfy,
                  for at least mergeWindow (checked via the store Stat's modified time):
    - msgRateIn   < mergeMsgRateInThreshold
    - bytesRateIn < mergeBytesRateInThreshold
    - msgRateOut  < mergeMsgRateOutThreshold
    - bytesRateOut< mergeBytesRateOutThreshold
  AND neither segment's lineage is already at maxDagDepth merges

if candidate pairs non-empty:
    → Merge(coldest pair by combined rate); set lastMergeAtMs = now

Adjacency is required because the existing mergeSegments API only merges hash-range-adjacent active segments.

Anti-flip-flop: three independent guards

  1. Threshold gap (hysteresis). Split thresholds are well above merge thresholds for every metric. The dead-band between them is what prevents a just-merged segment from immediately re-qualifying for a split.
  2. Asymmetric cooldown. Splits: a short splitCooldown (default 1 min) that only coalesces bursts. Merges: a longer mergeCooldown (default 5 min) plus a mergeWindow (default 5 min) during which the segment must have stayed cold. A pair must be durably cold to merge, but a segment can split again within a minute of getting hot.
  3. Max DAG depth on merges. maxDagDepth (default 10) caps how many merges a given lineage can accumulate. Once reached, that lineage stops being a merge candidate — but load-driven splits still fire. This bounds the number of split↔merge cycles a hash range can churn through while never blocking a split that throughput requires.

Design note — direction of the depth cap. The cap restricts merges, not splits. Splits are needed for correctness/performance and must always be available; merges are the optional efficiency step and are the ones that, combined with splits, could oscillate. dagDepth therefore counts merges in a segment's lineage, derived from the existing parentIds/childIds DAG in ScalableTopicMetadata — splits do not consume depth budget.

Caps

CapDefaultEffect
maxSegments64Splits stop once activeSegments == maxSegments.
minSegments1Merges stop once activeSegments == minSegments.
maxDagDepth10Merges stop for a lineage at the cap; splits unaffected.

Manual operations and cooldown

  • Manual admin.scalableTopics().splitSegment(...) sets lastSplitAtMs, so a manual split also starts the short auto-split cooldown.
  • Manual admin.scalableTopics().mergeSegments(...) sets lastMergeAtMs, so the operator's manual efficiency action also rate-limits the controller's automatic merges.

Evaluation triggers

The controller leader evaluates auto split/merge from two sources:

  • Event-driven (within seconds) — when a STREAM/CHECKPOINT consumer registers with or unregisters from the controller, it immediately evaluates the consumer-count split rule. No polling: consumer registration already flows through the controller (PIP-468).
  • Periodic tick — a scheduleAutoScaleTick (separate from the GC tick from PIP-468), default cadence scalableTopicAutoScaleInterval = 60s, evaluates the traffic-driven split rules and the merge pass. Per tick it does one metadata batch-read of the topic's segments/*/load records (or maintains a watch cache), evaluates, and dispatches.

Both sources call the same AutoScalePolicyEvaluator; the event-driven path only needs the consumer-count rule, so it is cheap. Both are cancelled on leadership loss / close.


Public-Facing Changes

Configuration (broker.conf)

Auto-scaling is default-on cluster-wide; these are the defaults applied to every scalable topic that does not override them.

PropertyDefaultDescription
scalableTopicAutoScaleEnabledtrueMaster switch for auto split/merge.
scalableTopicAutoScaleInterval60sPeriodic (traffic) evaluation cadence. Consumer-count changes are handled event-driven, independent of this.
scalableTopicMaxSegments64Hard ceiling on active segments.
scalableTopicMinSegments1Hard floor on active segments.
scalableTopicMaxDagDepth10Max merges in a lineage before merges are disabled for it.
scalableTopicSplitCooldown1mMinimum time between automatic splits on a topic (coalesces bursts).
scalableTopicMergeCooldown5mMinimum time between automatic merges on a topic.
scalableTopicMergeWindow5mDuration a pair must stay cold before merging.
scalableTopicSplitMsgRateInThreshold10000msg/s ingest split trigger.
scalableTopicSplitBytesRateInThreshold50MBbytes/s ingest split trigger.
scalableTopicSplitMsgRateOutThreshold50000msg/s dispatch split trigger.
scalableTopicSplitBytesRateOutThreshold250MBbytes/s dispatch split trigger.
scalableTopicMergeMsgRateInThreshold1000msg/s ingest merge trigger.
scalableTopicMergeBytesRateInThreshold5MBbytes/s ingest merge trigger.
scalableTopicMergeMsgRateOutThreshold5000msg/s dispatch merge trigger.
scalableTopicMergeBytesRateOutThreshold25MBbytes/s dispatch merge trigger.
scalableTopicLoadReportInterval10sSegment owner sampling interval.
scalableTopicLoadReportRateChangeThreshold25%Minimum rate change since the last write that triggers a new SegmentLoadStats write.

Policy resolution (namespace + topic overrides)

Following the existing autoTopicCreationOverride pattern, an AutoScalePolicyOverride can be set at two levels; resolution is most-specific-wins, falling back to broker.conf:

  1. Per-topic — a new autoScalePolicy field on ScalableTopicMetadata, set via admin.scalableTopics().setAutoScalePolicy(topic, policy) / getAutoScalePolicy(topic).
  2. Per-namespace — a new scalableTopicAutoScalePolicy field on Policies, set via admin.namespaces().setScalableTopicAutoScalePolicy(ns, policy).

AutoScalePolicyOverride carries the same knobs as the broker config (all optional; unset fields fall through). Setting enabled = false opts a topic or namespace out entirely.

Admin Client API

java
interface ScalableTopics {
    // ... existing ...
    void setAutoScalePolicy(String topic, AutoScalePolicyOverride policy) throws PulsarAdminException;
    AutoScalePolicyOverride getAutoScalePolicy(String topic) throws PulsarAdminException;
    void removeAutoScalePolicy(String topic) throws PulsarAdminException;
}

interface Namespaces {
    // ... existing ...
    void setScalableTopicAutoScalePolicy(String namespace, AutoScalePolicyOverride policy) ...;
    AutoScalePolicyOverride getScalableTopicAutoScalePolicy(String namespace) ...;
    void removeScalableTopicAutoScalePolicy(String namespace) ...;
}

Metadata Store Paths

PathContentWriter
/topics/{tenant}/{ns}/{topic}/segments/{segmentId}/loadSegmentLoadStats JSONsegment-owning broker

(autoScalePolicy rides inside the existing ScalableTopicMetadata blob; the namespace override rides inside Policies. No other new paths.)

Observability

  • New per-topic metrics: pulsar_scalable_topic_active_segments, pulsar_scalable_topic_auto_splits_total, pulsar_scalable_topic_auto_merges_total, pulsar_scalable_topic_split_suppressed_max_segments_total, pulsar_scalable_topic_merge_suppressed_max_depth_total.
  • The existing ScalableTopicStats is extended with the most recent SegmentLoadStats per segment and the resolved effective policy, so operators can see why the controller did or did not act.

Operational Safety

The maxSegments, maxDagDepth, asymmetric cooldown, and threshold-gap guards together bound both the rate and the total amount of structural change a topic can undergo, so enabling auto split/merge cannot cause unbounded segment growth or split/merge storms.

Operators who want manual-only control set scalableTopicAutoScaleEnabled=false (cluster) or an enabled=false override (namespace/topic).

Compatibility: scalable topics are a new, as-yet-unreleased feature (PIP-460), so there is no backward/forward compatibility to consider — SegmentLoadStats, the policy fields, and the config knobs all ship together with the rest of the scalable-topic feature.


Security Considerations

setAutoScalePolicy / getAutoScalePolicy (topic and namespace variants) require the same admin permissions as the corresponding existing scalable-topic and namespace policy operations. SegmentLoadStats is written by brokers via their authenticated internal identity and is not client-writable.