examples/postgres_source/README.md
Your structured data, searchable by meaning — in plain async Python.
</p> <p align="center"> <strong>Star us ❤️ →</strong> <a href="https://github.com/cocoindex-io/cocoindex" title="Star CocoIndex on GitHub"><picture><source media="(prefers-color-scheme: dark)" srcset="https://cocoindex.io/blobs/github/homepage/star-btn-small-dark.svg"><source media="(prefers-color-scheme: light)" srcset="https://cocoindex.io/blobs/github/homepage/star-btn-small-light.svg"></picture></a> · <a href="https://cocoindex.io/docs/examples/postgres-source/" title="Read the full walkthrough"><picture><source media="(prefers-color-scheme: dark)" srcset="https://cocoindex.io/blobs/github/homepage/docs-inline-dark.svg"><source media="(prefers-color-scheme: light)" srcset="https://cocoindex.io/blobs/github/homepage/docs-inline-light.svg"></picture></a> · <a href="https://discord.com/invite/zpA9S2DR7s" title="Join the CocoIndex Discord"><picture><source media="(prefers-color-scheme: dark)" srcset="https://cocoindex.io/blobs/github/homepage/discord-inline-dark.svg"><source media="(prefers-color-scheme: light)" srcset="https://cocoindex.io/blobs/github/homepage/discord-inline-light.svg"></picture></a> </p> <div align="center"> </div>Most data already lives in a database. This example takes an existing Postgres table of products, reads it row by row, derives a couple of fields, embeds each row, and writes the result — including the vector — back into Postgres with pgvector. You declare the transformation in native Python and your own types — target_state = transformation(source_state) — and the heavy lifting (incremental processing, change tracking, managed targets) runs in a Rust engine underneath, so only the rows that changed get re-embedded and re-upserted.
app_main wires the source to the target: it mounts the Postgres target table, opens the source table with PgTableSource, and mounts one processing component per source row. Passing row_type=SourceProduct maps each row straight into the dataclass; items(...) tags each one with its (product_category, product_name) composite key. Read it in main.py:
@coco.fn(memo=True)
async def process_product(product: SourceProduct, table: postgres.TableTarget[OutputProduct]) -> None:
full_description = f"Category: {product.product_category}\nName: {product.product_name}\n\n{product.description}"
total_value = product.price * product.amount
embedding = await coco.use_context(EMBEDDER).embed(full_description)
table.declare_row(row=OutputProduct(..., total_value=total_value, embedding=embedding))
@coco.fn
async def app_main() -> None:
target_table = await postgres.mount_table_target(
PG_DB, table_name=TABLE_NAME,
table_schema=await postgres.TableSchema.from_class(
OutputProduct, primary_key=["product_category", "product_name"]),
pg_schema_name=PG_SCHEMA_NAME,
)
source = postgres.PgTableSource(
coco.use_context(SOURCE_POOL), table_name="source_products", row_type=SourceProduct)
await coco.mount_each(
process_product,
source.fetch_rows().items(lambda p: (p.product_category, p.product_name)),
target_table,
)
We embed the composed description — category and name included — so a search for "wireless audio" matches even when the body never says it. embedding: Annotated[NDArray, EMBEDDER] ties the vector column to the embedder, so its dimensions are inferred automatically.
Step-by-step walkthrough with the source/target row shapes, the derived fields, the embedder wiring, and the SQL query.
</p>PgTableSource reads an existing table directly — point it at any table and you have a semantic index over your structured data, no export step.SOURCE_DATABASE_URL to read from a separate database. mount_table_target creates and manages the target table — schema, idempotent upserts, orphan cleanup.full_description carries the category and name into the vector, so meaning-based search works even when the query words never appear in the body.@coco.fn(memo=True) skips a row whose content and code are unchanged; the output's primary key is derived from the source row, so only changed rows are re-embedded and upserted and vanished rows are deleted.all-MiniLM-L6-v2 embedder, no API key; swap EMBED_MODEL for any of the 12k+ sentence-transformer models on Hugging Face.1. Start Postgres + pgvector (the repo ships a compose file):
docker compose -f ../../dev/postgres.yaml up -d
2. Configure & install:
cp .env.example .env # set POSTGRES_URL and SOURCE_DATABASE_URL (can be the same instance)
pip install -e .
3. Seed the source table — create source_products with the sample rows:
psql "$SOURCE_DATABASE_URL" -f ./prepare_source_data.sql
4. Build the index — the Postgres source runs as a one-shot catch-up (scan the source table, sync the target, exit):
cocoindex update main
5. Search from the command line:
python main.py "wireless headphones"
The most semantically similar products come back ranked — even when they share none of the words in your query. That's the whole point of a vector index.
<a href="https://cocoindex.io/docs">Docs</a> · <a href="https://cocoindex.io/docs/examples/postgres-source/">Walkthrough</a> · <a href="https://discord.com/invite/zpA9S2DR7s">Discord</a> · <a href="https://github.com/cocoindex-io/cocoindex/tree/main/examples"><b>See all examples →</b></a>
</p>