Back to Opik

Google ADK + Opik Integration Cookbook

apps/opik-documentation/documentation/docs/cookbook/google_adk_integration.ipynb

2.0.24-526226.7 KB
Original Source

Google ADK + Opik Integration Cookbook

This notebook demonstrates how to integrate Google's Agent Development Kit (ADK) with Opik for comprehensive tracing and observability. We'll cover three key integration patterns:

  1. Basic Agent Example - Simple single-agent setup with Opik tracing
  2. Multi-Agent Example - Complex multi-agent workflow showing hierarchical tracing
  3. Hybrid Tracing - Combining Opik decorators with ADK callbacks for comprehensive observability

You will need:

  1. A Comet account, for seeing Opik visualizations (free!) - comet.com
  2. An OpenAI account, for using gpt-4o model - platform.openai.com/settings/organization/api-keys
  3. Google ADK installed and configured

This example will use:

  • google-adk for agent development
  • opik for tracing and observability
  • OpenAI's gpt-4o model through LiteLLM

Setup

Install the required packages:

python
%pip install opik google-adk litellm --upgrade

Configure Opik for your session:

python
import opik
opik.configure()

Set up your OpenAI API key:

python
import os
import getpass
if "OPENAI_API_KEY" not in os.environ:
    os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter your OpenAI API key: ")

Import Required Libraries

python
import asyncio
import datetime
from zoneinfo import ZoneInfo

from google.adk.agents import LlmAgent
from google.adk.models.lite_llm import LiteLlm
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.genai import types
from opik.integrations.adk import OpikTracer, track_adk_agent_recursive

Create Basic Agent

Here we create a single agent with Opik callbacks. The tracer will automatically capture all interactions:

python
def get_weather(city: str) -> dict:
    """Get weather information for a city."""
    if city.lower() == "new york":
        return {
            "status": "success",
            "report": "The weather in New York is sunny with a temperature of 25 °C (77 °F).",
        }
    elif city.lower() == "london":
        return {
            "status": "success",
            "report": "The weather in London is cloudy with a temperature of 18 °C (64 °F).",
        }
    return {"status": "error", "error_message": f"Weather info for '{city}' is unavailable."}

def get_current_time(city: str) -> dict:
    """Get current time for a city."""
    if city.lower() == "new york":
        tz = ZoneInfo("America/New_York")
        now = datetime.datetime.now(tz)
        return {
            "status": "success",
            "report": now.strftime(f"The current time in {city} is %Y-%m-%d %H:%M:%S %Z%z."),
        }
    elif city.lower() == "london":
        tz = ZoneInfo("Europe/London")
        now = datetime.datetime.now(tz)
        return {
            "status": "success",
            "report": now.strftime(f"The current time in {city} is %Y-%m-%d %H:%M:%S %Z%z."),
        }
    return {"status": "error", "error_message": f"No timezone info for '{city}'."}

Configure Opik Tracing

Set up the Opik tracer to capture all agent interactions:

python
basic_tracer = OpikTracer(
    name="basic-weather-agent",
    tags=["basic", "weather", "time", "single-agent"],
    metadata={
        "environment": "development",
        "model": "gpt-4o",
        "framework": "google-adk",
        "example": "basic"
    },
    project_name="adk-basic-demo"
)

Create the LLM Agent

Initialize the Google ADK agent with OpenAI's gpt-4o model and Opik tracing:

python
# Initialize LiteLLM with OpenAI gpt-4o
llm = LiteLlm(model="openai/gpt-4o")

# Create the basic agent with Opik callbacks
basic_agent = LlmAgent(
    name="weather_time_agent",
    model=llm,
    description="Agent for answering time & weather questions",
    instruction="Answer questions about the time or weather in a city. Be helpful and provide clear information.",
    tools=[get_weather, get_current_time],
    before_agent_callback=basic_tracer.before_agent_callback,
    after_agent_callback=basic_tracer.after_agent_callback,
    before_model_callback=basic_tracer.before_model_callback,
    after_model_callback=basic_tracer.after_model_callback,
    before_tool_callback=basic_tracer.before_tool_callback,
    after_tool_callback=basic_tracer.after_tool_callback,
)

Setup Session and Runner for Basic Example

python
basic_session_service = InMemorySessionService()
basic_runner = Runner(
    agent=basic_agent,
    app_name="basic_weather_app",
    session_service=basic_session_service,
)

Helper Functions

python
async def setup_basic_session():
    """Create a new session for the basic example."""
    sess = await basic_session_service.create_session(
        app_name="basic_weather_app",
        user_id="user_basic",
        session_id="session_basic_001"
    )
    return sess.id

async def call_basic_agent(user_msg: str, session_id: str):
    """Send a message to the basic agent and get the response."""
    print(f"User: {user_msg}")
    content = types.Content(role="user", parts=[types.Part(text=user_msg)])
    async for event in basic_runner.run_async(user_id="user_basic", session_id=session_id, new_message=content):
        if event.is_final_response():
            print(f"Assistant: {event.content.parts[0].text}")
            print()

Demo: Basic Agent Interactions

Let's test our basic agent with some weather and time queries:

python
# Create a session for basic example
basic_session_id = await setup_basic_session()
print(f"Created basic session: {basic_session_id}")
print()

# Test weather query
await call_basic_agent("What's the weather like in New York?", basic_session_id)

# Test time query  
await call_basic_agent("What time is it in London?", basic_session_id)

# Test combined query
await call_basic_agent("Can you tell me both the weather and time in New York?", basic_session_id)

The trace can now be viewed in the UI:

Example 2: Multi-Agent Setup with Hierarchical Tracing

This example demonstrates a more complex multi-agent setup where we have specialized agents for different tasks. The key insight is that you only need to add Opik callbacks to the top-level agent - all child agent calls will be automatically traced in the same trace tree.

python
def get_detailed_weather(city: str) -> dict:
    """Get detailed weather information including forecast."""
    weather_data = {
        "new york": {
            "current": "Sunny, 25°C (77°F)",
            "humidity": "65%",
            "wind": "10 km/h NW",
            "forecast": "Partly cloudy tomorrow, high of 27°C"
        },
        "london": {
            "current": "Cloudy, 18°C (64°F)", 
            "humidity": "78%",
            "wind": "15 km/h SW",
            "forecast": "Light rain expected tomorrow, high of 16°C"
        },
        "tokyo": {
            "current": "Partly cloudy, 22°C (72°F)",
            "humidity": "70%", 
            "wind": "8 km/h E",
            "forecast": "Sunny tomorrow, high of 25°C"
        }
    }
    
    city_lower = city.lower()
    if city_lower in weather_data:
        data = weather_data[city_lower]
        return {
            "status": "success",
            "report": f"Weather in {city}: {data['current']}. Humidity: {data['humidity']}, Wind: {data['wind']}. {data['forecast']}"
        }
    return {"status": "error", "error_message": f"Detailed weather for '{city}' is unavailable."}

def get_world_time(city: str) -> dict:
    """Get time information for major world cities."""
    timezones = {
        "new york": "America/New_York",
        "london": "Europe/London", 
        "tokyo": "Asia/Tokyo",
        "sydney": "Australia/Sydney",
        "paris": "Europe/Paris"
    }
    
    city_lower = city.lower()
    if city_lower in timezones:
        tz = ZoneInfo(timezones[city_lower])
        now = datetime.datetime.now(tz)
        return {
            "status": "success",
            "report": now.strftime(f"Current time in {city}: %A, %B %d, %Y at %I:%M %p %Z")
        }
    return {"status": "error", "error_message": f"Time zone info for '{city}' is unavailable."}

