docs/integrations/google-ai-adk.mdx
Integrate Mem0 with Google ADK (Agent Development Kit), an open-source framework for building multi-agent workflows. This integration enables agents to access persistent memory across conversations, enhancing context retention and personalization.
In this guide, we'll create a Google ADK agent that:
MemoryService interface to connect Mem0load_memory toolInstall the necessary libraries:
pip install google-adk mem0ai python-dotenv
Set up your API keys:
<Note>Remember to get your API key from <a href="https://app.mem0.ai" rel="nofollow">Mem0 Platform</a> and set up a Google AI Studio API Key.</Note>
import os
from dotenv import load_dotenv
load_dotenv()
# os.environ["GOOGLE_API_KEY"] = "your-google-api-key"
# os.environ["MEM0_API_KEY"] = "your-mem0-api-key"
Create a custom MemoryService by implementing ADK's BaseMemoryService. Save the following as mem0_memory_service.py:
import asyncio
import os
from typing import Optional
from typing_extensions import override
from google.adk.memory.base_memory_service import BaseMemoryService, SearchMemoryResponse
from google.adk.memory.memory_entry import MemoryEntry
from google.adk.sessions import Session
from google.genai.types import Content, Part
from mem0 import MemoryClient
class Mem0MemoryService(BaseMemoryService):
"""MemoryService implementation backed by the Mem0 Platform."""
def __init__(self, api_key: Optional[str] = None):
super().__init__()
api_key = api_key or os.environ.get("MEM0_API_KEY")
self._client: Optional[MemoryClient] = MemoryClient(api_key=api_key) if api_key else None
@override
async def search_memory(
self, *, app_name: str, user_id: str, query: str
) -> SearchMemoryResponse:
"""Search for memories relevant to the current user and query."""
if not self._client:
return SearchMemoryResponse(memories=[])
try:
results = await asyncio.to_thread(
self._client.search,
query,
filters={"AND": [{"user_id": user_id}, {"app_id": app_name}]},
top_k=5,
)
entries = []
for mem in results.get("results", []):
text = mem.get("memory", "")
if not text:
continue
raw_ts = mem.get("created_at") or mem.get("updated_at")
entries.append(
MemoryEntry(
content=Content(parts=[Part(text=text)]),
author=mem.get("metadata", {}).get("author", "user"),
timestamp=str(raw_ts) if raw_ts else None,
)
)
return SearchMemoryResponse(memories=entries)
except Exception as e:
print(f"[Mem0MemoryService] search_memory error: {e}")
return SearchMemoryResponse(memories=[])
@override
async def add_session_to_memory(self, session: Session) -> None:
"""Persist a completed ADK session into Mem0."""
if not self._client:
return
user_id = session.user_id
if not user_id:
return
app_name = getattr(session, "app_name", None)
try:
messages = []
for event in session.events:
if not (event.content and event.content.parts):
continue
role = getattr(event.content, "role", None) or "user"
if role == "model":
role = "assistant"
elif role not in ("user", "assistant"):
continue
text_parts = [
p.text for p in event.content.parts if hasattr(p, "text") and p.text
]
if text_parts:
messages.append({"role": role, "content": " ".join(text_parts)})
if messages:
metadata = {"app_id": app_name} if app_name else {}
await asyncio.to_thread(
self._client.add, messages, user_id=user_id, metadata=metadata
)
except Exception as e:
print(f"[Mem0MemoryService] add_session_to_memory error: {e}")
This after-agent callback fires at the end of every turn and saves the session to Mem0. Save as memory_callbacks.py:
async def save_session_to_memory(callback_context) -> None:
"""Persist the completed session to Mem0 after each agent turn."""
try:
await callback_context.add_session_to_memory()
except ValueError:
pass
except Exception as e:
print(f"[save_session_to_memory] error: {e}")
The following example demonstrates creating an ADK agent with automatic Mem0 memory:
import asyncio
from google.adk.agents import LlmAgent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.tools import load_memory
from google.genai.types import Content, Part
from mem0_memory_service import Mem0MemoryService
from memory_callbacks import save_session_to_memory
memory_service = Mem0MemoryService()
session_service = InMemorySessionService()
agent = LlmAgent(
name="personal_assistant",
model="gemini-2.0-flash",
instruction="""You are a helpful personal assistant.
Relevant memories from past conversations are provided to you automatically.
Use them to personalize your responses.""",
description="A personal assistant that remembers user preferences and past interactions",
tools=[load_memory],
after_agent_callback=save_session_to_memory,
)
runner = Runner(
agent=agent,
session_service=session_service,
memory_service=memory_service,
app_name="memory_assistant",
)
async def chat(user_input: str, user_id: str) -> str:
session = await session_service.create_session(
app_name="memory_assistant",
user_id=user_id,
)
content = Content(role="user", parts=[Part(text=user_input)])
async for event in runner.run_async(user_id=user_id, session_id=session.id, new_message=content):
if event.is_final_response() and event.content and event.content.parts:
return event.content.parts[0].text
return "No response generated"
if __name__ == "__main__":
print(asyncio.run(chat(
"I love Italian food and I'm planning a trip to Rome next month",
user_id="alice",
)))
print(asyncio.run(chat(
"Any food recommendations for my trip?",
user_id="alice",
)))
Because memory_service is passed to the Runner, every agent in the hierarchy shares the same memory automatically. Only the root coordinator needs the auto-save callback: ADK fires it once when the full turn completes:
import asyncio
from google.adk.agents import LlmAgent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.tools.agent_tool import AgentTool
from google.adk.tools import load_memory
from google.genai.types import Content, Part
from mem0_memory_service import Mem0MemoryService
from memory_callbacks import save_session_to_memory
memory_service = Mem0MemoryService()
session_service = InMemorySessionService()
travel_agent = LlmAgent(
name="travel_specialist",
model="gemini-2.0-flash",
instruction="""You are a travel planning specialist.
Relevant memories about the user's travel preferences are provided automatically.
Use them to make personalized recommendations.""",
description="Specialist in travel planning and recommendations",
tools=[load_memory],
)
health_agent = LlmAgent(
name="health_advisor",
model="gemini-2.0-flash",
instruction="""You are a health and wellness advisor.
Relevant memories about the user's health goals are provided automatically.
Use them to give personalized advice.""",
description="Specialist in health and wellness advice",
tools=[load_memory],
)
coordinator = LlmAgent(
name="coordinator",
model="gemini-2.0-flash",
instruction="""You are a coordinator that delegates requests to specialist agents.
For travel-related questions, delegate to the travel specialist.
For health-related questions, delegate to the health advisor.
Relevant memories about the user are provided automatically.""",
description="Coordinates requests between specialist agents",
tools=[
load_memory,
AgentTool(agent=travel_agent, skip_summarization=False),
AgentTool(agent=health_agent, skip_summarization=False),
],
after_agent_callback=save_session_to_memory,
)
runner = Runner(
agent=coordinator,
session_service=session_service,
memory_service=memory_service,
app_name="specialist_system",
)
async def chat_with_specialists(user_input: str, user_id: str) -> str:
session = await session_service.create_session(
app_name="specialist_system",
user_id=user_id,
)
content = Content(role="user", parts=[Part(text=user_input)])
async for event in runner.run_async(user_id=user_id, session_id=session.id, new_message=content):
if event.is_final_response() and event.content and event.content.parts:
return event.content.parts[0].text
return "No response generated"
if __name__ == "__main__":
response = asyncio.run(chat_with_specialists("Plan a healthy meal for my Italy trip", user_id="alice"))
print(response)
load_memory tool searches Mem0 at the start of each turn and injects relevant memories directly into the agent context. No prompt instructions are needed.save_session_to_memory callback persists every completed turn to Mem0 without any manual calls.Mem0MemoryService implements ADK's BaseMemoryService and integrates via the Runner. It works natively across the entire agent hierarchy.user_id is passed automatically from the ADK session context, ensuring memories are always scoped to the correct user.Mem0MemoryService instance shared through the Runner gives all agents, coordinators and specialists, access to the same user memory.To use Google Cloud Vertex AI instead of AI Studio, set the following environment variables before creating agents:
import os
os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "True"
os.environ["GOOGLE_CLOUD_PROJECT"] = "your-project-id"
os.environ["GOOGLE_CLOUD_LOCATION"] = "us-central1"
You can customize how memories are searched by modifying Mem0MemoryService.search_memory. For example, to filter by category:
results = await asyncio.to_thread(
self._client.search,
query,
filters={
"AND": [
{"user_id": user_id},
{"app_id": app_name},
{"categories": {"contains": "travel"}}
]
},
top_k=10,
)
<Note>InMemorySessionService stores sessions in memory and is intended for prototyping. For production, use a persistent session service and clean up sessions when they are no longer needed.</Note>
By implementing Mem0MemoryService as an ADK BaseMemoryService, you get persistent, user-scoped memory across single agents and complex multi-agent hierarchies with minimal code. Memory injection and session saving happen automatically, keeping your agent prompts clean and your token usage efficient.