Back to Pulsar

PIP-466: New Java Client API (V5) with Scalable Topic Support

pip/pip-466.md

5.0.0-M122.9 KB
Original Source

PIP-466: New Java Client API (V5) with Scalable Topic Support

Background Knowledge

Apache Pulsar's Java client API (pulsar-client-api) has been the primary interface for Java applications since Pulsar's inception. The API surface has grown organically over the years to support partitioned topics, multiple subscription types, transactions, schema evolution, and more.

API versioning precedent. Pulsar already went through a breaking API redesign in the 2.0 release, where the old API was moved into a separate compatibility module (pulsar-client-1x-base) and the main module was replaced with the new API. This PIP takes a less disruptive approach: the existing pulsar-client-api and pulsar-client modules stay completely unchanged, and the new API is introduced in an additional pulsar-client-v5 module. Existing applications are unaffected; new applications can opt in to the V5 API.

Partitioned topics are Pulsar's current mechanism for topic-level parallelism. A partitioned topic is a collection of N independent internal topics (partitions), each backed by a separate managed ledger. The client is responsible for routing messages to partitions (via MessageRouter) and is exposed to partition-level details through TopicMessageId, getPartitionsForTopic(), and partition-specific consumers.

Subscription types control how messages are distributed to consumers. Pulsar supports four types — Exclusive, Failover, Shared, and Key_Shared — all accessed through a single Consumer interface. The interface exposes all operations regardless of which subscription type is in use, even though some operations (e.g., acknowledgeCumulative() on Shared) throw at runtime.

Scalable topics (PIP-460) are a new server-side mechanism where a topic is composed of a DAG of hash-range segments that can be dynamically split and merged by the broker. Unlike partitioned topics, the number of segments is invisible to the client and can change at runtime without application awareness. The client receives segment layout updates via a dedicated protocol command (DAG watch session) and routes messages based on hash-range matching. This PIP defines the client API designed to work with scalable topics; the broker-side design is covered by PIP-460.

Motivation

1. Remove partitioning from the client API

The current API leaks partitioning everywhere: TopicMessageId carries partition indexes, getPartitionsForTopic() exposes the count, MessageRouter forces the application to make routing decisions, and consumers can be bound to specific partitions. This forces application code to deal with what is fundamentally a server-side scalability concern.

With scalable topics, parallelism is achieved via hash-range segments managed entirely by the broker. The client should treat topics as opaque endpoints — per-key ordering is guaranteed when a key is specified, but the underlying parallelism (how many segments, which broker owns each) is invisible and dynamic.

The current API cannot cleanly support this model because partitioning is baked into the type system (TopicMessageId), the builder API (MessageRouter, MessageRoutingMode), and the consumer model (partition-specific consumers, getPartitionsForTopic()).

2. Simplify an oversized API

After years of organic growth, the API surface has accumulated significant baggage:

  • Consumer has 60+ methods mixing unrelated concerns (ack, nack, seek, pause, unsubscribe, stats, get topic name, is connected, etc.)
  • ConsumerBuilder has 40+ configuration methods with overlapping semantics
  • Timeouts use (long, TimeUnit) in some places and long millis in others
  • Nullable returns vs empty — inconsistent across the API
  • loadConf(Map), clone(), Serializable on builders — rarely used, clutters the API
  • SPI via reflection hack (DefaultImplementation) instead of standard ServiceLoader

A new module can start with a clean, minimal surface using modern Java idioms.

3. Separate streaming vs queuing consumption

The current Consumer mixes all four subscription types behind a single interface:

  • acknowledgeCumulative() is available but throws at runtime for Shared subscriptions
  • negativeAcknowledge() semantics differ between modes
  • seek() behavior varies depending on subscription type
  • Dead-letter policy only applies to Shared/Key_Shared

This design means the compiler cannot help you — you discover misuse at runtime. Splitting into purpose-built consumer types where each exposes only the operations that make sense for its model improves both usability and correctness.

4. Native support for connector frameworks

Connector frameworks like Apache Flink and Apache Spark need to manage their own offsets across all segments of a topic, take atomic snapshots, and seek back to a checkpoint on recovery. The current API has no first-class support for this — connectors resort to low-level Reader plus manual partition tracking plus brittle offset management.

