Back to Supermemory

Supermemory Pipecat SDK

packages/pipecat-sdk-python/README.md

latest4.8 KB
Original Source

Supermemory Pipecat SDK

Memory-enhanced conversational AI pipelines with Supermemory and Pipecat.

Installation

bash
pip install supermemory-pipecat

Quick Start

python
import os
from pipecat.pipeline.pipeline import Pipeline
from pipecat.services.openai import OpenAILLMService, OpenAIUserContextAggregator
from supermemory_pipecat import SupermemoryPipecatService

# Create memory service
memory = SupermemoryPipecatService(
    api_key=os.getenv("SUPERMEMORY_API_KEY"),
    user_id="user-123",  # Required: used as container_tag
    session_id="conversation-456",  # Optional: groups memories by session
)

# Create pipeline with memory
pipeline = Pipeline([
    transport.input(),
    stt,
    user_context,
    memory,  # Automatically retrieves and injects relevant memories
    llm,
    transport.output(),
])

Configuration

Parameters

ParameterTypeRequiredDescription
user_idstrYesUser identifier - used as container_tag for memory scoping
session_idstrNoSession/conversation ID for grouping memories
api_keystrNoSupermemory API key (or set SUPERMEMORY_API_KEY env var)
paramsInputParamsNoAdvanced configuration
base_urlstrNoCustom API endpoint

Advanced Configuration

python
from supermemory_pipecat import SupermemoryPipecatService

memory = SupermemoryPipecatService(
    user_id="user-123",
    session_id="conv-456",
    params=SupermemoryPipecatService.InputParams(
        search_limit=10,           # Max memories to retrieve
        search_threshold=0.1,      # Similarity threshold
        mode="full",               # "profile", "query", or "full"
        system_prompt="Based on previous conversations, I recall:\n\n",
    ),
)

Memory Modes

ModeStatic ProfileDynamic ProfileSearch Results
"profile"YesYesNo
"query"NoNoYes
"full"YesYesYes

How It Works

  1. Intercepts context frames - Listens for LLMContextFrame in the pipeline
  2. Tracks conversation - Maintains clean conversation history (no injected memories)
  3. Retrieves memories - Queries /v4/profile API with user's message
  4. Injects memories - Formats and adds to LLM context as system message
  5. Stores messages - Sends last user message to Supermemory (background, non-blocking)

What Gets Stored

Only the last user message is sent to Supermemory:

User: What's the weather like today?

Stored as:

json
{
  "content": "User: What's the weather like today?",
  "container_tags": ["user-123"],
  "custom_id": "conversation-456",
  "metadata": { "platform": "pipecat" }
}

Full Example

python
import asyncio
import os
from fastapi import FastAPI, WebSocket
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.runner import PipelineRunner
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.google.gemini_live.llm import GeminiLiveLLMService
from pipecat.transports.websocket.fastapi import (
    FastAPIWebsocketTransport,
    FastAPIWebsocketParams,
)
from supermemory_pipecat import SupermemoryPipecatService

app = FastAPI()

@app.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()

    transport = FastAPIWebsocketTransport(
        websocket=websocket,
        params=FastAPIWebsocketParams(audio_in_enabled=True, audio_out_enabled=True),
    )

    # Gemini Live for speech-to-speech
    llm = GeminiLiveLLMService(
        api_key=os.getenv("GEMINI_API_KEY"),
        model="models/gemini-2.5-flash-native-audio-preview-12-2025",
    )

    context = OpenAILLMContext([{"role": "system", "content": "You are a helpful assistant."}])
    context_aggregator = llm.create_context_aggregator(context)

    # Supermemory memory service
    memory = SupermemoryPipecatService(
        user_id="alice",
        session_id="session-123",
    )

    pipeline = Pipeline([
        transport.input(),
        context_aggregator.user(),
        memory,
        llm,
        transport.output(),
        context_aggregator.assistant(),
    ])

    runner = PipelineRunner()
    task = PipelineTask(pipeline)
    await runner.run(task)

License

MIT