apps/opik-documentation/documentation/docs/cookbook/google_adk_integration.ipynb
This notebook demonstrates how to integrate Google's Agent Development Kit (ADK) with Opik for comprehensive tracing and observability. We'll cover three key integration patterns:
You will need:
This example will use:
Install the required packages:
%pip install opik google-adk litellm --upgrade
import opik
opik.configure()
Set up your OpenAI API key:
import os
import getpass
if "OPENAI_API_KEY" not in os.environ:
os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter your OpenAI API key: ")
import asyncio
import datetime
from zoneinfo import ZoneInfo
from google.adk.agents import LlmAgent
from google.adk.models.lite_llm import LiteLlm
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.genai import types
from opik.integrations.adk import OpikTracer, track_adk_agent_recursive
Here we create a single agent with Opik callbacks. The tracer will automatically capture all interactions:
def get_weather(city: str) -> dict:
"""Get weather information for a city."""
if city.lower() == "new york":
return {
"status": "success",
"report": "The weather in New York is sunny with a temperature of 25 °C (77 °F).",
}
elif city.lower() == "london":
return {
"status": "success",
"report": "The weather in London is cloudy with a temperature of 18 °C (64 °F).",
}
return {"status": "error", "error_message": f"Weather info for '{city}' is unavailable."}
def get_current_time(city: str) -> dict:
"""Get current time for a city."""
if city.lower() == "new york":
tz = ZoneInfo("America/New_York")
now = datetime.datetime.now(tz)
return {
"status": "success",
"report": now.strftime(f"The current time in {city} is %Y-%m-%d %H:%M:%S %Z%z."),
}
elif city.lower() == "london":
tz = ZoneInfo("Europe/London")
now = datetime.datetime.now(tz)
return {
"status": "success",
"report": now.strftime(f"The current time in {city} is %Y-%m-%d %H:%M:%S %Z%z."),
}
return {"status": "error", "error_message": f"No timezone info for '{city}'."}
Set up the Opik tracer to capture all agent interactions:
basic_tracer = OpikTracer(
name="basic-weather-agent",
tags=["basic", "weather", "time", "single-agent"],
metadata={
"environment": "development",
"model": "gpt-4o",
"framework": "google-adk",
"example": "basic"
},
project_name="adk-basic-demo"
)
Initialize the Google ADK agent with OpenAI's gpt-4o model and Opik tracing:
# Initialize LiteLLM with OpenAI gpt-4o
llm = LiteLlm(model="openai/gpt-4o")
# Create the basic agent with Opik callbacks
basic_agent = LlmAgent(
name="weather_time_agent",
model=llm,
description="Agent for answering time & weather questions",
instruction="Answer questions about the time or weather in a city. Be helpful and provide clear information.",
tools=[get_weather, get_current_time],
before_agent_callback=basic_tracer.before_agent_callback,
after_agent_callback=basic_tracer.after_agent_callback,
before_model_callback=basic_tracer.before_model_callback,
after_model_callback=basic_tracer.after_model_callback,
before_tool_callback=basic_tracer.before_tool_callback,
after_tool_callback=basic_tracer.after_tool_callback,
)
basic_session_service = InMemorySessionService()
basic_runner = Runner(
agent=basic_agent,
app_name="basic_weather_app",
session_service=basic_session_service,
)
async def setup_basic_session():
"""Create a new session for the basic example."""
sess = await basic_session_service.create_session(
app_name="basic_weather_app",
user_id="user_basic",
session_id="session_basic_001"
)
return sess.id
async def call_basic_agent(user_msg: str, session_id: str):
"""Send a message to the basic agent and get the response."""
print(f"User: {user_msg}")
content = types.Content(role="user", parts=[types.Part(text=user_msg)])
async for event in basic_runner.run_async(user_id="user_basic", session_id=session_id, new_message=content):
if event.is_final_response():
print(f"Assistant: {event.content.parts[0].text}")
print()
Let's test our basic agent with some weather and time queries:
# Create a session for basic example
basic_session_id = await setup_basic_session()
print(f"Created basic session: {basic_session_id}")
print()
# Test weather query
await call_basic_agent("What's the weather like in New York?", basic_session_id)
# Test time query
await call_basic_agent("What time is it in London?", basic_session_id)
# Test combined query
await call_basic_agent("Can you tell me both the weather and time in New York?", basic_session_id)
The trace can now be viewed in the UI:
This example demonstrates a more complex multi-agent setup where we have specialized agents for different tasks. The key insight is that you only need to add Opik callbacks to the top-level agent - all child agent calls will be automatically traced in the same trace tree.
def get_detailed_weather(city: str) -> dict:
"""Get detailed weather information including forecast."""
weather_data = {
"new york": {
"current": "Sunny, 25°C (77°F)",
"humidity": "65%",
"wind": "10 km/h NW",
"forecast": "Partly cloudy tomorrow, high of 27°C"
},
"london": {
"current": "Cloudy, 18°C (64°F)",
"humidity": "78%",
"wind": "15 km/h SW",
"forecast": "Light rain expected tomorrow, high of 16°C"
},
"tokyo": {
"current": "Partly cloudy, 22°C (72°F)",
"humidity": "70%",
"wind": "8 km/h E",
"forecast": "Sunny tomorrow, high of 25°C"
}
}
city_lower = city.lower()
if city_lower in weather_data:
data = weather_data[city_lower]
return {
"status": "success",
"report": f"Weather in {city}: {data['current']}. Humidity: {data['humidity']}, Wind: {data['wind']}. {data['forecast']}"
}
return {"status": "error", "error_message": f"Detailed weather for '{city}' is unavailable."}
def get_world_time(city: str) -> dict:
"""Get time information for major world cities."""
timezones = {
"new york": "America/New_York",
"london": "Europe/London",
"tokyo": "Asia/Tokyo",
"sydney": "Australia/Sydney",
"paris": "Europe/Paris"
}
city_lower = city.lower()
if city_lower in timezones:
tz = ZoneInfo(timezones[city_lower])
now = datetime.datetime.now(tz)
return {
"status": "success",
"report": now.strftime(f"Current time in {city}: %A, %B %d, %Y at %I:%M %p %Z")
}
return {"status": "error", "error_message": f"Time zone info for '{city}' is unavailable."}
def get_travel_info(from_city: str, to_city: str) -> dict:
"""Get basic travel information between cities."""
travel_data = {
("new york", "london"): {"flight_time": "7 hours", "time_diff": "+5 hours"},
("london", "new york"): {"flight_time": "8 hours", "time_diff": "-5 hours"},
("new york", "tokyo"): {"flight_time": "14 hours", "time_diff": "+14 hours"},
("tokyo", "new york"): {"flight_time": "13 hours", "time_diff": "-14 hours"},
("london", "tokyo"): {"flight_time": "12 hours", "time_diff": "+9 hours"},
("tokyo", "london"): {"flight_time": "11 hours", "time_diff": "-9 hours"},
}
route = (from_city.lower(), to_city.lower())
if route in travel_data:
data = travel_data[route]
return {
"status": "success",
"report": f"Travel from {from_city} to {to_city}: Approximately {data['flight_time']} flight time. Time difference: {data['time_diff']}"
}
return {"status": "error", "error_message": f"Travel info for '{from_city}' to '{to_city}' is unavailable."}
Now we'll create specialized agents for different domains. Notice that only the coordinator agent will have Opik callbacks:
# Weather specialist agent (no Opik callbacks needed)
weather_agent = LlmAgent(
name="weather_specialist",
model=llm,
description="Specialized agent for detailed weather information",
instruction="Provide comprehensive weather information including current conditions and forecasts. Be detailed and informative.",
tools=[get_detailed_weather]
)
# Time specialist agent (no Opik callbacks needed)
time_agent = LlmAgent(
name="time_specialist",
model=llm,
description="Specialized agent for world time information",
instruction="Provide accurate time information for cities around the world. Include day of week and full date.",
tools=[get_world_time]
)
# Travel specialist agent (no Opik callbacks needed)
travel_agent = LlmAgent(
name="travel_specialist",
model=llm,
description="Specialized agent for travel information",
instruction="Provide helpful travel information including flight times and time zone differences.",
tools=[get_travel_info]
)
The coordinator agent orchestrates the specialized agents and only needs Opik callbacks here - all child agent calls will be automatically traced:
# Configure Opik tracer for multi-agent example
multi_agent_tracer = OpikTracer(
name="multi-agent-coordinator",
tags=["multi-agent", "coordinator", "weather", "time", "travel"],
metadata={
"environment": "development",
"model": "gpt-4o",
"framework": "google-adk",
"example": "multi-agent",
"agent_count": 4
},
project_name="adk-multi-agent-demo"
)
# Coordinator agent with sub-agents
coordinator_agent = LlmAgent(
name="travel_coordinator",
model=llm,
description="Coordinator agent that delegates to specialized agents for weather, time, and travel information",
instruction="""You are a travel coordinator that helps users with weather, time, and travel information.
You have access to three specialized agents:
- weather_specialist: For detailed weather information
- time_specialist: For world time information
- travel_specialist: For travel planning information
Delegate appropriate queries to the right specialist agents and compile comprehensive responses for the user.""",
tools=[], # No direct tools, delegates to sub-agents
sub_agents=[weather_agent, time_agent, travel_agent],
)
# Use the experimental recursive tracking feature to instrument all agents at once
from opik.integrations.adk import track_adk_agent_recursive
track_adk_agent_recursive(coordinator_agent, multi_agent_tracer)
multi_session_service = InMemorySessionService()
multi_runner = Runner(
agent=coordinator_agent,
app_name="multi_agent_travel_app",
session_service=multi_session_service,
)
async def setup_multi_session(session_id):
"""Create a new session for the multi-agent example."""
sess = await multi_session_service.create_session(
app_name="multi_agent_travel_app",
user_id="user_multi",
session_id=session_id
)
return sess.id
async def call_multi_agent(user_msg: str, session_id: str):
"""Send a message to the coordinator agent and get the response."""
print(f"User: {user_msg}")
content = types.Content(role="user", parts=[types.Part(text=user_msg)])
async for event in multi_runner.run_async(user_id="user_multi", session_id=session_id, new_message=content):
if event.is_final_response():
print(f"Coordinator: {event.content.parts[0].text}")
print()
Let's test our multi-agent setup with complex queries that require coordination between agents:
# Demo: Each question in a separate session to show individual traces
print("=== Creating separate sessions for individual traces ===")
# Question 1: Weather (separate session)
session_1 = await setup_multi_session(session_id="session_multi_001") # Pass unique session_id
await call_multi_agent("I need detailed weather information for Tokyo", session_1)
# Question 2: Time (separate session)
session_2 = await setup_multi_session(session_id="session_multi_002") # Pass unique session_id
await call_multi_agent("What time is it in Paris right now?", session_2)
# Question 3: Travel (separate session)
session_3 = await setup_multi_session(session_id="session_multi_003") # Pass unique session_id
await call_multi_agent("I'm planning to travel from London to New York. Can you help with travel time and time zones?", session_3)
# Question 4: Complex multi-agent (separate session)
session_4 = await setup_multi_session(session_id="session_multi_004") # Pass unique session_id
await call_multi_agent("I'm traveling from New York to Tokyo tomorrow. Can you give me the weather in both cities, current times, and travel information?", session_4)
The trace can now be viewed in the UI:
This advanced example shows how to combine Opik's @opik.track decorator with ADK's callback system. This is powerful when you have complex multi-step tools that perform their own internal operations that you want to trace separately, while still maintaining the overall agent trace context.
These tools use the @opik.track decorator to trace their internal operations, while still being called within the ADK agent's trace context:
@opik.track(name="weather_data_processing", tags=["data-processing", "weather"])
def process_weather_data(raw_data: dict) -> dict:
"""Process raw weather data with additional computations."""
# Simulate some data processing steps that we want to trace separately
processed = {
"temperature_celsius": raw_data.get("temp_c", 0),
"temperature_fahrenheit": raw_data.get("temp_c", 0) * 9/5 + 32,
"conditions": raw_data.get("condition", "unknown"),
"comfort_index": "comfortable" if 18 <= raw_data.get("temp_c", 0) <= 25 else "less comfortable"
}
return processed
@opik.track(name="location_validation", tags=["validation", "location"])
def validate_location(city: str) -> dict:
"""Validate and normalize city names."""
# Simulate location validation logic that we want to trace
normalized_cities = {
"nyc": "New York",
"ny": "New York",
"new york city": "New York",
"london uk": "London",
"london england": "London",
"tokyo japan": "Tokyo"
}
city_lower = city.lower().strip()
validated_city = normalized_cities.get(city_lower, city.title())
return {
"original": city,
"validated": validated_city,
"is_valid": city_lower in ["new york", "london", "tokyo"] or city_lower in normalized_cities
}
@opik.track(name="advanced_weather_lookup", tags=["weather", "api-simulation"])
def get_advanced_weather(city: str) -> dict:
"""Get weather with internal processing steps tracked by Opik decorators."""
# Step 1: Validate location (traced by @opik.track)
location_result = validate_location(city)
if not location_result["is_valid"]:
return {
"status": "error",
"error_message": f"Invalid location: {city}"
}
validated_city = location_result["validated"]
# Step 2: Get raw weather data (simulated)
raw_weather_data = {
"New York": {"temp_c": 25, "condition": "sunny", "humidity": 65},
"London": {"temp_c": 18, "condition": "cloudy", "humidity": 78},
"Tokyo": {"temp_c": 22, "condition": "partly cloudy", "humidity": 70}
}
if validated_city not in raw_weather_data:
return {
"status": "error",
"error_message": f"Weather data unavailable for {validated_city}"
}
raw_data = raw_weather_data[validated_city]
# Step 3: Process the data (traced by @opik.track)
processed_data = process_weather_data(raw_data)
return {
"status": "success",
"city": validated_city,
"report": f"Weather in {validated_city}: {processed_data['conditions']}, {processed_data['temperature_celsius']}°C ({processed_data['temperature_fahrenheit']:.1f}°F). Comfort level: {processed_data['comfort_index']}.",
"raw_humidity": raw_data["humidity"]
}
@opik.track(name="time_calculation", tags=["time", "calculation"])
def calculate_time_info(timezone_name: str) -> dict:
"""Calculate detailed time information with internal steps."""
try:
tz = ZoneInfo(timezone_name)
now = datetime.datetime.now(tz)
# Calculate additional time info
time_info = {
"current_time": now,
"hour_24": now.hour,
"is_business_hours": 9 <= now.hour <= 17,
"day_of_week": now.strftime("%A"),
"week_number": now.isocalendar()[1],
"is_weekend": now.weekday() >= 5
}
return time_info
except Exception as e:
return {"error": str(e)}
@opik.track(name="advanced_time_lookup", tags=["time", "timezone"])
def get_advanced_time(city: str) -> dict:
"""Get time information with advanced calculations."""
# Step 1: Validate location
location_result = validate_location(city)
if not location_result["is_valid"]:
return {
"status": "error",
"error_message": f"Invalid location for time lookup: {city}"
}
validated_city = location_result["validated"]
# Step 2: Map to timezone
timezone_map = {
"New York": "America/New_York",
"London": "Europe/London",
"Tokyo": "Asia/Tokyo"
}
if validated_city not in timezone_map:
return {
"status": "error",
"error_message": f"Timezone mapping unavailable for {validated_city}"
}
# Step 3: Calculate time info (traced by @opik.track)
time_info = calculate_time_info(timezone_map[validated_city])
if "error" in time_info:
return {"status": "error", "error_message": time_info["error"]}
current_time = time_info["current_time"]
business_status = "during business hours" if time_info["is_business_hours"] else "outside business hours"
weekend_status = "on a weekend" if time_info["is_weekend"] else "on a weekday"
return {
"status": "success",
"city": validated_city,
"report": f"Current time in {validated_city}: {current_time.strftime('%A, %B %d, %Y at %I:%M %p %Z')} ({business_status}, {weekend_status})",
"metadata": {
"hour_24": time_info["hour_24"],
"week_number": time_info["week_number"],
"is_business_hours": time_info["is_business_hours"]
}
}
This agent uses tools that have internal Opik tracing via decorators, while the agent itself uses ADK callbacks:
# Configure Opik tracer for hybrid example
hybrid_tracer = OpikTracer(
name="hybrid-tracing-agent",
tags=["hybrid", "decorators", "callbacks", "advanced"],
metadata={
"environment": "development",
"model": "gpt-4o",
"framework": "google-adk",
"example": "hybrid-tracing",
"tracing_methods": ["decorators", "callbacks"]
},
project_name="adk-hybrid-demo"
)
# Create hybrid agent that combines both tracing approaches
hybrid_agent = LlmAgent(
name="advanced_weather_time_agent",
model=llm,
description="Advanced agent with hybrid Opik tracing using both decorators and callbacks",
instruction="""You are an advanced weather and time agent that provides detailed information with comprehensive internal processing.
Your tools perform multi-step operations that are individually traced, giving detailed visibility into the processing pipeline.
Use the advanced weather and time tools to provide thorough, well-processed information to users.""",
tools=[get_advanced_weather, get_advanced_time],
# ADK callbacks for agent-level tracing
before_agent_callback=hybrid_tracer.before_agent_callback,
after_agent_callback=hybrid_tracer.after_agent_callback,
before_model_callback=hybrid_tracer.before_model_callback,
after_model_callback=hybrid_tracer.after_model_callback,
before_tool_callback=hybrid_tracer.before_tool_callback,
after_tool_callback=hybrid_tracer.after_tool_callback,
)
hybrid_session_service = InMemorySessionService()
hybrid_runner = Runner(
agent=hybrid_agent,
app_name="hybrid_tracing_app",
session_service=hybrid_session_service,
)
async def setup_hybrid_session():
"""Create a new session for the hybrid tracing example."""
sess = await hybrid_session_service.create_session(
app_name="hybrid_tracing_app",
user_id="user_hybrid",
session_id="session_hybrid_001"
)
return sess.id
async def call_hybrid_agent(user_msg: str, session_id: str):
"""Send a message to the hybrid agent and get the response."""
print(f"User: {user_msg}")
content = types.Content(role="user", parts=[types.Part(text=user_msg)])
async for event in hybrid_runner.run_async(user_id="user_hybrid", session_id=session_id, new_message=content):
if event.is_final_response():
print(f"Advanced Assistant: {event.content.parts[0].text}")
print()
This demo showcases how both decorator-traced tool operations and ADK callback-traced agent operations appear in the same unified trace:
# Create a session for hybrid example
hybrid_session_id = await setup_hybrid_session()
print(f"Created hybrid tracing session: {hybrid_session_id}")
print()
# Test weather with internal processing steps
print("=== Testing Advanced Weather Lookup ===")
await call_hybrid_agent("What's the weather like in NYC?", hybrid_session_id)
# Test time with internal calculations
print("=== Testing Advanced Time Lookup ===")
await call_hybrid_agent("What time is it in London right now? Include business hours info.", hybrid_session_id)
# Test invalid location to see validation tracing
print("=== Testing Location Validation ===")
await call_hybrid_agent("What's the weather in InvalidCity?", hybrid_session_id)
# Test complex query that triggers multiple internal operations
print("=== Testing Complex Multi-Step Query ===")
await call_hybrid_agent("I need detailed weather and time information for Tokyo, including whether it's business hours", hybrid_session_id)
The trace can now be viewed in the UI:
After running all three examples, you can view the traces in your Opik dashboard. Here's what you'll see:
@opik.track decoratorsvalidate_location, process_weather_data) appear as child spans within the tool call spansThis notebook demonstrated three powerful integration patterns. You can extend this by:
For more information: