pip/pip-469.md
Apache Pulsar introduced topic-level policies in PIP-39. A broker reads and writes these policies through
TopicPoliciesService. The default implementation,
SystemTopicBasedTopicPoliciesService, persists topic policy changes in the namespace __change_events system topic and
keeps an in-memory cache on brokers that own bundles for that namespace.
PIP-92 extended topic policies with the distinction between local and global policies. Any
TopicPoliciesService implementation therefore needs to handle two independent values for the same topic: the
cluster-local policy state and the globally visible policy state.
PIP-376 made TopicPoliciesService pluggable through the broker configuration
topicPoliciesServiceClassName. That change removed the hard coupling between topic policies and system topics, but the
backend choice is still broker-wide. During upgrade from the default system-topic backend, brokers still need a way to
recognize namespaces that already have topic-policies state in __change_events, so those namespaces do not silently
move to another backend.
The system-topic-based topic policies implementation works by appending topic policy changes to a __change_events
topic in each namespace. It works well when this topic has already been loaded by a broker, then all topic policies
operations just access the in-memory cache. However, in cold start scenarios, for example when the owner broker is down
during a restart, the new owner broker has to create a reader on the __change_events topic and wait for it to catch
up before it can read any topic policies, which is required in the path of loading a topic in the same namespace. This
adds significant latency to the topic load path, especially before the topic is compacted.
Things become worse when many __change_events topics move to a restarting broker. The new owner broker has to create
many readers and replay all messages on these topics. This leads to high pressure on BookKeeper and can cause
Too many requests on the same bookie errors in GetLastMessageId RPCs.
A metadata-store-backed topic policies backend is attractive because it removes the extra lifecycle and operational
dependency of a dedicated __change_events topic. A metadata-cache-based implementation can still provide caching and
change notifications, while avoiding the cold-start latency of waiting for a system-topic reader to initialize and
catch up.
There is a second operational requirement: operators need a safe gradual rollout path. Existing namespaces that already
have topic-policies state in __change_events must stay on the system-topic backend, while newly created namespaces
should be able to use the broker-configured backend. This does not require a new namespace policy. For the upgrade case
from the default configuration, the existence of __change_events is already a conservative legacy marker.
TopicPoliciesService implementation that does not depend on system topics.__change_events.topicPoliciesServiceClassName for namespaces that do not have __change_events,
including newly created namespaces.When topic-level policies are enabled, the broker instantiates a LegacyAwareTopicPoliciesService instead of using the
configured implementation directly.
The wrapper always has access to two backends:
SystemTopicBasedTopicPoliciesServicetopicPoliciesServiceClassNameFor each namespace, the wrapper checks whether the topic-policies system topic persistent://{tenant}/{namespace}/__change_events
already exists:
SystemTopicBasedTopicPoliciesService.topicPoliciesServiceClassName.This rule is intentionally conservative. If __change_events exists, the broker assumes that namespace may already
contain topic-policies state in the system-topic backend and therefore must not be moved implicitly.
This proposal also introduces MetadataStoreTopicPoliciesService, a concrete TopicPoliciesService implementation
that stores topic policies in dedicated metadata-store paths:
This keeps the storage scope aligned with the semantics introduced by PIP-92 and avoids writing topic policies through managed-ledger metadata side effects.
PulsarService#initTopicPoliciesService() continues to respect topicLevelPoliciesEnabled. When topic-level policies
are disabled, behavior is unchanged and TopicPoliciesService.DISABLED is used.
When topic-level policies are enabled, the broker constructs:
new LegacyAwareTopicPoliciesService(
this,
new SystemTopicBasedTopicPoliciesService(this),
configuredTopicPoliciesService)
Broker startup validates both backends:
SystemTopicBasedTopicPoliciesService must be instantiable.topicPoliciesServiceClassName must be instantiable.LegacyAwareTopicPoliciesService#start starts only the configured backend. It intentionally does not call
SystemTopicBasedTopicPoliciesService#start, because that start path registers a namespace-bundle ownership listener
whose only purpose is to eagerly create a reader on <namespace>/__change_events when a namespace bundle is loaded.
Under legacy-aware routing, that eager optimization would be counterproductive because it can create readers for
namespaces that do not have topic policies in __change_events. For legacy namespaces, the system-topic reader and
policy cache are initialized lazily by the routed system-topic backend operations.
If either backend cannot be instantiated, or if the configured backend cannot be started, broker startup fails. There is no per-request fallback from one backend to another.
LegacyAwareTopicPoliciesService is responsible for:
__change_events exists for the namespace by using
NamespaceEventsSystemTopicFactory.checkSystemTopicExists(namespace, EventType.TOPIC_POLICY, pulsarService).getTopicPoliciesAsync, updateTopicPoliciesAsync, deleteTopicPoliciesAsync, and listener operations to
the system-topic backend when the system topic exists.Listener registration is routed through TopicPoliciesService#registerListenerAsync. This lets the wrapper resolve the
namespace backend before registering the listener, and the listener is registered only on the selected backend instead
of being registered on both backends.
The system-topic existence check can be cached per namespace in memory, but the routing rule is defined by actual topic existence rather than by new namespace metadata.
This means:
__change_events continue to use the system-topic backend.__change_events use the broker-configured backend.__change_events does not exist yet.If __change_events is later deleted, the namespace falls back to the broker-configured backend on subsequent
resolution. This matches current system-topic behavior, which already treats a missing __change_events topic as
meaning the system-topic-backed topic-policies state is gone.
MetadataStoreTopicPoliciesService implements TopicPoliciesService with the following storage model:
/admin/topic-policies/global/{tenant}/{namespace}/{domain}/{encodedTopic}./admin/topic-policies/local/{tenant}/{namespace}/{domain}/{encodedTopic}.To avoid possible conflicts like the listener registered on the /admin/local-policies path from
BrokerService#handleMetadataChanges, these two paths share the same root path /admin/topic-policies, which is not
used by any other component.
Each node stores a serialized TopicPolicies document. The backend writes and reads the two scopes independently:
GetType.GLOBAL_ONLY only touch the global path and return a TopicPolicies object whose isGlobal
flag is true.GetType.LOCAL_ONLY only touch the local path and return a TopicPolicies object whose isGlobal flag
is false.isGlobalPolicy=true only modify the global path.isGlobalPolicy=false only modify the local path.Deletes remove the local record and, unless keepGlobalPoliciesAfterDeleting is set, also remove the global record.
This matches the existing TopicPoliciesService deletion contract.
This design intentionally uses dedicated metadata nodes instead of piggybacking on PartitionedTopicMetadata or
ManagedLedgerInfo. That keeps local/global visibility correct and avoids losing topic policies during normal
managed-ledger metadata updates.
TopicPoliciesService adds registerListenerAsync(TopicName, TopicPolicyListener) for listener registration. The
existing synchronous registerListener(TopicName, TopicPolicyListener) method is retained as a deprecated compatibility
hook for existing custom implementations, and the default async method delegates to it. Implementations that need async
routing or initialization, such as LegacyAwareTopicPoliciesService, override registerListenerAsync directly.
The backend registers watchers on both metadata stores:
TopicPolicies or
null if the local node was removed.TopicPolicies
or null if the global node was removed.This preserves runtime updates for already loaded topics, including global topic policies. The backend does not add an append-only replay log; it relies on metadata-store notifications and read-after-notify refresh.
The TopicPoliciesService extension point gains a default
CompletableFuture<Boolean> registerListenerAsync(TopicName, TopicPolicyListener) method. Existing implementations
remain compatible because registerListener(TopicName, TopicPolicyListener) is retained and used by the default async
implementation.
No new namespace policy field is introduced.
No new namespace admin REST endpoint or Java admin client method is introduced.
Changing the topic-policies backend for a namespace is not a public operation in this proposal. The routing rule is
derived from __change_events existence plus the broker-level configuration.
No binary protocol changes.
topicPoliciesServiceClassName
TopicPoliciesService implementation.__change_events use this backend.__change_events keep using SystemTopicBasedTopicPoliciesService regardless of
this value.No CLI change in this proposal.
No new metric is required.
The intended upgrade flow is:
topicPoliciesServiceClassName to the alternate backend if newly created namespaces should use it.__change_events continue to use SystemTopicBasedTopicPoliciesService.__change_events, including newly created namespaces, use the configured backend.No namespace metadata backfill is required.
This upgrade rule is intentionally conservative:
__change_events exists, the namespace stays on the system-topic backend.__change_events does not exist, the namespace uses the configured backend.This means some namespaces with an empty but already-created __change_events topic may continue using the
system-topic backend. That is acceptable because it avoids missing legacy state.
Existing custom TopicPoliciesService implementations that only implement the synchronous registerListener method
continue to work through the default registerListenerAsync bridge. Implementations can override
registerListenerAsync when registration itself needs asynchronous backend resolution or initialization.
Rolling back to a broker version that does not understand legacy-aware routing returns topic-policies backend selection to pure broker-wide behavior.
__change_events.This proposal does not introduce a new geo-replication protocol for topic policies.
__change_events, which is already shared broker-visible topic
metadata.This keeps the implementation simpler, but it does not solve the operational requirement to keep existing namespaces on their current backend while directing newly created namespaces to a different one.
This would also solve the upgrade problem, but it introduces new namespace-scoped metadata changes that are not
necessary for the default-system-topic upgrade path. The proposal prefers to reuse the already existing
__change_events artifact as the legacy marker.
This provides more flexibility than needed, but it also reintroduces runtime switching, rollback ambiguity, and the risk of one namespace being served by different backends if brokers do not resolve the override identically. The proposal intentionally avoids this surface.
This proposal is a follow-up to PIP-376. It keeps backend selection pluggable, but handles upgrade from
the legacy system-topic backend by reusing __change_events as the compatibility marker instead of introducing a new
namespace-level policy or namespace-level metadata field.