sdks/python/design/API_AND_DATA_FLOW.md
The Opik Python SDK provides lightweight, non-blocking tracing for LLM applications. The architecture prioritizes minimal performance impact on user code through asynchronous message processing, intelligent batching, and background workers.
opik.OpikThe Opik class is the central entry point and factory for all SDK operations.
Location: opik/api_objects/opik_client.py
import opik
# Initialize client
client = opik.Opik(
project_name="my_project", # Optional: defaults to "Default Project"
workspace="my_workspace", # Optional: defaults to "default"
host="https://api.opik.com", # Optional: custom backend URL
api_key="your_api_key" # Optional: for cloud deployments
)
Configuration is resolved in this order (highest to lowest):
Opik() constructorOPIK_PROJECT_NAME, OPIK_WORKSPACE, etc.)~/.opik.config)# Create a trace
trace = client.trace(
name="my_operation",
input={"query": "What is AI?"},
metadata={"version": "1.0"},
tags=["production"]
)
# Create a span (must be within trace context or provide trace_id)
span = client.span(
name="llm_call",
trace_id=trace.id, # Required if no trace context
parent_span_id=None, # Optional: for nested spans
input={"prompt": "..."},
type="llm", # Types: "llm", "tool", "general"
model="gpt-4",
provider="openai"
)
# Update trace/span
client.trace(
id=trace.id,
output={"answer": "..."},
metadata={"tokens": 150}
)
# Add feedback scores
client.log_traces_feedback_scores(
scores=[{
"id": trace.id,
"name": "accuracy",
"value": 0.95,
"reason": "Accurate response"
}]
)
# Ensure all data is sent
client.flush(timeout=30) # Wait up to 30 seconds
import opik
# Simplest usage
@opik.track
def my_function(input: str) -> str:
# Automatically creates trace and span
# Captures input and output
return process(input)
# With options
@opik.track(
name="custom_name", # Override function name
project_name="my_project", # Set project
capture_input=True, # Capture inputs (default: True)
capture_output=True, # Capture outputs (default: True)
tags=["production"], # Add tags
metadata={"version": "1.0"}, # Add metadata
type="llm" # Set span type
)
def llm_call(prompt: str) -> str:
return call_llm(prompt)
# Nested functions create nested spans
@opik.track
def outer_function(data):
preprocessed = preprocess(data) # Creates nested span
result = process(preprocessed) # Creates nested span
return result
@opik.track
def preprocess(data):
return {"cleaned": data}
@opik.track
def process(data):
return {"result": data}
import opik
# Access current context
trace_data = opik.get_current_trace_data()
span_data = opik.get_current_span_data()
# Update current trace/span
opik.update_current_span(
metadata={"key": "value"},
tags=["important"],
usage={"completion_tokens": 100, "prompt_tokens": 50, "total_tokens": 150}
)
opik.update_current_trace(
output={"result": "success"}
)
# Context managers for manual control
with opik.start_as_current_trace(name="my_trace", input={"data": "test"}) as trace:
# Trace context active
with opik.start_as_current_span(name="step1", type="tool") as span:
# Span context active
do_work()
# Span auto-closed
# Trace auto-closed
# Distributed tracing
headers = opik.get_distributed_trace_headers()
# Pass headers to remote service
# Remote service continues trace: opik.track(distributed_headers=headers)
# Create and manage datasets
dataset = client.create_dataset(
name="my_dataset",
description="Test dataset"
)
dataset.insert([
{"input": "query1", "expected": "answer1"},
{"input": "query2", "expected": "answer2"}
])
# Create experiments
experiment = client.create_experiment(
name="exp_v1",
dataset_name="my_dataset"
)
# Manage prompts
prompt = client.create_prompt(
name="my_prompt",
prompt="Answer this question: {{question}}",
type="text"
)
# Create new version
prompt_v2 = prompt.create_version(
prompt="Enhanced: {{question}}\nContext: {{context}}"
)
The SDK is organized into 3 layers:
┌──────────────────────────────────────────────────────────────┐
│ Layer 1: Public API │
│ │
│ opik.Opik, @opik.track, opik_context │
│ - User-facing interface │
│ - Input validation │
│ - Context management │
└─────────────┬────────────────────────────┬───────────────────┘
│ │
│ Observability │ Resource Management
│ (trace, span, feedback) │ (dataset, experiment,
│ │ prompt, search, etc.)
│ │
▼ ▼
┌──────────────────────────────┐ ┌──────────────────────────┐
│ Layer 2: Message Processing │ │ API Object Clients │
│ (Observability operations) │ │ (Resource operations) │
│ │ │ │
│ Streamer │ │ Dataset, Experiment, │
│ ↓ │ │ Prompt, Attachment, │
│ Queue │ │ Threads clients │
│ ↓ │ │ │
│ Consumers │ │ - Manage state │
│ ↓ │ │ - Handle complex logic │
│ MessageProcessor │ │ - Wrap REST calls │
│ │ │ │
│ - Background async │ │ Delegates to ↓ │
│ - Batching │ └──────────────┬───────────┘
│ - Retry logic │ │
│ │ │
│ Delegates to ↓ │ │
└───────────────┬──────────────┘ │
│ │
└────────────────┬───────────────┘
▼
┌───────────────────────────────────────────────┐
│ Layer 3: REST API Client │
│ │
│ OpikApi (auto-generated from OpenAPI) │
│ - HTTP client │
│ - Request/response serialization │
│ - Connection pooling │
│ │
│ ═══════════════════════════════════════ │
│ ║ HTTP requests to Opik Backend ║ │
│ ║ (External service, not part of SDK)║ │
│ ═══════════════════════════════════════ │
└───────────────────────────────────────────────┘
Key Points:
opik.Opik, @opik.track)Two Execution Paths:
Observability operations (trace/span/feedback):
opik.Opik → Message Processing (Layer 2) → REST API Client → BackendResource management operations (dataset/experiment/prompt):
opik.Opik → API Object Clients → REST API Client → BackendAPI Object Clients (opik/api_objects/):
For complex resource types, intermediate client classes provide:
dataset.insert(), experiment.get_items(), prompt.format()Examples:
Dataset (dataset/dataset.py) - Manages dataset items, handles insertion/deletionExperiment (experiment/experiment.py) - Tracks experiment items, links to datasetPrompt (prompt/prompt.py) - Manages prompt versions and templatingAttachmentClient (attachment/client.py) - Handles file attachmentsThreadsClient (threads/threads_client.py) - Manages conversational threadsFor simple operations (search, get), opik.Opik calls REST client directly without intermediate client.
The Opik client provides two types of operations with different execution paths:
Observability operations that use background processing:
| Operation | Purpose | Returns |
|---|---|---|
trace() | Create/update trace | None (fire-and-forget) |
span() | Create/update span | None (fire-and-forget) |
log_traces_feedback_scores() | Add feedback to traces | None |
log_spans_feedback_scores() | Add feedback to spans | None |
experiment.insert() | Create experiment items | None |
| Attachment uploads | Upload files to S3 | None |
Flow: API → Message → Streamer → Queue → Consumer → REST Client → Backend
Characteristics:
flush() before app exitResource management and query operations that bypass message processing:
| Category | Operations | Uses |
|---|---|---|
| Dataset | create_dataset(), get_dataset(), delete_dataset() | Dataset client |
| Experiment | create_experiment(), get_experiment_by_id() | Experiment client |
| Prompt | create_prompt(), get_prompt(), update_prompt() | Prompt client |
| Search | search_traces(), search_spans() | Direct REST |
| Retrieval | get_trace_content(), get_span_content() | Direct REST |
| Delete | delete_traces(), delete_*_feedback_score() | Direct REST |
Flow (with API Object Client):
client.create_dataset(name) → Dataset.__init__() → REST Client → Backend
↓
Returns Dataset object with methods
Flow (direct REST):
client.search_traces() → REST Client → Backend → Returns List[TracePublic]
Characteristics:
Async path (observability):
Sync path (resources):
opik/context_storage.py)Manages trace and span context using Python's contextvars for proper isolation.
class OpikContextStorage:
def __init__(self):
# Context variables for isolation
self._current_trace_data_context: ContextVar[Optional[TraceData]]
self._spans_data_stack_context: ContextVar[Tuple[SpanData, ...]]
def set_trace_data(self, trace_data: TraceData) -> None:
"""Set current trace in context"""
def add_span_data(self, span_data: SpanData) -> None:
"""Push span onto stack"""
def pop_span_data(self) -> Optional[SpanData]:
"""Pop span from stack"""
def top_span_data(self) -> Optional[SpanData]:
"""Get current span without removing"""
Why contextvars?
opik/message_processing/streamer.py)Routes messages to appropriate handlers (queue, batch, or upload).
class Streamer:
def __init__(
self,
queue: MessageQueue,
queue_consumers: List[QueueConsumer],
batch_manager: Optional[BatchManager],
file_upload_manager: FileUploadManager
):
self._message_queue = queue
self._queue_consumers = queue_consumers
self._batch_manager = batch_manager
self._file_upload_manager = file_upload_manager
def put(self, message: BaseMessage) -> None:
"""Route message based on type"""
if self._batch_manager and message.supports_batching:
self._batch_manager.process_message(message)
elif message.supports_upload:
self._file_upload_manager.upload(message)
else:
self._message_queue.put(message)
def flush(self, timeout: Optional[float]) -> bool:
"""Wait for all messages to be processed"""
def close(self, timeout: Optional[int]) -> bool:
"""Stop processing and cleanup"""
opik/message_processing/message_queue.py)Thread-safe FIFO queue with backpressure handling.
class MessageQueue(Generic[T]):
def __init__(self, max_length: Optional[int] = None):
self._queue: queue.Queue[T] = queue.Queue()
self._max_length = max_length
def put(self, item: T) -> None:
"""Add message, discard oldest if full"""
if self._max_length and self._queue.qsize() >= self._max_length:
# Remove oldest message
try:
self._queue.get_nowait()
except queue.Empty:
pass
self._queue.put(item)
def get(self, timeout: float) -> Optional[T]:
"""Get next message"""
return self._queue.get(timeout=timeout)
def empty(self) -> bool:
"""Check if queue is empty"""
return self._queue.empty()
opik/message_processing/queue_consumer.py)Worker thread that processes messages from the queue.
class QueueConsumer(threading.Thread):
def __init__(
self,
queue: MessageQueue,
message_processor: MessageProcessor,
name: Optional[str] = None
):
super().__init__(daemon=True, name=name)
self._message_queue = queue
self._message_processor = message_processor
self.next_message_time = 0.0 # For rate limiting
def run(self) -> None:
"""Main worker loop"""
while not self._processing_stopped:
self._loop()
def _loop(self) -> None:
"""Process one message"""
# Check rate limiting
if time.monotonic() < self.next_message_time:
time.sleep(SLEEP_INTERVAL)
return
# Get and process message
try:
message = self._message_queue.get(timeout=SLEEP_INTERVAL)
if message and message.delivery_time <= time.monotonic():
self._message_processor.process(message)
except RateLimitError as e:
# Re-queue message with delay
self.next_message_time = time.monotonic() + e.retry_after
self._message_queue.put(message)
opik/message_processing/message_processors.py)Maps message types to REST API handlers.
class OpikMessageProcessor(BaseMessageProcessor):
def __init__(self, rest_client: OpikApi):
self._rest_client = rest_client
# Map message types to handlers
self._handlers: Dict[Type, MessageHandler] = {
CreateTraceMessage: self._process_create_trace_message,
CreateSpanMessage: self._process_create_span_message,
UpdateTraceMessage: self._process_update_trace_message,
UpdateSpanMessage: self._process_update_span_message,
AddTraceFeedbackScoresBatchMessage: self._process_feedback_scores,
CreateSpansBatchMessage: self._process_create_spans_batch,
CreateTraceBatchMessage: self._process_create_traces_batch,
# ... more handlers
}
def process(self, message: BaseMessage) -> None:
"""Process message by calling appropriate handler"""
handler = self._handlers.get(type(message))
if handler:
try:
handler(message)
except ApiError as e:
# Handle specific error cases
if e.status_code == 409:
return # Duplicate, ignore
elif e.status_code == 429:
raise RateLimitError(e)
else:
LOGGER.error(f"Failed to process message: {e}")
Let's follow a trace creation from user code to backend in detail.
import opik
client = opik.Opik(project_name="my_project")
trace = client.trace(
name="my_trace",
input={"query": "test"},
metadata={"version": "1.0"}
)
# In opik_client.py: Opik.trace()
def trace(self, name: str, input: dict, metadata: dict, **kwargs):
# 1. Generate ID if not provided
trace_id = kwargs.get("id") or id_helpers.generate_id()
# 2. Validate inputs
validation.validate_trace_parameters(name, input, metadata)
# 3. Create TraceData
trace_data = TraceData(
id=trace_id,
name=name,
input=input,
metadata=metadata,
project_name=self._project_name,
start_time=datetime_helpers.now()
)
# 4. Create message
message = CreateTraceMessage(
trace_id=trace_data.id,
name=trace_data.name,
input=trace_data.input,
metadata=trace_data.metadata,
project_name=trace_data.project_name,
start_time=trace_data.start_time,
# ... other fields
)
# 5. Send to streamer (non-blocking!)
self._streamer.put(message)
# 6. Return immediately
return trace_id
Key Point: User code continues immediately. Message processing happens asynchronously.
# In streamer.py: Streamer.put()
def put(self, message: BaseMessage) -> None:
with self._lock:
# Check if draining
if self._drain:
return
# do embedded attachments pre-processing first (MUST ALWAYS BE DONE FIRST)
preprocessed_message = self._attachments_preprocessor.preprocess(
message
)
# do batching pre-processing third
preprocessed_message = self._batch_preprocessor.preprocess(
preprocessed_message
)
# Route to queue
if not self._message_queue.accept_put_without_discarding():
LOGGER.warning("Queue full, discarding oldest message")
self._message_queue.put(preprocessed_message)
Decision Tree:
Message arrives
│
├─► Supports batching? ──Yes──► BatchManager
├─► Has file upload? ───Yes──► FileUploadManager
└─► Default ─────────────────► MessageQueue
If batching is enabled, certain message types accumulate before sending.
Messages that support batching (current implementation):
CreateSpanMessage → batched into CreateSpansBatchMessageCreateTraceMessage → batched into CreateTraceBatchMessageAddTraceFeedbackScoresBatchMessage → already a batch messageAddSpanFeedbackScoresBatchMessage → already a batch messageCreateExperimentItemsBatchMessage → already a batch messageMessages that don't support batching (sent individually):
UpdateSpanMessage - Updates sent immediatelyUpdateTraceMessage - Updates sent immediately# In batch_manager.py
class BatchManager:
def process_message(self, message: BaseMessage) -> None:
# Find or create batcher for this message type
batcher = self._get_or_create_batcher(type(message))
# Add to batch
batcher.add(message)
# Check if should flush
if batcher.should_flush():
self._flush_batcher(batcher)
def _flush_batcher(self, batcher: BaseBatcher) -> None:
# Create batch message
batch_message = batcher.create_batch_message()
# Send to queue
self._message_queue.put(batch_message)
# Clear batcher
batcher.clear()
Flush Triggers:
flush()# Queue consumer pulls message
message = self._message_queue.get(timeout=0.1)
# Check delivery time (for rate limiting)
if message.delivery_time > time.monotonic():
# Re-queue for later
self._message_queue.put(message)
return
# Process message
self._message_processor.process(message)
# In message_processors.py
def _process_create_trace_message(
self, message: CreateTraceMessage
) -> None:
# Map message to REST request
trace_write = trace_write.TraceWrite(
id=message.trace_id,
name=message.name,
input=message.input,
metadata=message.metadata,
project_name=message.project_name,
start_time=message.start_time.isoformat(),
# ... more fields
)
# Make REST API call
self._rest_client.traces.create_trace(
request=trace_write
)
# In rest_api (auto-generated)
def create_trace(self, request: TraceWrite) -> TracePublic:
# Serialize request
json_data = request.dict(exclude_none=True)
# Make HTTP request
response = self._client.post(
"/v1/traces",
json=json_data,
headers={"Authorization": f"Bearer {self._api_key}"}
)
# Handle response
if response.status_code == 201:
return TracePublic.parse_obj(response.json())
else:
raise ApiError(response)
Backend receives request and stores in database:
The @opik.track decorator provides automatic tracing. Here's the complete flow:
@opik.track
def my_function(x: int) -> int:
return x * 2
# Behind the scenes
_decorator = OpikTrackDecorator()
my_function = _decorator.track(my_function)
# User calls function
result = my_function(5)
Step-by-Step Execution:
1. Decorator intercepts call
↓
2. Check if tracing is active
↓
3. Extract function inputs
│
├─► Arguments: (5,)
├─► Keyword arguments: {}
└─► Combined: {"x": 5}
↓
4. Check for existing trace in context
│
├─► No trace? Create TraceData
│ │
│ ├─► Generate trace_id
│ ├─► Set start_time
│ ├─► Store in context: context_storage.set_trace_data()
│ │
│ Context state: trace_stack = [TraceData]
│
└─► Trace exists? Reuse
↓
5. Create SpanData
│
├─► Generate span_id
├─► Get trace_id from context
├─► Check for parent span
│ │
│ ├─► Parent exists? Set parent_span_id
│ └─► No parent? parent_span_id = None
│
├─► Capture input: {"x": 5}
├─► Set start_time
├─► Set type, name, metadata, tags
│
Context state: span_stack = [..., SpanData]
↓
6. Push span to context
context_storage.add_span_data(span_data)
↓
7. Execute wrapped function
│
├─► try:
│ result = my_function(5) # Original function
│ except Exception as e:
│ error_info = collect_error_info(e)
│ span_data.error_info = error_info
│ raise # Re-raise to user
│
└─► Returns: 10
↓
8. Capture output
│
├─► If capture_output=True:
│ │
│ ├─► Output is dict? Use as-is
│ └─► Not dict? Wrap: {"output": 10}
│
└─► span_data.output = {"output": 10}
↓
9. Set end_time
span_data.end_time = datetime_helpers.now()
↓
10. Pop span from context
span_data = context_storage.pop_span_data()
Context state: span_stack = [...]
↓
11. Send span to backend
│
├─► Create CreateSpanMessage from span_data
├─► streamer.put(message)
│
[Async processing begins]
↓
12. Check if top-level function
│
├─► span_stack is empty?
│ │
│ ├─► Yes: Also send trace
│ │ │
│ │ ├─► trace_data = context_storage.pop_trace_data()
│ │ ├─► Set trace end_time
│ │ ├─► Create CreateTraceMessage
│ │ └─► streamer.put(message)
│ │
│ Context state: trace_stack = [], span_stack = []
│
└─► No: Leave trace in context (more spans coming)
↓
13. Return result to user
return 10
@opik.track
def outer(x):
result = inner(x)
return result * 2
@opik.track
def inner(x):
return x + 1
# Call
outer(5)
Context State Timeline:
Time │ Action │ Trace Stack │ Span Stack
──────┼─────────────────────────┼────────────────┼─────────────────
T0 │ outer() called │ [] │ []
T1 │ Create trace │ [Trace-A] │ []
T2 │ Create span-outer │ [Trace-A] │ [Span-1]
T3 │ inner() called │ [Trace-A] │ [Span-1]
T4 │ Reuse trace │ [Trace-A] │ [Span-1]
T5 │ Create span-inner │ [Trace-A] │ [Span-1, Span-2]
│ (parent=Span-1) │ │
T6 │ inner() executes │ [Trace-A] │ [Span-1, Span-2]
T7 │ inner() returns │ [Trace-A] │ [Span-1, Span-2]
T8 │ Pop span-inner │ [Trace-A] │ [Span-1]
T9 │ Send Span-2 message │ [Trace-A] │ [Span-1]
T10 │ outer() continues │ [Trace-A] │ [Span-1]
T11 │ outer() returns │ [Trace-A] │ [Span-1]
T12 │ Pop span-outer │ [Trace-A] │ []
T13 │ Send Span-1 message │ [Trace-A] │ []
T14 │ Stack empty! Send trace │ [] │ []
T15 │ Pop trace, send message │ [] │ []
Resulting Tree:
Trace-A
└─ Span-1 (outer)
└─ Span-2 (inner)
All messages inherit from BaseMessage:
class BaseMessage:
delivery_time: float = 0.0 # For rate limiting
def dict(self) -> Dict[str, Any]:
"""Convert to dictionary"""
# Core message types
class CreateTraceMessage(BaseMessage):
trace_id: str
name: Optional[str]
input: Optional[Dict]
output: Optional[Dict]
metadata: Optional[Dict]
tags: Optional[List[str]]
start_time: datetime
end_time: Optional[datetime]
# ... more fields
class CreateSpanMessage(BaseMessage):
span_id: str
trace_id: str
parent_span_id: Optional[str]
name: Optional[str]
type: str
input: Optional[Dict]
output: Optional[Dict]
usage: Optional[Dict]
model: Optional[str]
provider: Optional[str]
# ... more fields
class UpdateSpanMessage(BaseMessage):
span_id: str
# Only fields to update
class UpdateTraceMessage(BaseMessage):
trace_id: str
# Only fields to update
# Batch message types
class CreateSpansBatchMessage(BaseMessage):
spans: List[CreateSpanMessage]
class CreateTraceBatchMessage(BaseMessage):
traces: List[CreateTraceMessage]
# Feedback messages
class AddTraceFeedbackScoresBatchMessage(BaseMessage):
feedback_scores: List[FeedbackScoreDict]
class AddSpanFeedbackScoresBatchMessage(BaseMessage):
feedback_scores: List[FeedbackScoreDict]
# Experiment item messages
class ExperimentItemMessage(BaseMessage):
id: str
experiment_id: str
trace_id: str
dataset_item_id: str
class CreateExperimentItemsBatchMessage(BaseMessage):
batch: List[ExperimentItemMessage]
def put(self, message: BaseMessage) -> None:
"""Route message to appropriate handler"""
# 1. Check if draining
if self._drain:
return # Drop message (shutdown in progress)
# 2. Check batching support
if (
self._batch_manager is not None
and self._batch_manager.message_supports_batching(message)
):
# Messages that support batching:
# - CreateSpanMessage
# - CreateTraceMessage
# - Feedback score messages (always batched)
self._batch_manager.process_message(message)
return
# 3. Check file upload support
if base_upload_manager.message_supports_upload(message):
# Messages with attachments
# - Uploaded to S3 first
# - Then regular message sent with S3 URLs
self._upload_preprocessor.upload(message)
return
# 4. Default: Add to queue
if not self._message_queue.accept_put_without_discarding():
# Queue is full
LOGGER.warning("Queue full, discarding oldest message")
self._message_queue.put(message)
Before messages reach the queue or batch manager, the SDK can optionally preprocess them to extract embedded base64-encoded attachments (images, PDFs, etc.) from trace/span input, output, and metadata fields.
# In streamer.py: Streamer.__init__()
def __init__(self, ...):
# Create preprocessing pipeline
self._message_preprocessors = []
# 1. Attachments preprocessor (conditionally wraps messages)
attachments_preprocessor = AttachmentsPreprocessor(enabled=True)
self._message_preprocessors.append(attachments_preprocessor)
# 2. Batching preprocessor (groups batchable messages)
batching_preprocessor = BatchingPreprocessor(...)
self._message_preprocessors.append(batching_preprocessor)
# Messages flow through preprocessors before routing
def put(self, message: BaseMessage) -> None:
# Apply preprocessors in order
for preprocessor in self._message_preprocessors:
message = preprocessor.preprocess(message)
# Then route to queue/batch/upload
self._route_message(message)
The AttachmentsPreprocessor decides which messages need attachment extraction:
class AttachmentsPreprocessor(MessagePreprocessor):
def preprocess(self, message: BaseMessage) -> BaseMessage:
"""
Wraps messages that need attachment extraction in AttachmentSupportingMessage.
Only wraps if:
1. Update messages (UpdateSpanMessage, UpdateTraceMessage) - always process
2. Create messages with end_time set - final data, ready to extract
Does NOT wrap:
- Create messages without end_time - in-progress operations
"""
if _has_potential_content_with_attachments(message):
return AttachmentSupportingMessage(message)
return message
def _has_potential_content_with_attachments(message: BaseMessage) -> bool:
# Check if it's an Update message - always process these
if isinstance(message, (UpdateSpanMessage, UpdateTraceMessage)):
return _message_has_field_of_interest_set(message)
# Check if it's a Create message with end_time set - only process these
if isinstance(message, (CreateSpanMessage, CreateTraceMessage)):
if message.end_time is not None:
return _message_has_field_of_interest_set(message)
return False
return False
def _message_has_field_of_interest_set(message) -> bool:
"""Check if message has input, output, or metadata fields set"""
return (
message.input is not None
or message.output is not None
or message.metadata is not None
)
Key Design Decision: Why skip Create messages without end_time?
Trace/Span Lifecycle:
│
├─► Create (no end_time) ──► In-progress operation
│ │ - May be updated multiple times
│ │ - Extracting attachments now is wasteful
│ │ - Will extract on final update anyway
│ │
│ └─► Update (sets end_time) ──► Completed operation
│ │ - Contains final data
│ │ - Extract attachments now ✓
│ │
│ └─► Backend storage
│
└─► Create (with end_time) ──► Synchronous/completed operation
│ - Contains final data upfront
│ - Extract attachments now ✓
│
└─► Backend storage
Performance Impact:
class AttachmentSupportingMessage(BaseMessage):
"""
Wrapper that signals a message needs attachment extraction.
The wrapped message is processed by AttachmentsExtractionProcessor
before being sent to backend.
"""
original_message: Union[
CreateSpanMessage,
UpdateSpanMessage,
CreateTraceMessage,
UpdateTraceMessage
]
1. Message arrives at Streamer
│
▼
2. AttachmentsPreprocessor.preprocess()
│
├─► Should extract? (Update or Create with end_time)
│ │
│ ├─► Yes: Wrap in AttachmentSupportingMessage
│ │ │
│ │ └─► Route to AttachmentsExtractionProcessor
│ │ │
│ │ ├─► Extract base64 attachments from input/output/metadata
│ │ │ - Handles nested dictionaries and lists
│ │ │ - Supports PNG, JPEG, PDF, GIF, WebP, SVG, JSON
│ │ │ - Replaces base64 with placeholder: [filename.png]
│ │ │
│ │ ├─► Upload attachments to S3
│ │ │
│ │ └─► Forward original message (now sanitized) to queue
│ │
│ └─► No: Pass through unchanged
│ │
│ └─► Route directly to queue/batch/upload
│
▼
3. Message continues through normal pipeline
The extractor recursively processes nested data structures:
class AttachmentsExtractor:
def extract_and_replace(
self,
data: Dict[str, Any],
entity_type: Literal["span", "trace"],
entity_id: str,
project_name: str,
context: Literal["input", "output", "metadata"],
) -> List[AttachmentWithContext]:
"""
Extract attachments from data and replace with placeholders.
Handles:
- Simple strings: {"image": "data:image/png;base64,..."}
- Nested dicts: {"user": {"avatar": "data:image/png;base64,..."}}
- Lists: {"images": ["data:image/png;base64,...", "data:image/jpeg;base64,..."]}
- Mixed: {"messages": [{"role": "user", "content": [{"image": "data:..."}]}]}
"""
attachments = []
for key, value in data.items():
result = self._try_extract_attachments(value, context)
if result.attachments:
data[key] = result.sanitized_data
attachments.extend(...)
return attachments
def _try_extract_attachments(self, data: Any, context: str) -> ExtractionResult:
"""Recursively extract from any data type"""
if isinstance(data, str):
return self._extract_from_string(data, context)
elif isinstance(data, dict):
return self._extract_from_dict(data, context)
elif isinstance(data, list):
return self._extract_from_list(data, context)
else:
# int, bool, None, etc. - return as-is
return ExtractionResult(attachments=[], sanitized_data=data)
Example: Nested structure extraction
# Input
trace_input = {
"messages": [
{
"role": "user",
"content": [
{"type": "text", "text": "What's in this image?"},
{"type": "image_url", "image_url": {"url": "data:image/png;base64,iVBORw0K..."}}
]
}
]
}
# After extraction
trace_input = {
"messages": [
{
"role": "user",
"content": [
{"type": "text", "text": "What's in this image?"},
{"type": "image_url", "image_url": {"url": "[input-attachment-abc123.png]"}}
]
}
]
}
# Extracted attachment uploaded to S3
# Attachment: {file_name: "input-attachment-abc123.png", content_type: "image/png", ...}
Supported Formats:
data:<mime-type>;base64,<base64-data>User Code
│
▼
CreateSpanMessage(
input={"image": "data:image/png;base64,..."},
end_time=now() # ← Key: end_time is set
)
│
▼
Streamer.put()
│
├─► AttachmentsPreprocessor
│ │
│ └─► Has end_time? YES → Wrap in AttachmentSupportingMessage
│
▼
Route to AttachmentsExtractionProcessor
│
├─► Extract attachments
│ - Find base64 data in input
│ - Decode and identify type (PNG)
│ - Save to temporary file
│ - Replace with placeholder
│
├─► Upload to S3
│ - CreateAttachmentMessage → S3
│
└─► Forward sanitized CreateSpanMessage
- input={"image": "[input-attachment-123.png]"}
│
▼
BatchManager (or Queue)
│
▼
Backend storage
class QueueConsumer(threading.Thread):
def run(self) -> None:
"""Main worker loop"""
while not self._processing_stopped:
self._loop()
def _loop(self) -> None:
"""Process one message"""
# 1. Check rate limiting
now = time.monotonic()
if now < self.next_message_time:
self.idling = False
time.sleep(SLEEP_BETWEEN_LOOP_ITERATIONS)
return
# 2. Get message from queue
try:
self.idling = True
message = self._message_queue.get(
timeout=SLEEP_BETWEEN_LOOP_ITERATIONS
)
self.idling = False
if message is None:
return
# 3. Check delivery time
if message.delivery_time <= now:
# Ready to process
self._message_processor.process(message)
else:
# Not ready yet, re-queue
self._push_message_back(message)
except Empty:
time.sleep(SLEEP_BETWEEN_LOOP_ITERATIONS)
except OpikCloudRequestsRateLimited as e:
# 4. Handle rate limiting
LOGGER.info(
"Rate limited, retrying in %s seconds",
e.retry_after
)
# Update next processing time
self.next_message_time = now + e.retry_after
# Re-queue message with delay
if message is not None:
message.delivery_time = self.next_message_time
self._push_message_back(message)
except Exception as ex:
LOGGER.error("Unexpected error: %s", ex, exc_info=ex)
def process(self, message: BaseMessage) -> None:
"""Process message with comprehensive error handling"""
message_type = type(message)
handler = self._handlers.get(message_type)
if handler is None:
LOGGER.debug("Unknown message type: %s", message_type.__name__)
return
try:
# Execute handler
handler(message)
except ApiError as exception:
# 1. Handle duplicate requests
if exception.status_code == 409:
# Retry mechanism sent duplicate, ignore
return
# 2. Handle rate limiting
elif exception.status_code == 429:
# Extract retry-after from headers
if exception.headers is not None:
rate_limiter = rate_limit.parse_rate_limit(exception.headers)
if rate_limiter is not None:
raise OpikCloudRequestsRateLimited(
headers=exception.headers,
retry_after=rate_limiter.retry_after()
)
# 3. Other API errors
LOGGER.error(
"Failed to process %s: %s",
message_type.__name__,
str(exception),
extra={"error_tracking_extra": error_info}
)
except RetryError as retry_error:
# 4. Retry exhausted
cause = retry_error.last_attempt.exception()
LOGGER.error(
"Retries exhausted for %s: %s",
message_type.__name__,
cause
)
LOGGER.warning("Check Opik configuration")
except ValidationError as validation_error:
# 5. Data validation failed
LOGGER.error(
"Validation failed for %s: %s",
message_type.__name__,
validation_error
)
Batching reduces overhead by:
class BatchManager:
def __init__(self, message_queue: MessageQueue):
self._message_queue = message_queue
self._batchers: Dict[Type, BaseBatcher] = {}
self._lock = threading.RLock()
# Timer for periodic flushing
self._timer: Optional[threading.Timer] = None
self._flush_interval = 1.0 # seconds
def start(self) -> None:
"""Start periodic flushing"""
self._schedule_flush()
def _schedule_flush(self) -> None:
"""Schedule next flush"""
self._timer = threading.Timer(
self._flush_interval,
self._periodic_flush
)
self._timer.daemon = True
self._timer.start()
def _periodic_flush(self) -> None:
"""Flush all batchers periodically"""
self.flush()
if not self._stopped:
self._schedule_flush()
class SpansBatcher(BaseBatcher):
def __init__(self, max_batch_size: int = 100, max_memory_mb: int = 50):
self._messages: List[CreateSpanMessage] = []
self._max_batch_size = max_batch_size
self._max_memory_bytes = max_memory_mb * 1024 * 1024
self._current_memory = 0
def add(self, message: CreateSpanMessage) -> None:
"""Add span to batch"""
self._messages.append(message)
self._current_memory += self._estimate_size(message)
def should_flush(self) -> bool:
"""Check if batch should be flushed"""
return (
len(self._messages) >= self._max_batch_size
or self._current_memory >= self._max_memory_bytes
)
def create_batch_message(self) -> CreateSpansBatchMessage:
"""Create batch message from accumulated spans"""
return CreateSpansBatchMessage(
spans=self._messages.copy()
)
def clear(self) -> None:
"""Clear batch"""
self._messages.clear()
self._current_memory = 0
def _estimate_size(self, message: CreateSpanMessage) -> int:
"""Estimate message size in bytes"""
# Rough estimation based on serialized size
data = message.dict()
return len(json.dumps(data).encode('utf-8'))
Individual messages arrive
│
▼
┌────────────────────────────────────┐
│ BatchManager.process_message() │
│ │
│ 1. Find batcher for message type │
│ 2. Add to batch │
│ 3. Check flush conditions │
└────────┬───────────────────────────┘
│
▼
Should flush?
│
┌────┴────┐
│ │
No Yes
│ │
│ ▼
│ ┌──────────────────────────┐
│ │ Create batch message │
│ │ (e.g., 100 spans) │
│ └───┬──────────────────────┘
│ │
│ ▼
│ ┌──────────────────────────┐
│ │ Add to MessageQueue │
│ └───┬──────────────────────┘
│ │
│ ▼
│ ┌──────────────────────────┐
│ │ Clear batcher │
│ └──────────────────────────┘
│
└──► Continue accumulating
def _periodic_flush(self) -> None:
"""Called every flush_interval seconds"""
with self._lock:
for batcher in self._batchers.values():
if not batcher.is_empty():
batch_message = batcher.create_batch_message()
self._message_queue.put(batch_message)
batcher.clear()
Example: Every 1 second, flush all non-empty batchers.
def process_message(self, message: BaseMessage) -> None:
"""Add message to batch, flush if size limit reached"""
batcher = self._get_or_create_batcher(type(message))
batcher.add(message)
if batcher.should_flush():
batch_message = batcher.create_batch_message()
self._message_queue.put(batch_message)
batcher.clear()
Example: Batch reaches 100 messages.
def should_flush(self) -> bool:
"""Check memory threshold"""
return (
len(self._messages) >= self._max_batch_size
or self._current_memory >= self._max_memory_bytes
)
Example: Batch reaches 50MB.
def flush(self) -> None:
"""User-triggered flush"""
client.flush(timeout=30)
# Internally:
# 1. Flush all batchers
batch_manager.flush()
# 2. Wait for queue to empty
while not message_queue.empty() and not timeout:
time.sleep(0.1)
def stop(self) -> None:
"""Flush on shutdown"""
self._stopped = True
# Cancel timer
if self._timer:
self._timer.cancel()
# Flush all remaining messages
self.flush()
def _process_create_spans_batch_message(
self, message: CreateSpansBatchMessage
) -> None:
"""Process batch of spans"""
# Split into chunks if too large
chunks = sequence_splitter.split_into_chunks(
message.spans,
max_chunk_size=self._batch_memory_limit_mb
)
for chunk in chunks:
# Convert to REST request format
span_writes = [
span_write.SpanWrite.from_message(span_msg)
for span_msg in chunk
]
# Make single API call for all spans
self._rest_client.spans.create_spans_batch(
request=span_writes
)
Benefits:
The SDK uses Python's standard logging with structured extra data.
import logging
# Module-level loggers
LOGGER = logging.getLogger(__name__)
# Logging hierarchy
opik # Root logger
├── opik.api_objects # API objects
├── opik.decorator # Decorator logic
├── opik.message_processing # Message processing
│ ├── streamer
│ ├── message_processors
│ └── batching
└── opik.evaluation # Evaluation
# Queue full warning
LOGGER.warning(
"Queue size limit reached. Message added, oldest discarded."
)
# Rate limiting info
LOGGER.info(
"Rate limited, retrying in %s seconds, queue size: %d",
retry_after,
queue_size
)
# Processing error
LOGGER.error(
"Failed to process %s: %s",
message_type.__name__,
str(exception),
extra={"error_tracking_extra": error_info}
)
# Configuration warning
LOGGER.warning(
"Opik may not be configured correctly. "
"Run 'opik configure' to set up."
)
The SDK integrates with Sentry for error tracking (opt-in, randomized).
# Only track specific errors
class ErrorLevelCountFilter:
"""Only send ERROR and CRITICAL level events"""
def __call__(self, event, hint):
return event['level'] in ['error', 'fatal']
class ResponseStatusCodeFilter:
"""Filter out expected HTTP errors"""
def __call__(self, event, hint):
# Don't track 409 (Conflict - duplicate)
# Don't track 429 (Rate Limit - expected)
status_code = extract_status_code(event)
return status_code not in [409, 429]
def _generate_error_tracking_extra(
exception: Exception,
message: BaseMessage
) -> Dict[str, Any]:
"""Generate structured error context"""
return {
"message_type": type(message).__name__,
"exception_type": type(exception).__name__,
"status_code": getattr(exception, 'status_code', None),
"message_data": message.dict(),
"timestamp": datetime.now().isoformat()
}
The SDK tracks internal performance:
# Message queue metrics
queue_size = message_queue.qsize()
queue_full_events = metric_counter["queue_full"]
# Batch metrics
batch_size = len(batcher._messages)
batch_memory = batcher._current_memory
batches_flushed = metric_counter["batches_flushed"]
# Consumer metrics
consumer_idle = consumer.idling
messages_processed = metric_counter["messages_processed"]
processing_errors = metric_counter["processing_errors"]
# Flush status
success = client.flush(timeout=30)
if not success:
LOGGER.warning("Flush timeout, some messages may not be sent")
# Queue info (logged automatically)
LOGGER.info("Queue size: %d messages", queue_size)
from opik.healthcheck import check_health
# Check SDK health
result = check_health()
# Returns:
{
"backend_reachable": True,
"authentication_valid": True,
"project_exists": True,
"version": "0.1.0",
"configuration": {...}
}
# Unlimited (default)
client = opik.Opik() # No queue limit
# Limited (for memory-constrained environments)
# Set via config, not directly exposed
Total: N + M + 1 background threads
# Non-blocking call
trace = client.trace(...) # Returns trace object immediately (~1μs)
# Decorator overhead
@opik.track
def my_function():
pass # Overhead: ~10-100μs per call
# Async processing
# User code continues immediately
# Backend calls happen in background
# Flush latency
client.flush(timeout=30) # Waits for all messages
# Check message queue size
# If growing unbounded:
# 1. Check backend connectivity
# 2. Check rate limiting
# 3. Reduce tracing frequency
# If flush() takes too long:
# 1. Check queue size (too many pending messages)
# 2. Check network latency
# 3. Increase timeout
client.flush(timeout=60) # Increase timeout
# If messages not appearing:
# 1. Check backend connectivity
# 2. Verify authentication
# 3. Check for ERROR logs
# 4. Call flush() before exit
The Opik Python SDK provides:
For more information, see: