docs/open-source/features/async-memory.mdx
AsyncMemory gives you a non-blocking interface to Mem0’s storage layer so Python applications can add, search, and manage memories directly from async code. Use it when you embed Mem0 inside FastAPI services, background workers, or any workflow that relies on asyncio.
AsyncMemory talks to the same backends as the synchronous client but keeps everything in-process for lower latency.add, search, get_all, delete, etc.) mirrors the synchronous API, letting you reuse payload shapes.asyncio.gather.user_id, agent_id, and run_id to separate memories across sessions and agents.import asyncio
from mem0 import AsyncMemory
# Default configuration
memory = AsyncMemory()
# Custom configuration
from mem0.configs.base import MemoryConfig
custom_config = MemoryConfig(
# Your custom configuration here
)
memory = AsyncMemory(config=custom_config)
import asyncio
from contextlib import asynccontextmanager
from mem0 import AsyncMemory
@asynccontextmanager
async def get_memory():
memory = AsyncMemory()
try:
yield memory
finally:
# Clean up resources if needed
pass
async def safe_memory_usage():
async with get_memory() as memory:
return await memory.search("test query", filters={"user_id": "alice"})
async def batch_operations():
memory = AsyncMemory()
tasks = [
memory.add(
messages=[{"role": "user", "content": f"Message {i}"}],
user_id=f"user_{i}"
)
for i in range(5)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
else:
print(f"Task {i} completed successfully")
import asyncio
from mem0 import AsyncMemory
async def with_timeout_and_retry(operation, max_retries=3, timeout=10.0):
for attempt in range(max_retries):
try:
return await asyncio.wait_for(operation(), timeout=timeout)
except asyncio.TimeoutError:
print(f"Timeout on attempt {attempt + 1}")
except Exception as exc:
print(f"Error on attempt {attempt + 1}: {exc}")
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt)
raise Exception(f"Operation failed after {max_retries} attempts")
async def robust_memory_search():
memory = AsyncMemory()
async def search_operation():
return await memory.search("test query", filters={"user_id": "alice"})
return await with_timeout_and_retry(search_operation)
# Create memories
result = await memory.add(
messages=[
{"role": "user", "content": "I'm travelling to SF"},
{"role": "assistant", "content": "That's great to hear!"}
],
user_id="alice"
)
# Search memories
results = await memory.search(
query="Where am I travelling?",
filters={"user_id": "alice"}
)
# List memories
all_memories = await memory.get_all(filters={"user_id": "alice"})
# Get a specific memory
specific_memory = await memory.get(memory_id="memory-id-here")
# Update a memory
updated_memory = await memory.update(
memory_id="memory-id-here",
data="I'm travelling to Seattle"
)
# Delete a memory
await memory.delete(memory_id="memory-id-here")
# Delete scoped memories
await memory.delete_all(user_id="alice")
await memory.add(
messages=[{"role": "user", "content": "I prefer vegetarian food"}],
user_id="alice",
agent_id="diet-assistant",
run_id="consultation-001"
)
all_user_memories = await memory.get_all(filters={"user_id": "alice"})
agent_memories = await memory.get_all(filters={"user_id": "alice", "agent_id": "diet-assistant"})
session_memories = await memory.get_all(filters={"user_id": "alice", "run_id": "consultation-001"})
specific_memories = await memory.get_all(
filters={"user_id": "alice", "agent_id": "diet-assistant", "run_id": "consultation-001"}
)
history = await memory.history(memory_id="memory-id-here")
import asyncio
from openai import AsyncOpenAI
from mem0 import AsyncMemory
async_openai_client = AsyncOpenAI()
async_memory = AsyncMemory()
async def chat_with_memories(message: str, user_id: str = "default_user") -> str:
search_result = await async_memory.search(query=message, filters={"user_id": user_id}, top_k=3)
relevant_memories = search_result["results"]
memories_str = "\n".join(f"- {entry['memory']}" for entry in relevant_memories)
system_prompt = (
"You are a helpful AI. Answer the question based on query and memories.\n"
f"User Memories:\n{memories_str}"
)
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": message},
]
response = await async_openai_client.chat.completions.create(
model="gpt-5-mini",
messages=messages
)
assistant_response = response.choices[0].message.content
messages.append({"role": "assistant", "content": assistant_response})
await async_memory.add(messages, user_id=user_id)
return assistant_response
from mem0 import AsyncMemory
from mem0.configs.base import MemoryConfig
async def handle_initialization_errors():
try:
config = MemoryConfig(
vector_store={"provider": "chroma", "config": {"path": "./chroma_db"}},
llm={"provider": "openai", "config": {"model": "gpt-5-mini"}}
)
AsyncMemory(config=config)
print("AsyncMemory initialized successfully")
except ValueError as err:
print(f"Configuration error: {err}")
except ConnectionError as err:
print(f"Connection error: {err}")
async def handle_memory_operation_errors():
memory = AsyncMemory()
try:
await memory.get(memory_id="non-existent-id")
except ValueError as err:
print(f"Invalid memory ID: {err}")
try:
await memory.search(query="", filters={"user_id": "alice"})
except ValueError as err:
print(f"Invalid search query: {err}")
from fastapi import FastAPI, HTTPException
from mem0 import AsyncMemory
app = FastAPI()
memory = AsyncMemory()
@app.post("/memories/")
async def add_memory(messages: list, user_id: str):
try:
result = await memory.add(messages=messages, user_id=user_id)
return {"status": "success", "data": result}
except Exception as exc:
raise HTTPException(status_code=500, detail=str(exc))
@app.get("/memories/search")
async def search_memories(query: str, user_id: str, limit: int = 10):
try:
result = await memory.search(query=query, filters={"user_id": user_id}, top_k=limit)
return {"status": "success", "data": result}
except Exception as exc:
raise HTTPException(status_code=500, detail=str(exc))
import logging
import time
from functools import wraps
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def log_async_operation(operation_name):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
logger.info(f"Starting {operation_name}")
try:
result = await func(*args, **kwargs)
duration = time.time() - start_time
logger.info(f"{operation_name} completed in {duration:.2f}s")
return result
except Exception as exc:
duration = time.time() - start_time
logger.error(f"{operation_name} failed after {duration:.2f}s: {exc}")
raise
return wrapper
return decorator
@log_async_operation("Memory Add")
async def logged_memory_add(memory, messages, user_id):
return await memory.add(messages=messages, user_id=user_id)
await is the fastest way to miss writes—lint for it or add helper wrappers.user_id, agent_id, or run_id to avoid purging too much data.asyncio.gather for throughput but cap concurrency based on backend capacity.AsyncMemory once per worker to avoid repeated backend handshakes.| Issue | Possible causes | Fix |
|---|---|---|
| Initialization fails | Missing dependencies, invalid config | Validate MemoryConfig settings and environment variables. |
| Slow operations | Large datasets, network latency | Cache heavy queries and tune vector store parameters. |
| Memory not found | Invalid ID or deleted record | Check ID source and handle soft-deleted states. |
| Connection timeouts | Network issues, overloaded backend | Apply retries/backoff and inspect infrastructure health. |
| Out-of-memory errors | Oversized batches | Reduce concurrency or chunk operations into smaller sets. |