examples/csv_to_kafka/README.md
Declare the topic as a target state; CocoIndex produces the upserts and deletes, never the no-ops.
</p> <p align="center"> <strong>Star us ❤️ →</strong> <a href="https://github.com/cocoindex-io/cocoindex" title="Star CocoIndex on GitHub"><picture><source media="(prefers-color-scheme: dark)" srcset="https://cocoindex.io/blobs/github/homepage/star-btn-small-dark.svg"><source media="(prefers-color-scheme: light)" srcset="https://cocoindex.io/blobs/github/homepage/star-btn-small-light.svg"></picture></a> · <a href="https://cocoindex.io/docs/examples/csv-to-kafka/" title="Read the full walkthrough"><picture><source media="(prefers-color-scheme: dark)" srcset="https://cocoindex.io/blobs/github/homepage/docs-inline-dark.svg"><source media="(prefers-color-scheme: light)" srcset="https://cocoindex.io/blobs/github/homepage/docs-inline-light.svg"></picture></a> · <a href="https://discord.com/invite/zpA9S2DR7s" title="Join the CocoIndex Discord"><picture><source media="(prefers-color-scheme: dark)" srcset="https://cocoindex.io/blobs/github/homepage/discord-inline-dark.svg"><source media="(prefers-color-scheme: light)" srcset="https://cocoindex.io/blobs/github/homepage/discord-inline-light.svg"></picture></a> </p> <div align="center"> </div>CSV is the format that shows up everywhere and gets respect nowhere — BI exports, vendor dumps, spreadsheets parked in a shared drive, dropped into a folder and edited at random with no schema contract. This pipeline turns a directory of them into a clean, row-keyed, diff-only Kafka stream. You declare the transformation in native Python — target_state = transformation(source_state) — and the Rust engine does the incremental processing underneath: it tracks what each row last looked like and produces a message only for rows that actually changed — no producer loop, no dedup bookkeeping.
The Kafka topic is just a target you declare on, the same way you'd declare a Postgres table. process_csv runs once per file: parse rows with csv.DictReader, then declare each row as a target state — key from the first column, value the JSON-encoded row. Read it in main.py:
@coco.fn(memo=True)
async def process_csv(file: FileLike, topic_target: kafka.KafkaTopicTarget) -> None:
reader = csv.DictReader(io.StringIO(await file.read_text()))
headers = reader.fieldnames
if not headers:
return
first_col = headers[0]
for row in reader:
key_value = row.get(first_col)
if key_value is not None:
topic_target.declare_target_state(key=key_value, value=json.dumps(row))
@coco.fn
async def app_main() -> None:
topic_target = await kafka.mount_kafka_topic_target(KAFKA_PRODUCER, KAFKA_TOPIC)
files = localfs.walk_dir(
localfs.FilePath(path="./data"),
path_matcher=PatternFilePathMatcher(included_patterns=["**/*.csv"]),
live=True, # watch for changes; pass -L to `cocoindex update` to run live
)
await coco.mount_each(process_csv, files.items(), topic_target)
The one line worth pausing on is declare_target_state — deliberately not produce(). You describe what the topic should be as a function of the source; CocoIndex turns the state transitions into wire messages. A new or changed key produces an upsert (k, v); a key that's no longer declared produces a delete (k, None); a key declared with the same value sends nothing.
Step-by-step walkthrough with the producer lifespan, the state-vs-messages model, live mode, and exactly which messages each change produces.
</p>live=True on walk_dir and -L on the CLI is the entire difference between a catch-up run and a streaming one — process_csv and the target don't change. No separate "streaming" code path.Needs a running Kafka broker. CocoIndex never creates topics — you create the one it produces into.
1. Start a broker & create the topic — a single-container Redpanda (Kafka-API compatible) is the quickest local broker:
docker run -d --name redpanda -p 9092:9092 redpandadata/redpanda:latest \
redpanda start --mode dev-container --smp 1 \
--kafka-addr PLAINTEXT://0.0.0.0:9092 --advertise-kafka-addr PLAINTEXT://localhost:9092
docker exec redpanda rpk topic create cocoindex-csv-rows
2. Configure & install:
cp .env.example .env # set KAFKA_BOOTSTRAP_SERVERS / KAFKA_TOPIC (+ SASL creds for a managed broker)
pip install -e .
3. Run the pipeline — the example ships a data/ folder of sample CSVs. Choose catch-up (scan, sync, exit) or live (catch up, then keep watching ./data):
# Catch-up run: reconcile the topic up to now, then exit
cocoindex update main.py
# Live run: catch up, then produce on every change
cocoindex update -L main.py
4. Look at the topic — keys are each row's first column, values the JSON-encoded rows:
docker exec redpanda rpk topic consume cocoindex-csv-rows --num 10
Edit a cell in data/products.csv while live mode runs, and a new message with the same key appears within a second. The consumer side — kafka_to_lancedb — reads these messages back off the topic and dispatches them into LanceDB tables.
<a href="https://cocoindex.io/docs">Docs</a> · <a href="https://cocoindex.io/docs/examples/csv-to-kafka/">Walkthrough</a> · <a href="https://discord.com/invite/zpA9S2DR7s">Discord</a> · <a href="https://github.com/cocoindex-io/cocoindex/tree/main/examples"><b>See all examples →</b></a>
</p>