docs/src/content/example-posts/oci-object-storage-embedding.md
This is the Semantic Search 101 example with exactly one thing changed: instead of reading Markdown from a local folder, it lists Markdown objects from an Oracle Cloud (OCI) Object Storage bucket. The chunk → embed → store-in-pgvector half is identical, so the base walkthrough covers the embedding model, the DocEmbedding row, and the query. Here we focus on the part that differs: the OCI client, the source call, and how the same flow goes live off OCI Streaming.
From a high level, these are the steps:
You declare the transformation logic with native Python, without worrying about how updates propagate. Think: target_state = transformation(source_state).
The OCI SDK is synchronous and you create the client yourself, so we build one from a config-file profile and hand it to the context alongside the Postgres pool and the embedder. The connector wraps every SDK call in asyncio.to_thread, so passing the sync client through is fine.
def _build_oci_client() -> ObjectStorageClient:
config = oci.config.from_file(
file_location=os.path.expanduser(OCI_CONFIG_FILE),
profile_name=OCI_PROFILE,
)
return ObjectStorageClient(config)
OCI_CLIENT = coco.ContextKey[ObjectStorageClient]("oci_object_storage_client")
@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
async with asyncpg.create_pool(DATABASE_URL) as pool:
builder.provide(PG_DB, pool)
builder.provide(EMBEDDER, SentenceTransformerEmbedder(EMBED_MODEL))
builder.provide(OCI_CLIENT, _build_oci_client())
yield
Everything downstream of this — DocEmbedding, process_file, process_chunk — is the same chunk-and-embed code as the base example, so we won't repeat it here. The one small difference is the source type: process_file takes an oci_object_storage.OCIFile and reads its text with await file.read_text(), just like the local FileLike.
app_main mounts the Postgres table, then walks the bucket with list_objects. It is the OCI analogue of localfs.walk_dir — give it the client, the namespace, the bucket, an optional prefix, and a path matcher, and it yields one OCIFile per matching object.
@coco.fn
async def app_main() -> None:
target_table = await postgres.mount_table_target(
PG_DB,
table_name=TABLE_NAME,
table_schema=await postgres.TableSchema.from_class(
DocEmbedding, primary_key=["id"],
),
pg_schema_name=PG_SCHEMA_NAME,
)
client = coco.use_context(OCI_CLIENT)
# Live mode is opt-in: build a LiveStream[bytes] from OCI Streaming if configured.
consumer = _build_streaming_consumer()
live_stream = None
if consumer is not None and OCI_STREAMING_TOPIC is not None:
live_stream = kafka.topic_as_stream(consumer, [OCI_STREAMING_TOPIC]).payloads()
files = oci_object_storage.list_objects(
client,
OCI_NAMESPACE,
OCI_BUCKET,
prefix=OCI_PREFIX,
path_matcher=PatternFilePathMatcher(included_patterns=["**/*.md"]),
live_stream=live_stream,
)
await coco.mount_each(process_file, files.items(), target_table)
mount_each runs one processing component per object so the engine tracks and updates each independently. With live_stream=None (the default), list_objects does a one-shot catch-up scan. Pass a stream and it keeps watching — that's the next section.
OCI Streaming is Kafka-protocol-compatible, so live updates ride the Kafka connector. When the four OCI_STREAMING_* env vars are set, we build a confluent_kafka.aio.AIOConsumer with SASL_SSL + PLAIN auth, wrap it with kafka.topic_as_stream(...).payloads() to get a LiveStream[bytes], and pass it to list_objects. The connector snapshots a cutoff before the scan, runs the scan and stream concurrently, and for each post-cutoff event re-reads the object to apply an authoritative update or delete — see the OCI Object Storage connector docs for the details.
def _build_streaming_consumer() -> AIOConsumer | None:
if not (
OCI_STREAMING_BOOTSTRAP_SERVERS and OCI_STREAMING_TOPIC
and OCI_STREAMING_USERNAME and OCI_STREAMING_AUTH_TOKEN
):
return None
return AIOConsumer({
"bootstrap.servers": OCI_STREAMING_BOOTSTRAP_SERVERS,
"security.protocol": "SASL_SSL",
"sasl.mechanism": "PLAIN",
"sasl.username": OCI_STREAMING_USERNAME,
"sasl.password": OCI_STREAMING_AUTH_TOKEN,
"group.id": OCI_STREAMING_GROUP_ID,
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
})
A running Postgres with the pgvector extension:
export POSTGRES_URL="postgres://cocoindex:cocoindex@localhost/cocoindex"
An OCI Object Storage bucket with Markdown objects, and an OCI config file (default ~/.oci/config) with API-key auth. Copy .env.example to .env and fill in OCI_NAMESPACE, OCI_BUCKET, and an optional OCI_PREFIX.
Install CocoIndex with the OCI, Kafka, Postgres, and embedding extras:
pip install -U "cocoindex[oci,kafka,postgres,sentence_transformers]" asyncpg pgvector numpy python-dotenv
Build the index — catch-up (scan, sync, exit) or live (catch up, then keep watching the OCI Streaming topic):
# Catch-up run
cocoindex update main
# Live run: keep watching OCI Streaming for change events
cocoindex update -L main
Then search straight from the command line:
python main.py "what is self-attention?"
This example keeps it minimal and doesn't declare a vector index, so queries do a sequential scan — fine for a few objects. For a larger corpus, add target_table.declare_vector_index(column="embedding") exactly as Semantic Search 101 does.
Incrementality works the same as the base example: @coco.fn(memo=True) skips an object whose content and code are unchanged, and mount_table_target upserts only the rows that actually changed and deletes rows whose source is gone.
id and embedding, new chunks are embedded and inserted, and vanished chunks are deleted.In catch-up mode CocoIndex discovers these by re-scanning the bucket; in live mode the OCI Streaming events drive the same updates with low latency, no full re-scan needed.
The full, runnable example is in the CocoIndex repo: examples/oci_object_storage_embedding. If you're starting from a local folder instead of a bucket, Semantic Search 101 is the same flow on the local filesystem.
If this helped, give CocoIndex a star on GitHub and come say hi on Discord.