Back to Cocoindex

Conversation to Knowledge — Design

examples/conversation_to_knowledge/design.md

1.0.324.9 KB
Original Source

Conversation to Knowledge — Design

Overview

Convert podcast sessions (from YouTube) into a structured knowledge graph stored in SurrealDB, using CocoIndex for declarative pipeline orchestration with incremental updates.

Technology Choices

ConcernChoiceRationale
Audio downloadyt-dlpStandard, reliable YouTube downloader
Transcription + diarizationAssemblyAISingle API call gives speaker-labeled transcript with utterances. No file size / duration limits. No GPU needed
LLM (extraction)instructor + litellmgpt-5.4 (configurable via LLM_MODEL in .env)Strong model for structured extraction from transcripts
LLM (entity resolution)instructor + litellmgpt-5-mini (configurable via RESOLUTION_LLM_MODEL in .env)Lighter model sufficient for simple entity deduplication decisions
Embedding (entity resolution)sentence-transformers/all-MiniLM-L6-v2Fast, good for short entity name similarity
In-memory vector searchfaiss-cpu (IndexFlatIP)SIMD-optimized exact search with incremental add(); no GPU needed
Target databaseSurrealDBGraph DB with relations, per spec
Data modelsPydanticPer spec
PipelineCocoIndexPer spec

Input Format

A directory of plain text files, each containing YouTube URLs (one per line):

input/
├── ai_podcasts.txt
├── tech_interviews.txt
└── ...

Each file:

https://www.youtube.com/watch?v=dQw4w9WgXcQ
https://youtu.be/abc123
https://www.youtube.com/watch?v=xyz789

Empty lines and lines starting with # are ignored. Video IDs are extracted from URLs via regex:

python
_YOUTUBE_URL_RE = re.compile(
    r"(?:youtube\.com/watch\?v=|youtu\.be/|youtube\.com/embed/)([a-zA-Z0-9_-]{11})"
)

def extract_video_id(url: str) -> str:
    m = _YOUTUBE_URL_RE.search(url)
    if m is None:
        raise ValueError(f"Cannot extract YouTube video ID from: {url}")
    return m.group(1)

