examples/oci_object_storage_embedding/README.md
It's Semantic Search 101 with the source swapped for a bucket — in plain async Python.
</p> <p align="center"> <strong>Star us ❤️ →</strong> <a href="https://github.com/cocoindex-io/cocoindex" title="Star CocoIndex on GitHub"><picture><source media="(prefers-color-scheme: dark)" srcset="https://cocoindex.io/blobs/github/homepage/star-btn-small-dark.svg"><source media="(prefers-color-scheme: light)" srcset="https://cocoindex.io/blobs/github/homepage/star-btn-small-light.svg"></picture></a> · <a href="https://cocoindex.io/docs/examples/oci-object-storage-embedding/" title="Read the full walkthrough"><picture><source media="(prefers-color-scheme: dark)" srcset="https://cocoindex.io/blobs/github/homepage/docs-inline-dark.svg"><source media="(prefers-color-scheme: light)" srcset="https://cocoindex.io/blobs/github/homepage/docs-inline-light.svg"></picture></a> · <a href="https://discord.com/invite/zpA9S2DR7s" title="Join the CocoIndex Discord"><picture><source media="(prefers-color-scheme: dark)" srcset="https://cocoindex.io/blobs/github/homepage/discord-inline-dark.svg"><source media="(prefers-color-scheme: light)" srcset="https://cocoindex.io/blobs/github/homepage/discord-inline-light.svg"></picture></a> </p> <div align="center"> </div>Most documents already live in object storage, not on your laptop. This pipeline lists Markdown objects from an OCI Object Storage bucket, splits each into overlapping chunks, embeds them with sentence-transformers, and stores the vectors in Postgres with pgvector. You declare the transformation in native Python and your own types — target_state = transformation(source_state) — and the heavy lifting (incremental processing, change tracking, managed targets) runs in a Rust engine underneath, so adding one object embeds one object, not the whole bucket.
The chunk → embed → store half is identical to Semantic Search 101; the part that differs is the source. The OCI SDK is synchronous and you create the client yourself, so the example builds one from a config-file profile, hands it to the context, and lists objects with oci_object_storage.list_objects — the OCI analogue of localfs.walk_dir. Live mode is opt-in: when the four OCI_STREAMING_* env vars are set, it builds a Kafka-protocol AIOConsumer against OCI Streaming and passes it through as a LiveStream[bytes]. Read it in main.py:
@coco.fn
async def app_main() -> None:
target_table = await postgres.mount_table_target(
PG_DB, table_name=TABLE_NAME,
table_schema=await postgres.TableSchema.from_class(DocEmbedding, primary_key=["id"]),
pg_schema_name=PG_SCHEMA_NAME,
)
client = coco.use_context(OCI_CLIENT)
# Live mode is opt-in: build a LiveStream[bytes] from OCI Streaming if configured.
consumer = _build_streaming_consumer()
live_stream = None
if consumer is not None and OCI_STREAMING_TOPIC is not None:
live_stream = kafka.topic_as_stream(consumer, [OCI_STREAMING_TOPIC]).payloads()
files = oci_object_storage.list_objects(
client, OCI_NAMESPACE, OCI_BUCKET, prefix=OCI_PREFIX,
path_matcher=PatternFilePathMatcher(included_patterns=["**/*.md"]),
live_stream=live_stream,
)
await coco.mount_each(process_file, files.items(), target_table)
With live_stream=None (the default), list_objects does a one-shot catch-up scan. Pass a stream and the connector keeps watching, re-reading each post-cutoff object to apply an authoritative update or delete. mount_each runs one processing component per object so the engine tracks each independently.
Step-by-step walkthrough with the OCI client, the source call, the row schema, and exactly how live mode rides OCI Streaming.
</p>process_file takes an oci_object_storage.OCIFile and reads it with await file.read_text(), just like a local FileLike.head_object) to determine current state, then issues an update (present) or delete (404) — the event type is never trusted as the dispatch signal.@coco.fn(memo=True) skips an object whose content and code are unchanged; mount_table_target upserts only changed rows and deletes rows whose source is gone.EMBED_MODEL for any of the 12k+ models on Hugging Face.1. Start Postgres + pgvector (the repo ships a compose file):
docker compose -f ../../dev/postgres.yaml up -d
2. Configure & install — point at a bucket with Markdown objects and an OCI config file (default ~/.oci/config):
cp .env.example .env # set POSTGRES_URL, OCI_NAMESPACE, OCI_BUCKET (optional OCI_PREFIX)
pip install -e .
For live mode, also set OCI_STREAMING_BOOTSTRAP_SERVERS, OCI_STREAMING_TOPIC, OCI_STREAMING_USERNAME, and OCI_STREAMING_AUTH_TOKEN in .env (the .env.example documents each format). With those unset, the connector skips the subscription and just does the catch-up scan.
3. Build the index — catch-up (scan, sync, exit) or live (catch up, then keep watching the topic):
cocoindex update main # catch-up
cocoindex update -L main # live
4. Search from the command line:
python main.py "what is self-attention?"
This example keeps it minimal and doesn't declare a vector index, so queries do a sequential scan — fine for a few objects. For a larger corpus, add target_table.declare_vector_index(column="embedding") exactly as Semantic Search 101 does.
<a href="https://cocoindex.io/docs">Docs</a> · <a href="https://cocoindex.io/docs/examples/oci-object-storage-embedding/">Walkthrough</a> · <a href="https://discord.com/invite/zpA9S2DR7s">Discord</a> · <a href="https://github.com/cocoindex-io/cocoindex/tree/main/examples"><b>See all examples →</b></a>
</p>