def get_travel_info(from_city: str, to_city: str) -> dict:
    """Get basic travel information between cities."""
    travel_data = {
        ("new york", "london"): {"flight_time": "7 hours", "time_diff": "+5 hours"},
        ("london", "new york"): {"flight_time": "8 hours", "time_diff": "-5 hours"},
        ("new york", "tokyo"): {"flight_time": "14 hours", "time_diff": "+14 hours"},
        ("tokyo", "new york"): {"flight_time": "13 hours", "time_diff": "-14 hours"},
        ("london", "tokyo"): {"flight_time": "12 hours", "time_diff": "+9 hours"},
        ("tokyo", "london"): {"flight_time": "11 hours", "time_diff": "-9 hours"},
    }
    
    route = (from_city.lower(), to_city.lower())
    if route in travel_data:
        data = travel_data[route]
        return {
            "status": "success",
            "report": f"Travel from {from_city} to {to_city}: Approximately {data['flight_time']} flight time. Time difference: {data['time_diff']}"
        }
    return {"status": "error", "error_message": f"Travel info for '{from_city}' to '{to_city}' is unavailable."}

Create Specialized Agents

Now we'll create specialized agents for different domains. Notice that only the coordinator agent will have Opik callbacks:

python
# Weather specialist agent (no Opik callbacks needed)
weather_agent = LlmAgent(
    name="weather_specialist",
    model=llm,
    description="Specialized agent for detailed weather information",
    instruction="Provide comprehensive weather information including current conditions and forecasts. Be detailed and informative.",
    tools=[get_detailed_weather]
)

# Time specialist agent (no Opik callbacks needed)  
time_agent = LlmAgent(
    name="time_specialist",
    model=llm,
    description="Specialized agent for world time information",
    instruction="Provide accurate time information for cities around the world. Include day of week and full date.",
    tools=[get_world_time]
)

# Travel specialist agent (no Opik callbacks needed)
travel_agent = LlmAgent(
    name="travel_specialist", 
    model=llm,
    description="Specialized agent for travel information",
    instruction="Provide helpful travel information including flight times and time zone differences.",
    tools=[get_travel_info]
)

Create Coordinator Agent with Opik Tracing

The coordinator agent orchestrates the specialized agents and only needs Opik callbacks here - all child agent calls will be automatically traced:

python
# Configure Opik tracer for multi-agent example
multi_agent_tracer = OpikTracer(
    name="multi-agent-coordinator",
    tags=["multi-agent", "coordinator", "weather", "time", "travel"],
    metadata={
        "environment": "development",
        "model": "gpt-4o",
        "framework": "google-adk", 
        "example": "multi-agent",
        "agent_count": 4
    },
    project_name="adk-multi-agent-demo"
)

# Coordinator agent with sub-agents
coordinator_agent = LlmAgent(
    name="travel_coordinator",
    model=llm,
    description="Coordinator agent that delegates to specialized agents for weather, time, and travel information",
    instruction="""You are a travel coordinator that helps users with weather, time, and travel information.
    
    You have access to three specialized agents:
    - weather_specialist: For detailed weather information
    - time_specialist: For world time information  
    - travel_specialist: For travel planning information
    
    Delegate appropriate queries to the right specialist agents and compile comprehensive responses for the user.""",
    tools=[],  # No direct tools, delegates to sub-agents
    sub_agents=[weather_agent, time_agent, travel_agent],
)

# Use the experimental recursive tracking feature to instrument all agents at once
from opik.integrations.adk import track_adk_agent_recursive
track_adk_agent_recursive(coordinator_agent, multi_agent_tracer)

Setup Multi-Agent Session and Runner

python
multi_session_service = InMemorySessionService()
multi_runner = Runner(
    agent=coordinator_agent,
    app_name="multi_agent_travel_app",
    session_service=multi_session_service,
)

Multi-Agent Helper Functions

python
async def setup_multi_session(session_id): 
    """Create a new session for the multi-agent example."""
    sess = await multi_session_service.create_session(
        app_name="multi_agent_travel_app",
        user_id="user_multi",
        session_id=session_id
    )
    return sess.id

async def call_multi_agent(user_msg: str, session_id: str):
    """Send a message to the coordinator agent and get the response."""
    print(f"User: {user_msg}")
    content = types.Content(role="user", parts=[types.Part(text=user_msg)])
    async for event in multi_runner.run_async(user_id="user_multi", session_id=session_id, new_message=content):
        if event.is_final_response():
            print(f"Coordinator: {event.content.parts[0].text}")
            print()

Demo: Multi-Agent Interactions

Let's test our multi-agent setup with complex queries that require coordination between agents:

python
# Demo: Each question in a separate session to show individual traces
print("=== Creating separate sessions for individual traces ===")

# Question 1: Weather (separate session)
session_1 = await setup_multi_session(session_id="session_multi_001") # Pass unique session_id
await call_multi_agent("I need detailed weather information for Tokyo", session_1)

# Question 2: Time (separate session)
session_2 = await setup_multi_session(session_id="session_multi_002") # Pass unique session_id
await call_multi_agent("What time is it in Paris right now?", session_2)

# Question 3: Travel (separate session)
session_3 = await setup_multi_session(session_id="session_multi_003") # Pass unique session_id
await call_multi_agent("I'm planning to travel from London to New York. Can you help with travel time and time zones?", session_3)

# Question 4: Complex multi-agent (separate session)
session_4 = await setup_multi_session(session_id="session_multi_004") # Pass unique session_id
await call_multi_agent("I'm traveling from New York to Tokyo tomorrow. Can you give me the weather in both cities, current times, and travel information?", session_4)

The trace can now be viewed in the UI:

Example 3: Hybrid Tracing - Combining Opik Decorators with ADK Callbacks

This advanced example shows how to combine Opik's @opik.track decorator with ADK's callback system. This is powerful when you have complex multi-step tools that perform their own internal operations that you want to trace separately, while still maintaining the overall agent trace context.

Define Advanced Tools with Opik Decorators

These tools use the @opik.track decorator to trace their internal operations, while still being called within the ADK agent's trace context:

python
@opik.track(name="weather_data_processing", tags=["data-processing", "weather"])
def process_weather_data(raw_data: dict) -> dict:
    """Process raw weather data with additional computations."""
    # Simulate some data processing steps that we want to trace separately
    processed = {
        "temperature_celsius": raw_data.get("temp_c", 0),
        "temperature_fahrenheit": raw_data.get("temp_c", 0) * 9/5 + 32,
        "conditions": raw_data.get("condition", "unknown"),
        "comfort_index": "comfortable" if 18 <= raw_data.get("temp_c", 0) <= 25 else "less comfortable"
    }
    return processed

@opik.track(name="location_validation", tags=["validation", "location"])  
def validate_location(city: str) -> dict:
    """Validate and normalize city names."""
    # Simulate location validation logic that we want to trace
    normalized_cities = {
        "nyc": "New York",
        "ny": "New York", 
        "new york city": "New York",
        "london uk": "London",
        "london england": "London",
        "tokyo japan": "Tokyo"
    }
    
    city_lower = city.lower().strip()
    validated_city = normalized_cities.get(city_lower, city.title())
    
    return {
        "original": city,
        "validated": validated_city,
        "is_valid": city_lower in ["new york", "london", "tokyo"] or city_lower in normalized_cities
    }

@opik.track(name="advanced_weather_lookup", tags=["weather", "api-simulation"])
def get_advanced_weather(city: str) -> dict:
    """Get weather with internal processing steps tracked by Opik decorators."""
    
    # Step 1: Validate location (traced by @opik.track)
    location_result = validate_location(city)
    
    if not location_result["is_valid"]:
        return {
            "status": "error", 
            "error_message": f"Invalid location: {city}"
        }
    
    validated_city = location_result["validated"]
    
    # Step 2: Get raw weather data (simulated)
    raw_weather_data = {
        "New York": {"temp_c": 25, "condition": "sunny", "humidity": 65},
        "London": {"temp_c": 18, "condition": "cloudy", "humidity": 78}, 
        "Tokyo": {"temp_c": 22, "condition": "partly cloudy", "humidity": 70}
    }
    
    if validated_city not in raw_weather_data:
        return {
            "status": "error",
            "error_message": f"Weather data unavailable for {validated_city}"
        }
    
    raw_data = raw_weather_data[validated_city]
    
    # Step 3: Process the data (traced by @opik.track)
    processed_data = process_weather_data(raw_data)
    
    return {
        "status": "success",
        "city": validated_city,
        "report": f"Weather in {validated_city}: {processed_data['conditions']}, {processed_data['temperature_celsius']}°C ({processed_data['temperature_fahrenheit']:.1f}°F). Comfort level: {processed_data['comfort_index']}.",
        "raw_humidity": raw_data["humidity"]
    }

