examples/rust/csv_to_kafka/README.md
Rust port of the Python csv_to_kafka example.
It reads local CSV files, converts each row to a JSON object (using the header
row as keys), and publishes one Kafka message per row through CocoIndex's
declarative KafkaTopicTarget. Each message is keyed by the first CSV column,
matching the Python example.
| Concern | Python | Rust (this example) |
|---|---|---|
| Target | kafka.mount_kafka_topic_target(...) | kafka::mount_kafka_topic_target(...) |
| Per-file compute | @coco.fn(memo=True) process_csv | #[cocoindex::function(memo)] process_csv |
| Declare a message | topic_target.declare_target_state(key, val) | target.declare_message(ctx, key, val) |
| Kafka client | confluent_kafka (librdkafka) | rskafka (pure Rust, no C deps) |
process_csv is memoized — a CSV file whose content is unchanged is not
re-parsed.KafkaTopicTarget only produces a message when its value
changed since the last run, and produces a tombstone (a record with a null
value) for a row that disappeared from the source.Both the topic and the partitioning are user-managed: like the Python connector,
CocoIndex never creates or drops topics during reconciliation (this example
calls ensure_topic explicitly), and messages are produced to partition 0.
Start a Kafka or Redpanda broker on localhost:9092, then:
export KAFKA_BOOTSTRAP_SERVERS=localhost:9092 # default
export KAFKA_TOPIC=cocoindex-csv-rows # default
# Read ./data/*.csv and produce a message per row (incremental on re-run)
cargo run -- index
# Print all messages currently on the topic
cargo run -- consume
Edit a row in data/products.csv and re-run cargo run -- index: only the
changed row is re-produced. Delete a row and re-run: a tombstone is produced for
it.
Note: unlike the Python example (
live=Truecontinuous watch), this runs a single pass perindexinvocation — the Rust SDK'sfs::walkis one-shot.