Back to Pulsar

PIP-446: Support Native OpenTelemetry Tracing in Pulsar Java Client

pip/pip-446.md

4.2.121.8 KB
Original Source

PIP-446: Support Native OpenTelemetry Tracing in Pulsar Java Client

Background knowledge

OpenTelemetry

OpenTelemetry is a vendor-neutral observability framework that provides APIs, SDKs, and tools for collecting distributed traces, metrics, and logs. It has become the industry standard for observability, adopted by major cloud providers and APM vendors.

Distributed Tracing

Distributed tracing tracks requests as they flow through distributed systems. A trace represents the entire journey of a request, composed of multiple spans. Each span represents a single operation (e.g., sending a message, processing a request). Spans form parent-child relationships, creating a trace tree that visualizes request flow across services.

W3C Trace Context

The W3C Trace Context specification defines a standard way to propagate trace context across service boundaries using HTTP headers or message properties:

  • traceparent: Contains trace ID, span ID, and trace flags
  • tracestate: Contains vendor-specific trace information

Pulsar Interceptors

Pulsar client interceptors allow users to intercept and modify messages before sending (producer) or after receiving (consumer). They provide hooks for cross-cutting concerns like tracing, metrics, and security.

Cumulative Acknowledgment

In Pulsar, cumulative acknowledgment allows consumers to acknowledge all messages up to a specific message ID in one operation. This is only available for Failover and Exclusive subscription types where message order is guaranteed. When a message is cumulatively acknowledged, all previous messages on that partition are implicitly acknowledged.

Motivation

Currently, the Pulsar Java client lacks native support for distributed tracing with OpenTelemetry. While the OpenTelemetry Java Agent can automatically instrument Pulsar clients, there are several limitations:

  1. Agent-only approach: Users must use the Java Agent, which may not be suitable for all deployment scenarios (e.g., serverless, embedded applications)
  2. Limited control: Users cannot easily customize tracing behavior or selectively enable tracing for specific producers/consumers
  3. Missing first-class support: Other Apache projects (Kafka, Camel) provide native OpenTelemetry support, making Pulsar less competitive
  4. Complex setup: Users must understand agent configuration and classpath setup

Native OpenTelemetry support would:

  • Provide a programmatic API for tracing configuration
  • Enable selective tracing without agent overhead
  • Improve observability in production systems
  • Align Pulsar with modern observability practices
  • Make it easier to diagnose performance issues and message flow

Goals

In Scope

  1. Producer tracing: Create spans for message send operations with automatic trace context injection
  2. Consumer tracing: Create spans for message receive/process operations with automatic trace context extraction
  3. Trace context propagation: Inject and extract W3C Trace Context via message properties
  4. Programmatic API: Enable tracing via ClientBuilder API
  5. Interceptor-based design: Implement using Pulsar's existing interceptor mechanism
  6. Cumulative acknowledgment support: Properly handle span lifecycle for cumulative acks
  7. Multi-topic consumer support: Track spans across multiple topic partitions
  8. Agent compatibility: Ensure compatibility with OpenTelemetry Java Agent
  9. Semantic conventions: Follow OpenTelemetry messaging semantic conventions
  10. Zero overhead when disabled: No performance impact when tracing is not enabled

Out of Scope

  1. Broker-side tracing: This PIP focuses on client-side tracing only
  2. Metrics collection: Only distributed tracing, not OpenTelemetry metrics
  3. Log correlation: Only tracing integration, not log integration
  4. Custom propagators: Only W3C Trace Context format supported initially
  5. Transaction tracing: Tracing for Pulsar transactions (future enhancement)
  6. Schema registry tracing: Tracing for schema operations
  7. Admin API tracing: Tracing for admin operations

High Level Design

The implementation adds native OpenTelemetry tracing to the Pulsar Java client through:

1. New Interfaces

Two new interfaces enable attaching tracing spans to messages and message IDs:

  • TraceableMessage: Allows messages to carry OpenTelemetry spans
  • TraceableMessageId: Allows message IDs to carry OpenTelemetry spans

2. Interceptors

Two interceptors implement the tracing logic:

  • OpenTelemetryProducerInterceptor: Creates producer spans and injects trace context
  • OpenTelemetryConsumerInterceptor: Creates consumer spans and extracts trace context

3. Configuration API

Users can enable tracing through the ClientBuilder:

java
PulsarClient client = PulsarClient.builder()
    .serviceUrl("pulsar://localhost:6650")
    .openTelemetry(openTelemetry)
    .enableTracing(true)
    .build();

When enabled, the client automatically adds tracing interceptors to all producers and consumers.

4. Trace Context Propagation

The implementation uses W3C Trace Context format to propagate trace context:

  • Producer: Injects traceparent and tracestate into message properties
  • Consumer: Extracts trace context from message properties

This enables end-to-end tracing across services that communicate via Pulsar.

5. Span Lifecycle Management

The implementation carefully manages span lifecycle:

  • Producer spans: Start on send, end on broker acknowledgment (or error)
  • Consumer spans: Start on receive, end on acknowledgment (or negative ack)
  • Cumulative ack: Ends all spans for messages up to the acknowledged position

6. Multi-Topic Support

For multi-topic consumers, the implementation maintains separate span maps per topic partition to correctly handle cumulative acknowledgments across multiple topics.

Detailed Design

Design & Implementation Details

1. Traceable Interfaces

TraceableMessage interface (pulsar-client-api):

java
public interface TraceableMessage {
    void setTracingSpan(io.opentelemetry.api.trace.Span span);
    io.opentelemetry.api.trace.Span getTracingSpan();
}

TraceableMessageId interface (pulsar-client-api):

java
public interface TraceableMessageId {
    void setTracingSpan(io.opentelemetry.api.trace.Span span);
    io.opentelemetry.api.trace.Span getTracingSpan();
}

Both MessageImpl and MessageIdImpl implement these interfaces by adding a transient field to store the span without affecting serialization.

2. OpenTelemetryProducerInterceptor

Located in pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/

Key methods:

  • beforeSend(): Creates a producer span and injects trace context into message properties
  • onSendAcknowledgement(): Ends the span successfully and records message ID
  • onPartitionsChange(): No-op (not needed for producer)

Span creation:

  • Uses TracingContext.createProducerSpan() to create a PRODUCER span
  • Span name: send {topic}
  • Attributes: messaging.system, messaging.destination.name, messaging.operation.name
  • Records messaging.message.id when broker acknowledges

Trace context injection:

  • Uses OpenTelemetry TextMapPropagator to inject context into message properties
  • Injects traceparent and tracestate headers
  • Only injects if not already present (allows compatibility with Java Agent)

3. OpenTelemetryConsumerInterceptor

Located in pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/

Key methods:

  • beforeConsume(): Extracts trace context and creates a consumer span
  • onAcknowledge(): Ends the span for individual ack with OK status
  • onAcknowledgeCumulative(): Ends all spans up to the acknowledged position with OK status
  • onNegativeAcksSend(): Ends the span with OK status and adds an event (not an error)
  • onAckTimeoutSend(): Ends the span with OK status and adds an event (not an error)

Span creation:

  • Uses TracingContext.createConsumerSpan() to create a CONSUMER span
  • Span name: process {topic}
  • Attributes: messaging.system, messaging.destination.name, messaging.operation.name, messaging.message.id
  • Links to producer span via extracted trace context

Cumulative acknowledgment handling:

  • Maintains Map<String, ConcurrentSkipListMap<MessageIdAdv, Span>> for Failover/Exclusive subscriptions
  • Outer map key: topic partition (from TopicMessageId.getOwnerTopic())
  • Inner map: sorted message IDs to spans for efficient range operations
  • When cumulative ack occurs, removes and ends all spans up to the acknowledged position
  • Zero overhead for Shared/Key_Shared subscriptions (map is null)

Multi-topic support:

  • Nested map structure handles messages from multiple topic partitions
  • Each topic partition maintains independent sorted span map
  • Cumulative ack only affects spans from the same topic partition

Acknowledgment type tracking:

  • Every consumer span includes a messaging.pulsar.acknowledgment.type attribute indicating how it was completed:
    • "acknowledge": Normal individual acknowledgment
    • "cumulative_acknowledge": Cumulative acknowledgment
    • "negative_acknowledge": Message negatively acknowledged (will be redelivered)
    • "ack_timeout": Acknowledgment timeout (will be redelivered)
  • Negative ack and ack timeout end spans with OK status (not ERROR) because they are normal Pulsar message flow
  • This design separates messaging operations (which succeed) from application logic failures (which should be tracked in separate child spans)
  • When a message is redelivered, a new consumer span is created for the new delivery attempt
  • The attribute allows users to query and analyze retry patterns, timeout issues, and acknowledgment types in their tracing backend

4. TracingContext Utility

Provides helper methods for span creation and management:

  • createProducerSpan(): Creates a producer span with correct attributes
  • createConsumerSpan(): Creates a consumer span with trace context extraction
  • endSpan(): Safely ends a span
  • endSpan(span, exception): Ends a span with error status
  • isValid(): Checks if a span is valid and recording

5. TracingProducerBuilder

Helper for manual trace context injection (advanced use cases):

  • injectContext(): Injects trace context into message properties
  • extractFromHeaders(): Extracts trace context from HTTP headers

6. ClientBuilder Integration

New API methods (ClientBuilder):

java
ClientBuilder enableTracing(boolean tracingEnabled);

Implementation (ClientBuilderImpl):

  • enableTracing() stores tracingEnabled flag in ClientConfigurationData
  • Passes configuration to PulsarClientImpl

Automatic interceptor addition (ConsumerBuilderImpl, ProducerBuilderImpl):

  • Checks if tracing is enabled in client configuration
  • Automatically adds appropriate interceptor if enabled
  • User-provided interceptors are preserved and combined

7. InstrumentProvider Enhancement

Enhanced to provide OpenTelemetry instance:

java
public OpenTelemetry getOpenTelemetry();

Falls back to GlobalOpenTelemetry.get() if not explicitly configured.

8. Implementation Classes

Modified classes:

  • MessageImpl: Implements TraceableMessage
  • MessageIdImpl: Implements TraceableMessageId
  • TopicMessageImpl: Delegates TraceableMessage methods to wrapped message
  • TopicMessageIdImpl: Delegates TraceableMessageId methods to wrapped message ID
  • ConsumerBase: Provides getSubscriptionType() for interceptors
  • ConsumerBuilderImpl: Auto-adds consumer interceptor when enabled
  • ProducerBuilderImpl: Auto-adds producer interceptor when enabled
  • PulsarClientImpl: Stores and provides OpenTelemetry configuration
  • ClientConfigurationData: Stores OpenTelemetry and enableTracing settings

9. Span Attributes

Following OpenTelemetry messaging semantic conventions:

Producer spans:

  • messaging.system: "pulsar"
  • messaging.destination.name: Topic name
  • messaging.operation.name: "send"
  • messaging.message.id: Message ID (added on ack)

Consumer spans:

  • messaging.system: "pulsar"
  • messaging.destination.name: Topic name
  • messaging.destination.subscription.name: Subscription name
  • messaging.operation.name: "process"
  • messaging.message.id: Message ID
  • messaging.pulsar.acknowledgment.type: Custom attribute indicating how the message was acknowledged:
    • "acknowledge": Individual acknowledgment
    • "cumulative_acknowledge": Cumulative acknowledgment
    • "negative_acknowledge": Negative acknowledgment (message will be redelivered)
    • "ack_timeout": Acknowledgment timeout (message will be redelivered)

Rationale for messaging.pulsar.acknowledgment.type attribute:

  • Provides visibility into message acknowledgment patterns
  • Enables querying for retry scenarios (negative ack, timeout)
  • Helps identify timeout configuration issues
  • Allows analysis of cumulative vs. individual acknowledgment usage
  • Uses attribute (not event) for better queryability in tracing backends

