sdks/python/design/INTEGRATIONS.md
The SDK provides automatic tracking for 12+ LLM frameworks through three architectural patterns. Integrations are designed to be lightweight, extensible, and framework-native.
| Integration | Pattern | Location | Key Features |
|---|---|---|---|
| OpenAI | Method Patching | integrations/openai/ | Multiple APIs, streaming, function calling |
| Anthropic | Method Patching | integrations/anthropic/ | Messages API, delta accumulation |
| Bedrock | Method Patching | integrations/bedrock/ | Multi-format aggregators, extensible |
| Google GenAI | Method Patching | integrations/genai/ | Multi-modal support |
| AISuite | Method Patching | integrations/aisuite/ | Unified interface |
| LangChain | Callback | integrations/langchain/ | BaseTracer, provider extractors, external context support |
| LlamaIndex | Callback | integrations/llama_index/ | Event parsing, dedicated client |
| DSPy | Callback | integrations/dspy/ | Isolated context, graph visualization |
| Haystack | Callback | integrations/haystack/ | Component-based |
| ADK | Hybrid | integrations/adk/ | OpenTelemetry interception + callbacks |
| CrewAI | Hybrid | integrations/crewai/ | Method wrapping + LiteLLM delegation |
Library Architecture Analysis:
Does library provide callbacks/hooks?
│
├─► Yes ─► Callbacks reliable and in-context?
│ │
│ ├─► Yes ─► Pure Callback
│ │ (LangChain, LlamaIndex, DSPy, Haystack)
│ │
│ └─► No ─► Hybrid (Callback + Patching)
│ (ADK, CrewAI)
│
└─► No ─► Method Patching
(OpenAI, Anthropic, Bedrock, GenAI, AISuite)
Why callbacks alone may be insufficient:
Solution: Add patching/integration for OpenTelemetry interception (ADK) or external dependency tracking (CrewAI).
Method patching wraps client methods to intercept calls:
track_library(client) → Wraps methods → client.method() intercepted
↓
BaseTrackDecorator
↓
_start_span_inputs_preprocessor
(extract input, create span)
↓
Call original method
↓
_streams_handler
(check if output is stream)
↓
┌────────┴────────┐
│ │
Stream? Not stream
│ │
Patch stream │
Defer finalization │
Return patched │
│ │
└────────┬────────┘
↓
_end_span_inputs_preprocessor
(extract output, usage, finalize span)
(called immediately for non-streaming,
or in finally block for streaming)
All method patching integrations are idempotent: Use opik_tracked marker to prevent double-wrapping.
Files:
opik_tracker.py - Main entry point, wraps client methodsopenai_chat_completions_decorator.py - Chat completions decoratoropenai_responses_decorator.py - Responses API decoratorstream_patchers.py - Stream iteration patchingchat_completion_chunks_aggregator.py - Chunk aggregationresponse_events_aggregator.py - Response events aggregationWrapped Methods:
chat.completions.create() - Standard chat APIbeta.chat.completions.parse() - Structured outputsresponses.create() - Responses APIStreaming Support: Handles openai.Stream, openai.AsyncStream, and ChatCompletionStreamManager.
Files:
opik_tracker.py - Main entry pointmessages_create_decorator.py - Messages decoratorstream_patchers.py - Stream/context manager patchingWrapped Methods:
messages.create() - Both standard and streamingmessages.stream() - Context manager patternKey Implementation Detail: Delta Accumulation
Anthropic streams delta events (not complete chunks) that must be accumulated. Event accumulator builds complete message by merging deltas progressively.
Location: stream_patchers.py - See accumulation logic
Files:
opik_tracker.py - Main entry pointconverse/converse_decorator.py - Converse APIinvoke_model/invoke_model_decorator.py - Legacy InvokeModel APIinvoke_model/chunks_aggregator/ - Extensible aggregator systemWrapped Methods:
client.converse() - Unified Converse APIclient.invoke_model() - Legacy API (multiple formats)client.invoke_agent() - Agent invocationsKey Implementation Detail: Extensible Multi-Format Aggregator
Problem: Bedrock supports multiple model formats (Claude, Nova, Llama, Mistral) with different streaming structures.
Solution: Registry pattern with pluggable aggregators.
Architecture (invoke_model/chunks_aggregator/):
base.py - ChunkAggregator protocolformat_detector.py - Detection registry + aggregator registryclaude.py, nova.py, llama.py, mistral.py - Format-specific aggregatorsapi.py - Public interface: detect_format() + aggregate_chunks_to_dataclass()Extensibility: Add new format by creating module + registering in format_detector.py. Zero changes to existing code.
Benefits: Open/Closed Principle, isolated testing, clear separation of concerns.
Documentation: See EXTENDING.md and README.md in chunks_aggregator/ directory.
Files:
opik_tracker.py - Main entry pointgenerate_content_decorator.py - Content generation decoratorstream_wrappers.py - Stream handlinggenerations_aggregators.py - Chunk aggregationFeatures: Multi-modal support (text, images), streaming responses.
Files:
opik_tracker.py - Main entry pointaisuite_decorator.py - Decorator implementationPattern: Similar to OpenAI (unified interface across providers).
Callback integrations implement framework's callback interface:
Framework execution → Fires events → Callback methods
↓
on_start() - Create span/trace
on_end() - Update and send
on_error() - Capture error, finalize
Files:
opik_tracer.py - Implements BaseTracerlanggraph_tracer_injector.py - Graph configuration injection for LangGraphlanggraph_async_context_bridge.py - Context propagation for async LangGraph nodesprovider_usage_extractors/ - Provider-specific usage extractionhelpers.py - Utility functionsbase_llm_patcher.py - Adds base_url to LLM dict (for provider ID)Pattern: Pure callback (extends langchain_core.tracers.BaseTracer)
Key Feature: Supports parent-child relations with external Opik spans/traces
When used within @track decorated functions or existing Opik trace context:
context_storageExample:
@opik.track # Opik trace + span
def my_function():
chain.invoke(..., callbacks=[OpikTracer()]) # LangChain spans as children
State Management:
_span_data_map: Dict[UUID, SpanData] - Maps LangChain run_id to Opik span_created_traces_data_map: Dict[UUID, TraceData] - Maps run_id to trace_externally_created_traces_ids: Set[str] - Tracks external tracesCallback Methods (implements full BaseTracer interface):
Chain callbacks:
_on_chain_start(run) → Check for existing trace, create span as child if exists_on_chain_end(run) → Finalize span, send to backend_on_chain_error(run) → Capture error info, finalize spanLLM callbacks:
on_chat_model_start(...) → Special handling for chat models_on_chat_model_start(run) → Internal processing_on_llm_start(run) → Create LLM span (type="llm"), extract provider_on_llm_end(run) → Extract usage via provider extractors, send span_on_llm_error(run) → Capture error, finalize spanTool callbacks:
_on_tool_start(run) → Create tool span (type="tool")_on_tool_end(run) → Finalize tool span_on_tool_error(run) → Capture error, finalize spanError callbacks ensure spans finalized even when LangChain operations fail.
Key Implementation Detail: Provider-Specific Usage Extractors
Location: provider_usage_extractors/
Challenge: Each LangChain provider stores usage in different locations/formats within the Run object.
Solution: Registry pattern with provider-specific extractors.
Extractors:
OpenAIUsageExtractor - Extracts from run.outputs.llm_output.token_usageAnthropicUsageExtractor - Handles Anthropic formatBedrockUsageExtractor - Handles Bedrock formatGoogleUsageExtractor - Handles Google formatusage_extractor.py for full registryEach extractor knows where to find usage in that provider's Run structure.
LangGraph Support:
The integration provides enhanced support for LangGraph through:
track_langgraph() Function: High-level wrapper that injects OpikTracer into the graph's default configuration, eliminating the need to pass config={"callbacks": [opik_tracer]} on every invocation.
Automatic Graph Visualization: Extracts and stores Mermaid graph structure in trace metadata via OpikTracer.set_graph() method.
Async Context Bridge: extract_current_langgraph_span_data() helper for propagating trace context to @track-decorated functions in async LangGraph nodes.
Usage Pattern:
from opik.integrations.langchain import OpikTracer, track_langgraph
from langgraph.graph import StateGraph, START, END
# Build and compile graph
builder = StateGraph(State)
builder.add_node("my_node", my_node_function)
builder.add_edge(START, "my_node")
builder.add_edge("my_node", END)
app = builder.compile()
# Track once
opik_tracer = OpikTracer(tags=["production"])
app = track_langgraph(app, opik_tracer)
# All invocations automatically tracked
result = app.invoke({"message": "Hello"})
Implementation Details:
langgraph_tracer_injector.py - Injects OpikTracer into graph's default configlanggraph_async_context_bridge.py - Extracts span data from LangGraph config for async context propagationOpikTracer.set_graph() - Stores graph visualization in _trace_default_metadata["_opik_graph_definition"]Files:
callback.py - Implements BaseCallbackHandlerevent_parsing_utils.py - Parses LlamaIndex event payloadsEvent Handling:
on_event_start(event_type, payload, event_id, parent_id) → Parse payload, create spanon_event_end(event_type, payload, event_id) → Parse output/usage, send spanEvent Parser (event_parsing_utils.py): Extracts data from payloads based on event_type (EMBEDDING, QUERY, LLM, etc.).
Files:
callback.py - Implements dspy.utils.callback.BaseCallbackgraph.py - Mermaid graph builder for DSPy programsCallbacks:
on_module_start/end() - DSPy module executionon_lm_start/end() - LM calls (extracts provider/model from "provider/model" format)on_tool_start/end() - Tool executionsKey Implementation Detail: Global Context Storage with Safe Operations
Uses global OpikContextStorage instance, enabling opik.opik_context API access to spans/traces created by DSPy callbacks. This allows users to:
opik_context.get_current_span_data() / opik_context.get_current_trace_data()opik_context.update_current_span() / opik_context.update_current_trace()Context Safety: Uses ensure_id parameter for all context pop operations (pop_span_data(ensure_id=...), pop_trace_data(ensure_id=...)) to prevent context corruption in concurrent scenarios or when DSPy callbacks coexist with @track decorated functions.
Graph Visualization: Builds Mermaid diagram of DSPy program structure (graph.py).
Files:
opik_connector.py - Component added to pipelineopik_tracer.py - Tracer for pipeline executionconverters.py - Convert Haystack objects to Opik formatPattern: Component-based (added to pipeline, observes without modifying data flow).
Files:
opik_tracer.py - Agent callbackspatchers/adk_otel_tracer/opik_adk_otel_tracer.py - OpenTelemetry tracerrecursive_callback_injector.py - Recursive callback injectiongraph/mermaid_graph_builder.py - Agent graph visualizationpatchers/patchers.py - Global patchesWhy Hybrid: ADK uses OpenTelemetry for internal tracing + provides agent callbacks.
Dual Approach:
OpenTelemetry Patching (patchers/adk_otel_tracer/opik_adk_otel_tracer.py):
start_span() calls from ADKINVALID_SPAN (no-op for OpenTelemetry)_ADK_INTERNAL_SPAN_NAME_SKIP_LISTAgent Callbacks (opik_tracer.py):
before/after_agent_callbackbefore/after_model_callbackbefore/after_tool_callbackrecursive_callback_injector.py)Key Implementation Details:
OpenTelemetry Interception: Instead of dual tracing (OTel + Opik), intercepts OTel tracer to create only Opik spans. Single tracing backend, no OpenTelemetry overhead. Callbacks is used only to update spans and traces, but it's OTel tracer that is responsible for creating them and working with context (it's done to benefit from reliability of OTel context manager)
Graph Visualization (graph/mermaid_graph_builder.py): Generates Mermaid diagram of agent structure including:
_opik_graph_definitionFiles:
opik_tracker.py - Main tracking setupcrewai_decorator.py - Decorator for CrewAI methodsflow_patchers.py - Flow class patchingWhy Hybrid: CrewAI methods wrapped + LiteLLM used for LLM tracking + direct provider client patching for v1.0.0+.
Approach:
Crew.kickoff, Agent.execute_task, Task.execute_synclitellm.track_litellm() (CrewAI uses LiteLLM internally for v0.x)Flow.__init__ to auto-wrap dynamically registered methods (v1.0.0+ only)crew argument is providedKey Implementation Details:
LiteLLM Delegation: Reuses existing LiteLLM integration instead of duplicating LLM tracking logic.
Flow Patching (flow_patchers.py): Patches constructor to wrap methods registered via @start, @listen decorators. Gracefully handles missing Flow class (not available in CrewAI < v1.0.0).
Graceful Degradation: Handles missing provider libraries gracefully:
crewai.llms.providers.openai.completion) is not installed, logs debug message and continuesUsage:
# For CrewAI v0.x (LiteLLM-based)
track_crewai(project_name="my-project")
# For CrewAI v1.0.0+ (direct provider clients)
crew = Crew(agents=[...], tasks=[...])
track_crewai(project_name="my-project", crew=crew) # crew argument enables LLM client tracking
Used by: OpenAI (openai.Stream), Anthropic (anthropic.Stream)
Files: stream_patchers.py in each integration
Approach:
__iter__ from classStream.__iter__ = wrapperstream.opik_tracked_instance = TrueKey Pattern - Context Pop Before Streaming:
Before returning stream, pop span/trace from context:
def _streams_handler(self, output, ...):
if is_stream(output):
# Pop BEFORE returning (stream consumed later)
span_to_end, trace_to_end = base_track_decorator.pop_end_candidates()
return patch_stream(output, span_to_end, trace_to_end, ...)
Why: Stream consumption happens after decorator returns. Popping prevents nested calls from seeing stale context.
Key Pattern - Finalization Guarantee:
All stream wrappers use finally:
def wrapper(self):
try:
accumulated = []
for item in original(self):
accumulated.append(item)
yield item
finally:
# ALWAYS runs - even if stream not fully consumed
finalize_span(aggregator(accumulated), ...)
Why: User might break early or exception occurs. Span must finalize.
Used by: Anthropic (MessageStreamManager)
Approach:
__enter__ and __exit__ of stream manager__exit__Files: stream_patchers.py
Suitable for stream managers that use with statement pattern.
Used by: Some Bedrock/GenAI cases
Location: opik/decorator/generator_wrappers.py
Approach: Wrap generator without modifying library classes. Returns custom proxy that finalizes in __del__ or explicit close.
Location: opik/llm_usage/opik_usage.py
All providers map to standardized format:
class OpikUsage(pydantic.BaseModel):
completion_tokens: Optional[int]
prompt_tokens: Optional[int]
total_tokens: Optional[int]
provider_usage: Optional[BaseOriginalProviderUsage] # Original preserved
Location: opik/llm_usage/opik_usage_factory.py
Registry with builder functions per provider:
_PROVIDER_TO_OPIK_USAGE_BUILDERS: Dict[Provider, List[Callable]] = {
LLMProvider.OPENAI: [
OpikUsage.from_openai_completions_dict,
OpikUsage.from_openai_responses_dict, # Multiple formats supported
],
LLMProvider.ANTHROPIC: [OpikUsage.from_anthropic_dict],
LLMProvider.BEDROCK: [OpikUsage.from_bedrock_dict],
# ...
}
Process:
build_opik_usage(provider, usage_dict)OpikUsageExtensibility: Add new provider by:
MyProviderUsage classfrom_myprovider_dict() to OpikUsageLocation: opik/types.py
Supported providers for cost tracking:
OPENAI, ANTHROPIC, BEDROCKGOOGLE_VERTEXAI, GOOGLE_AICOHERE, GROQtypes.py for complete listSDK Responsibility: Provide data
model: Model name (e.g., "gpt-4")provider: Provider enumusage: Token counts (OpikUsage)total_cost: Optional overrideBackend Responsibility: Calculate cost
Note: Integrations do not calculate cost - only provide data for backend.
Integration Patterns:
Streaming Strategies:
Key Patterns:
opik_tracked marker prevents double-wrappingfinally blocks ensure span completionNotable Implementations:
@track)opik_context API access)For implementation details, see source code in:
opik/integrations/ - All integration implementationsopik/llm_usage/ - Usage tracking and conversionopik/decorator/ - Base decorator and streaming utilitiesFor more information, see: