pip/pip-466.md
Apache Pulsar's Java client API (pulsar-client-api) has been the primary interface for Java
applications since Pulsar's inception. The API surface has grown organically over the years to
support partitioned topics, multiple subscription types, transactions, schema evolution, and more.
API versioning precedent. Pulsar already went through a breaking API redesign in the 2.0
release, where the old API was moved into a separate compatibility module
(pulsar-client-1x-base) and the main module was replaced with the new API. This PIP takes
a less disruptive approach: the existing pulsar-client-api and pulsar-client modules stay
completely unchanged, and the new API is introduced in an additional pulsar-client-v5 module.
Existing applications are unaffected; new applications can opt in to the V5 API.
Partitioned topics are Pulsar's current mechanism for topic-level parallelism. A partitioned
topic is a collection of N independent internal topics (partitions), each backed by a separate
managed ledger. The client is responsible for routing messages to partitions (via MessageRouter)
and is exposed to partition-level details through TopicMessageId, getPartitionsForTopic(),
and partition-specific consumers.
Subscription types control how messages are distributed to consumers. Pulsar supports four
types — Exclusive, Failover, Shared, and Key_Shared — all accessed through a single Consumer
interface. The interface exposes all operations regardless of which subscription type is in use,
even though some operations (e.g., acknowledgeCumulative() on Shared) throw at runtime.
Scalable topics (PIP-460) are a new server-side mechanism where a topic is composed of a DAG of hash-range segments that can be dynamically split and merged by the broker. Unlike partitioned topics, the number of segments is invisible to the client and can change at runtime without application awareness. The client receives segment layout updates via a dedicated protocol command (DAG watch session) and routes messages based on hash-range matching. This PIP defines the client API designed to work with scalable topics; the broker-side design is covered by PIP-460.
The current API leaks partitioning everywhere: TopicMessageId carries partition indexes,
getPartitionsForTopic() exposes the count, MessageRouter forces the application to make
routing decisions, and consumers can be bound to specific partitions. This forces application
code to deal with what is fundamentally a server-side scalability concern.
With scalable topics, parallelism is achieved via hash-range segments managed entirely by the broker. The client should treat topics as opaque endpoints — per-key ordering is guaranteed when a key is specified, but the underlying parallelism (how many segments, which broker owns each) is invisible and dynamic.
The current API cannot cleanly support this model because partitioning is baked into the type
system (TopicMessageId), the builder API (MessageRouter, MessageRoutingMode), and the
consumer model (partition-specific consumers, getPartitionsForTopic()).
After years of organic growth, the API surface has accumulated significant baggage:
Consumer has 60+ methods mixing unrelated concerns (ack, nack, seek, pause, unsubscribe,
stats, get topic name, is connected, etc.)ConsumerBuilder has 40+ configuration methods with overlapping semantics(long, TimeUnit) in some places and long millis in othersloadConf(Map), clone(), Serializable on builders — rarely used, clutters the APIDefaultImplementation) instead of standard ServiceLoaderA new module can start with a clean, minimal surface using modern Java idioms.
The current Consumer mixes all four subscription types behind a single interface:
acknowledgeCumulative() is available but throws at runtime for Shared subscriptionsnegativeAcknowledge() semantics differ between modesseek() behavior varies depending on subscription typeThis design means the compiler cannot help you — you discover misuse at runtime. Splitting into purpose-built consumer types where each exposes only the operations that make sense for its model improves both usability and correctness.
Connector frameworks like Apache Flink and Apache Spark need to manage their own offsets across
all segments of a topic, take atomic snapshots, and seek back to a checkpoint on recovery. The
current API has no first-class support for this — connectors resort to low-level Reader plus
manual partition tracking plus brittle offset management.
A dedicated CheckpointConsumer with opaque, serializable Checkpoint objects provides a clean
integration point.
This PIP is a companion to PIP-460: Scalable Topics, which defines the broker-side segment management, metadata storage, and admin APIs. The V5 client API is the primary interface for applications to use scalable topics — while the protocol commands and segment routing could theoretically be added to the v4 client, the V5 API was designed from the ground up to support the opaque, dynamically-segmented topic model that scalable topics provide.
The V5 API is designed to support all use cases currently supported by the existing API: producing messages, consuming with ordered/shared/key-shared semantics, transactions, schema evolution, and end-to-end encryption. It is not a subset — it is a full replacement API. It also works with existing partitioned and non-partitioned topics, so applications can adopt the new API without changing their topic infrastructure.
The long-term vision is for scalable topics and the V5 API to become the primary model, eventually deprecating partitioned/non-partitioned topics and the v4 API. However, this deprecation is explicitly not planned for the 5.0 release. The 5.0 release will ship both APIs side by side, with the V5 API recommended for new applications. A subsequent PIP will detail the migration path and deprecation timeline.
While this PIP covers the Java client, the same API model (purpose-built consumer types, opaque topics, checkpoint-based connector support) will also be introduced in non-Java client SDKs (Python, Go, C++, Node.js) with language-appropriate idioms. Each SDK will mirror the same concepts and follow the same approach of supporting both old and new topic types side by side. The non-Java SDKs will be covered by separate PIPs.
pulsar-client-api-v5 module with new Java interfaces for Producer, StreamConsumer,
QueueConsumer, CheckpointConsumer, and PulsarClientpulsar-client-v5 implementation module that wraps the existing v4 client transport
and adds scalable topic routingDuration, Instant, Optional, records, ServiceLoaderpulsar-client-api — it remains fully supported and unchangedThe V5 client API is shipped as two new modules alongside the existing client:
pulsar-client-api-v5 (interfaces and value types only — no implementation)
pulsar-client-v5 (implementation, depends on pulsar-client for transport)
The existing pulsar-client-api and pulsar-client modules are unchanged. Applications
can use v4 and v5 in the same JVM.
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
The PulsarClient interface provides builder methods for all producer/consumer types:
PulsarClient
.newProducer(schema) -> ProducerBuilder -> Producer<T>
.newStreamConsumer(schema) -> StreamConsumerBuilder -> StreamConsumer<T>
.newQueueConsumer(schema) -> QueueConsumerBuilder -> QueueConsumer<T>
.newCheckpointConsumer(schema) -> CheckpointConsumerBuilder -> CheckpointConsumer<T>
.newTransaction() -> Transaction
Instead of a single Consumer with a SubscriptionType enum, the V5 API provides three
distinct consumer types:
graph TD
A[PulsarClient] -->|newStreamConsumer| B[StreamConsumer]
A -->|newQueueConsumer| C[QueueConsumer]
A -->|newCheckpointConsumer| D[CheckpointConsumer]
B -->|"Ordered, cumulative ack"| E[Event sourcing, CDC, ordered pipelines]
C -->|"Parallel, individual ack"| F[Work queues, task processing]
D -->|"Unmanaged, checkpoint/seek"| G[Flink, Spark connectors]
StreamConsumer — Ordered consumption with cumulative acknowledgment. Maps to Exclusive/Failover subscription semantics. Messages are delivered in order (per-key when keyed).
QueueConsumer — Unordered parallel consumption with individual acknowledgment. Maps to Shared/Key_Shared subscription semantics. Includes dead-letter policy, ack timeout, and redelivery backoff.
CheckpointConsumer — Unmanaged consumption for connector frameworks. No subscription, no
ack — position tracking is entirely external. Provides checkpoint() for atomic position
snapshots and seek(Checkpoint) for recovery.
When a V5 client connects to a topic:// domain topic, it establishes a DAG watch session
with the broker. The broker sends the current segment layout (which segments exist, their
hash ranges, and which broker owns each) and pushes updates when the layout changes (splits,
merges).
sequenceDiagram
participant Client as V5 Client
participant Broker as Broker
participant Meta as Metadata Store
Client->>Broker: ScalableTopicLookup(topic)
Broker->>Meta: Watch topic DAG
Broker-->>Client: ScalableTopicUpdate(DAG)
Note over Client: Create per-segment producers/consumers
Meta-->>Broker: Segment split notification
Broker-->>Client: ScalableTopicUpdate(new DAG)
Note over Client: Add new segments, drain old
The Producer hashes message keys to determine which segment to send to, maintaining one
internal producer per active segment. When segments split or merge, the client transparently
creates new internal producers and drains old ones.
All types are sync-first with an .async() accessor:
Producer<T> -> producer.async() -> AsyncProducer<T>
StreamConsumer<T> -> consumer.async() -> AsyncStreamConsumer<T>
QueueConsumer<T> -> consumer.async() -> AsyncQueueConsumer<T>
CheckpointConsumer<T> -> consumer.async() -> AsyncCheckpointConsumer<T>
Transaction -> txn.async() -> AsyncTransaction
Both views share the same underlying resources.
pulsar-client-api-v5/
org.apache.pulsar.client.api.v5
├── PulsarClient, PulsarClientBuilder, PulsarClientException
├── Producer, ProducerBuilder
├── StreamConsumer, StreamConsumerBuilder
├── QueueConsumer, QueueConsumerBuilder
├── CheckpointConsumer, CheckpointConsumerBuilder, Checkpoint
├── Message, Messages, MessageId, MessageMetadata, MessageBuilder
├── Transaction
├── async/ (AsyncProducer, AsyncMessageBuilder, Async*Consumer, AsyncTransaction)
├── auth/ (Authentication, AuthenticationData, CryptoKeyReader, ...)
├── config/ (BatchingPolicy, CompressionPolicy, TlsPolicy, BackoffPolicy, ...)
├── schema/ (Schema, SchemaInfo, SchemaType)
└── internal/ (PulsarClientProvider — ServiceLoader SPI)
pulsar-client-v5/
org.apache.pulsar.client.impl.v5
├── PulsarClientV5, PulsarClientBuilderV5, PulsarClientProviderV5
├── ScalableTopicProducer, ProducerBuilderV5
├── ScalableStreamConsumer, StreamConsumerBuilderV5
├── ScalableQueueConsumer, QueueConsumerBuilderV5
├── ScalableCheckpointConsumer, CheckpointConsumerBuilderV5
├── DagWatchClient, ClientSegmentLayout, SegmentRouter
├── SchemaAdapter, AuthenticationAdapter, CryptoKeyReaderAdapter
├── MessageV5, MessageIdV5, MessagesV5, CheckpointV5
└── Async*V5 wrappers
MessageMetadata<T, BUILDER> — A self-referential builder base shared between sync and
async message sending:
interface MessageMetadata<T, BUILDER extends MessageMetadata<T, BUILDER>> {
BUILDER value(T value);
BUILDER key(String key);
BUILDER property(String name, String value);
BUILDER eventTime(Instant eventTime);
BUILDER deliverAfter(Duration delay);
BUILDER deliverAt(Instant timestamp);
BUILDER transaction(Transaction txn);
}
MessageBuilder<T> extends it with MessageId send().
AsyncMessageBuilder<T> extends it with CompletableFuture<MessageId> send().
Checkpoint — Opaque, serializable position vector across all segments:
interface Checkpoint {
byte[] toByteArray();
Instant creationTime();
static Checkpoint earliest();
static Checkpoint latest();
static Checkpoint atTimestamp(Instant timestamp);
static Checkpoint fromByteArray(byte[] data);
}
Internally, a Checkpoint stores a Map<Long, MessageId> mapping segment IDs to positions.
The format is forward-compatible — checkpoints saved with fewer segments can be applied after
splits/merges.
Configuration records — Immutable records with static factories:
| Record | Purpose | Example |
|---|---|---|
BatchingPolicy | Batching config | BatchingPolicy.of(Duration.ofMillis(10), 5000, MemorySize.ofMB(1)) |
CompressionPolicy | Compression codec | CompressionPolicy.of(CompressionType.ZSTD) |
TlsPolicy | TLS/mTLS config | TlsPolicy.of("/path/to/ca.pem") |
BackoffPolicy | Retry backoff | BackoffPolicy.exponential(Duration.ofMillis(100), Duration.ofSeconds(30)) |
DeadLetterPolicy | Dead letter queue | DeadLetterPolicy.of(5) |
EncryptionPolicy | E2E encryption | EncryptionPolicy.forProducer(keyReader, "mykey") |
ChunkingPolicy | Large msg chunking | ChunkingPolicy.of(MemorySize.ofMB(10)) |
Implementation is loaded via java.util.ServiceLoader:
// In pulsar-client-api-v5
public interface PulsarClientProvider {
PulsarClientBuilder newClientBuilder();
<T> Schema<T> jsonSchema(Class<T> clazz);
// ... factory methods for all SPI types
}
// In pulsar-client-v5
// META-INF/services/org.apache.pulsar.client.api.v5.internal.PulsarClientProvider
// -> org.apache.pulsar.client.impl.v5.PulsarClientProviderV5
This replaces the reflection-based DefaultImplementation approach used in the current API.
This PIP introduces a new public Java API. The existing pulsar-client-api is unchanged.
New modules:
pulsar-client-api-v5 — interfaces and value types (compile dependency for applications)pulsar-client-v5 — implementation (runtime dependency)New interfaces (summary):
| Interface | Methods | Description |
|---|---|---|
PulsarClient | builder(), newProducer(), newStreamConsumer(), newQueueConsumer(), newCheckpointConsumer(), newTransaction(), close() | Entry point |
Producer<T> | newMessage(), flush(), close(), async() | Send messages |
StreamConsumer<T> | receive(), receive(Duration), acknowledgeCumulative(), close(), async() | Ordered consumption |
QueueConsumer<T> | receive(), receive(Duration), acknowledge(), negativeAcknowledge(), close(), async() | Parallel consumption |
CheckpointConsumer<T> | receive(), receive(Duration), checkpoint(), seek(), close(), async() | Framework consumption |
No new broker configuration is introduced by this PIP. The V5 client reuses the existing
ClientConfigurationData internally.
No new CLI commands specific to the V5 API.
No new metrics are introduced by the V5 client API itself. The underlying v4 producers and consumers continue to emit their existing metrics.
The V5 client wraps v4 producers and consumers internally, so existing producer/consumer metrics (publish rate, latency, backlog, etc.) continue to work. Each internal segment producer/consumer appears as a separate instance in metrics, identified by the segment topic name.
Operators should monitor:
The V5 client API does not introduce new security mechanisms. It delegates all authentication and authorization to the underlying v4 client:
PulsarClientBuilder.authentication() and delegated to the
v4 AuthenticationProvider framework via AuthenticationAdaptertopic:// name — accessing the underlying
segment:// topics uses the same tenant/namespace permissionsEncryptionPolicy on ProducerBuilder, delegated to
the v4 CryptoKeyReader framework via CryptoKeyReaderAdapterCommandScalableTopicLookup protocol command is sent only after the connection is
authenticated and in the Connected state, consistent with other lookup commandsNo new REST endpoints are introduced by the client API itself.
The V5 API is a new, additive module. No changes are required to existing applications. This follows the same approach as the 2.0 API redesign, where the old API was preserved in a separate compatibility module — except this time there is no breaking change at all. The existing API is unchanged and existing applications require no modifications.
pulsar-client-api (v4) continue to work without modificationpulsar-client-v5To adopt the V5 API, applications add pulsar-client-v5 as a dependency and use
PulsarClient.builder() from the org.apache.pulsar.client.api.v5 package.
Since the V5 API is a separate module, rollback is simply removing the dependency and reverting to v4 API calls. No broker-side changes are required.
Applications using CheckpointConsumer should note that saved Checkpoint byte arrays are
specific to the V5 implementation and cannot be used with v4 Reader/Consumer.
The V5 client API itself does not introduce geo-replication concerns — it connects to whichever cluster it is configured for. However, geo-replication of scalable topics has specific considerations (segment layout synchronization across clusters, cross-cluster split/merge coordination) that will be detailed in a subsequent PIP.
We considered adding scalable topic support to the existing Consumer interface by adding new
methods for checkpoint/seek and hiding segment details internally. This was rejected because:
acknowledgeCumulative() on a Shared
subscription)TopicMessageId, MessageRouter) would break backward
compatibilityWe considered keeping a single Consumer type but using different builder types per subscription
mode (e.g., StreamConsumerBuilder returning a Consumer with restricted methods). This was
rejected because the returned Consumer would still expose all methods at the type level — the
restriction would only be in documentation, not enforced by the compiler.
We chose a separate module (pulsar-client-api-v5) rather than adding new interfaces to
pulsar-client-api because:
value() vs getValue()), different types
(Duration vs (long, TimeUnit)), and different patterns (Optional vs nullable)The V5 API targets Java 17, the same as the rest of Pulsar.
The implementation module (pulsar-client-v5) wraps the existing v4 pulsar-client for all
transport-level operations. This means bug fixes and performance improvements to the v4 client
automatically benefit V5 users, and the V5 module itself is relatively thin — primarily routing
logic and API adaptation.