Public-facing Changes

Public API

New interfaces (org.apache.pulsar.client.api):

java
public interface TraceableMessage {
    void setTracingSpan(io.opentelemetry.api.trace.Span span);
    io.opentelemetry.api.trace.Span getTracingSpan();
}

public interface TraceableMessageId {
    void setTracingSpan(io.opentelemetry.api.trace.Span span);
    io.opentelemetry.api.trace.Span getTracingSpan();
}

ClientBuilder new methods:

java
/**
 * Enable or disable automatic tracing.
 * When enabled, uses GlobalOpenTelemetry.get() if no OpenTelemetry instance is set.
 * Tracing interceptors are automatically added to all producers and consumers.
 */
ClientBuilder enableTracing(boolean tracingEnabled);

New public classes (org.apache.pulsar.client.impl.tracing):

  • OpenTelemetryProducerInterceptor: Producer interceptor for tracing
  • OpenTelemetryConsumerInterceptor<T>: Consumer interceptor for tracing
  • TracingContext: Utility methods for span creation
  • TracingProducerBuilder: Helper for manual trace context injection

Modified classes:

  • Message interface: Now extends TraceableMessage (via implementations)
  • MessageId interface: Now extends TraceableMessageId (via implementations)

Binary protocol

No changes to binary protocol. Trace context is propagated via existing message properties mechanism.

Configuration

New ClientBuilder options:

  • enableTracing(boolean): Enable automatic tracing

Example configuration:

java
// Option 1: Explicit OpenTelemetry instance with tracing enabled
OpenTelemetry otel = OpenTelemetrySdk.builder()
    .setTracerProvider(tracerProvider)
    .build();

PulsarClient client = PulsarClient.builder()
    .serviceUrl("pulsar://localhost:6650")
    .openTelemetry(otel)
    .enableTracing(true)
    .build();

// Option 2: Use GlobalOpenTelemetry
PulsarClient client = PulsarClient.builder()
    .serviceUrl("pulsar://localhost:6650")
    .enableTracing(true)  // Uses GlobalOpenTelemetry
    .build();

// Option 3: Manual interceptor (advanced)
Producer<String> producer = client.newProducer(Schema.STRING)
    .topic("my-topic")
    .intercept(new OpenTelemetryProducerInterceptor())
    .create();

CLI

No CLI changes. This is a client library feature.

Metrics

This PIP focuses on distributed tracing, not metrics. No new metrics are added.

Monitoring

Users can monitor tracing effectiveness through their OpenTelemetry backend (e.g., Jaeger, Zipkin, Grafana Tempo):

Key Monitoring Aspects

  1. Span creation rate: Monitor the rate of producer and consumer spans to ensure tracing is active
  2. Trace completeness: Verify traces show complete paths from producer to consumer
  3. Error spans: Monitor spans with ERROR status to identify failures
  4. Span duration: Analyze span durations to identify performance bottlenecks:
    • Long producer spans may indicate slow broker acknowledgments
    • Long consumer spans may indicate slow message processing
  1. Producer Performance:

    • Track send span durations by topic
    • Alert on high error rates
    • Monitor throughput (spans per second)
  2. Consumer Performance:

    • Track process span durations by topic
    • Monitor acknowledgment latency
    • Alert on negative acknowledgment rates
  3. End-to-End Latency:

    • Visualize complete traces from producer to consumer
    • Identify bottlenecks in the message flow
    • Track latency percentiles (p50, p95, p99)

Security Considerations

This feature does not introduce new security concerns:

  1. Trace context in properties: Trace context (traceparent, tracestate) is stored in message properties, which are part of the existing message structure. No additional authentication or authorization is needed.

  2. No sensitive data: Trace context only contains trace IDs, span IDs, and trace flags. No user data or sensitive information is included.

  3. OpenTelemetry instance: The OpenTelemetry instance is provided by the application and follows the same security model as other client configuration.

  4. Multi-tenancy: Tracing respects existing Pulsar multi-tenancy boundaries. Trace context is scoped to individual messages and does not leak across tenants.

  5. No new endpoints: This feature does not add new HTTP endpoints or protocol commands.

