usecases/export/ARCHITECTURE.md
The export feature produces point-in-time snapshots of Weaviate collections as Parquet files on an external storage backend (S3, GCS, Azure). It uses a 2PC-like protocol (Prepare/Commit, but without the atomicity guarantees of a true two-phase commit) across the cluster so that every participating node anchors its snapshot in the same coordinated window, then scans and uploads independently.
+-----------+
| Client |
+-----+-----+
|
POST /v1/export/{backend}
|
+-----v-----+
| Scheduler | (coordinator node)
+-----+-----+
|
Phase 1: Prepare (all nodes)
|
Phase 2: Commit (all nodes)
|
+---------------+---------------+
| |
+---------+---------+ +---------+---------+
| Participant | | Participant |
| node-1 | | node-2 |
| | | |
| Prepare: | | Prepare: |
| snapshot shards | | snapshot shards |
| Commit: | | Commit: |
| scan -> parquet | | scan -> parquet |
| upload | | upload |
+-------------------+ +-------------------+
The protocol borrows the Prepare/Commit shape of two-phase commit but intentionally does not provide true 2PC atomicity. The goal of the two phases is narrower: strongly guarantee that only one export runs at a time across the cluster (each node reserves a single-export slot during Prepare via CAS, and Commit only proceeds once all nodes have confirmed their reservation). Exports are idempotent and produce no side effects on failure, so best-effort abort is sufficient: if a node fails after Commit, the remaining nodes cancel and the user re-runs the export.
The coordinator (Scheduler, on the node that receives the HTTP request) drives the protocol. All persistent state lives on the backend (metadata JSON, per-node status files, Parquet files). The coordinator is stateless after the initial response.
Phase 1 -- Prepare. The coordinator validates the request, resolves classes, and determines shard ownership (one replica per shard, least-loaded strategy). It then sends Prepare to all nodes concurrently. Each participant reserves an export slot (one active export at a time via CAS), starts snapshotting all assigned shards in a background goroutine, and arms a 30 s auto-abort timer. Prepare returns immediately; the snapshot may still be in flight.
Phase 2 -- Commit. Sent to all nodes concurrently. Each participant waits
for its snapshot goroutine (<-pending.done), initializes the backend (e.g.
S3 client verification), then launches the scan+upload in a background
goroutine. The Export() HTTP response is sent only after all nodes have been
committed; scanning continues in background goroutines.
Abort. Any failure during Prepare or Commit triggers abortAll(): a
best-effort sweep using a fresh 15 s context. User-initiated cancel
(DELETE /v1/export/{id}) follows the same path and writes CANCELED
metadata. The 30 s auto-abort timer prevents leaked reservations if the
coordinator crashes between phases.
Snapshots use hard-linked LSM segment files so the long-running scan never
holds locks and concurrent writes continue unblocked. WAL files are the
exception: they are copied (not hard-linked) because they are mutable —
if the shard loads after the snapshot is taken, the original WAL could be
modified, which would corrupt a hard-linked snapshot. The copy is atomic
(temporary file + rename) so a crash never leaves a partial WAL. See the
godoc in adapters/repos/db/lsmkv/bucket_snapshot.go for the full mechanism
(flush cycle pausing, hard-link safety, snapshot lifecycle, and cleanup).
Unloaded shards (cold tenants) are snapshotted directly from disk: immutable
files (segments, bloom filters, count-net-additions) are hard-linked, and WAL
files are copied. Offloaded/frozen tenants are skipped with a SkipReason.
Each shard snapshot is split into key ranges scanned by a worker pool
(GOMAXPROCS * 2 workers), producing one Parquet file per range.
Range computation: computeRanges uses QuantileKeys to split the key
space. Range count is bounded between count / maxObjectsPerRange and
count / minObjectsPerRange (50K--500K objects, targeting ~2--3 GB files after
Zstd compression).
Per-range pipeline: Each scanJob creates an io.Pipe connecting a
ParquetWriter (10K-row buffer, Zstd, 8 MB page buffer) to a backend upload
goroutine. The scan seeks to its start key with a bucket cursor, deserializes
each object via storobj.ExportFieldsFromBinary, and writes a ParquetRow.
Cleanup goroutines shut down snapshot buckets and remove directories after all
ranges of a shard complete.
File naming: {className}_{shardName}_{rangeIndex:04d}.parquet. Collection
and tenant names are stored as file-level Parquet metadata, not as row columns.
{homeDir}/
export_metadata.json # source of truth
node_{nodeName}_status.json # per node, written every 10 s
{className}_{shardName}_{rangeIndex:04d}.parquet
Export-level: STARTED → TRANSFERRING (live only, never persisted) →
SUCCESS / FAILED / CANCELED.
Shard-level: TRANSFERRING, SUCCESS, FAILED, SKIPPED.
Lock-free: ParquetWriter.Flush → onFlush callback →
ShardProgress.objectsWritten.Add (atomic). The status writer goroutine
(every 10 s) calls SyncAndSnapshot() to copy atomics into the JSON-visible
field and writes the result to the backend.
Scheduler.Status() and Participant.tryPromoteMetadata() both read per-node
status files, check liveness, and promote metadata to a terminal state if all
nodes are done. Both use the same logic so the race is benign -- last writer
produces the same result.
| Primitive | Purpose |
|---|---|
mu (Mutex) | Guards prepared state, abort timer, cancel func, pending snapshot, active slot |
activeExport | One-export-at-a-time slot (CAS in Prepare, cleared in clearAndRelease) |
pending.done (channel) | Snapshot goroutine → Commit handoff |
exportWg (WaitGroup) | Graceful shutdown waits for in-flight exports |
ShardProgress.objectsWritten (atomic) | Lock-free progress counter between scan workers and status writer |
Context hierarchy:
shutdownCtx → snapshotCtx (per Prepare) and shutdownCtx → exportCtx
(per Commit, canceled on Abort or sibling failure).
Sibling health: Two background goroutines per export -- status flush
(every 10 s) and sibling check (every 60 s, reads status files with
IsRunning RPC fallback). On sibling failure: cancel local export, best-effort
abort to remaining siblings.
Startup (configure_api.go): Create Participant → register cluster API
routes → create Scheduler → register REST handlers.
Shutdown: Scheduler.StartShutdown rejects new exports, then
Participant.StartShutdown cancels shutdownCtx, then Participant.Shutdown
waits on exportWg. In-flight exports detect cancellation on the next cursor
step and exit; metadata promotion still runs before the goroutine returns.
| File | Role |
|---|---|
usecases/export/scheduler.go | 2PC-like coordinator, metadata I/O, status assembly |
usecases/export/participant.go | Per-node: slot, snapshot, scan orchestration, sibling monitoring |
usecases/export/parallel_scan.go | Key ranges, scan jobs, range writer pipeline |
usecases/export/parquet_writer.go | Batched Parquet writing with Zstd |
usecases/export/types.go | Request/response types, Selector and BackendProvider interfaces |
usecases/export/transport.go | ExportClient and NodeResolver interfaces |
entities/export/status.go | Status and shard-status enums |
adapters/repos/db/export.go | DB.SnapshotShards, shard ownership, snapshot dispatch |
adapters/repos/db/lsmkv/bucket_snapshot.go | CreateSnapshot, NewSnapshotBucket, hard-link logic |