We read these with localfs.walk_dir() matching **/*.txt.

Entity Type Configuration

All named entity types (person, tech, org, ...) are defined in a single global list, making it easy to extend:

python
PERSON_ENTITY_NAME = "person"

@dataclass
class EntityTypeConfig:
    name: str            # Also the SurrealDB table name
    llm_description: str # Injected into the extraction prompt
    llm_examples: list[str]

ENTITY_TYPES: list[EntityTypeConfig] = [
    EntityTypeConfig(name="person", llm_description="...", llm_examples=["Lex Fridman", ...]),
    EntityTypeConfig(name="tech",   llm_description="...", llm_examples=["Python (programming language)", ...]),
    EntityTypeConfig(name="org",    llm_description="...", llm_examples=["OpenAI", ...]),
]

The only Person-specific logic (session participation, statement attribution via speakers) is referenced directly using PERSON_ENTITY_NAME rather than encoded in the config.

Data Models (Pydantic)

Entity Models (for SurrealDB)

python
@dataclass
class SessionTranscript:
    """Carries raw transcript and yt-dlp metadata for extraction."""
    transcript: str
    yt_channel: str              # YouTube channel name
    yt_title: str
    yt_description: str | None   # Video description
    yt_upload_date: str | None

@dataclass
class Session:
    id: int           # Generated by IdGenerator
    youtube_id: str   # YouTube video ID
    name: str         # Extracted by LLM from transcript context
    description: str | None  # Extracted by LLM
    transcript: str   # Full transcript with speaker labels
    date: str | None  # Extracted by LLM if mentioned, else from yt-dlp metadata

@dataclass
class Entity:
    """Generic named entity node shared by all entity types (person, tech, org)."""
    id: str           # Canonical name used directly as ID
    name: str

@dataclass
class Statement:
    id: int           # Generated by IdGenerator
    statement: str    # The statement text (one thematic claim)

Relationship Models (SurrealDB Relations)

Relations have no additional edge fields — just id (auto-derived from from_id + to_id).

LLM Extraction Models (Pydantic, for instructor)

Extraction happens in two LLM calls per session:

Step 1 — Metadata + speaker identification:

python
class SpeakerIdentification(pydantic.BaseModel):
    """Maps a diarization label to a real person name."""
    label: str          # e.g. "A", "B"
    name: str | None    # Real name, or None if unrecognizable

class SessionMetadata(pydantic.BaseModel):
    """LLM output from Step 1: session metadata + speaker mapping."""
    name: str                                # Session/episode name
    description: str | None                  # Brief description
    date: str | None                         # Date if mentioned (ISO format)
    speakers: list[SpeakerIdentification]    # Speaker label → real name

Step 2 — Statement extraction (on reformatted transcript with real names):

python
class RawStatement(pydantic.BaseModel):
    """A thematic claim or statement made during the session."""
    statement: str
    speakers: list[str]          # Names of persons who made the statement
    mentioned_person: list[str]   # Person names involved
    mentioned_tech: list[str]     # Tech names involved
    mentioned_org: list[str]      # Org names involved

class StatementExtraction(pydantic.BaseModel):
    """LLM output from Step 2: statements with involved entities."""
    statements: list[RawStatement]

All entity names must be self-contained — no anaphoric references (pronouns, labels like "Speaker A", "the host", etc.). Every name must stand on its own as a clear identifier.

SurrealDB Schema

Node Tables

TableFieldsNotes
sessionid, name, description?, transcript, date?SCHEMAFULL
personid, nameSCHEMAFULL — schema derived from Entity dataclass
techid, nameSCHEMAFULL — schema derived from Entity dataclass
orgid, nameSCHEMAFULL — schema derived from Entity dataclass
statementid, statementSCHEMAFULL

Relation Tables (Edges)

RelationFROM → TOEdge Fields
person_sessionpersonsession(none)
session_statementsessionstatement(none)
person_statementpersonstatement(none)
statement_mentionsstatementperson / tech / org(none, polymorphic TO)

statement_mentions is polymorphic — the TO side can be person, tech, or org. The SurrealDB connector supports this via listing multiple targets.

Processing Pipeline

Phase 1: Per-Session Processing (mounted as components, memoized)

input .txt files
  └─ parse URLs → list of video IDs
       └─ for each session (use_mount per video ID):
            ├─ [1] Fetch transcript + metadata  (memo=True)
            │       → speaker-labeled transcript + yt metadata (channel, title, description, date)
            ├─ [2a] Reformat transcript (empty speaker map → all labels as "(Speaker X)")
            ├─ [2b] LLM Step 1: extract metadata + identify speakers  (memo=True)
            │       → SessionMetadata (name, description, date, speaker_label→person mapping)
            ├─ [2c] Reformat transcript (with speaker map → real names where known)
            ├─ [2d] LLM Step 2: extract statements + involved entities  (memo=True)
            │       → StatementExtraction (statements with self-contained entity names)
            ├─ [3] Declare Session node → SurrealDB
            ├─ [4] Declare Statement nodes + session_statement edges → SurrealDB
            └─ [5] Return raw entities (persons, techs, orgs) + statement linkages
                   for entity resolution in Phase 2

Fetch Transcript

python
@coco.fn(memo=True)
async def fetch_transcript(youtube_id: str) -> SessionTranscript:
    """Download audio via yt-dlp, transcribe with speaker diarization via AssemblyAI."""
    # 1. yt-dlp: download audio to temp file + fetch metadata (channel, title, description, upload_date)
    # 2. AssemblyAI: transcribe with speaker_labels=True
    #    - Uploads file, returns utterances with speaker labels
    # 3. Format transcript as "Speaker A: ...\nSpeaker B: ..." text
    # 4. Return SessionTranscript(transcript=..., yt_channel=..., yt_title=...,
    #                             yt_description=..., yt_upload_date=...)
    ...

SessionTranscript carries the raw transcript text and all available yt-dlp metadata (channel name, title, description, upload_date). The metadata is essential for Step 1 speaker identification.

Reformat Transcript (shared utility)

python
def reformat_transcript(
    transcript: str, speaker_map: dict[str, str | None]
) -> str:
    """Replace speaker labels with real names; keep label for unrecognized speakers.

    Used by both extraction steps:
    - Step 1: pass empty dict → all speakers shown as "(Speaker A)", "(Speaker B)", ...
    - Step 2: pass mapping from Step 1 → recognized speakers get real names,
              unrecognized stay as "(Speaker X)"
    """
    # "Speaker A: ..." → "Lex Fridman: ..." or "(Speaker A): ..."
    ...

LLM Step 1: Extract Metadata + Identify Speakers

python
@coco.fn(memo=True)
async def extract_metadata(reformatted_transcript: str, transcript: SessionTranscript) -> SessionMetadata:
    """Give LLM the reformatted transcript + all YouTube metadata to identify speakers."""
    client = instructor.from_litellm(litellm.acompletion, mode=instructor.Mode.JSON)
    return await client.chat.completions.create(
        model=coco.use_context(LLM_MODEL),
        response_model=SessionMetadata,
        messages=[
            {"role": "system", "content": METADATA_PROMPT},
            {"role": "user", "content": f"YouTube channel: {transcript.yt_channel}\n"
                                        f"Video title: {transcript.yt_title}\n"
                                        f"Description: {transcript.yt_description or 'N/A'}\n"
                                        f"Upload date: {transcript.yt_upload_date or 'unknown'}\n\n"
                                        f"Transcript:\n{reformatted_transcript}"},
        ],
    )

The prompt instructs the LLM to:

  • Name the session clearly (use video title/channel as hints)
  • Write a brief description
  • Identify the date if mentioned in conversation
  • Map each speaker label (A, B, ...) to a real person name using metadata + conversation content as clues. Names must be Wikipedia-style canonical names (e.g. "Lex Fridman", "Sam Altman"), following the same convention as all other entities. Leave name as None for any speaker it cannot confidently identify.

The identified speakers (those with non-None names) form the person_session relationship.

LLM Step 2: Extract Statements

python
@coco.fn(memo=True)
async def extract_statements(reformatted_transcript: str) -> StatementExtraction:
    """Extract statements and involved entities from the reformatted transcript."""
    ...

The Step 2 prompt instructs the LLM to:

  • Extract thematic statements with speaker attributions and involved entities
  • Use Wikipedia-style canonical names (e.g. "Franklin D. Roosevelt", "Python (programming language)")
  • All entity names must be self-contained — no anaphoric references (pronouns, "Speaker A", "the host", etc.)
  • Statements from unrecognized speakers (marked as (Speaker X)) are still extracted with their involved entities, but are not attributed to any person

Declare + Return

python
@coco.fn(memo=True)
async def process_session(
    youtube_id: str,
    session_table: surrealdb.TableTarget,
    statement_table: surrealdb.TableTarget,
    session_statement_rel: surrealdb.RelationTarget,
) -> SessionRawEntities:
    transcript = await fetch_transcript(youtube_id)

    # Step 1: reformat with empty map (no names known yet), then extract metadata
    step1_transcript = reformat_transcript(transcript.transcript, {})
    metadata = await extract_metadata(step1_transcript, transcript)

    # Step 2: reformat with real names, then extract statements
    speaker_map = {s.label: s.name for s in metadata.speakers}
    step2_transcript = reformat_transcript(transcript.transcript, speaker_map)
    stmt_extraction = await extract_statements(step2_transcript)

    id_gen = IdGenerator(youtube_id)

    # Declare session node (use LLM-extracted metadata, fallback to yt-dlp)
    session_id = await id_gen.next_id()
    session = Session(
        id=session_id,
        youtube_id=youtube_id,
        name=metadata.name or transcript.yt_title,
        description=metadata.description,
        transcript=transcript.transcript,
        date=metadata.date or transcript.yt_upload_date,
    )
    session_table.declare_record(row=session)

    # Declare statements + session_statement edges
    identified_stmts: list[IdentifiedStatement] = []
    for stmt in stmt_extraction.statements:
        stmt_id = await id_gen.next_id(stmt.statement)
        statement_table.declare_record(row=Statement(id=stmt_id, statement=stmt.statement))
        session_statement_rel.declare_relation(from_id=session_id, to_id=stmt_id)
        identified_stmts.append(IdentifiedStatement(id=stmt_id, raw=stmt))

    # Only identified speakers form person_session
    identified_persons = [s.name for s in metadata.speakers if s.name is not None]

    return SessionRawEntities(
        session_id=session_id,
        raw_entities={PERSON_ENTITY_NAME: identified_persons},
        statements=identified_stmts,
    )

Phase 2: Entity Resolution

Uses faiss (faiss-cpu) for in-memory vector search. As entities are processed one by one ("bubble sort"), each embedding is added to a faiss.IndexFlatIP index, so nearest-neighbor queries only search among already-processed entities. IndexFlatIP operates on L2-normalized vectors, making inner product equivalent to cosine similarity.

For each entity type (Person, Tech, Org) independently:

1. Collect all raw entities from all sessions → all_raw_entities: set[str]

2. Initialize faiss index:
   index = faiss.IndexFlatIP(embedding_dim)
   index_names: list[str] = []   # maps faiss row index → entity name

3. Build deduplication_dict via "bubble sort" approach:
   For each entity in all_raw_entities:
     a. Compute embedding (memoized) via SentenceTransformerEmbedder
     b. L2-normalize the embedding
     c. Query index for top N nearest neighbors with similarity > (1 - MAX_DISTANCE)
        (if neighbor is a dup in deduplication_dict, collect its canonical instead)
     d. If candidates exist (excluding self):
        - LLM call (memoized): "Are any of these the same entity as '{entity}'?
          Pick by number, or 'none'."
        - LLM picks canonical name (or declares new canonical)
     e. Update deduplication_dict:
        - entity → None (canonical) or entity → canonical_name
        - If another existing canonical is identified as dup of current,
          update that entry too
     f. Add embedding to faiss index, append name to index_names

4. Output: deduplication_dict: dict[str, str | None]
   e.g. {"Apple Inc.": None, "Apple": "Apple Inc.", "AAPL": "Apple Inc."}

Threshold defaults:

  • MAX_DISTANCE_FOR_RESOLUTION: 0.3 (cosine distance, i.e. similarity > 0.7)
  • N (top candidates): 5
python
import faiss

@coco.fn(memo=True)
async def compute_entity_embedding(name: str) -> NDArray:
    embedder = coco.use_context(EMBEDDER)
    return await embedder.embed(name)

async def resolve_entities(all_raw_entities: set[str]) -> dict[str, str | None]:
    dim = 384  # all-MiniLM-L6-v2 dimension
    index = faiss.IndexFlatIP(dim)
    index_names: list[str] = []
    dedup: dict[str, str | None] = {}

    for entity in all_raw_entities:
        embedding = await compute_entity_embedding(entity)
        faiss.normalize_L2(embedding.reshape(1, -1))

        candidates = []
        if index.ntotal > 0:
            sims, idxs = index.search(embedding.reshape(1, -1), k=min(N, index.ntotal))
            for sim, idx in zip(sims[0], idxs[0]):
                if sim >= 1.0 - MAX_DISTANCE and idx >= 0:
                    cand = index_names[idx]
                    canonical = resolve_canonical(cand, dedup)
                    if canonical != entity:
                        candidates.append(canonical)

        if candidates:
            match = await resolve_entity_pair(entity, list(set(candidates)))
            dedup[entity] = match  # None if entity is canonical, else canonical name
        else:
            dedup[entity] = None  # new canonical

        index.add(embedding.reshape(1, -1))
        index_names.append(entity)

    return dedup

@coco.fn(memo=True)
async def resolve_entity_pair(entity: str, candidates: list[str]) -> str | None:
    """LLM decides if entity matches any candidate; returns canonical or None."""
    client = instructor.from_litellm(litellm.acompletion)
    ...

Phase 3: Knowledge Base Creation

With the deduplication dicts resolved, declare all remaining nodes and edges. Person/Tech/Org use their canonical name directly as the ID. Session and statement IDs are carried from Phase 1 via IdentifiedStatement.

python
@coco.fn
async def create_knowledge_base(
    all_session_raw: list[SessionRawEntities],
    entity_dedup: dict[str, dict[str, str | None]],
    entity_tables: dict[str, surrealdb.TableTarget],
    person_session_rel: surrealdb.RelationTarget,
    person_statement_rel: surrealdb.RelationTarget,
    statement_mentions_rel: surrealdb.RelationTarget,
):
    # Declare canonical nodes for each entity type (name IS the id)
    for cfg in ENTITY_TYPES:
        dedup = entity_dedup[cfg.name]
        table = entity_tables[cfg.name]
        for name, upstream in dedup.items():
            if upstream is None:
                table.declare_record(row=Entity(id=name, name=name))

    person_dedup = entity_dedup[PERSON_ENTITY_NAME]

    # Declare relationships using canonical names
    for session_raw in all_session_raw:
        for person_name in session_raw.raw_entities.get(PERSON_ENTITY_NAME, []):
            canonical = resolve_canonical(person_name, person_dedup)
            person_session_rel.declare_relation(
                from_id=canonical, to_id=session_raw.session_id)

        for identified in session_raw.statements:
            stmt = identified.raw
            stmt_id = identified.id
            for speaker in stmt.speakers:
                canonical = resolve_canonical(speaker, person_dedup)
                person_statement_rel.declare_relation(
                    from_id=canonical, to_id=stmt_id)
            for cfg in ENTITY_TYPES:
                dedup = entity_dedup[cfg.name]
                table = entity_tables[cfg.name]
                for canonical in {
                    resolve_canonical(e, dedup)
                    for e in getattr(stmt, f"mentioned_{cfg.name}")
                }:
                    statement_mentions_rel.declare_relation(
                        from_id=stmt_id, to_id=canonical, to_table=table)

Helper to chase dedup chains:

python
def resolve_canonical(name: str, dedup: dict[str, str | None]) -> str:
    while dedup.get(name) is not None:
        name = dedup[name]
    return name

App Structure

python
@coco.fn
async def app_main(input_dir: pathlib.Path) -> None:
    # --- Setup targets ---
    session_table = await surrealdb.mount_table_target(DB, "session", session_schema)
    statement_table = await surrealdb.mount_table_target(DB, "statement", statement_schema)
    entity_schema = await surrealdb.TableSchema.from_class(Entity)
    entity_tables = {
        cfg.name: await surrealdb.mount_table_target(DB, cfg.name, entity_schema)
        for cfg in ENTITY_TYPES
    }

    person_table = entity_tables[PERSON_ENTITY_NAME]
    session_statement_rel = await surrealdb.mount_relation_target(
        DB, "session_statement", session_table, statement_table)
    person_session_rel = await surrealdb.mount_relation_target(
        DB, "person_session", person_table, session_table)
    person_statement_rel = await surrealdb.mount_relation_target(
        DB, "person_statement", person_table, statement_table)
    statement_mentions_rel = await surrealdb.mount_relation_target(
        DB, "statement_mentions", statement_table,
        [entity_tables[cfg.name] for cfg in ENTITY_TYPES])  # polymorphic TO

    # --- Phase 1: Per-session processing ---
    files = localfs.walk_dir(input_dir, path_matcher=PatternFilePathMatcher(
        included_patterns=["**/*.txt"]))

    all_session_raw: list[SessionRawEntities] = []
    for key, file in files.items():
        text = await file.read_text()
        for line in text.strip().splitlines():
            line = line.strip()
            if not line or line.startswith("#"):
                continue
            youtube_id = extract_video_id(line)
            raw = await coco.use_mount(
                coco.component_subpath("session", youtube_id),
                process_session, youtube_id,
                session_table, statement_table, session_statement_rel,
            )
            all_session_raw.append(raw)

    # --- Phase 2: Entity resolution (one mount per entity type) ---
    entity_dedup = dict(zip(
        [cfg.name for cfg in ENTITY_TYPES],
        await asyncio.gather(*(
            coco.use_mount(
                coco.component_subpath("resolve", cfg.name),
                resolve_entities,
                collect_all_raw(all_session_raw, cfg.name),
            )
            for cfg in ENTITY_TYPES
        )),
    ))

    # --- Phase 3: Declare knowledge base ---
    await coco.mount(
        coco.component_subpath("knowledge_base"),
        create_knowledge_base,
        all_session_raw=all_session_raw,
        entity_dedup=entity_dedup,
        entity_tables=entity_tables,
        person_session_rel=person_session_rel,
        person_statement_rel=person_statement_rel,
        statement_mentions_rel=statement_mentions_rel,
    )

app = coco.App(
    coco.AppConfig(name="ConversationToKnowledge"),
    app_main,
    input_dir=pathlib.Path("./input"),
)

Lifespan & Context

python
SURREAL_DB = coco.ContextKey[surrealdb.ConnectionFactory]("surreal_db")
EMBEDDER = coco.ContextKey[SentenceTransformerEmbedder]("embedder", detect_change=True)
LLM_MODEL = coco.ContextKey[str]("llm_model", detect_change=True)
RESOLUTION_LLM_MODEL = coco.ContextKey[str]("resolution_llm_model", detect_change=True)

@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
    builder.provide(SURREAL_DB, surrealdb.ConnectionFactory(
        url=os.environ["SURREALDB_URL"],
        namespace="cocoindex",
        database="knowledge",
        credentials={
            "username": os.environ.get("SURREALDB_USER", "root"),
            "password": os.environ.get("SURREALDB_PASS", "root"),
        },
    ))
    builder.provide(EMBEDDER, SentenceTransformerEmbedder(
        "sentence-transformers/all-MiniLM-L6-v2"))
    builder.provide(LLM_MODEL, os.environ.get("LLM_MODEL", "openai/gpt-5.4"))
    builder.provide(RESOLUTION_LLM_MODEL, os.environ.get("RESOLUTION_LLM_MODEL", "openai/gpt-5-mini"))
    yield