Backward & Forward Compatibility

Upgrade

Fully backward compatible. No breaking changes:

  1. Default behavior unchanged: Tracing is disabled by default. Existing applications work without modification.
  2. Serialization compatible: New interfaces use transient fields that don't affect serialization.
  3. Message format unchanged: Trace context uses existing message properties mechanism.
  4. Interceptor compatible: Works alongside existing user interceptors.

Upgrade steps:

  1. Upgrade client library to version containing this feature
  2. Optionally enable tracing via ClientBuilder.enableTracing(true)
  3. Configure OpenTelemetry SDK and exporters if using programmatic configuration

Downgrade / Rollback

Fully compatible with downgrade:

  1. Message compatibility: Messages sent with trace context can be received by older clients (they ignore unknown properties)
  2. No schema changes: No changes to message schema or protocol
  3. Graceful degradation: Older clients simply don't create spans but can still process messages

Rollback steps:

  1. Downgrade client library to previous version
  2. Tracing will stop, but message flow continues normally
  3. Existing traces may be incomplete if some clients are downgraded

Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations

No impact on geo-replication:

  1. Trace context preserved: Message properties (including trace context) are replicated across clusters
  2. Mixed versions: Clusters can run different client versions. Trace context propagates through older brokers without issues.
  3. No broker changes: This is a client-only feature. Broker version doesn't matter.

Considerations:

  • Traces may span multiple clusters, providing visibility into geo-replication latency
  • If some clusters use tracing and others don't, traces will have gaps but remain functional
  • Trace context continues across cluster boundaries via message properties

Alternatives

Alternative 1: OpenTelemetry Java Agent Only

Approach: Only support tracing via OpenTelemetry Java Agent automatic instrumentation.

Rejected because:

  • Requires agent deployment, not suitable for all environments
  • No programmatic control over tracing behavior
  • Harder to debug and customize
  • Not aligned with other Apache projects that provide native support

Alternative 2: Custom Tracing API

Approach: Design a custom Pulsar-specific tracing API instead of using OpenTelemetry.

Rejected because:

  • OpenTelemetry is the industry standard
  • Custom API would require additional exporters and integrations
  • Would not work with existing OpenTelemetry ecosystem
  • Increases maintenance burden

Alternative 3: Span Storage in Message Properties

Approach: Store spans directly in message properties instead of using separate TraceableMessage interface.

Rejected because:

  • Spans are not serializable
  • Would require serializing span context for every message (overhead)
  • Less clean API design
  • Harder to integrate with cumulative acknowledgment

Alternative 4: Per-Message Acknowledgment Only

Approach: Only support individual acknowledgment, not cumulative acknowledgment.

Rejected because:

  • Cumulative acknowledgment is a key Pulsar feature
  • Would leave spans unclosed until timeout
  • Poor user experience for Failover/Exclusive subscriptions
  • Incomplete tracing for common use cases

General Notes

Performance Considerations

The implementation is designed for minimal overhead:

  1. Zero overhead when disabled: No performance impact when tracing is not enabled
  2. Lazy initialization: Span maps only created for Failover/Exclusive subscriptions
  3. Efficient data structures: ConcurrentSkipListMap for O(log n) range operations
  4. Transient fields: Spans not serialized with messages
  5. Batching: OpenTelemetry SDK batches span exports by default

Testing

Comprehensive testing includes:

  1. Unit tests: OpenTelemetryTracingTest verifies span creation, attributes, and context propagation
  2. Example tests: TracingExampleTest demonstrates usage patterns
  3. Integration tests: Manual testing with Jaeger backend
  4. Compatibility tests: Verified with OpenTelemetry Java Agent

Documentation

User documentation provided in:

  • pulsar-client/TRACING.md: Comprehensive tracing guide
  • Javadoc comments on all public APIs
  • Code examples in test classes

Links