LLM.md
Mem0 ("mem-zero") is an intelligent memory layer that enhances AI assistants and agents with persistent, personalized memory capabilities. It enables AI systems to remember user preferences, adapt to individual needs, and continuously learn over time—making it ideal for customer support chatbots, AI assistants, and autonomous systems.
Key Benefits:
# Python
pip install mem0ai
# TypeScript/JavaScript
npm install mem0ai
from mem0 import Memory
# Initialize memory
memory = Memory()
# Add memories
memory.add([
{"role": "user", "content": "I love pizza and hate broccoli"},
{"role": "assistant", "content": "I'll remember your food preferences!"}
], user_id="user123")
# Search memories
results = memory.search("food preferences", user_id="user123")
print(results)
# Get all memories
all_memories = memory.get_all(user_id="user123")
from mem0 import MemoryClient
# Initialize client
client = MemoryClient(api_key="your-api-key")
# Add memories
client.add([
{"role": "user", "content": "My name is John and I'm a developer"}
], user_id="john")
# Search memories
results = client.search("What do you know about me?", user_id="john")
import { MemoryClient } from 'mem0ai';
const client = new MemoryClient({ apiKey: 'your-api-key' });
// Add memory
const memories = await client.add([
{ role: 'user', content: 'My name is John' }
], { user_id: 'john' });
// Search memories
const results = await client.search('What is my name?', { user_id: 'john' });
import { Memory } from 'mem0ai/oss';
const memory = new Memory({
embedder: { provider: 'openai', config: { apiKey: 'key' } },
vectorStore: { provider: 'memory', config: { dimension: 1536 } },
llm: { provider: 'openai', config: { apiKey: 'key' } }
});
const result = await memory.add('My name is John', { userId: 'john' });
Import: from mem0 import Memory, AsyncMemory
from mem0 import Memory
from mem0.configs.base import MemoryConfig
# Basic initialization
memory = Memory()
# With custom configuration
config = MemoryConfig(
vector_store={"provider": "qdrant", "config": {"host": "localhost"}},
llm={"provider": "openai", "config": {"model": "gpt-4.1-nano-2025-04-14"}},
embedder={"provider": "openai", "config": {"model": "text-embedding-3-small"}}
)
memory = Memory(config)
*add(messages, , user_id=None, agent_id=None, run_id=None, metadata=None, infer=True, memory_type=None, prompt=None)
messages: str, dict, or list of message dictsuser_id/agent_id/run_id: Session identifiers (at least one required)metadata: Additional metadata to storeinfer: Whether to use LLM for fact extraction (default: True)memory_type: "procedural_memory" for procedural memoriesprompt: Custom prompt for memory creation*search(query, , user_id=None, agent_id=None, run_id=None, limit=100, filters=None, threshold=None)
query: Search query stringuser_id/agent_id/run_id: Session filters (at least one required)limit: Maximum results (default: 100)filters: Additional search filtersthreshold: Minimum similarity scoreget(memory_id)
get_all(*, user_id=None, agent_id=None, run_id=None, filters=None, limit=100)
update(memory_id, data)
delete(memory_id)
delete_all(user_id=None, agent_id=None, run_id=None)
history(memory_id)
reset()
Import: from mem0 import MemoryClient, AsyncMemoryClient
client = MemoryClient(
api_key="your-api-key", # or set MEM0_API_KEY env var
host="https://api.mem0.ai", # optional
org_id="your-org-id", # optional
project_id="your-project-id" # optional
)
**add(messages, kwargs)
**search(query, version="v1", kwargs)
get(memory_id)
**get_all(version="v1", kwargs)
update(memory_id, text=None, metadata=None)
delete(memory_id)
**delete_all(kwargs)
batch_update(memories)
batch_delete(memories)
users()
delete_users(user_id=None, agent_id=None, app_id=None, run_id=None)
reset()
history(memory_id)
**feedback(memory_id, feedback, kwargs)
**create_memory_export(schema, kwargs)
**get_memory_export(kwargs)
from mem0.configs.base import MemoryConfig
config = MemoryConfig(
vector_store=VectorStoreConfig(provider="qdrant", config={...}),
llm=LlmConfig(provider="openai", config={...}),
embedder=EmbedderConfig(provider="openai", config={...}),
graph_store=GraphStoreConfig(provider="neo4j", config={...}), # optional
history_db_path="~/.mem0/history.db",
version="v1.1",
custom_instructions="Custom prompt...",
custom_update_memory_prompt="Custom prompt..."
)
config = MemoryConfig(
llm={
"provider": "openai",
"config": {
"model": "gpt-4.1-nano-2025-04-14",
"temperature": 0.1,
"max_tokens": 1000
}
},
embedder={
"provider": "openai",
"config": {
"model": "text-embedding-3-small"
}
}
)
config = MemoryConfig(
llm={
"provider": "ollama",
"config": {
"model": "llama3.1:8b",
"ollama_base_url": "http://localhost:11434"
}
},
embedder={
"provider": "ollama",
"config": {
"model": "nomic-embed-text"
}
},
vector_store={
"provider": "chroma",
"config": {
"collection_name": "my_memories",
"path": "./chroma_db"
}
}
)
config = MemoryConfig(
graph_store={
"provider": "neo4j",
"config": {
"url": "bolt://localhost:7687",
"username": "neo4j",
"password": "password",
"database": "neo4j"
}
}
)
config = MemoryConfig(
llm={
"provider": "azure_openai",
"config": {
"model": "gpt-4",
"azure_endpoint": "https://your-resource.openai.azure.com/",
"api_key": "your-api-key",
"api_version": "2024-02-01"
}
},
vector_store={
"provider": "pinecone",
"config": {
"api_key": "your-pinecone-key",
"index_name": "mem0-index",
"dimension": 1536
}
}
)
import { MemoryClient } from 'mem0ai';
const client = new MemoryClient({
apiKey: 'your-api-key',
host: 'https://api.mem0.ai', // optional
organizationId: 'org-id', // optional
projectId: 'project-id' // optional
});
// Core operations
const memories = await client.add([
{ role: 'user', content: 'I love pizza' }
], { user_id: 'user123' });
const results = await client.search('food preferences', { user_id: 'user123' });
const memory = await client.get('memory-id');
const allMemories = await client.getAll({ user_id: 'user123' });
// Management operations
await client.update('memory-id', 'Updated content');
await client.delete('memory-id');
await client.deleteAll({ user_id: 'user123' });
// Batch operations
await client.batchUpdate([{ id: 'mem1', text: 'new text' }]);
await client.batchDelete(['mem1', 'mem2']);
// User management
const users = await client.users();
await client.deleteUsers({ user_ids: ['user1', 'user2'] });
// Webhooks
const webhooks = await client.getWebhooks();
await client.createWebhook({
url: 'https://your-webhook.com',
name: 'My Webhook',
eventTypes: ['memory.created', 'memory.updated']
});
import { Memory } from 'mem0ai/oss';
const memory = new Memory({
embedder: {
provider: 'openai',
config: { apiKey: 'your-key' }
},
vectorStore: {
provider: 'qdrant',
config: { host: 'localhost', port: 6333 }
},
llm: {
provider: 'openai',
config: { model: 'gpt-4.1-nano' }
}
});
// Core operations
const result = await memory.add('I love pizza', { userId: 'user123' });
const searchResult = await memory.search('food preferences', { userId: 'user123' });
const memoryItem = await memory.get('memory-id');
const allMemories = await memory.getAll({ userId: 'user123' });
// Management
await memory.update('memory-id', 'Updated content');
await memory.delete('memory-id');
await memory.deleteAll({ userId: 'user123' });
// History and reset
const history = await memory.history('memory-id');
await memory.reset();
interface Message {
role: 'user' | 'assistant';
content: string | MultiModalMessages;
}
interface Memory {
id: string;
memory?: string;
user_id?: string;
categories?: string[];
created_at?: Date;
updated_at?: Date;
metadata?: any;
score?: number;
}
interface MemoryOptions {
user_id?: string;
agent_id?: string;
app_id?: string;
run_id?: string;
metadata?: Record<string, any>;
filters?: Record<string, any>;
api_version?: 'v1' | 'v2';
infer?: boolean;
enable_graph?: boolean;
}
interface SearchResult {
results: Memory[];
relations?: any[];
}
Graph memory enables relationship tracking between entities mentioned in conversations.
# Enable graph memory
config = MemoryConfig(
graph_store={
"provider": "neo4j",
"config": {
"url": "bolt://localhost:7687",
"username": "neo4j",
"password": "password"
}
}
)
memory = Memory(config)
# Add memory with relationship extraction
result = memory.add(
"John works at OpenAI and is friends with Sarah",
user_id="user123"
)
# Result includes both memories and relationships
print(result["results"]) # Memory entries
print(result["relations"]) # Graph relationships
Supported Graph Databases:
Store and retrieve memories from text, images, and PDFs.
# Text + Image
messages = [
{"role": "user", "content": "This is my travel setup"},
{
"role": "user",
"content": {
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}
}
}
]
client.add(messages, user_id="user123")
# PDF processing
pdf_message = {
"role": "user",
"content": {
"type": "pdf_url",
"pdf_url": {"url": "https://example.com/document.pdf"}
}
}
client.add([pdf_message], user_id="user123")
Store step-by-step procedures and workflows.
# Add procedural memory
result = memory.add(
"To deploy the app: 1. Run tests 2. Build Docker image 3. Push to registry 4. Update k8s manifests",
user_id="developer123",
memory_type="procedural_memory"
)
# Search for procedures
procedures = memory.search(
"How to deploy?",
user_id="developer123"
)
custom_extraction_prompt = """
Extract key facts from the conversation focusing on:
1. Personal preferences
2. Technical skills
3. Project requirements
4. Important dates and deadlines
Conversation: {messages}
"""
config = MemoryConfig(
custom_instructions=custom_extraction_prompt
)
memory = Memory(config)
class PersonalAssistant:
def __init__(self):
self.memory = Memory()
self.llm = OpenAI() # Your LLM client
def chat(self, user_input: str, user_id: str) -> str:
# Retrieve relevant memories
memories = self.memory.search(user_input, user_id=user_id, limit=5)
# Build context from memories
context = "\n".join([f"- {m['memory']}" for m in memories['results']])
# Generate response with context
prompt = f"""
Context from previous conversations:
{context}
User: {user_input}
Assistant:
"""
response = self.llm.generate(prompt)
# Store the conversation
self.memory.add([
{"role": "user", "content": user_input},
{"role": "assistant", "content": response}
], user_id=user_id)
return response
class SupportBot:
def __init__(self):
self.memory = MemoryClient(api_key="your-key")
def handle_ticket(self, customer_id: str, issue: str) -> str:
# Get customer history
history = self.memory.search(
issue,
user_id=customer_id,
limit=10
)
# Check for similar past issues
similar_issues = [m for m in history if m['score'] > 0.8]
if similar_issues:
context = f"Previous similar issues: {similar_issues[0]['memory']}"
else:
context = "No previous similar issues found."
# Generate response
response = self.generate_support_response(issue, context)
# Store interaction
self.memory.add([
{"role": "user", "content": f"Issue: {issue}"},
{"role": "assistant", "content": response}
], user_id=customer_id, metadata={
"category": "support_ticket",
"timestamp": datetime.now().isoformat()
})
return response
class StudyBuddy:
def __init__(self):
self.memory = Memory()
def study_session(self, student_id: str, topic: str, content: str):
# Store study material
self.memory.add(
f"Studied {topic}: {content}",
user_id=student_id,
metadata={
"topic": topic,
"session_date": datetime.now().isoformat(),
"type": "study_session"
}
)
def quiz_student(self, student_id: str, topic: str) -> list:
# Get relevant study materials
materials = self.memory.search(
f"topic:{topic}",
user_id=student_id,
filters={"metadata.type": "study_session"}
)
# Generate quiz questions based on materials
questions = self.generate_quiz_questions(materials)
return questions
def track_progress(self, student_id: str) -> dict:
# Get all study sessions
sessions = self.memory.get_all(
user_id=student_id,
filters={"metadata.type": "study_session"}
)
# Analyze progress
topics_studied = {}
for session in sessions['results']:
topic = session['metadata']['topic']
topics_studied[topic] = topics_studied.get(topic, 0) + 1
return {
"total_sessions": len(sessions['results']),
"topics_covered": len(topics_studied),
"topic_frequency": topics_studied
}
class MultiAgentSystem:
def __init__(self):
self.shared_memory = Memory()
self.agents = {
"researcher": ResearchAgent(),
"writer": WriterAgent(),
"reviewer": ReviewAgent()
}
def collaborative_task(self, task: str, session_id: str):
# Research phase
research_results = self.agents["researcher"].research(task)
self.shared_memory.add(
f"Research findings: {research_results}",
agent_id="researcher",
run_id=session_id,
metadata={"phase": "research"}
)
# Writing phase
research_context = self.shared_memory.search(
"research findings",
run_id=session_id
)
draft = self.agents["writer"].write(task, research_context)
self.shared_memory.add(
f"Draft content: {draft}",
agent_id="writer",
run_id=session_id,
metadata={"phase": "writing"}
)
# Review phase
all_context = self.shared_memory.get_all(run_id=session_id)
final_output = self.agents["reviewer"].review(draft, all_context)
return final_output
import speech_recognition as sr
from gtts import gTTS
import pygame
class VoiceAssistant:
def __init__(self):
self.memory = Memory()
self.recognizer = sr.Recognizer()
self.microphone = sr.Microphone()
def listen_and_respond(self, user_id: str):
# Listen to user
with self.microphone as source:
audio = self.recognizer.listen(source)
try:
# Convert speech to text
user_input = self.recognizer.recognize_google(audio)
print(f"User said: {user_input}")
# Get relevant memories
memories = self.memory.search(user_input, user_id=user_id)
context = "\n".join([m['memory'] for m in memories['results'][:3]])
# Generate response
response = self.generate_response(user_input, context)
# Store conversation
self.memory.add([
{"role": "user", "content": user_input},
{"role": "assistant", "content": response}
], user_id=user_id)
# Convert response to speech
tts = gTTS(text=response, lang='en')
tts.save("response.mp3")
# Play response
pygame.mixer.init()
pygame.mixer.music.load("response.mp3")
pygame.mixer.music.play()
return response
except sr.UnknownValueError:
return "Sorry, I didn't understand that."
# Use consistent user/agent/session IDs
user_id = f"user_{user_email.replace('@', '_')}"
agent_id = f"agent_{agent_name}"
run_id = f"session_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
# Add meaningful metadata
metadata = {
"category": "customer_support",
"priority": "high",
"department": "technical",
"timestamp": datetime.now().isoformat(),
"source": "chat_widget"
}
# Use descriptive memory content
memory.add(
"Customer John Smith reported login issues with 2FA on mobile app. Resolved by clearing app cache.",
user_id=customer_id,
metadata=metadata
)
# Use specific search queries
results = memory.search(
"login issues mobile app", # Specific keywords
user_id=customer_id,
limit=5, # Reasonable limit
threshold=0.7 # Filter low-relevance results
)
# Combine multiple searches for comprehensive results
technical_issues = memory.search("technical problems", user_id=user_id)
recent_conversations = memory.get_all(
user_id=user_id,
filters={"metadata.timestamp": {"$gte": last_week}},
limit=10
)
# Regular cleanup of old memories
def cleanup_old_memories(memory_client, days_old=90):
cutoff_date = datetime.now() - timedelta(days=days_old)
all_memories = memory_client.get_all()
for mem in all_memories:
if datetime.fromisoformat(mem['created_at']) < cutoff_date:
memory_client.delete(mem['id'])
# Archive important memories
def archive_memory(memory_client, memory_id):
memory = memory_client.get(memory_id)
memory_client.update(memory_id, metadata={
**memory.get('metadata', {}),
'archived': True,
'archive_date': datetime.now().isoformat()
})
def safe_memory_operation(memory_client, operation, *args, **kwargs):
try:
return operation(*args, **kwargs)
except Exception as e:
logger.error(f"Memory operation failed: {e}")
# Fallback to basic response without memory
return {"results": [], "message": "Memory temporarily unavailable"}
# Usage
results = safe_memory_operation(
memory_client,
memory_client.search,
query,
user_id=user_id
)
# Batch operations when possible
memories_to_add = [
{"content": msg1, "user_id": user_id},
{"content": msg2, "user_id": user_id},
{"content": msg3, "user_id": user_id}
]
# Instead of multiple add() calls, use batch operations
for memory_data in memories_to_add:
memory.add(memory_data["content"], user_id=memory_data["user_id"])
# Cache frequently accessed memories
from functools import lru_cache
@lru_cache(maxsize=100)
def get_user_preferences(user_id: str):
return memory.search("preferences settings", user_id=user_id, limit=5)
from cookbooks.helper.mem0_teachability import Mem0Teachability
from mem0 import Memory
# Add memory capability to AutoGen agents
memory = Memory()
teachability = Mem0Teachability(
verbosity=1,
reset_db=False,
recall_threshold=1.5,
memory_client=memory
)
# Apply to agent
teachability.add_to_agent(your_autogen_agent)
from langchain.memory import ConversationBufferMemory
from mem0 import Memory
class Mem0LangChainMemory(ConversationBufferMemory):
def __init__(self, user_id: str, **kwargs):
super().__init__(**kwargs)
self.mem0 = Memory()
self.user_id = user_id
def save_context(self, inputs, outputs):
# Save to both LangChain and Mem0
super().save_context(inputs, outputs)
# Store in Mem0 for long-term memory
self.mem0.add([
{"role": "user", "content": str(inputs)},
{"role": "assistant", "content": str(outputs)}
], user_id=self.user_id)
def load_memory_variables(self, inputs):
# Load from LangChain buffer
variables = super().load_memory_variables(inputs)
# Enhance with relevant long-term memories
relevant_memories = self.mem0.search(
str(inputs),
user_id=self.user_id,
limit=3
)
if relevant_memories['results']:
long_term_context = "\n".join([
f"- {m['memory']}" for m in relevant_memories['results']
])
variables['history'] += f"\n\nRelevant past context:\n{long_term_context}"
return variables
import streamlit as st
from mem0 import Memory
# Initialize memory
if 'memory' not in st.session_state:
st.session_state.memory = Memory()
# User input
user_id = st.text_input("User ID", value="user123")
user_message = st.text_input("Your message")
if st.button("Send"):
# Get relevant memories
memories = st.session_state.memory.search(
user_message,
user_id=user_id,
limit=5
)
# Display memories
if memories['results']:
st.subheader("Relevant Memories:")
for memory in memories['results']:
st.write(f"- {memory['memory']} (Score: {memory['score']:.2f})")
# Generate and display response
response = generate_response(user_message, memories)
st.write(f"Assistant: {response}")
# Store conversation
st.session_state.memory.add([
{"role": "user", "content": user_message},
{"role": "assistant", "content": response}
], user_id=user_id)
# Display all memories
if st.button("Show All Memories"):
all_memories = st.session_state.memory.get_all(user_id=user_id)
for memory in all_memories['results']:
st.write(f"- {memory['memory']}")
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from mem0 import MemoryClient
from typing import List, Optional
app = FastAPI()
memory_client = MemoryClient(api_key="your-api-key")
class ChatMessage(BaseModel):
role: str
content: str
class ChatRequest(BaseModel):
messages: List[ChatMessage]
user_id: str
metadata: Optional[dict] = None
class SearchRequest(BaseModel):
query: str
user_id: str
limit: int = 10
@app.post("/chat")
async def chat(request: ChatRequest):
try:
# Add messages to memory
result = memory_client.add(
[msg.dict() for msg in request.messages],
user_id=request.user_id,
metadata=request.metadata
)
return {"status": "success", "result": result}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/search")
async def search_memories(request: SearchRequest):
try:
results = memory_client.search(
request.query,
user_id=request.user_id,
limit=request.limit
)
return {"results": results}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/memories/{user_id}")
async def get_user_memories(user_id: str, limit: int = 50):
try:
memories = memory_client.get_all(user_id=user_id, limit=limit)
return {"memories": memories}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.delete("/memories/{memory_id}")
async def delete_memory(memory_id: str):
try:
result = memory_client.delete(memory_id)
return {"status": "deleted", "result": result}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
Memory Not Found
# Check if memory exists before operations
memory = memory_client.get(memory_id)
if not memory:
print(f"Memory {memory_id} not found")
Search Returns No Results
# Lower the similarity threshold
results = memory.search(
query,
user_id=user_id,
threshold=0.5 # Lower threshold
)
# Check if memories exist for user
all_memories = memory.get_all(user_id=user_id)
if not all_memories['results']:
print("No memories found for user")
Configuration Issues
# Validate configuration
try:
memory = Memory(config)
# Test with a simple operation
memory.add("Test memory", user_id="test")
print("Configuration valid")
except Exception as e:
print(f"Configuration error: {e}")
API Rate Limits
import time
from functools import wraps
def rate_limit_retry(max_retries=3, delay=1):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if "rate limit" in str(e).lower() and attempt < max_retries - 1:
time.sleep(delay * (2 ** attempt)) # Exponential backoff
continue
raise e
return wrapper
return decorator
@rate_limit_retry()
def safe_memory_add(memory, content, user_id):
return memory.add(content, user_id=user_id)
Optimize Vector Store Configuration
# For Qdrant
config = MemoryConfig(
vector_store={
"provider": "qdrant",
"config": {
"host": "localhost",
"port": 6333,
"collection_name": "memories",
"embedding_model_dims": 1536,
"distance": "cosine"
}
}
)
Batch Processing
# Process multiple memories efficiently
def batch_add_memories(memory_client, conversations, user_id, batch_size=10):
for i in range(0, len(conversations), batch_size):
batch = conversations[i:i+batch_size]
for conv in batch:
memory_client.add(conv, user_id=user_id)
time.sleep(0.1) # Small delay between batches
Memory Cleanup
# Regular cleanup to maintain performance
def cleanup_memories(memory_client, user_id, max_memories=1000):
all_memories = memory_client.get_all(user_id=user_id)
if len(all_memories) > max_memories:
# Keep most recent memories
sorted_memories = sorted(
all_memories,
key=lambda x: x['created_at'],
reverse=True
)
# Delete oldest memories
for memory in sorted_memories[max_memories:]:
memory_client.delete(memory['id'])
Mem0 is available under the Apache 2.0 License. See the LICENSE file for more details.