A dedicated CheckpointConsumer with opaque, serializable Checkpoint objects provides a clean integration point.

Relationship to PIP-460 and long-term vision

This PIP is a companion to PIP-460: Scalable Topics, which defines the broker-side segment management, metadata storage, and admin APIs. The V5 client API is the primary interface for applications to use scalable topics — while the protocol commands and segment routing could theoretically be added to the v4 client, the V5 API was designed from the ground up to support the opaque, dynamically-segmented topic model that scalable topics provide.

The V5 API is designed to support all use cases currently supported by the existing API: producing messages, consuming with ordered/shared/key-shared semantics, transactions, schema evolution, and end-to-end encryption. It is not a subset — it is a full replacement API. It also works with existing partitioned and non-partitioned topics, so applications can adopt the new API without changing their topic infrastructure.

The long-term vision is for scalable topics and the V5 API to become the primary model, eventually deprecating partitioned/non-partitioned topics and the v4 API. However, this deprecation is explicitly not planned for the 5.0 release. The 5.0 release will ship both APIs side by side, with the V5 API recommended for new applications. A subsequent PIP will detail the migration path and deprecation timeline.

While this PIP covers the Java client, the same API model (purpose-built consumer types, opaque topics, checkpoint-based connector support) will also be introduced in non-Java client SDKs (Python, Go, C++, Node.js) with language-appropriate idioms. Each SDK will mirror the same concepts and follow the same approach of supporting both old and new topic types side by side. The non-Java SDKs will be covered by separate PIPs.

Goals

In Scope

  • A new pulsar-client-api-v5 module with new Java interfaces for Producer, StreamConsumer, QueueConsumer, CheckpointConsumer, and PulsarClient
  • A new pulsar-client-v5 implementation module that wraps the existing v4 client transport and adds scalable topic routing
  • Support for all use cases currently supported by the existing API (produce, consume with ordered/shared/key-shared semantics, transactions, schema, encryption)
  • Purpose-built consumer types that separate streaming (ordered, cumulative ack) from queuing (parallel, individual ack) from checkpoint (unmanaged, framework-driven)
  • Opaque topic model where partition/segment details are hidden from the application
  • Modern Java API conventions: Duration, Instant, Optional, records, ServiceLoader
  • First-class transaction support in the main package
  • DAG watch protocol integration for live segment layout updates

Out of Scope

  • Changes to the existing pulsar-client-api — it remains fully supported and unchanged
  • Changes to the wire protocol beyond what is needed for scalable topic DAG watch
  • Broker-side scalable topic management (split/merge algorithms, load balancing) — covered by PIP-460 and subsequent more specific PIPs
  • Migration path from v4 to v5 API — will be detailed in a subsequent PIP
  • Implementation details — this PIP focuses on the public API surface
  • Deprecation of the existing API or partitioned/non-partitioned topic types
  • TableView equivalent in v5 — may be added in a follow-up PIP

High Level Design

The V5 client API is shipped as two new modules alongside the existing client:

pulsar-client-api-v5    (interfaces and value types only — no implementation)
pulsar-client-v5        (implementation, depends on pulsar-client for transport)

The existing pulsar-client-api and pulsar-client modules are unchanged. Applications can use v4 and v5 in the same JVM.

Entry point

java
PulsarClient client = PulsarClient.builder()
        .serviceUrl("pulsar://localhost:6650")
        .build();

The PulsarClient interface provides builder methods for all producer/consumer types:

PulsarClient
  .newProducer(schema)            -> ProducerBuilder    -> Producer<T>
  .newStreamConsumer(schema)      -> StreamConsumerBuilder -> StreamConsumer<T>
  .newQueueConsumer(schema)       -> QueueConsumerBuilder  -> QueueConsumer<T>
  .newCheckpointConsumer(schema)  -> CheckpointConsumerBuilder -> CheckpointConsumer<T>
  .newTransaction()               -> Transaction

Consumer types

Instead of a single Consumer with a SubscriptionType enum, the V5 API provides three distinct consumer types:

mermaid
graph TD
    A[PulsarClient] -->|newStreamConsumer| B[StreamConsumer]
    A -->|newQueueConsumer| C[QueueConsumer]
    A -->|newCheckpointConsumer| D[CheckpointConsumer]

    B -->|"Ordered, cumulative ack"| E[Event sourcing, CDC, ordered pipelines]
    C -->|"Parallel, individual ack"| F[Work queues, task processing]
    D -->|"Unmanaged, checkpoint/seek"| G[Flink, Spark connectors]

StreamConsumer — Ordered consumption with cumulative acknowledgment. Maps to Exclusive/Failover subscription semantics. Messages are delivered in order (per-key when keyed).

QueueConsumer — Unordered parallel consumption with individual acknowledgment. Maps to Shared/Key_Shared subscription semantics. Includes dead-letter policy, ack timeout, and redelivery backoff.

CheckpointConsumer — Unmanaged consumption for connector frameworks. No subscription, no ack — position tracking is entirely external. Provides checkpoint() for atomic position snapshots and seek(Checkpoint) for recovery.

Scalable topic integration

When a V5 client connects to a topic:// domain topic, it establishes a DAG watch session with the broker. The broker sends the current segment layout (which segments exist, their hash ranges, and which broker owns each) and pushes updates when the layout changes (splits, merges).

mermaid
sequenceDiagram
    participant Client as V5 Client
    participant Broker as Broker
    participant Meta as Metadata Store

    Client->>Broker: ScalableTopicLookup(topic)
    Broker->>Meta: Watch topic DAG
    Broker-->>Client: ScalableTopicUpdate(DAG)
    Note over Client: Create per-segment producers/consumers

    Meta-->>Broker: Segment split notification
    Broker-->>Client: ScalableTopicUpdate(new DAG)
    Note over Client: Add new segments, drain old

The Producer hashes message keys to determine which segment to send to, maintaining one internal producer per active segment. When segments split or merge, the client transparently creates new internal producers and drains old ones.

Sync/async model

All types are sync-first with an .async() accessor:

java
Producer<T>            -> producer.async()   -> AsyncProducer<T>
StreamConsumer<T>      -> consumer.async()   -> AsyncStreamConsumer<T>
QueueConsumer<T>       -> consumer.async()   -> AsyncQueueConsumer<T>
CheckpointConsumer<T>  -> consumer.async()   -> AsyncCheckpointConsumer<T>
Transaction            -> txn.async()        -> AsyncTransaction

Both views share the same underlying resources.

Detailed Design

Design & Implementation Details

Module structure

pulsar-client-api-v5/
  org.apache.pulsar.client.api.v5
  ├── PulsarClient, PulsarClientBuilder, PulsarClientException
  ├── Producer, ProducerBuilder
  ├── StreamConsumer, StreamConsumerBuilder
  ├── QueueConsumer, QueueConsumerBuilder
  ├── CheckpointConsumer, CheckpointConsumerBuilder, Checkpoint
  ├── Message, Messages, MessageId, MessageMetadata, MessageBuilder
  ├── Transaction
  ├── async/       (AsyncProducer, AsyncMessageBuilder, Async*Consumer, AsyncTransaction)
  ├── auth/        (Authentication, AuthenticationData, CryptoKeyReader, ...)
  ├── config/      (BatchingPolicy, CompressionPolicy, TlsPolicy, BackoffPolicy, ...)
  ├── schema/      (Schema, SchemaInfo, SchemaType)
  └── internal/    (PulsarClientProvider — ServiceLoader SPI)

pulsar-client-v5/
  org.apache.pulsar.client.impl.v5
  ├── PulsarClientV5, PulsarClientBuilderV5, PulsarClientProviderV5
  ├── ScalableTopicProducer, ProducerBuilderV5
  ├── ScalableStreamConsumer, StreamConsumerBuilderV5
  ├── ScalableQueueConsumer, QueueConsumerBuilderV5
  ├── ScalableCheckpointConsumer, CheckpointConsumerBuilderV5
  ├── DagWatchClient, ClientSegmentLayout, SegmentRouter
  ├── SchemaAdapter, AuthenticationAdapter, CryptoKeyReaderAdapter
  ├── MessageV5, MessageIdV5, MessagesV5, CheckpointV5
  └── Async*V5 wrappers

