pip/pip-446.md
OpenTelemetry is a vendor-neutral observability framework that provides APIs, SDKs, and tools for collecting distributed traces, metrics, and logs. It has become the industry standard for observability, adopted by major cloud providers and APM vendors.
Distributed tracing tracks requests as they flow through distributed systems. A trace represents the entire journey of a request, composed of multiple spans. Each span represents a single operation (e.g., sending a message, processing a request). Spans form parent-child relationships, creating a trace tree that visualizes request flow across services.
The W3C Trace Context specification defines a standard way to propagate trace context across service boundaries using HTTP headers or message properties:
traceparent: Contains trace ID, span ID, and trace flagstracestate: Contains vendor-specific trace informationPulsar client interceptors allow users to intercept and modify messages before sending (producer) or after receiving (consumer). They provide hooks for cross-cutting concerns like tracing, metrics, and security.
In Pulsar, cumulative acknowledgment allows consumers to acknowledge all messages up to a specific message ID in one operation. This is only available for Failover and Exclusive subscription types where message order is guaranteed. When a message is cumulatively acknowledged, all previous messages on that partition are implicitly acknowledged.
Currently, the Pulsar Java client lacks native support for distributed tracing with OpenTelemetry. While the OpenTelemetry Java Agent can automatically instrument Pulsar clients, there are several limitations:
Native OpenTelemetry support would:
ClientBuilder APIThe implementation adds native OpenTelemetry tracing to the Pulsar Java client through:
Two new interfaces enable attaching tracing spans to messages and message IDs:
TraceableMessage: Allows messages to carry OpenTelemetry spansTraceableMessageId: Allows message IDs to carry OpenTelemetry spansTwo interceptors implement the tracing logic:
OpenTelemetryProducerInterceptor: Creates producer spans and injects trace contextOpenTelemetryConsumerInterceptor: Creates consumer spans and extracts trace contextUsers can enable tracing through the ClientBuilder:
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.openTelemetry(openTelemetry)
.enableTracing(true)
.build();
When enabled, the client automatically adds tracing interceptors to all producers and consumers.
The implementation uses W3C Trace Context format to propagate trace context:
traceparent and tracestate into message propertiesThis enables end-to-end tracing across services that communicate via Pulsar.
The implementation carefully manages span lifecycle:
For multi-topic consumers, the implementation maintains separate span maps per topic partition to correctly handle cumulative acknowledgments across multiple topics.
TraceableMessage interface (pulsar-client-api):
public interface TraceableMessage {
void setTracingSpan(io.opentelemetry.api.trace.Span span);
io.opentelemetry.api.trace.Span getTracingSpan();
}
TraceableMessageId interface (pulsar-client-api):
public interface TraceableMessageId {
void setTracingSpan(io.opentelemetry.api.trace.Span span);
io.opentelemetry.api.trace.Span getTracingSpan();
}
Both MessageImpl and MessageIdImpl implement these interfaces by adding a transient field to store the span without affecting serialization.
Located in pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/
Key methods:
beforeSend(): Creates a producer span and injects trace context into message propertiesonSendAcknowledgement(): Ends the span successfully and records message IDonPartitionsChange(): No-op (not needed for producer)Span creation:
TracingContext.createProducerSpan() to create a PRODUCER spansend {topic}messaging.system, messaging.destination.name, messaging.operation.namemessaging.message.id when broker acknowledgesTrace context injection:
TextMapPropagator to inject context into message propertiestraceparent and tracestate headersLocated in pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/
Key methods:
beforeConsume(): Extracts trace context and creates a consumer spanonAcknowledge(): Ends the span for individual ack with OK statusonAcknowledgeCumulative(): Ends all spans up to the acknowledged position with OK statusonNegativeAcksSend(): Ends the span with OK status and adds an event (not an error)onAckTimeoutSend(): Ends the span with OK status and adds an event (not an error)Span creation:
TracingContext.createConsumerSpan() to create a CONSUMER spanprocess {topic}messaging.system, messaging.destination.name, messaging.operation.name, messaging.message.idCumulative acknowledgment handling:
Map<String, ConcurrentSkipListMap<MessageIdAdv, Span>> for Failover/Exclusive subscriptionsTopicMessageId.getOwnerTopic())Multi-topic support:
Acknowledgment type tracking:
messaging.pulsar.acknowledgment.type attribute indicating how it was completed:
"acknowledge": Normal individual acknowledgment"cumulative_acknowledge": Cumulative acknowledgment"negative_acknowledge": Message negatively acknowledged (will be redelivered)"ack_timeout": Acknowledgment timeout (will be redelivered)Provides helper methods for span creation and management:
createProducerSpan(): Creates a producer span with correct attributescreateConsumerSpan(): Creates a consumer span with trace context extractionendSpan(): Safely ends a spanendSpan(span, exception): Ends a span with error statusisValid(): Checks if a span is valid and recordingHelper for manual trace context injection (advanced use cases):
injectContext(): Injects trace context into message propertiesextractFromHeaders(): Extracts trace context from HTTP headersNew API methods (ClientBuilder):
ClientBuilder enableTracing(boolean tracingEnabled);
Implementation (ClientBuilderImpl):
enableTracing() stores tracingEnabled flag in ClientConfigurationDataPulsarClientImplAutomatic interceptor addition (ConsumerBuilderImpl, ProducerBuilderImpl):
Enhanced to provide OpenTelemetry instance:
public OpenTelemetry getOpenTelemetry();
Falls back to GlobalOpenTelemetry.get() if not explicitly configured.
Modified classes:
MessageImpl: Implements TraceableMessageMessageIdImpl: Implements TraceableMessageIdTopicMessageImpl: Delegates TraceableMessage methods to wrapped messageTopicMessageIdImpl: Delegates TraceableMessageId methods to wrapped message IDConsumerBase: Provides getSubscriptionType() for interceptorsConsumerBuilderImpl: Auto-adds consumer interceptor when enabledProducerBuilderImpl: Auto-adds producer interceptor when enabledPulsarClientImpl: Stores and provides OpenTelemetry configurationClientConfigurationData: Stores OpenTelemetry and enableTracing settingsFollowing OpenTelemetry messaging semantic conventions:
Producer spans:
messaging.system: "pulsar"messaging.destination.name: Topic namemessaging.operation.name: "send"messaging.message.id: Message ID (added on ack)Consumer spans:
messaging.system: "pulsar"messaging.destination.name: Topic namemessaging.destination.subscription.name: Subscription namemessaging.operation.name: "process"messaging.message.id: Message IDmessaging.pulsar.acknowledgment.type: Custom attribute indicating how the message was acknowledged:
"acknowledge": Individual acknowledgment"cumulative_acknowledge": Cumulative acknowledgment"negative_acknowledge": Negative acknowledgment (message will be redelivered)"ack_timeout": Acknowledgment timeout (message will be redelivered)Rationale for messaging.pulsar.acknowledgment.type attribute:
New interfaces (org.apache.pulsar.client.api):
public interface TraceableMessage {
void setTracingSpan(io.opentelemetry.api.trace.Span span);
io.opentelemetry.api.trace.Span getTracingSpan();
}
public interface TraceableMessageId {
void setTracingSpan(io.opentelemetry.api.trace.Span span);
io.opentelemetry.api.trace.Span getTracingSpan();
}
ClientBuilder new methods:
/**
* Enable or disable automatic tracing.
* When enabled, uses GlobalOpenTelemetry.get() if no OpenTelemetry instance is set.
* Tracing interceptors are automatically added to all producers and consumers.
*/
ClientBuilder enableTracing(boolean tracingEnabled);
New public classes (org.apache.pulsar.client.impl.tracing):
OpenTelemetryProducerInterceptor: Producer interceptor for tracingOpenTelemetryConsumerInterceptor<T>: Consumer interceptor for tracingTracingContext: Utility methods for span creationTracingProducerBuilder: Helper for manual trace context injectionModified classes:
Message interface: Now extends TraceableMessage (via implementations)MessageId interface: Now extends TraceableMessageId (via implementations)No changes to binary protocol. Trace context is propagated via existing message properties mechanism.
New ClientBuilder options:
enableTracing(boolean): Enable automatic tracingExample configuration:
// Option 1: Explicit OpenTelemetry instance with tracing enabled
OpenTelemetry otel = OpenTelemetrySdk.builder()
.setTracerProvider(tracerProvider)
.build();
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.openTelemetry(otel)
.enableTracing(true)
.build();
// Option 2: Use GlobalOpenTelemetry
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.enableTracing(true) // Uses GlobalOpenTelemetry
.build();
// Option 3: Manual interceptor (advanced)
Producer<String> producer = client.newProducer(Schema.STRING)
.topic("my-topic")
.intercept(new OpenTelemetryProducerInterceptor())
.create();
No CLI changes. This is a client library feature.
This PIP focuses on distributed tracing, not metrics. No new metrics are added.
Users can monitor tracing effectiveness through their OpenTelemetry backend (e.g., Jaeger, Zipkin, Grafana Tempo):
Producer Performance:
send span durations by topicConsumer Performance:
process span durations by topicEnd-to-End Latency:
This feature does not introduce new security concerns:
Trace context in properties: Trace context (traceparent, tracestate) is stored in message properties, which are part of the existing message structure. No additional authentication or authorization is needed.
No sensitive data: Trace context only contains trace IDs, span IDs, and trace flags. No user data or sensitive information is included.
OpenTelemetry instance: The OpenTelemetry instance is provided by the application and follows the same security model as other client configuration.
Multi-tenancy: Tracing respects existing Pulsar multi-tenancy boundaries. Trace context is scoped to individual messages and does not leak across tenants.
No new endpoints: This feature does not add new HTTP endpoints or protocol commands.
Fully backward compatible. No breaking changes:
transient fields that don't affect serialization.Upgrade steps:
ClientBuilder.enableTracing(true)Fully compatible with downgrade:
Rollback steps:
No impact on geo-replication:
Considerations:
Approach: Only support tracing via OpenTelemetry Java Agent automatic instrumentation.
Rejected because:
Approach: Design a custom Pulsar-specific tracing API instead of using OpenTelemetry.
Rejected because:
Approach: Store spans directly in message properties instead of using separate TraceableMessage interface.
Rejected because:
Approach: Only support individual acknowledgment, not cumulative acknowledgment.
Rejected because:
The implementation is designed for minimal overhead:
ConcurrentSkipListMap for O(log n) range operationsComprehensive testing includes:
OpenTelemetryTracingTest verifies span creation, attributes, and context propagationTracingExampleTest demonstrates usage patternsUser documentation provided in:
pulsar-client/TRACING.md: Comprehensive tracing guide