docs/plans/2026-03-07-projections-v2-engine-design.md
Date: 2026-03-07 Status: Proposed
A new projections processing engine that replaces the current complex pipeline with simpler, modern infrastructure already available in KurrentDB: filtered read-all enumerators, secondary indexes (DuckDB), and multi-stream atomic appends.
The new engine eliminates multi-stream readers, event reordering, per-stream emitted-stream tracking, and complex idempotency checks. It introduces partitioned parallel processing with Chandy-Lamport checkpointing and atomic output writes.
IProjectionStateHandler / Jint)Three read strategies replace the current reader hierarchy (TransactionFileEventReader, MultiStreamEventReader, EventByTypeIndexEventReader, StreamEventReader):
Used by fromAll(), fromCategory(), fromEventType().
Uses the existing Enumerator.AllSubscriptionFiltered with the appropriate IEventFilter:
fromAll() → DefaultAllFilter or user-specified filterfromCategory(name) → StreamName.Prefixes("name-")fromEventType(type) → EventType.Prefixes("type")Checkpoints by log position (TFPos). Delivers events in log order with catch-up and live tailing phases.
Used by fromStream('stream-name').
Reads from the primary index via direct stream read — no log scan needed. This is the most efficient path for single-stream projections.
Enhancement required: Stream reads currently don't support starting from a log position. This needs to be added so that checkpointing remains uniform across all strategies. The DuckDB default index ($idx-all) can resolve between stream event number and log position.
Checkpoints by log position.
Used by fromStreams(['stream-a', 'stream-b', ...]).
Queries the DuckDB default index with:
SELECT * FROM "$idx-all"
WHERE stream IN ('stream-a', 'stream-b', ...)
AND log_position > @checkpoint
ORDER BY log_position
Returns events in log order naturally. Checkpoints by log position.
Enhancement required: A new IEventFilter implementation for stream-name-set matching, backed by DuckDB queries.
All three strategies produce events ordered by log position and checkpoint using log position. This gives a single downstream interface regardless of source configuration.
Reader (Strategy A/B/C)
│
│ events in log-position order
▼
Dispatcher (consistent hash of partitionBy key)
│
├──► Partition Channel 0 ──► Process ──► Output Buffer 0
├──► Partition Channel 1 ──► Process ──► Output Buffer 1
├──► Partition Channel 2 ──► Process ──► Output Buffer 2
...
└──► Partition Channel N-1 ──► Process ──► Output Buffer N-1
│
▼
Checkpoint Coordinator (Chandy-Lamport markers)
│
▼
Multi-Stream Atomic Append (checkpoint + emitted events + state)
The dispatcher receives events from the reader and routes them to partition channels:
partitionBy(event) to get the partition key (or use stream name as default)hash(partitionKey) % N to select the target partition channelSystem.Threading.Channel<T>Events for the same partition key always go to the same partition, preserving per-key ordering.
Each partition runs a dedicated async task that:
IProjectionStateHandler.ProcessEvent() (existing Jint runtime, unchanged)The partition count is configurable per projection (default TBD, e.g., 4). The existing IProjectionStateHandler and Jint JavaScript runtime are reused without modification.
Bounded channel capacity on partition channels provides natural backpressure. If partitions are slow, the dispatcher blocks on channel.Writer.WriteAsync(), which in turn pauses the reader. No explicit backpressure signaling needed.
The filtered $all enumerator delivers events in log order. Within a partition, events for the same key are processed in order. Cross-partition ordering is not required because partitions process independent keys. This eliminates the current StagedProcessingQueue and its complex ordering logic.
The processing topology is a simple single-producer (reader/dispatcher) to N-consumer (partitions) fan-out. The Chandy-Lamport algorithm provides consistent distributed snapshots without stalling the pipeline.
Trigger: Checkpoint interval reached (configurable by byte threshold, event count, or time interval).
Inject markers: The dispatcher injects a CheckpointMarker(sequenceNumber, logPosition) into each of the N partition channels. The logPosition is the position of the last event dispatched before the marker.
Partition snapshot: When a partition processes the marker, it:
Coordinator collects: Once all N partitions have reported for the same marker sequence number, the coordinator has a consistent snapshot.
Atomic write: The coordinator issues a single multi-stream append (ClientMessage.WriteEvents with multiple EventStreamIds) containing:
$projections-{name}-checkpoint stream (with log position)$projections-{name}-{partitionKey}-result streamsCompletion: On successful write, the frozen buffers are released.
The alternative — pausing all partitions, collecting state, writing, then resuming — creates unnecessary pipeline stalls. Chandy-Lamport markers flow through the same channels as events, so each partition reaches the snapshot point naturally without coordinated pauses. The simple topology (single source → N channels) makes the algorithm straightforward: only forward-channel markers are needed, no cross-partition marker propagation.
emit(streamName, eventType, data) → writes events to target streamslinkTo(streamName, event) → writes link events to target streams$projections-{name}-{partitionKey}-result streamsResult and ResultRemoved event types in state streams$projections-{name}-partitions for every new partition discovered. Eliminated.$projections-{name}-result with links to every result event. Eliminated.linkTo() creates link events.This significantly reduces write amplification. If partition discovery is needed, it can be served by querying the DuckDB category index on result streams.
All output from a checkpoint interval is written in a single multi-stream append:
WriteEvents {
EventStreamIds: [
"$projections-{name}-checkpoint",
"target-stream-1",
"target-stream-2",
"$projections-{name}-partitionA-result",
"$projections-{name}-partitionB-result",
...
],
ExpectedVersions: [...],
Events: [checkpointEvent, emittedEvent1, emittedEvent2, stateA, stateB, ...],
EventStreamIndexes: [0, 1, 1, 2, 3, ...]
}
This guarantees all-or-nothing semantics: either the entire checkpoint (with all emitted events and state) is committed, or nothing is.
Each partition maintains an LRU cache of partition states for its assigned keys. Cache sizing is configurable. On cache miss, state is loaded from the partition's result stream.
Dirty partition states are included in the atomic checkpoint write. State is written to $projections-{name}-{partitionKey}-result streams as Result events.
On startup or after crash:
$projections-{name}-checkpoint to get the log positionThe atomic checkpoint write guarantees that if the checkpoint exists, all associated emitted events and state updates also exist. If the write was interrupted, the checkpoint doesn't exist, and we replay from the previous one.
ProjectionManager (existing, unchanged)
│
├──► ManagedProjection (existing state machine)
│ │
│ ├──► [engine=v1] ──► CoreProjection (existing pipeline)
│ │
│ └──► [engine=v2] ──► ProjectionEngineV2 (new)
│
└──► Management HTTP/gRPC endpoints (unchanged)
engine: v1 | v2) controls which engine processes the projectionv2v1 until explicitly migratedManagedProjection state machine is reused with minimal changes (routing logic added)ProjectionManager and management endpoints remain unchangedA new service class that:
ManagedProjectionManagedProjection (running, faulted, stopped, etc.)System projections ($by_category, $by_event_type, $stream_by_category, $streams) are not handled by the new engine. DuckDB secondary indexes provide equivalent functionality and are the intended replacement. System projections remain on the v1 engine during the transition period and will be deprecated.
A JavaScript error in any partition faults the entire projection. The projection enters the Faulted state with the error details. The user can inspect the error and reset/restart the projection. This matches current behavior.
The atomic checkpoint guarantees consistency. On crash:
No WAL, no replay log, no idempotency checks needed.
IExpiryStrategy (default 7-second timeout with retry) for read operationsThe following current-engine components are not needed in v2:
| Current Component | Replacement |
|---|---|
EventReaderCoreService | Read strategies using existing enumerators |
TransactionFileEventReader | Enumerator.AllSubscriptionFiltered |
MultiStreamEventReader | DuckDB stream-name-set filter |
EventByTypeIndexEventReader | Enumerator.AllSubscriptionFiltered with event type filter |
StreamEventReader | Direct stream read (enhanced with log position start) |
HeadingEventReader | Not needed (enumerators handle live tailing) |
StagedProcessingQueue | Partitioned channels (no reordering needed) |
CommittedEventWorkItem (5-stage pipeline) | Simple sequential per-partition processing |
ProjectionCheckpoint (per-stream tracking) | Atomic multi-stream append |
EmittedStream / EmittedStreamsWriter | Output buffers + multi-stream append |
ResultEventEmitter (link events) | Direct result events only |
ReaderSubscriptionBase | Not needed (no subscription-to-reader indirection) |
EventProcessingProjectionProcessingPhase | ProjectionEngineV2 processing loop |
Stream read from log position: Enable ReadStreamEventsForward to accept a starting log position instead of only stream event number. Use DuckDB index for resolution.
Stream-name-set EventFilter: New IEventFilter implementation that matches events whose stream name is in a specified set. Backed by DuckDB query on $idx-all.
ManagedProjection routing: Add engine version config flag and routing logic to delegate to v1 (CoreProjection) or v2 (ProjectionEngineV2).
ProjectionEngineV2 alongside existing engine. New projections default to v2. Existing projections stay on v1.