docs/src/content/docs/programming_guide/live_mode.mdx
By default, calling app.update() runs in catch-up mode: it scans all sources, processes what changed since the last run (memoized components are skipped, so unchanged work is not redone), syncs target states, and returns. Targets are caught up to the moment the run started, and that's it — to pick up further changes, you call update() again.
So catch-up mode is already incremental — but each call still has to scan sources to discover what changed, and changes are only picked up when you trigger a new run.
Live mode keeps the app running after catch-up finishes and lets components stream changes continuously from their sources (e.g. a file system watcher or a database change feed), applying them to target states with very low latency. This is useful when:
update() callTwo things are needed for live mode to work: the app must be enabled to stay running, and somewhere in the component tree a component must react to changes.
Pass live=True when updating the app:
app.update_blocking(live=True)
# Or async
handle = app.update(live=True)
await handle.result()
From the CLI:
cocoindex update --live my_app.py
# or
cocoindex update -L my_app.py
The live flag propagates top-down through the component tree — both coco.mount() and coco.use_mount() inherit live from the parent, so children are live when the app is live.
Without live=True on the app, the app runs in catch-up mode — everything completes after the initial scan, even if a source supports live watching.
Enabling live mode keeps the app running, but something in the component tree needs to actually watch for changes. That something is a LiveComponent — a component with a long-running process_live() method that delivers incremental updates.
You rarely need to write a LiveComponent manually. The two most common patterns are:
LiveMapView or LiveMapFeedSource connectors can provide live capabilities via two protocols:
LiveMapView — the source has scannable current state (e.g., a directory or database table). It does a full scan first, then watches for changes. Example: localfs.walk_dir(live=True).items().LiveMapFeed — the source only streams changes, with no snapshot to scan (e.g., a Kafka consumer). All data arrives via the change stream. Example: kafka.topic_as_map().When mount_each() receives either, it automatically creates a LiveComponent internally that:
CocoIndex handles change detection, memoization, and target state reconciliation the same way as in catch-up mode.
Without live support on the source, mount_each() falls back to catch-up behavior — a one-time iteration over items and that's it.
coco.auto_refreshWhen the source isn't change-aware but you still want fresh data — say, polling a REST endpoint or re-reading a database table that doesn't emit change events — wrap your processor function in coco.auto_refresh. It runs your function once, signals readiness, then re-runs it on a fixed delay:
import datetime
import cocoindex as coco
async def sync_users(db, target) -> None:
rows = await db.fetch_all_users()
for row in rows:
target.declare_row(row=UserRow(...))
@coco.fn
async def app_main(db, target) -> None:
await coco.mount(
coco.auto_refresh(sync_users, interval=datetime.timedelta(minutes=5)),
db, target,
)
app = coco.App(coco.AppConfig(name="UserSync"), app_main, db=..., target=...)
app.update_blocking(live=True)
Catch-up compatibility: in catch-up mode (the default), auto_refresh runs sync_users once and exits — observationally identical to mounting sync_users directly. The interval is ignored. Same pipeline, choose catch-up or live at run time.
Handling deletes: each cycle's declarations are reconciled against the previous run. If a row disappears from the source table between polls, sync_users simply doesn't declare it that cycle — CocoIndex automatically deletes the corresponding target. You don't need to track deletions yourself.
localfs — file watching with LiveMapViewThe localfs connector supports live mode via walk_dir(..., live=True), which watches for file system changes using watchfiles:
@coco.fn
async def app_main(sourcedir: pathlib.Path, outdir: pathlib.Path) -> None:
files = localfs.walk_dir(
sourcedir, recursive=True,
path_matcher=PatternFilePathMatcher(included_patterns=["**/*.md"]),
live=True, # items() returns a LiveMapView
)
await coco.mount_each(process_file, files.items(), outdir)
app = coco.App(coco.AppConfig(name="FilesTransform"), app_main, sourcedir=..., outdir=...)
app.update_blocking(live=True)
Catch-up compatibility: LiveMapView sources also work without live=True — they do the initial full scan and exit cleanly. You can write your pipeline once and choose catch-up or live at run time.
For a complete working example, see files_transform.
kafka — consuming a topic with LiveMapFeedThe kafka connector treats a topic as a live keyed map — each message is an upsert or delete for a key. Since there's no snapshot to scan, it returns a LiveMapFeed:
from confluent_kafka.aio import AIOConsumer
from cocoindex.connectors import kafka
@coco.fn
async def app_main() -> None:
consumer = AIOConsumer({
"bootstrap.servers": "localhost:9092",
"group.id": "my-group",
"enable.auto.commit": "false",
})
items = kafka.topic_as_map(consumer, ["my-topic"])
await coco.mount_each(process_message, items)
app = coco.App(coco.AppConfig(name="KafkaConsumer"), app_main)
app.update_blocking(live=True)
The abstractions behind live mode, from most general to most specific:
mount_each() uses it to construct a LiveComponent automatically. Connector authors implement this to add live support.coco.auto_refresh — wraps a regular processor function as a LiveComponent that re-runs on a fixed interval. Use when there's no change-event source.LiveMapView (e.g., localfs) or LiveMapFeed (e.g., kafka) from their source APIs. Users just pass the result to mount_each().For custom change feeds, fine-grained lifecycle control, or implementing live map protocols on your own connector, see Live Components.