Environment Variables (.env)

env
# Required
OPENAI_API_KEY=sk-...
SURREALDB_URL=ws://localhost:8000/rpc

# Optional (with defaults)
SURREALDB_USER=root
SURREALDB_PASS=root
LLM_MODEL=gpt-5.4
RESOLUTION_LLM_MODEL=gpt-5-mini

ID Generation

  • Person, Tech, Org: The canonical name is used directly as the ID (readable, no hashing).
  • Session, Statement: Use CocoIndex's IdGenerator.next_id() to generate stable sequential int IDs. The IdGenerator is created inside process_session with the YouTube video ID as the dependency, ensuring IDs are stable across runs.
python
from cocoindex.resources.id import IdGenerator

# Inside process_session:
id_gen = IdGenerator(youtube_id)
session_id = await id_gen.next_id()           # int
stmt_id = await id_gen.next_id(stmt.statement)  # int, distinct per statement

Relation IDs are auto-derived from from_id + to_id by the SurrealDB connector.

File Structure

examples/conversation_to_knowledge/
├── spec.md              # Requirements (existing)
├── design.md            # This file
├── conv_knowledge/      # Python package
│   ├── __init__.py
│   ├── app.py           # App entry point, lifespan, app_main
│   ├── models.py        # Pydantic + dataclass models
│   ├── fetch.py         # YouTube download + OpenAI diarized transcription
│   ├── extract.py       # LLM entity/metadata extraction (instructor + litellm)
│   └── resolve.py       # Entity resolution (embedding + LLM)
├── pyproject.toml       # Dependencies
└── input/               # Sample input files
    └── sample.txt

Dependencies

toml
[project]
dependencies = [
    "cocoindex>=1.0.0",
    "yt-dlp",
    "openai",
    "instructor",
    "litellm",
    "sentence-transformers",
    "faiss-cpu",
    "numpy",
    "pydantic",
    "surrealdb",
]