Back to Opik

Observability for LangGraph with Opik

apps/opik-documentation/documentation/fern/docs/tracing/integrations/langgraph.mdx

2.0.22-6605-merge-206515.4 KB
Original Source

Opik provides a seamless integration with LangGraph, allowing you to easily log and trace your LangGraph-based applications. By using the OpikTracer callback, you can automatically capture detailed information about your LangGraph graph executions during both development and production.

Account Setup

Comet provides a hosted version of the Opik platform, simply create an account and grab your API Key.

You can also run the Opik platform locally, see the installation guide for more information.

Getting Started

Installation

To use the OpikTracer with LangGraph, you'll need to have both the opik and langgraph packages installed. You can install them using pip:

bash
pip install opik langgraph langchain

Configuring Opik

Configure the Opik Python SDK for your deployment type. See the Python SDK Configuration guide for detailed instructions on:

  • CLI configuration: opik configure
  • Code configuration: opik.configure()
  • Self-hosted vs Cloud vs Enterprise setup
  • Configuration files and environment variables

Using Opik with LangGraph

Opik provides two ways to track LangGraph applications. We recommend using the track_langgraph function for a simpler experience, but you can also use the OpikTracer callback directly if you need more control.

The simplest way to track your LangGraph applications is using the track_langgraph function. This function wraps your compiled graph once, and all subsequent invocations are automatically tracked without needing to pass callbacks:

python
from typing import List, Annotated
from pydantic import BaseModel
from opik.integrations.langchain import OpikTracer, track_langgraph
from langchain_core.messages import HumanMessage
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages

# create your LangGraph graph
class State(BaseModel):
    messages: Annotated[list, add_messages]

def chatbot(state):
    # Typically your LLM calls would be done here
    return {"messages": "Hello, how can I help you today?"}

graph = StateGraph(State)
graph.add_node("chatbot", chatbot)
graph.add_edge(START, "chatbot")
graph.add_edge("chatbot", END)
app = graph.compile()

# Create OpikTracer and track the graph once - no need to pass callbacks anymore!
# The graph visualization is automatically extracted by track_langgraph
opik_tracer = OpikTracer(
    tags=["production"],
    metadata={"version": "1.0"}
)
app = track_langgraph(app, opik_tracer)

# Now all invocations are automatically tracked
for s in app.stream({"messages": [HumanMessage(content = "How to use LangGraph ?")]}):
    print(s)

# No callbacks needed here either!
result = app.invoke({"messages": [HumanMessage(content = "How to use LangGraph ?")]})

This is similar to how other Opik integrations work (like OpenAI, Anthropic, etc.), where you wrap the client or object once and then use it normally.

Option 2: Using OpikTracer callback

If you need more fine-grained control or want to use different tracers for different invocations, you can use the OpikTracer callback directly:

python
from typing import List, Annotated
from pydantic import BaseModel
from opik.integrations.langchain import OpikTracer
from langchain_core.messages import HumanMessage
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages

# create your LangGraph graph
class State(BaseModel):
    messages: Annotated[list, add_messages]

def chatbot(state):
    # Typically your LLM calls would be done here
    return {"messages": "Hello, how can I help you today?"}

graph = StateGraph(State)
graph.add_node("chatbot", chatbot)
graph.add_edge(START, "chatbot")
graph.add_edge("chatbot", END)
app = graph.compile()

# Create the OpikTracer
opik_tracer = OpikTracer()

# Pass the OpikTracer callback to each invocation
for s in app.stream({"messages": [HumanMessage(content = "How to use LangGraph ?")]},
                      config={"callbacks": [opik_tracer]}):
    print(s)

result = app.invoke({"messages": [HumanMessage(content = "How to use LangGraph ?")]},
                      config={"callbacks": [opik_tracer]})

Viewing Traces in the UI

Once tracking is enabled using either method, you will start to see the traces in the Opik UI:

<Frame> </Frame>

Practical Example: Classification Workflow

Let's walk through a real-world example of using LangGraph with Opik for a classification workflow. This example demonstrates how to create a graph with conditional routing and track its execution.

Setting up the Environment

First, let's set up our environment with the necessary dependencies:

python
import opik

# Configure Opik
opik.configure(use_local=False)

Creating the LangGraph Workflow

We'll create a LangGraph workflow with 3 nodes that demonstrates conditional routing:

python
from langgraph.graph import StateGraph, END
from typing import TypedDict, Optional

# Define the graph state
class GraphState(TypedDict):
    question: Optional[str] = None
    classification: Optional[str] = None
    response: Optional[str] = None

# Create the node functions
def classify(question: str) -> str:
    return "greeting" if question.startswith("Hello") else "search"

def classify_input_node(state):
    question = state.get("question", "").strip()
    classification = classify(question)
    return {"classification": classification}

def handle_greeting_node(state):
    return {"response": "Hello! How can I help you today?"}

def handle_search_node(state):
    question = state.get("question", "").strip()
    search_result = f"Search result for '{question}'"
    return {"response": search_result}

# Create the workflow
workflow = StateGraph(GraphState)
workflow.add_node("classify_input", classify_input_node)
workflow.add_node("handle_greeting", handle_greeting_node)
workflow.add_node("handle_search", handle_search_node)

# Add conditional routing
def decide_next_node(state):
    return (
        "handle_greeting"
        if state.get("classification") == "greeting"
        else "handle_search"
    )

workflow.add_conditional_edges(
    "classify_input",
    decide_next_node,
    {"handle_greeting": "handle_greeting", "handle_search": "handle_search"},
)

workflow.set_entry_point("classify_input")
workflow.add_edge("handle_greeting", END)
workflow.add_edge("handle_search", END)

app = workflow.compile()

Executing with Opik Tracing

Now let's execute the workflow with Opik tracing enabled using track_langgraph:

python
from opik.integrations.langchain import OpikTracer, track_langgraph

# Create OpikTracer and track the graph once
# The graph visualization is automatically extracted by track_langgraph
opik_tracer = OpikTracer(
    project_name="classification-workflow"
)
app = track_langgraph(app, opik_tracer)

# Execute the workflow - no callbacks needed!
inputs = {"question": "Hello, how are you?"}
result = app.invoke(inputs)
print(result)

# Test with a different input - still tracked automatically
inputs = {"question": "What is machine learning?"}
result = app.invoke(inputs)
print(result)

The graph execution is now logged on the Opik platform and can be viewed in the UI. The trace will show the complete execution path through the graph, including the classification decision and the chosen response path.

Compatibility with Opik tracing context

LangGraph tracing integrates seamlessly with Opik's tracing context, allowing you to call @track-decorated functions (and most use most of other native Opik integrations) from within your graph nodes and have them automatically attached to the trace tree.

Synchronous execution (invoke)

For synchronous graph execution using invoke(), everything works out of the box. You can access current spans/traces from LangGraph nodes and call tracked functions inside them:

python
import opik_context
from opik import track
from opik.integrations.langchain import OpikTracer, track_langgraph
from langgraph.graph import StateGraph, START, END

@track
def process_data(value: int) -> int:
    """Custom tracked function that will be attached to the trace tree."""
    return value * 2

def my_node(state):
    current_trace_data = opik_context.get_current_trace_data()
    current_span_data = opik_context.get_current_span_data()  # will return the span for `my_node`, created by OpikTracer

    # This tracked function call will automatically be part of the trace tree
    result = process_data(state["value"])
    return {"value": result}

# Build and execute graph
graph = StateGraph(dict)
graph.add_node("processor", my_node)
graph.add_edge(START, "processor")
graph.add_edge("processor", END)

app = graph.compile()
opik_tracer = OpikTracer()
app = track_langgraph(app, opik_tracer)

# Synchronous execution - tracked functions work automatically
result = app.invoke({"value": 21})

Asynchronous execution (ainvoke)

For asynchronous graph execution using ainvoke(), you need to explicitly propagate the trace context to @track-decorated functions using the extract_current_langgraph_span_data helper:

<Accordion title="Why is this needed for async execution?"> This is due to a LangChain framework limitation that doesn't automatically share the execution context between callbacks (like `OpikTracer`) and node code in async scenarios. The explicit trace context propagation via distributed headers is required for seamless tracking across async boundaries. </Accordion>
python
from opik import track
from opik.integrations.langchain import OpikTracer, track_langgraph, extract_current_langgraph_span_data
from langgraph.graph import StateGraph, START, END

@track
def process_data(value: int) -> int:
    """Custom tracked function that needs distributed trace headers in async context."""
    return value * 2

async def my_async_node(state, config):
    # Extract current span data from LangGraph config. `opik_context` doesn't work here due to langgraph platform limitations related to context propagation.
    span_data = extract_current_langgraph_span_data(config)
    
    # Pass distributed trace headers to attach the tracked function to the trace tree
    result = process_data(
        state["value"],
        opik_distributed_trace_headers=span_data.get_distributed_trace_headers()  # all tracked functions implicitly support this parameter
    )
    return {"value": result}

# Build and execute graph
graph = StateGraph(dict)
graph.add_node("processor", my_async_node)
graph.add_edge(START, "processor")
graph.add_edge("processor", END)

app = graph.compile()
opik_tracer = OpikTracer()
app = track_langgraph(app, opik_tracer)

# Asynchronous execution - requires explicit trace context propagation
result = await app.ainvoke({"value": 21})

Alternatively, if you don't want to use the @track decorator, you can use the opik.start_as_current_span context manager with distributed headers:

python
import opik
from opik.integrations.langchain import OpikTracer, track_langgraph, extract_current_langgraph_span_data
from langgraph.graph import StateGraph, START, END

async def my_async_node(state, config):
    span_data = extract_current_langgraph_span_data(config)
    
    # Use context manager with distributed headers
    with opik.start_as_current_span(
        name="custom_operation",
        input={"input": state["value"]},
        opik_distributed_trace_headers=span_data.get_distributed_trace_headers()
    ) as span_data:
        # Your custom logic here
        result = state["value"] * 2
        span_data.output = {"output": result}
    
    return {"value": result}

# Build and execute graph
graph = StateGraph(dict)
graph.add_node("processor", my_async_node)
graph.add_edge(START, "processor")
graph.add_edge("processor", END)

app = graph.compile()
opik_tracer = OpikTracer()
app = track_langgraph(app, opik_tracer)

result = await app.ainvoke({"value": 21})

Logging threads

When you are running multi-turn conversations using LangGraph persistence, Opik will use Langgraph's thread_id as Opik thread_id. Here is an example below:

python
import sqlite3
from langgraph.checkpoint.sqlite import SqliteSaver
from typing import Annotated
from pydantic import BaseModel
from opik.integrations.langchain import OpikTracer, track_langgraph
from langchain_core.messages import HumanMessage
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain.chat_models import init_chat_model

llm = init_chat_model("openai:gpt-4.1")


# create your LangGraph graph
class State(BaseModel):
    messages: Annotated[list, add_messages]


def chatbot(state):
    # Typically your LLM calls would be done here
    return {"messages": [llm.invoke(state.messages)]}


graph = StateGraph(State)
graph.add_node("chatbot", chatbot)
graph.add_edge(START, "chatbot")
graph.add_edge("chatbot", END)

# Create a new SqliteSaver instance
# Note: check_same_thread=False is OK as the implementation uses a lock
# to ensure thread safety.
conn = sqlite3.connect("checkpoints.sqlite", check_same_thread=False)
memory = SqliteSaver(conn)

app = graph.compile(checkpointer=memory)

# Create the OpikTracer and track the graph
opik_tracer = OpikTracer()
app = track_langgraph(app, opik_tracer)

thread_id = "e424a45e-7763-443a-94ae-434b39b67b72"
config = {"configurable": {"thread_id": thread_id}}

# Initialize the state
state = State(**app.get_state(config).values) or State(messages=[])
print("STATE", state)

# Add the user message
state.messages.append(HumanMessage(content="Hello, my name is Bob, how are you doing ?"))
# state.messages.append(HumanMessage(content="What is my name ?"))

result = app.invoke(state, config=config)

print("Result", result)

Updating logged traces

You can use the OpikTracer.created_traces method to access the trace IDs collected by the OpikTracer callback:

python
from opik.integrations.langchain import OpikTracer

opik_tracer = OpikTracer()

# Calling LangGraph stream or invoke functions

traces = opik_tracer.created_traces()
print([trace.id for trace in traces])

These can then be used with the Opik.log_traces_feedback_scores method to update the logged traces.

Advanced usage

The OpikTracer object has a flush method that can be used to make sure that all traces are logged to the Opik platform before you exit a script. This method will return once all traces have been logged or if the timeout is reach, whichever comes first.

python
from opik.integrations.langchain import OpikTracer

opik_tracer = OpikTracer()
opik_tracer.flush()