examples/oci_object_storage_embedding/README.md
This example embeds markdown files from an Oracle Cloud Infrastructure (OCI) Object Storage bucket, stores the chunks + embeddings in Postgres (pgvector), and provides a simple semantic-search query demo.
It optionally subscribes to OCI Streaming (Kafka-protocol-compatible) for live updates: when configured, object create/update/delete events flowing through the stream trigger incremental re-processing without re-scanning the whole bucket.
~/.oci/config) with API-key auth set up — see OCI SDK config docsCopy .env.example to .env and fill in your values:
cp .env.example .env
Install deps:
pip install -e .
Build/update the index in catch-up mode (writes rows into Postgres and exits):
cocoindex update main.py
Run in live mode — performs an initial scan, then keeps watching the OCI Streaming topic and applies incremental updates:
cocoindex update -L main.py
Live mode requires OCI_STREAMING_BOOTSTRAP_SERVERS, OCI_STREAMING_TOPIC, OCI_STREAMING_USERNAME, and OCI_STREAMING_AUTH_TOKEN to be set in .env. With those unset, the connector skips live-stream subscription and just performs the catch-up scan.
OCI_STREAMING_USERNAME format<tenancy-name>/<username>/<stream-pool-ocid> — note the first two segments are plain names, not OCIDs:
tenancy-name: e.g. acme-corp (NOT ocid1.tenancy.oc1..…). If your tenancy uses Identity Domains, the format is <tenancy-name>/<domain-name>/<username>/<stream-pool-ocid>.username: e.g. [email protected] (NOT ocid1.user.oc1..…). For federated users, prefix with the IDP, e.g. oracleidentitycloudservice/[email protected].stream-pool-ocid: this one is an OCID, e.g. ocid1.streampool.oc1.iad.aaaa….The OCI Console pre-formats this exact string for you under Streaming → Stream Pools → <your pool> → Kafka Connection Settings → "SASL Connection String". Just copy that value verbatim.
OCI_STREAMING_AUTH_TOKENThis is a per-user "Auth Token" — a one-shot credential, distinct from your Console password and from API signing keys. Generate via Console → Profile → My profile → Auth tokens → Generate token. Copy it immediately; it cannot be viewed again.
Query:
python main.py query "what is self-attention?"
Note: this example does not create a vector index; queries will do a sequential scan.
OCI Streaming exposes a Kafka-compatible interface. The example builds a confluent_kafka.aio.AIOConsumer configured with SASL_SSL + PLAIN auth pointing at the streaming endpoint, wraps it via cocoindex.connectors.kafka.topic_as_stream(...).payloads() to get a LiveStream[bytes], and passes that to oci_object_storage.list_objects(..., live_stream=...). The connector then:
now - 5s) before the scan starts.eventTime predates the cutoff (the scan covers them).head_object to determine current state, then issues an authoritative update (object present) or delete (404) — event type is not trusted as a dispatch signal.See python/cocoindex/connectors/oci_object_storage/_source.py for details.