@opik.track(name="time_calculation", tags=["time", "calculation"])
def calculate_time_info(timezone_name: str) -> dict:
    """Calculate detailed time information with internal steps."""
    try:
        tz = ZoneInfo(timezone_name)
        now = datetime.datetime.now(tz)
        
        # Calculate additional time info
        time_info = {
            "current_time": now,
            "hour_24": now.hour,
            "is_business_hours": 9 <= now.hour <= 17,
            "day_of_week": now.strftime("%A"),
            "week_number": now.isocalendar()[1],
            "is_weekend": now.weekday() >= 5
        }
        
        return time_info
    except Exception as e:
        return {"error": str(e)}

@opik.track(name="advanced_time_lookup", tags=["time", "timezone"])  
def get_advanced_time(city: str) -> dict:
    """Get time information with advanced calculations."""
    
    # Step 1: Validate location
    location_result = validate_location(city)
    
    if not location_result["is_valid"]:
        return {
            "status": "error",
            "error_message": f"Invalid location for time lookup: {city}"
        }
    
    validated_city = location_result["validated"]
    
    # Step 2: Map to timezone
    timezone_map = {
        "New York": "America/New_York",
        "London": "Europe/London",
        "Tokyo": "Asia/Tokyo"
    }
    
    if validated_city not in timezone_map:
        return {
            "status": "error", 
            "error_message": f"Timezone mapping unavailable for {validated_city}"
        }
    
    # Step 3: Calculate time info (traced by @opik.track)
    time_info = calculate_time_info(timezone_map[validated_city])
    
    if "error" in time_info:
        return {"status": "error", "error_message": time_info["error"]}
    
    current_time = time_info["current_time"]
    business_status = "during business hours" if time_info["is_business_hours"] else "outside business hours"
    weekend_status = "on a weekend" if time_info["is_weekend"] else "on a weekday"
    
    return {
        "status": "success",
        "city": validated_city,
        "report": f"Current time in {validated_city}: {current_time.strftime('%A, %B %d, %Y at %I:%M %p %Z')} ({business_status}, {weekend_status})",
        "metadata": {
            "hour_24": time_info["hour_24"],
            "week_number": time_info["week_number"],
            "is_business_hours": time_info["is_business_hours"]
        }
    }

Create Hybrid Agent with Both Decorator and Callback Tracing

This agent uses tools that have internal Opik tracing via decorators, while the agent itself uses ADK callbacks:

python
# Configure Opik tracer for hybrid example
hybrid_tracer = OpikTracer(
    name="hybrid-tracing-agent",
    tags=["hybrid", "decorators", "callbacks", "advanced"],
    metadata={
        "environment": "development",
        "model": "gpt-4o",
        "framework": "google-adk",
        "example": "hybrid-tracing",
        "tracing_methods": ["decorators", "callbacks"]
    },
    project_name="adk-hybrid-demo"
)

# Create hybrid agent that combines both tracing approaches
hybrid_agent = LlmAgent(
    name="advanced_weather_time_agent",
    model=llm,
    description="Advanced agent with hybrid Opik tracing using both decorators and callbacks",
    instruction="""You are an advanced weather and time agent that provides detailed information with comprehensive internal processing.
    
    Your tools perform multi-step operations that are individually traced, giving detailed visibility into the processing pipeline.
    Use the advanced weather and time tools to provide thorough, well-processed information to users.""",
    tools=[get_advanced_weather, get_advanced_time],
    # ADK callbacks for agent-level tracing
    before_agent_callback=hybrid_tracer.before_agent_callback,
    after_agent_callback=hybrid_tracer.after_agent_callback,
    before_model_callback=hybrid_tracer.before_model_callback,
    after_model_callback=hybrid_tracer.after_model_callback,
    before_tool_callback=hybrid_tracer.before_tool_callback,
    after_tool_callback=hybrid_tracer.after_tool_callback,
)

