examples/conversation_to_knowledge/design.md
Convert podcast sessions (from YouTube) into a structured knowledge graph stored in SurrealDB, using CocoIndex for declarative pipeline orchestration with incremental updates.
| Concern | Choice | Rationale |
|---|---|---|
| Audio download | yt-dlp | Standard, reliable YouTube downloader |
| Transcription + diarization | AssemblyAI | Single API call gives speaker-labeled transcript with utterances. No file size / duration limits. No GPU needed |
| LLM (extraction) | instructor + litellm → gpt-5.4 (configurable via LLM_MODEL in .env) | Strong model for structured extraction from transcripts |
| LLM (entity resolution) | instructor + litellm → gpt-5-mini (configurable via RESOLUTION_LLM_MODEL in .env) | Lighter model sufficient for simple entity deduplication decisions |
| Embedding (entity resolution) | sentence-transformers/all-MiniLM-L6-v2 | Fast, good for short entity name similarity |
| In-memory vector search | faiss-cpu (IndexFlatIP) | SIMD-optimized exact search with incremental add(); no GPU needed |
| Target database | SurrealDB | Graph DB with relations, per spec |
| Data models | Pydantic | Per spec |
| Pipeline | CocoIndex | Per spec |
A directory of plain text files, each containing YouTube URLs (one per line):
input/
├── ai_podcasts.txt
├── tech_interviews.txt
└── ...
Each file:
https://www.youtube.com/watch?v=dQw4w9WgXcQ
https://youtu.be/abc123
https://www.youtube.com/watch?v=xyz789
Empty lines and lines starting with # are ignored. Video IDs are extracted from URLs via regex:
_YOUTUBE_URL_RE = re.compile(
r"(?:youtube\.com/watch\?v=|youtu\.be/|youtube\.com/embed/)([a-zA-Z0-9_-]{11})"
)
def extract_video_id(url: str) -> str:
m = _YOUTUBE_URL_RE.search(url)
if m is None:
raise ValueError(f"Cannot extract YouTube video ID from: {url}")
return m.group(1)
We read these with localfs.walk_dir() matching **/*.txt.
All named entity types (person, tech, org, ...) are defined in a single global list, making it easy to extend:
PERSON_ENTITY_NAME = "person"
@dataclass
class EntityTypeConfig:
name: str # Also the SurrealDB table name
llm_description: str # Injected into the extraction prompt
llm_examples: list[str]
ENTITY_TYPES: list[EntityTypeConfig] = [
EntityTypeConfig(name="person", llm_description="...", llm_examples=["Lex Fridman", ...]),
EntityTypeConfig(name="tech", llm_description="...", llm_examples=["Python (programming language)", ...]),
EntityTypeConfig(name="org", llm_description="...", llm_examples=["OpenAI", ...]),
]
The only Person-specific logic (session participation, statement attribution via speakers) is referenced directly using PERSON_ENTITY_NAME rather than encoded in the config.
@dataclass
class SessionTranscript:
"""Carries raw transcript and yt-dlp metadata for extraction."""
transcript: str
yt_channel: str # YouTube channel name
yt_title: str
yt_description: str | None # Video description
yt_upload_date: str | None
@dataclass
class Session:
id: int # Generated by IdGenerator
youtube_id: str # YouTube video ID
name: str # Extracted by LLM from transcript context
description: str | None # Extracted by LLM
transcript: str # Full transcript with speaker labels
date: str | None # Extracted by LLM if mentioned, else from yt-dlp metadata
@dataclass
class Entity:
"""Generic named entity node shared by all entity types (person, tech, org)."""
id: str # Canonical name used directly as ID
name: str
@dataclass
class Statement:
id: int # Generated by IdGenerator
statement: str # The statement text (one thematic claim)
Relations have no additional edge fields — just id (auto-derived from from_id + to_id).
Extraction happens in two LLM calls per session:
Step 1 — Metadata + speaker identification:
class SpeakerIdentification(pydantic.BaseModel):
"""Maps a diarization label to a real person name."""
label: str # e.g. "A", "B"
name: str | None # Real name, or None if unrecognizable
class SessionMetadata(pydantic.BaseModel):
"""LLM output from Step 1: session metadata + speaker mapping."""
name: str # Session/episode name
description: str | None # Brief description
date: str | None # Date if mentioned (ISO format)
speakers: list[SpeakerIdentification] # Speaker label → real name
Step 2 — Statement extraction (on reformatted transcript with real names):
class RawStatement(pydantic.BaseModel):
"""A thematic claim or statement made during the session."""
statement: str
speakers: list[str] # Names of persons who made the statement
mentioned_person: list[str] # Person names involved
mentioned_tech: list[str] # Tech names involved
mentioned_org: list[str] # Org names involved
class StatementExtraction(pydantic.BaseModel):
"""LLM output from Step 2: statements with involved entities."""
statements: list[RawStatement]
All entity names must be self-contained — no anaphoric references (pronouns, labels like "Speaker A", "the host", etc.). Every name must stand on its own as a clear identifier.
| Table | Fields | Notes |
|---|---|---|
session | id, name, description?, transcript, date? | SCHEMAFULL |
person | id, name | SCHEMAFULL — schema derived from Entity dataclass |
tech | id, name | SCHEMAFULL — schema derived from Entity dataclass |
org | id, name | SCHEMAFULL — schema derived from Entity dataclass |
statement | id, statement | SCHEMAFULL |
| Relation | FROM → TO | Edge Fields |
|---|---|---|
person_session | person → session | (none) |
session_statement | session → statement | (none) |
person_statement | person → statement | (none) |
statement_mentions | statement → person / tech / org | (none, polymorphic TO) |
statement_mentions is polymorphic — the TO side can be person, tech, or org. The SurrealDB connector supports this via listing multiple targets.
input .txt files
└─ parse URLs → list of video IDs
└─ for each session (use_mount per video ID):
├─ [1] Fetch transcript + metadata (memo=True)
│ → speaker-labeled transcript + yt metadata (channel, title, description, date)
├─ [2a] Reformat transcript (empty speaker map → all labels as "(Speaker X)")
├─ [2b] LLM Step 1: extract metadata + identify speakers (memo=True)
│ → SessionMetadata (name, description, date, speaker_label→person mapping)
├─ [2c] Reformat transcript (with speaker map → real names where known)
├─ [2d] LLM Step 2: extract statements + involved entities (memo=True)
│ → StatementExtraction (statements with self-contained entity names)
├─ [3] Declare Session node → SurrealDB
├─ [4] Declare Statement nodes + session_statement edges → SurrealDB
└─ [5] Return raw entities (persons, techs, orgs) + statement linkages
for entity resolution in Phase 2
@coco.fn(memo=True)
async def fetch_transcript(youtube_id: str) -> SessionTranscript:
"""Download audio via yt-dlp, transcribe with speaker diarization via AssemblyAI."""
# 1. yt-dlp: download audio to temp file + fetch metadata (channel, title, description, upload_date)
# 2. AssemblyAI: transcribe with speaker_labels=True
# - Uploads file, returns utterances with speaker labels
# 3. Format transcript as "Speaker A: ...\nSpeaker B: ..." text
# 4. Return SessionTranscript(transcript=..., yt_channel=..., yt_title=...,
# yt_description=..., yt_upload_date=...)
...
SessionTranscript carries the raw transcript text and all available yt-dlp metadata (channel name, title, description, upload_date). The metadata is essential for Step 1 speaker identification.
def reformat_transcript(
transcript: str, speaker_map: dict[str, str | None]
) -> str:
"""Replace speaker labels with real names; keep label for unrecognized speakers.
Used by both extraction steps:
- Step 1: pass empty dict → all speakers shown as "(Speaker A)", "(Speaker B)", ...
- Step 2: pass mapping from Step 1 → recognized speakers get real names,
unrecognized stay as "(Speaker X)"
"""
# "Speaker A: ..." → "Lex Fridman: ..." or "(Speaker A): ..."
...
@coco.fn(memo=True)
async def extract_metadata(reformatted_transcript: str, transcript: SessionTranscript) -> SessionMetadata:
"""Give LLM the reformatted transcript + all YouTube metadata to identify speakers."""
client = instructor.from_litellm(litellm.acompletion, mode=instructor.Mode.JSON)
return await client.chat.completions.create(
model=coco.use_context(LLM_MODEL),
response_model=SessionMetadata,
messages=[
{"role": "system", "content": METADATA_PROMPT},
{"role": "user", "content": f"YouTube channel: {transcript.yt_channel}\n"
f"Video title: {transcript.yt_title}\n"
f"Description: {transcript.yt_description or 'N/A'}\n"
f"Upload date: {transcript.yt_upload_date or 'unknown'}\n\n"
f"Transcript:\n{reformatted_transcript}"},
],
)
The prompt instructs the LLM to:
name as None for any speaker it cannot confidently identify.The identified speakers (those with non-None names) form the person_session relationship.
@coco.fn(memo=True)
async def extract_statements(reformatted_transcript: str) -> StatementExtraction:
"""Extract statements and involved entities from the reformatted transcript."""
...
The Step 2 prompt instructs the LLM to:
(Speaker X)) are still extracted with their involved entities, but are not attributed to any person@coco.fn(memo=True)
async def process_session(
youtube_id: str,
session_table: surrealdb.TableTarget,
statement_table: surrealdb.TableTarget,
session_statement_rel: surrealdb.RelationTarget,
) -> SessionRawEntities:
transcript = await fetch_transcript(youtube_id)
# Step 1: reformat with empty map (no names known yet), then extract metadata
step1_transcript = reformat_transcript(transcript.transcript, {})
metadata = await extract_metadata(step1_transcript, transcript)
# Step 2: reformat with real names, then extract statements
speaker_map = {s.label: s.name for s in metadata.speakers}
step2_transcript = reformat_transcript(transcript.transcript, speaker_map)
stmt_extraction = await extract_statements(step2_transcript)
id_gen = IdGenerator(youtube_id)
# Declare session node (use LLM-extracted metadata, fallback to yt-dlp)
session_id = await id_gen.next_id()
session = Session(
id=session_id,
youtube_id=youtube_id,
name=metadata.name or transcript.yt_title,
description=metadata.description,
transcript=transcript.transcript,
date=metadata.date or transcript.yt_upload_date,
)
session_table.declare_record(row=session)
# Declare statements + session_statement edges
identified_stmts: list[IdentifiedStatement] = []
for stmt in stmt_extraction.statements:
stmt_id = await id_gen.next_id(stmt.statement)
statement_table.declare_record(row=Statement(id=stmt_id, statement=stmt.statement))
session_statement_rel.declare_relation(from_id=session_id, to_id=stmt_id)
identified_stmts.append(IdentifiedStatement(id=stmt_id, raw=stmt))
# Only identified speakers form person_session
identified_persons = [s.name for s in metadata.speakers if s.name is not None]
return SessionRawEntities(
session_id=session_id,
raw_entities={PERSON_ENTITY_NAME: identified_persons},
statements=identified_stmts,
)
Uses faiss (faiss-cpu) for in-memory vector search. As entities are processed one by one ("bubble sort"), each embedding is added to a faiss.IndexFlatIP index, so nearest-neighbor queries only search among already-processed entities. IndexFlatIP operates on L2-normalized vectors, making inner product equivalent to cosine similarity.
For each entity type (Person, Tech, Org) independently:
1. Collect all raw entities from all sessions → all_raw_entities: set[str]
2. Initialize faiss index:
index = faiss.IndexFlatIP(embedding_dim)
index_names: list[str] = [] # maps faiss row index → entity name
3. Build deduplication_dict via "bubble sort" approach:
For each entity in all_raw_entities:
a. Compute embedding (memoized) via SentenceTransformerEmbedder
b. L2-normalize the embedding
c. Query index for top N nearest neighbors with similarity > (1 - MAX_DISTANCE)
(if neighbor is a dup in deduplication_dict, collect its canonical instead)
d. If candidates exist (excluding self):
- LLM call (memoized): "Are any of these the same entity as '{entity}'?
Pick by number, or 'none'."
- LLM picks canonical name (or declares new canonical)
e. Update deduplication_dict:
- entity → None (canonical) or entity → canonical_name
- If another existing canonical is identified as dup of current,
update that entry too
f. Add embedding to faiss index, append name to index_names
4. Output: deduplication_dict: dict[str, str | None]
e.g. {"Apple Inc.": None, "Apple": "Apple Inc.", "AAPL": "Apple Inc."}
Threshold defaults:
MAX_DISTANCE_FOR_RESOLUTION: 0.3 (cosine distance, i.e. similarity > 0.7)N (top candidates): 5import faiss
@coco.fn(memo=True)
async def compute_entity_embedding(name: str) -> NDArray:
embedder = coco.use_context(EMBEDDER)
return await embedder.embed(name)
async def resolve_entities(all_raw_entities: set[str]) -> dict[str, str | None]:
dim = 384 # all-MiniLM-L6-v2 dimension
index = faiss.IndexFlatIP(dim)
index_names: list[str] = []
dedup: dict[str, str | None] = {}
for entity in all_raw_entities:
embedding = await compute_entity_embedding(entity)
faiss.normalize_L2(embedding.reshape(1, -1))
candidates = []
if index.ntotal > 0:
sims, idxs = index.search(embedding.reshape(1, -1), k=min(N, index.ntotal))
for sim, idx in zip(sims[0], idxs[0]):
if sim >= 1.0 - MAX_DISTANCE and idx >= 0:
cand = index_names[idx]
canonical = resolve_canonical(cand, dedup)
if canonical != entity:
candidates.append(canonical)
if candidates:
match = await resolve_entity_pair(entity, list(set(candidates)))
dedup[entity] = match # None if entity is canonical, else canonical name
else:
dedup[entity] = None # new canonical
index.add(embedding.reshape(1, -1))
index_names.append(entity)
return dedup
@coco.fn(memo=True)
async def resolve_entity_pair(entity: str, candidates: list[str]) -> str | None:
"""LLM decides if entity matches any candidate; returns canonical or None."""
client = instructor.from_litellm(litellm.acompletion)
...
With the deduplication dicts resolved, declare all remaining nodes and edges.
Person/Tech/Org use their canonical name directly as the ID. Session and statement IDs
are carried from Phase 1 via IdentifiedStatement.
@coco.fn
async def create_knowledge_base(
all_session_raw: list[SessionRawEntities],
entity_dedup: dict[str, dict[str, str | None]],
entity_tables: dict[str, surrealdb.TableTarget],
person_session_rel: surrealdb.RelationTarget,
person_statement_rel: surrealdb.RelationTarget,
statement_mentions_rel: surrealdb.RelationTarget,
):
# Declare canonical nodes for each entity type (name IS the id)
for cfg in ENTITY_TYPES:
dedup = entity_dedup[cfg.name]
table = entity_tables[cfg.name]
for name, upstream in dedup.items():
if upstream is None:
table.declare_record(row=Entity(id=name, name=name))
person_dedup = entity_dedup[PERSON_ENTITY_NAME]
# Declare relationships using canonical names
for session_raw in all_session_raw:
for person_name in session_raw.raw_entities.get(PERSON_ENTITY_NAME, []):
canonical = resolve_canonical(person_name, person_dedup)
person_session_rel.declare_relation(
from_id=canonical, to_id=session_raw.session_id)
for identified in session_raw.statements:
stmt = identified.raw
stmt_id = identified.id
for speaker in stmt.speakers:
canonical = resolve_canonical(speaker, person_dedup)
person_statement_rel.declare_relation(
from_id=canonical, to_id=stmt_id)
for cfg in ENTITY_TYPES:
dedup = entity_dedup[cfg.name]
table = entity_tables[cfg.name]
for canonical in {
resolve_canonical(e, dedup)
for e in getattr(stmt, f"mentioned_{cfg.name}")
}:
statement_mentions_rel.declare_relation(
from_id=stmt_id, to_id=canonical, to_table=table)
Helper to chase dedup chains:
def resolve_canonical(name: str, dedup: dict[str, str | None]) -> str:
while dedup.get(name) is not None:
name = dedup[name]
return name
@coco.fn
async def app_main(input_dir: pathlib.Path) -> None:
# --- Setup targets ---
session_table = await surrealdb.mount_table_target(DB, "session", session_schema)
statement_table = await surrealdb.mount_table_target(DB, "statement", statement_schema)
entity_schema = await surrealdb.TableSchema.from_class(Entity)
entity_tables = {
cfg.name: await surrealdb.mount_table_target(DB, cfg.name, entity_schema)
for cfg in ENTITY_TYPES
}
person_table = entity_tables[PERSON_ENTITY_NAME]
session_statement_rel = await surrealdb.mount_relation_target(
DB, "session_statement", session_table, statement_table)
person_session_rel = await surrealdb.mount_relation_target(
DB, "person_session", person_table, session_table)
person_statement_rel = await surrealdb.mount_relation_target(
DB, "person_statement", person_table, statement_table)
statement_mentions_rel = await surrealdb.mount_relation_target(
DB, "statement_mentions", statement_table,
[entity_tables[cfg.name] for cfg in ENTITY_TYPES]) # polymorphic TO
# --- Phase 1: Per-session processing ---
files = localfs.walk_dir(input_dir, path_matcher=PatternFilePathMatcher(
included_patterns=["**/*.txt"]))
all_session_raw: list[SessionRawEntities] = []
for key, file in files.items():
text = await file.read_text()
for line in text.strip().splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
youtube_id = extract_video_id(line)
raw = await coco.use_mount(
coco.component_subpath("session", youtube_id),
process_session, youtube_id,
session_table, statement_table, session_statement_rel,
)
all_session_raw.append(raw)
# --- Phase 2: Entity resolution (one mount per entity type) ---
entity_dedup = dict(zip(
[cfg.name for cfg in ENTITY_TYPES],
await asyncio.gather(*(
coco.use_mount(
coco.component_subpath("resolve", cfg.name),
resolve_entities,
collect_all_raw(all_session_raw, cfg.name),
)
for cfg in ENTITY_TYPES
)),
))
# --- Phase 3: Declare knowledge base ---
await coco.mount(
coco.component_subpath("knowledge_base"),
create_knowledge_base,
all_session_raw=all_session_raw,
entity_dedup=entity_dedup,
entity_tables=entity_tables,
person_session_rel=person_session_rel,
person_statement_rel=person_statement_rel,
statement_mentions_rel=statement_mentions_rel,
)
app = coco.App(
coco.AppConfig(name="ConversationToKnowledge"),
app_main,
input_dir=pathlib.Path("./input"),
)
SURREAL_DB = coco.ContextKey[surrealdb.ConnectionFactory]("surreal_db")
EMBEDDER = coco.ContextKey[SentenceTransformerEmbedder]("embedder", detect_change=True)
LLM_MODEL = coco.ContextKey[str]("llm_model", detect_change=True)
RESOLUTION_LLM_MODEL = coco.ContextKey[str]("resolution_llm_model", detect_change=True)
@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
builder.provide(SURREAL_DB, surrealdb.ConnectionFactory(
url=os.environ["SURREALDB_URL"],
namespace="cocoindex",
database="knowledge",
credentials={
"username": os.environ.get("SURREALDB_USER", "root"),
"password": os.environ.get("SURREALDB_PASS", "root"),
},
))
builder.provide(EMBEDDER, SentenceTransformerEmbedder(
"sentence-transformers/all-MiniLM-L6-v2"))
builder.provide(LLM_MODEL, os.environ.get("LLM_MODEL", "openai/gpt-5.4"))
builder.provide(RESOLUTION_LLM_MODEL, os.environ.get("RESOLUTION_LLM_MODEL", "openai/gpt-5-mini"))
yield
.env)# Required
OPENAI_API_KEY=sk-...
SURREALDB_URL=ws://localhost:8000/rpc
# Optional (with defaults)
SURREALDB_USER=root
SURREALDB_PASS=root
LLM_MODEL=gpt-5.4
RESOLUTION_LLM_MODEL=gpt-5-mini
IdGenerator.next_id() to generate stable sequential int IDs. The IdGenerator is created inside process_session with the YouTube video ID as the dependency, ensuring IDs are stable across runs.from cocoindex.resources.id import IdGenerator
# Inside process_session:
id_gen = IdGenerator(youtube_id)
session_id = await id_gen.next_id() # int
stmt_id = await id_gen.next_id(stmt.statement) # int, distinct per statement
Relation IDs are auto-derived from from_id + to_id by the SurrealDB connector.
examples/conversation_to_knowledge/
├── spec.md # Requirements (existing)
├── design.md # This file
├── conv_knowledge/ # Python package
│ ├── __init__.py
│ ├── app.py # App entry point, lifespan, app_main
│ ├── models.py # Pydantic + dataclass models
│ ├── fetch.py # YouTube download + OpenAI diarized transcription
│ ├── extract.py # LLM entity/metadata extraction (instructor + litellm)
│ └── resolve.py # Entity resolution (embedding + LLM)
├── pyproject.toml # Dependencies
└── input/ # Sample input files
└── sample.txt
[project]
dependencies = [
"cocoindex>=1.0.0",
"yt-dlp",
"openai",
"instructor",
"litellm",
"sentence-transformers",
"faiss-cpu",
"numpy",
"pydantic",
"surrealdb",
]