docs/integrations/pipecat.mdx
Mem0 seamlessly integrates with Pipecat, providing long-term memory capabilities for conversational AI agents. This integration allows your Pipecat-powered applications to remember past conversations and provide personalized responses based on user history.
To use Mem0 with Pipecat, install the required dependencies:
pip install "pipecat-ai[mem0]"
You'll also need to set up your Mem0 API key as an environment variable:
export MEM0_API_KEY=your_mem0_api_key
You can obtain a Mem0 API key by signing up at mem0.ai.
Mem0 integration is provided through the Mem0MemoryService class in Pipecat. Here's how to configure it:
from pipecat.services.mem0 import Mem0MemoryService
memory = Mem0MemoryService(
api_key=os.getenv("MEM0_API_KEY"), # Your Mem0 API key
user_id="unique_user_id", # Unique identifier for the end user
agent_id="my_agent", # Identifier for the agent using the memory
run_id="session_123", # Optional: specific conversation session ID
params={ # Optional: configuration parameters
"search_limit": 10, # Maximum memories to retrieve per query
"search_threshold": 0.1, # Relevance threshold (0.0 to 1.0)
"system_prompt": "Here are your past memories:", # Custom prefix for memories
"add_as_system_message": True, # Add memories as system (True) or user (False) message
"position": 1, # Position in context to insert memories
}
)
The Mem0MemoryService should be positioned between your context aggregator and LLM service in the Pipecat pipeline:
pipeline = Pipeline([
transport.input(),
stt, # Speech-to-text for audio input
user_context, # User context aggregator
memory, # Mem0 Memory service enhances context here
llm, # LLM for response generation
tts, # Optional: Text-to-speech
transport.output(),
assistant_context # Assistant context aggregator
])
Here's a complete example of a Pipecat voice agent with Mem0 memory integration:
import asyncio
import os
from fastapi import FastAPI, WebSocket
from pipecat.frames.frames import TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.runner import PipelineRunner
from pipecat.services.mem0 import Mem0MemoryService
from pipecat.services.openai import OpenAILLMService, OpenAIUserContextAggregator, OpenAIAssistantContextAggregator
from pipecat.transports.network.fastapi_websocket import (
FastAPIWebsocketTransport,
FastAPIWebsocketParams
)
from pipecat.serializers.protobuf import ProtobufFrameSerializer
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.services.whisper import WhisperSTTService
app = FastAPI()
@app.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
# Basic setup with minimal configuration
user_id = "alice"
# WebSocket transport
transport = FastAPIWebsocketTransport(
websocket=websocket,
params=FastAPIWebsocketParams(
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
serializer=ProtobufFrameSerializer(),
)
)
# Core services
user_context = OpenAIUserContextAggregator()
assistant_context = OpenAIAssistantContextAggregator()
stt = WhisperSTTService(api_key=os.getenv("OPENAI_API_KEY"))
# Memory service - the key component
memory = Mem0MemoryService(
api_key=os.getenv("MEM0_API_KEY"),
user_id=user_id,
agent_id="fastapi_memory_bot"
)
# LLM for response generation
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-3.5-turbo",
system_prompt="You are a helpful assistant that remembers past conversations."
)
# Simple pipeline
pipeline = Pipeline([
transport.input(),
stt, # Speech-to-text for audio input
user_context,
memory, # Memory service enhances context here
llm,
transport.output(),
assistant_context
])
# Run the pipeline
runner = PipelineRunner()
task = PipelineTask(pipeline)
# Event handlers for WebSocket connections
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
# Send welcome message when client connects
await task.queue_frame(TextFrame("Hello! I'm a memory bot. I'll remember our conversation."))
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
# Clean up when client disconnects
await task.cancel()
await runner.run(task)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
When integrated with Pipecat, Mem0 provides two key functionalities:
All conversation messages are automatically stored in Mem0 for future reference:
When a new user message is detected:
You can customize how memories are retrieved and used:
memory = Mem0MemoryService(
api_key=os.getenv("MEM0_API_KEY"),
user_id="user123",
params={
"search_limit": 5, # Retrieve up to 5 memories
"search_threshold": 0.2, # Higher threshold for more relevant matches
}
)
Control how memories are presented to the LLM:
memory = Mem0MemoryService(
api_key=os.getenv("MEM0_API_KEY"),
user_id="user123",
params={
"system_prompt": "Previous conversations with this user:",
"add_as_system_message": True, # Add as system message instead of user message
"position": 0, # Insert at the beginning of the context
}
)