Setup Hybrid Session and Runner

python
hybrid_session_service = InMemorySessionService()
hybrid_runner = Runner(
    agent=hybrid_agent,
    app_name="hybrid_tracing_app",
    session_service=hybrid_session_service,
)

Hybrid Example Helper Functions

python
async def setup_hybrid_session():
    """Create a new session for the hybrid tracing example."""
    sess = await hybrid_session_service.create_session(
        app_name="hybrid_tracing_app",
        user_id="user_hybrid",
        session_id="session_hybrid_001"
    )
    return sess.id

async def call_hybrid_agent(user_msg: str, session_id: str):
    """Send a message to the hybrid agent and get the response."""
    print(f"User: {user_msg}")
    content = types.Content(role="user", parts=[types.Part(text=user_msg)])
    async for event in hybrid_runner.run_async(user_id="user_hybrid", session_id=session_id, new_message=content):
        if event.is_final_response():
            print(f"Advanced Assistant: {event.content.parts[0].text}")
            print()

Demo: Hybrid Tracing in Action

This demo showcases how both decorator-traced tool operations and ADK callback-traced agent operations appear in the same unified trace:

python
# Create a session for hybrid example
hybrid_session_id = await setup_hybrid_session()
print(f"Created hybrid tracing session: {hybrid_session_id}")
print()

# Test weather with internal processing steps
print("=== Testing Advanced Weather Lookup ===")
await call_hybrid_agent("What's the weather like in NYC?", hybrid_session_id)

# Test time with internal calculations
print("=== Testing Advanced Time Lookup ===") 
await call_hybrid_agent("What time is it in London right now? Include business hours info.", hybrid_session_id)

# Test invalid location to see validation tracing
print("=== Testing Location Validation ===")
await call_hybrid_agent("What's the weather in InvalidCity?", hybrid_session_id)

# Test complex query that triggers multiple internal operations
print("=== Testing Complex Multi-Step Query ===")
await call_hybrid_agent("I need detailed weather and time information for Tokyo, including whether it's business hours", hybrid_session_id)

The trace can now be viewed in the UI:

Understanding the Tracing Output

After running all three examples, you can view the traces in your Opik dashboard. Here's what you'll see:

Basic Example Traces

  • Simple linear trace showing: User Input → Agent Processing → Tool Calls → Model Response
  • Clear visibility into tool execution and model interactions

Multi-Agent Example Traces

  • Hierarchical trace showing: Coordinator Agent → Sub-Agent Delegation → Specialized Tool Calls
  • Key insight: Only the coordinator needed Opik callbacks, but all sub-agent operations are traced
  • Shows the complete decision tree of which agents were involved

Hybrid Example Traces

  • Most comprehensive: Shows both ADK callback traces AND decorator traces in the same trace tree
  • Tool calls contain nested spans from @opik.track decorators
  • Demonstrates how decorator-traced functions (like validate_location, process_weather_data) appear as child spans within the tool call spans
  • Perfect for debugging complex multi-step operations

Key Benefits of This Integration

  1. Automatic Tracing: ADK callbacks provide zero-configuration tracing of agent interactions
  2. Hierarchical Visibility: Multi-agent setups automatically create nested trace structures
  3. Flexible Granularity: Combine coarse-grained agent tracing with fine-grained function tracing
  4. Unified Context: All traces (callbacks + decorators) appear in the same trace tree
  5. Production Ready: Comprehensive observability for debugging and optimization

Next Steps

This notebook demonstrated three powerful integration patterns. You can extend this by:

  • Adding custom evaluation metrics using Opik's evaluation framework
  • Implementing real-time monitoring and alerting based on trace data
  • Using different LLM models and comparing their performance
  • Adding more sophisticated multi-agent workflows
  • Implementing custom tracing strategies for specific business logic
  • Building evaluation datasets from traced conversations

For more information: