docs/integrations/google-ai-adk.mdx
Integrate Mem0 with Google ADK (Agent Development Kit), an open-source framework for building multi-agent workflows. This integration enables agents to access persistent memory across conversations, enhancing context retention and personalization.
In this guide, we'll create a Google ADK agent that:
MemoryService interface to connect Mem0load_memory toolInstall the necessary libraries:
pip install google-adk mem0ai python-dotenv
Set up your API keys:
<Note>Remember to get your API key from <a href="https://app.mem0.ai" rel="nofollow">Mem0 Platform</a> and set up a Google AI Studio API Key.</Note>
import os
from dotenv import load_dotenv
load_dotenv()
# os.environ["GOOGLE_API_KEY"] = "your-google-api-key"
# os.environ["MEM0_API_KEY"] = "your-mem0-api-key"
Create a custom MemoryService by implementing ADK's BaseMemoryService. Save the following as mem0_memory_service.py:
import asyncio
import os
from typing import Optional
from typing_extensions import override
from google.adk.memory.base_memory_service import BaseMemoryService, SearchMemoryResponse
from google.adk.memory.memory_entry import MemoryEntry
from google.adk.sessions import Session
from google.genai.types import Content, Part
from mem0 import MemoryClient
class Mem0MemoryService(BaseMemoryService):
"""MemoryService implementation backed by the Mem0 Platform."""
def __init__(self, api_key: Optional[str] = None):
super().__init__()
api_key = api_key or os.environ.get("MEM0_API_KEY")
self._client: Optional[MemoryClient] = MemoryClient(api_key=api_key) if api_key else None
@override
async def search_memory(
self, *, app_name: str, user_id: str, query: str
) -> SearchMemoryResponse:
"""Search for memories relevant to the current user and query."""
if not self._client:
return SearchMemoryResponse(memories=[])
try:
results = await asyncio.to_thread(
self._client.search,
query,
filters={"AND": [{"user_id": user_id}, {"app_id": app_name}]},
top_k=5,
)
entries = []
for mem in results.get("results", []):
text = mem.get("memory", "")
if not text:
continue
raw_ts = mem.get("created_at") or mem.get("updated_at")
entries.append(
MemoryEntry(
content=Content(parts=[Part(text=text)]),
author=mem.get("metadata", {}).get("author", "user"),
timestamp=str(raw_ts) if raw_ts else None,
)
)
return SearchMemoryResponse(memories=entries)
except Exception as e:
print(f"[Mem0MemoryService] search_memory error: {e}")
return SearchMemoryResponse(memories=[])
@override
async def add_session_to_memory(self, session: Session) -> None:
"""Persist a completed ADK session into Mem0."""
if not self._client:
return
user_id = session.user_id
if not user_id:
return
app_name = getattr(session, "app_name", None)
try:
messages = []
for event in session.events:
if not (event.content and event.content.parts):
continue
role = getattr(event.content, "role", None) or "user"
if role == "model":
role = "assistant"
elif role not in ("user", "assistant"):
continue
text_parts = [
p.text for p in event.content.parts if hasattr(p, "text") and p.text
]
if text_parts:
messages.append({"role": role, "content": " ".join(text_parts)})
if messages:
metadata = {"app_id": app_name} if app_name else {}
await asyncio.to_thread(
self._client.add, messages, user_id=user_id, metadata=metadata
)
except Exception as e:
print(f"[Mem0MemoryService] add_session_to_memory error: {e}")
This after-agent callback fires at the end of every turn and saves the session to Mem0. Save as memory_callbacks.py:
async def save_session_to_memory(callback_context) -> None:
"""Persist the completed session to Mem0 after each agent turn."""
try:
await callback_context.add_session_to_memory()
except ValueError:
pass
except Exception as e:
print(f"[save_session_to_memory] error: {e}")
The following example demonstrates creating an ADK agent with automatic Mem0 memory:
import asyncio
from google.adk.agents import LlmAgent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.tools import load_memory
from google.genai.types import Content, Part
from mem0_memory_service import Mem0MemoryService
from memory_callbacks import save_session_to_memory
memory_service = Mem0MemoryService()
session_service = InMemorySessionService()
agent = LlmAgent(
name="personal_assistant",
model="gemini-2.0-flash",
instruction="""You are a helpful personal assistant.
Relevant memories from past conversations are provided to you automatically.
Use them to personalize your responses.""",
description="A personal assistant that remembers user preferences and past interactions",
tools=[load_memory],
after_agent_callback=save_session_to_memory,
)
runner = Runner(
agent=agent,
session_service=session_service,
memory_service=memory_service,
app_name="memory_assistant",
)
async def chat(user_input: str, user_id: str) -> str:
session = await session_service.create_session(
app_name="memory_assistant",
user_id=user_id,
)
content = Content(role="user", parts=[Part(text=user_input)])
async for event in runner.run_async(user_id=user_id, session_id=session.id, new_message=content):
if event.is_final_response() and event.content and event.content.parts:
return event.content.parts[0].text
return "No response generated"
if __name__ == "__main__":
print(asyncio.run(chat(
"I love Italian food and I'm planning a trip to Rome next month",
user_id="alice",
)))
print(asyncio.run(chat(
"Any food recommendations for my trip?",
user_id="alice",
)))
Because memory_service is passed to the Runner, every agent in the hierarchy shares the same memory automatically. Only the root coordinator needs the auto-save callback — ADK fires it once when the full turn completes:
import asyncio
from google.adk.agents import LlmAgent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.tools.agent_tool import AgentTool
from google.adk.tools import load_memory
from google.genai.types import Content, Part
from mem0_memory_service import Mem0MemoryService
from memory_callbacks import save_session_to_memory
memory_service = Mem0MemoryService()
session_service = InMemorySessionService()
travel_agent = LlmAgent(
name="travel_specialist",
model="gemini-2.0-flash",
instruction="""You are a travel planning specialist.
Relevant memories about the user's travel preferences are provided automatically.
Use them to make personalized recommendations.""",
description="Specialist in travel planning and recommendations",
tools=[load_memory],
)
health_agent = LlmAgent(
name="health_advisor",
model="gemini-2.0-flash",
instruction="""You are a health and wellness advisor.
Relevant memories about the user's health goals are provided automatically.
Use them to give personalized advice.""",
description="Specialist in health and wellness advice",
tools=[load_memory],
)
coordinator = LlmAgent(
name="coordinator",
model="gemini-2.0-flash",
instruction="""You are a coordinator that delegates requests to specialist agents.
For travel-related questions, delegate to the travel specialist.
For health-related questions, delegate to the health advisor.
Relevant memories about the user are provided automatically.""",
description="Coordinates requests between specialist agents",
tools=[
load_memory,
AgentTool(agent=travel_agent, skip_summarization=False),
AgentTool(agent=health_agent, skip_summarization=False),
],
after_agent_callback=save_session_to_memory,
)
runner = Runner(
agent=coordinator,
session_service=session_service,
memory_service=memory_service,
app_name="specialist_system",
)
async def chat_with_specialists(user_input: str, user_id: str) -> str:
session = await session_service.create_session(
app_name="specialist_system",
user_id=user_id,
)
content = Content(role="user", parts=[Part(text=user_input)])
async for event in runner.run_async(user_id=user_id, session_id=session.id, new_message=content):
if event.is_final_response() and event.content and event.content.parts:
return event.content.parts[0].text
return "No response generated"
if __name__ == "__main__":
response = asyncio.run(chat_with_specialists("Plan a healthy meal for my Italy trip", user_id="alice"))
print(response)
load_memory tool searches Mem0 at the start of each turn and injects relevant memories directly into the agent context — no prompt instructions needed.save_session_to_memory callback persists every completed turn to Mem0 without any manual calls.Mem0MemoryService implements ADK's BaseMemoryService and integrates via the Runner — works natively across the entire agent hierarchy.user_id is passed automatically from the ADK session context, ensuring memories are always scoped to the correct user.Mem0MemoryService instance shared through the Runner gives all agents — coordinators and specialists — access to the same user memory.To use Google Cloud Vertex AI instead of AI Studio, set the following environment variables before creating agents:
import os
os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "True"
os.environ["GOOGLE_CLOUD_PROJECT"] = "your-project-id"
os.environ["GOOGLE_CLOUD_LOCATION"] = "us-central1"
You can customize how memories are searched by modifying Mem0MemoryService.search_memory. For example, to filter by category:
results = await asyncio.to_thread(
self._client.search,
query,
filters={
"AND": [
{"user_id": user_id},
{"app_id": app_name},
{"categories": {"contains": "travel"}}
]
},
top_k=10,
)
<Note>InMemorySessionService stores sessions in memory and is intended for prototyping. For production, use a persistent session service and clean up sessions when they are no longer needed.</Note>
By implementing Mem0MemoryService as an ADK BaseMemoryService, you get persistent, user-scoped memory across single agents and complex multi-agent hierarchies with minimal code. Memory injection and session saving happen automatically, keeping your agent prompts clean and your token usage efficient.