Back to Pulsar

PIP-469: Legacy-aware topic policies backend routing and metadata-store topic policies

pip/pip-469.md

5.0.0-M115.4 KB
Original Source

PIP-469: Legacy-aware topic policies backend routing and metadata-store topic policies

Background knowledge

Apache Pulsar introduced topic-level policies in PIP-39. A broker reads and writes these policies through TopicPoliciesService. The default implementation, SystemTopicBasedTopicPoliciesService, persists topic policy changes in the namespace __change_events system topic and keeps an in-memory cache on brokers that own bundles for that namespace.

PIP-92 extended topic policies with the distinction between local and global policies. Any TopicPoliciesService implementation therefore needs to handle two independent values for the same topic: the cluster-local policy state and the globally visible policy state.

PIP-376 made TopicPoliciesService pluggable through the broker configuration topicPoliciesServiceClassName. That change removed the hard coupling between topic policies and system topics, but the backend choice is still broker-wide. During upgrade from the default system-topic backend, brokers still need a way to recognize namespaces that already have topic-policies state in __change_events, so those namespaces do not silently move to another backend.

Motivation

The system-topic-based topic policies implementation works by appending topic policy changes to a __change_events topic in each namespace. It works well when this topic has already been loaded by a broker, then all topic policies operations just access the in-memory cache. However, in cold start scenarios, for example when the owner broker is down during a restart, the new owner broker has to create a reader on the __change_events topic and wait for it to catch up before it can read any topic policies, which is required in the path of loading a topic in the same namespace. This adds significant latency to the topic load path, especially before the topic is compacted.

Things become worse when many __change_events topics move to a restarting broker. The new owner broker has to create many readers and replay all messages on these topics. This leads to high pressure on BookKeeper and can cause Too many requests on the same bookie errors in GetLastMessageId RPCs.

A metadata-store-backed topic policies backend is attractive because it removes the extra lifecycle and operational dependency of a dedicated __change_events topic. A metadata-cache-based implementation can still provide caching and change notifications, while avoiding the cold-start latency of waiting for a system-topic reader to initialize and catch up.

There is a second operational requirement: operators need a safe gradual rollout path. Existing namespaces that already have topic-policies state in __change_events must stay on the system-topic backend, while newly created namespaces should be able to use the broker-configured backend. This does not require a new namespace policy. For the upgrade case from the default configuration, the existence of __change_events is already a conservative legacy marker.

Goals

In Scope

  • Add a metadata-store-backed TopicPoliciesService implementation that does not depend on system topics.
  • Add routing logic that forces the system-topic backend for namespaces that already have __change_events.
  • Keep using the broker-level topicPoliciesServiceClassName for namespaces that do not have __change_events, including newly created namespaces.

Out of Scope

  • Adding a migration framework that moves topic policies data between backends automatically.

High Level Design

When topic-level policies are enabled, the broker instantiates a LegacyAwareTopicPoliciesService instead of using the configured implementation directly.

The wrapper always has access to two backends:

  • SystemTopicBasedTopicPoliciesService
  • The broker-configured topicPoliciesServiceClassName

For each namespace, the wrapper checks whether the topic-policies system topic persistent://{tenant}/{namespace}/__change_events already exists:

  • If it exists, the namespace is treated as a legacy system-topic namespace and all topic-policies operations are routed to SystemTopicBasedTopicPoliciesService.
  • If it does not exist, the namespace uses the broker-configured topicPoliciesServiceClassName.

This rule is intentionally conservative. If __change_events exists, the broker assumes that namespace may already contain topic-policies state in the system-topic backend and therefore must not be moved implicitly.

This proposal also introduces MetadataStoreTopicPoliciesService, a concrete TopicPoliciesService implementation that stores topic policies in dedicated metadata-store paths:

  • Global topic policies are stored in the configuration metadata store.
  • Local topic policies are stored in the local metadata store.

This keeps the storage scope aligned with the semantics introduced by PIP-92 and avoids writing topic policies through managed-ledger metadata side effects.

Detailed Design

Design & Implementation Details

Startup and validation

PulsarService#initTopicPoliciesService() continues to respect topicLevelPoliciesEnabled. When topic-level policies are disabled, behavior is unchanged and TopicPoliciesService.DISABLED is used.

When topic-level policies are enabled, the broker constructs:

java
new LegacyAwareTopicPoliciesService(
        this,
        new SystemTopicBasedTopicPoliciesService(this),
        configuredTopicPoliciesService)

Broker startup validates both backends:

  • SystemTopicBasedTopicPoliciesService must be instantiable.
  • The configured topicPoliciesServiceClassName must be instantiable.

LegacyAwareTopicPoliciesService#start starts only the configured backend. It intentionally does not call SystemTopicBasedTopicPoliciesService#start, because that start path registers a namespace-bundle ownership listener whose only purpose is to eagerly create a reader on <namespace>/__change_events when a namespace bundle is loaded. Under legacy-aware routing, that eager optimization would be counterproductive because it can create readers for namespaces that do not have topic policies in __change_events. For legacy namespaces, the system-topic reader and policy cache are initialized lazily by the routed system-topic backend operations.

If either backend cannot be instantiated, or if the configured backend cannot be started, broker startup fails. There is no per-request fallback from one backend to another.

Namespace-scoped service routing

LegacyAwareTopicPoliciesService is responsible for:

  • Checking whether __change_events exists for the namespace by using NamespaceEventsSystemTopicFactory.checkSystemTopicExists(namespace, EventType.TOPIC_POLICY, pulsarService).
  • Routing getTopicPoliciesAsync, updateTopicPoliciesAsync, deleteTopicPoliciesAsync, and listener operations to the system-topic backend when the system topic exists.
  • Routing the same operations to the configured backend when the system topic does not exist.

Listener registration is routed through TopicPoliciesService#registerListenerAsync. This lets the wrapper resolve the namespace backend before registering the listener, and the listener is registered only on the selected backend instead of being registered on both backends.

The system-topic existence check can be cached per namespace in memory, but the routing rule is defined by actual topic existence rather than by new namespace metadata.

This means:

  • Existing namespaces that already materialized __change_events continue to use the system-topic backend.
  • Namespaces that never created __change_events use the broker-configured backend.
  • Newly created namespaces use the broker-configured backend because __change_events does not exist yet.

If __change_events is later deleted, the namespace falls back to the broker-configured backend on subsequent resolution. This matches current system-topic behavior, which already treats a missing __change_events topic as meaning the system-topic-backed topic-policies state is gone.

Metadata-backed topic policies service

MetadataStoreTopicPoliciesService implements TopicPoliciesService with the following storage model:

  • Topic names are normalized to the partitioned topic name, so all partitions share the same topic-policies record.
  • Global policies are stored in the configuration metadata store path: /admin/topic-policies/global/{tenant}/{namespace}/{domain}/{encodedTopic}.
  • Local policies are stored in the local metadata store path: /admin/topic-policies/local/{tenant}/{namespace}/{domain}/{encodedTopic}.

To avoid possible conflicts like the listener registered on the /admin/local-policies path from BrokerService#handleMetadataChanges, these two paths share the same root path /admin/topic-policies, which is not used by any other component.

Each node stores a serialized TopicPolicies document. The backend writes and reads the two scopes independently:

  • Reads with GetType.GLOBAL_ONLY only touch the global path and return a TopicPolicies object whose isGlobal flag is true.
  • Reads with GetType.LOCAL_ONLY only touch the local path and return a TopicPolicies object whose isGlobal flag is false.
  • Updates with isGlobalPolicy=true only modify the global path.
  • Updates with isGlobalPolicy=false only modify the local path.

Deletes remove the local record and, unless keepGlobalPoliciesAfterDeleting is set, also remove the global record. This matches the existing TopicPoliciesService deletion contract.

This design intentionally uses dedicated metadata nodes instead of piggybacking on PartitionedTopicMetadata or ManagedLedgerInfo. That keeps local/global visibility correct and avoids losing topic policies during normal managed-ledger metadata updates.

Listener behavior

TopicPoliciesService adds registerListenerAsync(TopicName, TopicPolicyListener) for listener registration. The existing synchronous registerListener(TopicName, TopicPolicyListener) method is retained as a deprecated compatibility hook for existing custom implementations, and the default async method delegates to it. Implementations that need async routing or initialization, such as LegacyAwareTopicPoliciesService, override registerListenerAsync directly.

The backend registers watchers on both metadata stores:

  • A change on the local path re-reads the local node and notifies listeners with the latest local TopicPolicies or null if the local node was removed.
  • A change on the global path re-reads the global node and notifies listeners with the latest global TopicPolicies or null if the global node was removed.

This preserves runtime updates for already loaded topics, including global topic policies. The backend does not add an append-only replay log; it relies on metadata-store notifications and read-after-notify refresh.

Public-facing Changes

Public API

The TopicPoliciesService extension point gains a default CompletableFuture<Boolean> registerListenerAsync(TopicName, TopicPolicyListener) method. Existing implementations remain compatible because registerListener(TopicName, TopicPolicyListener) is retained and used by the default async implementation.

No new namespace policy field is introduced.

No new namespace admin REST endpoint or Java admin client method is introduced.

Changing the topic-policies backend for a namespace is not a public operation in this proposal. The routing rule is derived from __change_events existence plus the broker-level configuration.

Binary protocol

No binary protocol changes.

Configuration

  • topicPoliciesServiceClassName
    • Continues to define the broker-configured TopicPoliciesService implementation.
    • Namespaces that do not have __change_events use this backend.
    • Namespaces that already have __change_events keep using SystemTopicBasedTopicPoliciesService regardless of this value.

CLI

No CLI change in this proposal.

Metrics

No new metric is required.

Backward & Forward Compatibility

Upgrade

The intended upgrade flow is:

  1. Upgrade brokers to a version that understands legacy-aware backend routing.
  2. Change topicPoliciesServiceClassName to the alternate backend if newly created namespaces should use it.
  3. Existing namespaces that already have __change_events continue to use SystemTopicBasedTopicPoliciesService.
  4. Namespaces that do not have __change_events, including newly created namespaces, use the configured backend.

No namespace metadata backfill is required.

This upgrade rule is intentionally conservative:

  • If __change_events exists, the namespace stays on the system-topic backend.
  • If __change_events does not exist, the namespace uses the configured backend.

This means some namespaces with an empty but already-created __change_events topic may continue using the system-topic backend. That is acceptable because it avoids missing legacy state.

Existing custom TopicPoliciesService implementations that only implement the synchronous registerListener method continue to work through the default registerListenerAsync bridge. Implementations can override registerListenerAsync when registration itself needs asynchronous backend resolution or initialization.

Downgrade / Rollback

Rolling back to a broker version that does not understand legacy-aware routing returns topic-policies backend selection to pure broker-wide behavior.

  • The older broker will no longer special-case namespaces that have __change_events.
  • Operators will need to choose one broker-wide backend for the rollback cluster, or migrate data before rollback if both legacy system-topic namespaces and metadata-store namespaces must coexist.

Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations

This proposal does not introduce a new geo-replication protocol for topic policies.

  • Global topic policies stay in the configuration metadata store and therefore keep global visibility semantics.
  • Local topic policies stay in the local metadata store and therefore keep cluster-local visibility semantics.
  • Legacy namespaces are recognized by the existence of __change_events, which is already shared broker-visible topic metadata.

Alternatives

Keep a single broker-wide topic policies backend

This keeps the implementation simpler, but it does not solve the operational requirement to keep existing namespaces on their current backend while directing newly created namespaces to a different one.

Persist an explicit namespace backend marker

This would also solve the upgrade problem, but it introduces new namespace-scoped metadata changes that are not necessary for the default-system-topic upgrade path. The proposal prefers to reuse the already existing __change_events artifact as the legacy marker.

Add a user-managed namespace override API

This provides more flexibility than needed, but it also reintroduces runtime switching, rollback ambiguity, and the risk of one namespace being served by different backends if brokers do not resolve the override identically. The proposal intentionally avoids this surface.

General Notes

This proposal is a follow-up to PIP-376. It keeps backend selection pluggable, but handles upgrade from the legacy system-topic backend by reusing __change_events as the compatibility marker instead of introducing a new namespace-level policy or namespace-level metadata field.

Links