Key types

MessageMetadata<T, BUILDER> — A self-referential builder base shared between sync and async message sending:

java
interface MessageMetadata<T, BUILDER extends MessageMetadata<T, BUILDER>> {
    BUILDER value(T value);
    BUILDER key(String key);
    BUILDER property(String name, String value);
    BUILDER eventTime(Instant eventTime);
    BUILDER deliverAfter(Duration delay);
    BUILDER deliverAt(Instant timestamp);
    BUILDER transaction(Transaction txn);
}

MessageBuilder<T> extends it with MessageId send(). AsyncMessageBuilder<T> extends it with CompletableFuture<MessageId> send().

Checkpoint — Opaque, serializable position vector across all segments:

java
interface Checkpoint {
    byte[] toByteArray();
    Instant creationTime();

    static Checkpoint earliest();
    static Checkpoint latest();
    static Checkpoint atTimestamp(Instant timestamp);
    static Checkpoint fromByteArray(byte[] data);
}

Internally, a Checkpoint stores a Map<Long, MessageId> mapping segment IDs to positions. The format is forward-compatible — checkpoints saved with fewer segments can be applied after splits/merges.

Configuration records — Immutable records with static factories:

RecordPurposeExample
BatchingPolicyBatching configBatchingPolicy.of(Duration.ofMillis(10), 5000, MemorySize.ofMB(1))
CompressionPolicyCompression codecCompressionPolicy.of(CompressionType.ZSTD)
TlsPolicyTLS/mTLS configTlsPolicy.of("/path/to/ca.pem")
BackoffPolicyRetry backoffBackoffPolicy.exponential(Duration.ofMillis(100), Duration.ofSeconds(30))
DeadLetterPolicyDead letter queueDeadLetterPolicy.of(5)
EncryptionPolicyE2E encryptionEncryptionPolicy.forProducer(keyReader, "mykey")
ChunkingPolicyLarge msg chunkingChunkingPolicy.of(MemorySize.ofMB(10))

SPI discovery

Implementation is loaded via java.util.ServiceLoader:

java
// In pulsar-client-api-v5
public interface PulsarClientProvider {
    PulsarClientBuilder newClientBuilder();
    <T> Schema<T> jsonSchema(Class<T> clazz);
    // ... factory methods for all SPI types
}

// In pulsar-client-v5
// META-INF/services/org.apache.pulsar.client.api.v5.internal.PulsarClientProvider
// -> org.apache.pulsar.client.impl.v5.PulsarClientProviderV5

This replaces the reflection-based DefaultImplementation approach used in the current API.

Public-facing Changes

Public API

This PIP introduces a new public Java API. The existing pulsar-client-api is unchanged.

New modules:

  • pulsar-client-api-v5 — interfaces and value types (compile dependency for applications)
  • pulsar-client-v5 — implementation (runtime dependency)

New interfaces (summary):

InterfaceMethodsDescription
PulsarClientbuilder(), newProducer(), newStreamConsumer(), newQueueConsumer(), newCheckpointConsumer(), newTransaction(), close()Entry point
Producer<T>newMessage(), flush(), close(), async()Send messages
StreamConsumer<T>receive(), receive(Duration), acknowledgeCumulative(), close(), async()Ordered consumption
QueueConsumer<T>receive(), receive(Duration), acknowledge(), negativeAcknowledge(), close(), async()Parallel consumption
CheckpointConsumer<T>receive(), receive(Duration), checkpoint(), seek(), close(), async()Framework consumption

Configuration

No new broker configuration is introduced by this PIP. The V5 client reuses the existing ClientConfigurationData internally.

CLI

No new CLI commands specific to the V5 API.

Metrics

No new metrics are introduced by the V5 client API itself. The underlying v4 producers and consumers continue to emit their existing metrics.

Monitoring

The V5 client wraps v4 producers and consumers internally, so existing producer/consumer metrics (publish rate, latency, backlog, etc.) continue to work. Each internal segment producer/consumer appears as a separate instance in metrics, identified by the segment topic name.

Operators should monitor:

  • Per-segment publish rates to detect hot segments (candidates for splitting)
  • DAG watch session reconnections (indicates broker restarts or network issues)
  • Segment producer creation/closure events in client logs during split/merge operations

Security Considerations

The V5 client API does not introduce new security mechanisms. It delegates all authentication and authorization to the underlying v4 client:

  • Authentication is configured via PulsarClientBuilder.authentication() and delegated to the v4 AuthenticationProvider framework via AuthenticationAdapter
  • Topic-level authorization applies to the parent topic:// name — accessing the underlying segment:// topics uses the same tenant/namespace permissions
  • End-to-end encryption is supported via EncryptionPolicy on ProducerBuilder, delegated to the v4 CryptoKeyReader framework via CryptoKeyReaderAdapter
  • The new CommandScalableTopicLookup protocol command is sent only after the connection is authenticated and in the Connected state, consistent with other lookup commands

No new REST endpoints are introduced by the client API itself.

Backward & Forward Compatibility

Upgrade

The V5 API is a new, additive module. No changes are required to existing applications. This follows the same approach as the 2.0 API redesign, where the old API was preserved in a separate compatibility module — except this time there is no breaking change at all. The existing API is unchanged and existing applications require no modifications.

  • Applications using pulsar-client-api (v4) continue to work without modification
  • New applications can adopt the V5 API by depending on pulsar-client-v5
  • The V5 API works with all topic types: scalable topics, partitioned topics, and non-partitioned topics — applications can migrate to the new API without changing their topic infrastructure
  • Both APIs can coexist in the same JVM — the V5 implementation wraps the v4 transport internally
  • A detailed migration path from v4 to v5 will be provided in a subsequent PIP
  • A seamless migration path to convert existing partitioned and non-partitioned topics to scalable topics will also be provided, allowing applications to transition their topic infrastructure without data loss or downtime

To adopt the V5 API, applications add pulsar-client-v5 as a dependency and use PulsarClient.builder() from the org.apache.pulsar.client.api.v5 package.

Downgrade / Rollback

Since the V5 API is a separate module, rollback is simply removing the dependency and reverting to v4 API calls. No broker-side changes are required.

Applications using CheckpointConsumer should note that saved Checkpoint byte arrays are specific to the V5 implementation and cannot be used with v4 Reader/Consumer.

Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations

The V5 client API itself does not introduce geo-replication concerns — it connects to whichever cluster it is configured for. However, geo-replication of scalable topics has specific considerations (segment layout synchronization across clusters, cross-cluster split/merge coordination) that will be detailed in a subsequent PIP.

Alternatives

Extend the existing Consumer interface

We considered adding scalable topic support to the existing Consumer interface by adding new methods for checkpoint/seek and hiding segment details internally. This was rejected because:

  • The existing interface already has 60+ methods and is difficult to evolve
  • Adding checkpoint semantics alongside ack semantics would further confuse the API
  • The type system cannot prevent misuse (e.g., calling acknowledgeCumulative() on a Shared subscription)
  • Removing partition-related methods (TopicMessageId, MessageRouter) would break backward compatibility

Builder-per-subscription-type on existing API

We considered keeping a single Consumer type but using different builder types per subscription mode (e.g., StreamConsumerBuilder returning a Consumer with restricted methods). This was rejected because the returned Consumer would still expose all methods at the type level — the restriction would only be in documentation, not enforced by the compiler.

Separate module vs extending existing module

We chose a separate module (pulsar-client-api-v5) rather than adding new interfaces to pulsar-client-api because:

  • The v5 API uses different naming conventions (value() vs getValue()), different types (Duration vs (long, TimeUnit)), and different patterns (Optional vs nullable)
  • Having both conventions in the same package would be confusing
  • A clean module boundary makes it clear which API generation an application is using
  • The v4 API can eventually be deprecated without affecting v5 users

General Notes

The V5 API targets Java 17, the same as the rest of Pulsar.

The implementation module (pulsar-client-v5) wraps the existing v4 pulsar-client for all transport-level operations. This means bug fixes and performance improvements to the v4 client automatically benefit V5 users, and the V5 module itself is relatively thin — primarily routing logic and API adaptation.

Links

  • Mailing List discussion thread:
  • Mailing List voting thread: