docs/projections-core-architecture.md
This document describes the internal architecture of the projections subsystem in KurrentDB, covering management endpoints, event processing flow, reader infrastructure, and event emission.
Defined in src/KurrentDB.Projections.Core/Services/Http/ProjectionsController.cs. All routes are registered in SubscribeCore():
| Method | Route | Operation |
|---|---|---|
| GET | /projections | List all projections |
| POST | /projections/restart | Restart projection subsystem |
| GET | /projections/any | List all projections (any mode) |
| GET | /projections/all-non-transient | List non-transient projections |
| GET | /projections/transient | List transient projections |
| GET | /projections/onetime | List one-time projections |
| GET | /projections/continuous | List continuous projections |
| POST | /projections/transient | Create transient projection |
| POST | /projections/onetime | Create one-time projection |
| POST | /projections/continuous | Create continuous projection |
| GET | /projection/{name}/query | Get projection query/definition |
| PUT | /projection/{name}/query | Update projection query |
| GET | /projection/{name} | Get projection status |
| DELETE | /projection/{name} | Delete projection |
| GET | /projection/{name}/statistics | Get projection statistics |
| GET | /projection/{name}/state | Get projection state (partition optional) |
| GET | /projection/{name}/result | Get projection result (partition optional) |
| POST | /projection/{name}/command/disable | Disable projection |
| POST | /projection/{name}/command/enable | Enable projection |
| POST | /projection/{name}/command/reset | Reset projection |
| POST | /projection/{name}/command/abort | Abort projection |
| POST | /projection/{name}/config | Update projection config |
| GET | /projection/{name}/config | Get projection config |
Defined across partial classes in src/KurrentDB.Projections.Core/Services/Grpc/ProjectionManagement.*.cs:
Create — Create a new projectionUpdate — Update projection query/configDelete — Delete a projectionEnable — Enable a projectionDisable — Disable a projectionReset — Reset a projectionRestartSubsystem — Restart the projection subsystemResult — Get projection resultStatistics — Get projection statisticsProto definition: src/Protos/Grpc/projections.proto. The Statistics RPC is server-streaming (stream StatisticsResp), all others are unary. The Disable RPC supports a write_checkpoint option.
The gRPC service (ProjectionManagement) publishes ProjectionManagementMessage.Command.* messages onto the internal bus, the same messages used by the HTTP controller.
All defined in src/KurrentDB.Projections.Core/Messages/ProjectionManagementMessage.cs:
Command messages (requests from API):
Command.Post — Create a new projection (with mode, name, handler type, query, emit settings)Command.PostBatch — Create multiple projections atomicallyCommand.UpdateQuery — Update query text and emit settingsCommand.Delete — Delete with options for checkpoint/state/emitted stream cleanupCommand.Enable / Command.Disable / Command.Abort / Command.ResetCommand.GetStatistics / Command.GetState / Command.GetResult / Command.GetQueryCommand.GetConfig / Command.UpdateConfigResponse messages:
Updated, Statistics, ProjectionState, ProjectionResult, ProjectionQuery, ProjectionConfigOperationFailed, NotFound, NotAuthorized, Conflict┌─────────────────────┐ ┌──────────────────────┐
│ HTTP Controller │ │ gRPC Service │
│ ProjectionsController│ │ ProjectionManagement │
└────────┬────────────┘ └────────┬─────────────┘
│ │
│ ProjectionManagementMessage.Command.*
▼ ▼
┌─────────────────────────────────────────────────┐
│ ProjectionManager │
│ (Services/Management/ProjectionManager.cs) │
│ │
│ - Handles all Command.* messages │
│ - Maintains Dictionary<string, ManagedProjection>│
│ - Persists state to $projections-$master stream │
│ - Routes to appropriate ManagedProjection │
└─────────────────────┬───────────────────────────┘
│
▼
┌─────────────────────────────────────────────────┐
│ ManagedProjection │
│ (Services/Management/ManagedProjection.cs) │
│ │
│ - State machine per projection │
│ - Persists config to $projections-{name} stream │
│ - Publishes CoreProjectionManagementMessage.* │
│ to ProjectionCoreService via worker queues │
└─────────────────────┬───────────────────────────┘
│
▼
┌─────────────────────────────────────────────────┐
│ ProjectionCoreService │
│ (Services/Processing/ProjectionCoreService.cs) │
│ │
│ - Creates/manages CoreProjection instances │
│ - Handles CreateAndPrepare, Start, Stop, Kill │
│ - Maintains Dictionary<Guid, CoreProjection> │
└─────────────────────────────────────────────────┘
States defined in ManagedProjectionState enum:
Creating → Loading → Loaded → Preparing → Prepared → Starting → Running
↓
Stopping → Stopped
Aborting → Aborted
→ Completed
→ Faulted
→ Deleting
LoadingStopped
Each state has a corresponding handler class in Services/Management/ManagedProjectionStates/:
CreatingLoadingLoadedState — Handles initial loading from persisted statePreparingState / PreparedState — Projection definition compiled, waiting to startStartingState — Sending start command to coreRunningState — Actively processing eventsStoppingState / StoppedState — Graceful shutdownAbortingState / AbortedState — Forced shutdownCompletedState — One-time/query projection finishedFaultedState — Error occurredDeletingState — Being removedConfigurable via Command.UpdateConfig or at creation time:
| Parameter | Description | Default |
|---|---|---|
EmitEnabled | Allow event emission | false |
TrackEmittedStreams | Record emitted stream names for cleanup | false |
CheckpointAfterMs | Min time between checkpoints (ms) | from ProjectionConsts |
CheckpointHandledThreshold | Checkpoint after N events handled | from ProjectionConsts |
CheckpointUnhandledBytesThreshold | Checkpoint after N bytes unhandled | from ProjectionConsts |
PendingEventsThreshold | Max pending events before backpressure | from ProjectionConsts |
MaxWriteBatchLength | Max events per write batch | from ProjectionConsts |
MaxAllowedWritesInFlight | Max concurrent write operations | from ProjectionConsts |
ProjectionExecutionTimeout | JS execution timeout (ms) | from config |
The ProjectionManager distributes projections across worker queues (_queues). Each worker queue runs its own ProjectionCoreService and EventReaderCoreService. The ProjectionManagerMessageDispatcher routes CoreProjectionStatusMessage.* responses back to the ProjectionManager from worker threads.
┌────────────────────┐
│ Event Store │
│ (TF / Streams) │
└────────┬───────────┘
│ Read operations
▼
┌────────────────────┐
│ EventReader │
│ (Stream/TF/Multi/ │
│ EventByType) │
└────────┬───────────┘
│ ReaderSubscriptionMessage
│ .CommittedEventDistributed
▼
┌────────────────────┐
│EventReaderCoreService│
│ routes to │
│ IReaderSubscription │
└────────┬───────────┘
│
▼
┌────────────────────┐
│ ReaderSubscription │
│ (filtering, tagging,│
│ checkpoint suggest)│
└────────┬───────────┘
│ EventReaderSubscriptionMessage
│ .CommittedEventReceived
▼
┌────────────────────────────┐
│ ReaderSubscriptionDispatcher│
│ (routes by subscriptionId) │
└────────┬───────────────────┘
│
▼
┌──────────────────────────────────┐
│EventSubscriptionBasedProjection │
│ProcessingPhase │
│(EventProcessingProjectionPhase) │
│ │
│ → Creates CommittedEventWorkItem │
│ → Enqueues to CoreProjectionQueue │
└──────────────┬───────────────────┘
│
▼
┌──────────────────────────────────┐
│ StagedProcessingQueue │
│ (multi-stage work items) │
│ │
│ Stage 0: RecordEventOrder │
│ Stage 1: GetStatePartition │
│ Stage 2: Load (partition state) │
│ Stage 3: ProcessEvent │
│ Stage 4: WriteOutput │
└──────────────┬───────────────────┘
│
▼
┌──────────────────────────────────┐
│ IProjectionStateHandler │
│ (JS runtime / native handler) │
│ │
│ → ProcessEvent() returns: │
│ - newState │
│ - emittedEvents[] │
│ - projectionResult │
└──────────────────────────────────┘
CoreProjection (Services/Processing/CoreProjection.cs) manages the lifecycle of a single running projection instance:
Initial → LoadStateRequested → StateLoaded → Subscribed → Running
│
┌─────────────┼──────────────┐
▼ ▼ ▼
Stopping FaultedStopping CompletingPhase
▼ ▼ ▼
Stopped Faulted PhaseCompleted
│
(next phase or Stop)
Key transitions:
CheckpointReader.BeginLoadState() reads from the checkpoint streamBeginPhase() initializes the processing phaseProcessingPhase.ProcessEvent() is called via tick mechanismDefined in Services/Processing/Phases/:
EventProcessingProjectionProcessingPhase — The main phase that processes events through the state handler. Handles CommittedEventReceived messages and creates CommittedEventWorkItem instances.
WriteQueryEofProjectionProcessingPhase — Writes final results when a query reaches EOF.
WriteQueryResultProjectionProcessingPhase — Writes intermediate/final results for queries.
Phase state enum (PhaseState): Unknown, Stopped, Starting, Running
Subscription state enum (PhaseSubscriptionState): Unknown, Unsubscribed, Subscribing, Subscribed, Failed
ProcessingStrategySelector (Services/Processing/Strategies/ProcessingStrategySelector.cs) creates the appropriate strategy:
QueryProcessingStrategy — For one-time/transient projections (StopOnEof = true). Creates two phases: event processing + write EOF results.ContinuousProjectionProcessingStrategy — For continuous projections. Creates a single EventProcessingProjectionProcessingPhase that runs indefinitely.Both extend EventReaderBasedProjectionProcessingStrategy → DefaultProjectionProcessingStrategy → ProjectionProcessingStrategy.
The CommittedEventWorkItem (Services/Processing/WorkItems/CommittedEventWorkItem.cs) processes through these stages in the StagedProcessingQueue:
StatePartitionSelector.GetStatePartition() to determine which partition this event belongs to (e.g., stream name for foreachStream)BeginGetPartitionStateAt()IProjectionStateHandler.ProcessEvent() (JS runtime) which returns new state and emitted eventsFinalizeEventProcessing() which writes results, accounts partitions, and emits events through the ResultWriterCoreProjection.EnsureTickPending() publishes a ProjectionCoreServiceMessage.CoreTick that triggers CoreProjection.Tick(). This calls ProcessingPhase.ProcessEvent() which drains the StagedProcessingQueue. The tick mechanism ensures cooperative multitasking — projections yield control between work items, preventing any single projection from starving others.
The CoreProjectionQueue implements backpressure between readers and processing:
PendingEventsThreshold, a ReaderSubscriptionManagement.Pause message is publishedEventReaderCoreService pauses the reader (or forks a new paused reader if currently on the heading reader)ReaderSubscriptionManagement.Resume restarts readingThe PartitionStateCache provides in-memory caching of partition states with locking:
CheckpointCompleted triggers PartitionStateCache.Unlock(tag), allowing eviction of states older than the checkpoint positionRequiresRootPartition = true maintain a root state (key "") that is serialized with each checkpoint and available to all partitionsIProjectionStateHandler.LoadShared() / InitializeShared()Failure sources:
EventReaderSubscriptionMessage.Failed → propagated to phase → CoreProjection.SetFaulted()SafeProcessEventByHandler() → SetFaulting() with detailed error messageWrongExpectedVersion → CoreProjectionProcessingMessage.RestartRequestedRestart sequence (on RestartRequested):
EnsureUnsubscribed() — Tears down reader subscriptionGoToState(Initial) — Reinitializes caches and trackersStart() — Reloads checkpoint and resumes from last saved positionFaulted vs FaultedStopping: SetFaulting() transitions to FaultedStopping (waits for checkpoint) then Faulted. SetFaulted() goes directly to Faulted.
EventReaderCoreService (Services/Processing/EventReaderCoreService.cs) manages the lifecycle of all event readers for a worker thread:
Dictionary<Guid, IReaderSubscription> (subscriptions)Dictionary<Guid, IEventReader> (readers)_subscriptionEventReaders / _eventReaderSubscriptionsHeadingEventReader for efficient live-tail readingKey message handlers:
ReaderSubscriptionManagement.Subscribe — Creates a subscription + reader pairReaderSubscriptionManagement.Unsubscribe — Disposes reader, removes subscriptionReaderSubscriptionManagement.Pause / Resume — Pauses/resumes readingReaderSubscriptionMessage.CommittedEventDistributed — Routes events from readers to subscriptionsReaderSubscriptionMessage.EventReaderEof — Handles end-of-streamReaderSubscriptionMessage.EventReaderNotAuthorized — Handles auth failuresAll readers implement IEventReader and are created by ReaderStrategy.CreatePausedEventReader():
Source: Services/Processing/TransactionFile/TransactionFileEventReader.cs
$all stream)fromAll() is specified with no event type filterClientMessage.ReadAllEventsForward internal messagesSource: Services/Processing/SingleStream/StreamEventReader.cs
fromStream('name') or single-category fromCategory('name') (reads $ce-{name})ClientMessage.ReadStreamEventsForward internal messages$maxAge/$maxCount trimmingEventReaderPartitionDeleted notifications when configuredSource: Services/Processing/MultiStream/MultiStreamEventReader.cs
fromStreams(['a', 'b', 'c'])Source: Services/Processing/EventByType/EventByTypeIndexEventReader.cs
$et-{eventType})fromAll().when({EventType: ...}) with specific event type filters$et- streams) and TfBased (falls back to TF when index is behind)$deleted event type for stream deletion notificationsSource: Services/Processing/TransactionFile/HeadingEventReader.cs
eventCacheSize)ReaderStrategy.Create() in Services/Processing/Strategies/ReaderStrategy.cs selects the reader type based on the projection source definition:
if (allStreams && specific eventTypes) → EventByTypeIndexEventReader
if (allStreams) → TransactionFileEventReader
if (single stream) → StreamEventReader
if (single category) → StreamEventReader (on $ce-{category})
if (multiple streams) → MultiStreamEventReader
Each reader strategy creates matching filter and tagger pairs:
| Source | EventFilter | PositionTagger |
|---|---|---|
fromAll() + event types | EventByTypeIndexEventFilter | EventByTypeIndexPositionTagger |
fromAll() | TransactionFileEventFilter | TransactionFilePositionTagger |
fromStream(s) | StreamEventFilter | StreamPositionTagger |
fromCategory(c) | CategoryEventFilter | StreamPositionTagger (on $ce-c) |
fromStreams([...]) | MultiStreamEventFilter | MultiStreamPositionTagger |
EventFilter decides which events pass through to the projection (by event type, stream, link resolution).
PositionTagger creates CheckpointTag values that track the reader's position for checkpointing and resumption.
CheckpointTag modes (determined by tagger type):
Position — TF commit/prepare position (for $all reading)Stream — Single stream sequence numberMultiStream — Per-stream sequence number dictionaryEventTypeIndex — TF position + per-event-type stream sequence numbersPreparePosition — Prepare position only (for event reordering with lag)Phase — Multi-phase projection phase numberReaderSubscription (Services/Processing/Subscriptions/ReaderSubscription.cs) sits between the reader and the projection phase:
CommittedEventDistributed from the readerPassesSource, Passes)EventReaderSubscriptionMessage.CommittedEventReceivedThe EventReorderingReaderSubscription variant adds event reordering with a configurable processing lag for multi-stream sources.
EventReader EventReaderCoreService ReaderSubscription
│ │ │
│──CommittedEventDistributed──────►│ │
│ │──CommittedEventDistributed──►│
│ │ │──filter + tag──┐
│ │ │◄───────────────┘
│ │ │
│ │ CommittedEventReceived │
│ │◄─────────────────────────────│
│ │ │
│ (via ReaderSubscriptionDispatcher) │
│ │ │
│ ▼ │
│ EventProcessingProjection │
│ ProcessingPhase │
Projections produce events through two mechanisms:
emit() / linkTo() / linkStreamTo() / copyTo() — Called from JavaScript projection handlers, producing EmittedEventEnvelope[] returned from IProjectionStateHandler.ProcessEvent()ResultWriter emits Result or ResultRemoved events to result streamsThe JintProjectionStateHandler (Services/Interpreted/JintProjectionStateHandler.cs) executes user-defined JavaScript projections using the Jint engine. It registers global functions available to projection code:
// Available in projection JavaScript:
emit(streamId, eventType, eventBody, metadata?) // → EmittedDataEvent
linkTo(streamId, event, metadata?) // → EmittedDataEvent with $> type
linkStreamTo(streamId, linkedStreamId, metadata?) // → EmittedDataEvent linking streams
copyTo(streamId, event, metadata?) // → EmittedDataEvent copying event
emit() creates an EmittedDataEvent with the provided data and adds it to the _emitted listlinkTo() creates an EmittedDataEvent with type $> and data "{sequenceNumber}@{streamId}" (the standard link format). Uses a two-phase callback: the data event's OnCommitted sets the link's target event numberProcessEvent() and returned as EmittedEventEnvelope[]The handler also supports ProcessPartitionCreated() which can emit events when a new partition is first seen.
Defined in Services/Processing/Emitting/EmittedEvents/:
EmittedDataEvent — A regular data event written to a target stream (from emit())EmittedLinkTo — A $> link event pointing to another event (from linkTo())EmittedLinkToWithRecategorization — A link with category rewriting (used by $by_category)EmittedEventEnvelope — Wraps an EmittedEvent with stream metadataEach emitted event carries:
StreamIdCausedByTag (CheckpointTag of the source event)SetTargetEventNumber callback for cross-referencingIProjectionStateHandler.ProcessEvent()
│
│ returns EmittedEventEnvelope[]
▼
CommittedEventWorkItem.WriteOutput()
│
│ calls FinalizeEventProcessing()
▼
┌──────────────────────────────────────┐
│ ResultWriter │
│ (Services/Processing/Strategies/ │
│ ResultWriter.cs) │
│ │
│ WriteRunningResult() ─── if state │
│ │ changed, emits Result events │
│ │ │
│ EventsEmitted() ─── user emit() │
│ │ calls, forwarded to checkpoint │
│ │ │
│ AccountPartition() ─── registers │
│ new partitions in catalog stream │
└───────────────┬──────────────────────┘
│
│ calls IEmittedEventWriter.EventsEmitted()
▼
┌──────────────────────────────────────┐
│ ICoreProjectionCheckpointManager │
│ (ProjectionCheckpoint as │
│ IEmittedEventWriter) │
│ │
│ ValidateOrderAndEmitEvents() │
│ │ │
│ Groups events by target stream │
│ Creates EmittedStream per target │
└───────────────┬──────────────────────┘
│
▼
┌──────────────────────────────────────┐
│ EmittedStream │
│ (per target stream instance) │
│ │
│ - Validates event ordering │
│ - Batches writes (MaxWriteBatchLength)│
│ - Manages expected version │
│ - Handles write retries │
│ - Tracks emitted stream names │
└───────────────┬──────────────────────┘
│
│ IODispatcher.WriteEvents()
▼
┌──────────────────────────────────────┐
│ EmittedStreamsWriter │
│ (IEmittedStreamsWriter) │
│ │
│ WriteEvents(streamId, expectedVersion│
│ events[], writeAs, callback) │
│ │
│ → Publishes ClientMessage.WriteEvents│
│ to the internal bus │
└──────────────────────────────────────┘
The ProjectionCheckpoint manages write concurrency:
_maximumAllowedWritesInFlight — Configurable limit on concurrent writes (default from ProjectionConsts.MaxAllowedWritesInFlight)EmittedStream is assigned to a write queue via round-robin (_emittedStreams.Count % _maximumAllowedWritesInFlight)QueuedEmittedStreamsWriter — Wraps EmittedStreamsWriter with a queue that limits in-flight writes per queue IDAllowedWritesInFlight.Unbounded — Special value that disables queuingResultEventEmitter (Services/Processing/Emitting/ResultEventEmitter.cs) creates events for projection results:
Result or ResultRemoved event to $projections-{name}-resultResult/ResultRemoved event to $projections-{name}-{partition}-result AND a $> link from $projections-{name}-result to the partition result streamThe checkpoint mechanism ensures exactly-once emission semantics:
EmittedStream instances within a ProjectionCheckpointProjectionCheckpoint.Prepare(position) tells all EmittedStream instances to flushEmittedStream calls Checkpoint() → flushes pending writes → signals completionOnCheckpointCompleted() notifies the CheckpointManagerCoreProjectionCheckpointWriter writes the checkpoint tag + state to $projections-{name}-checkpointEmittedStream uses expected versions to deduplicate any re-emitted eventsCheckpoint loading (CoreProjectionCheckpointReader): Reads the last 10 events backward from the checkpoint stream, finds the first $ProjectionCheckpoint event, extracts the CheckpointTag from metadata and state data from the event body, validates epoch/version compatibility, then publishes CheckpointLoaded.
Checkpoint writing (CoreProjectionCheckpointWriter): Writes a $ProjectionCheckpoint event to the checkpoint stream with the serialized CheckpointTag as metadata and root partition state as data. Retries with exponential backoff (up to 12 attempts). Warns if checkpoint exceeds 8 MB.
When a projection restarts, EmittedStream enters recovery mode to ensure exactly-once semantics:
CausedByTag and event typeOnCommitted callbacks and dequeues (event already written)InvalidEmittedEventSequenceExceptionEach emitted event carries metadata:
{
"$causedBy": "<source-event-guid>",
"$correlationId": "<correlation-id>",
"checkpoint_tag": { ... },
"projection_version": { ... }
}
Write retries use exponential backoff (up to 12 attempts, max 256 seconds).
EmittedStreamsTracker (Services/Processing/Emitting/EmittedStreamsTracker.cs) maintains a record of all streams a projection has written to. This is stored in a tracking stream ($projections-{name}-emittedstreams) and is used during projection deletion to clean up emitted streams when deleteEmittedStreams = true.
The built-in system projections use native handlers (not JavaScript) that produce events via the same pipeline:
$by_category — Reads $streams and emits $> links to $ce-{category} streams$by_event_type — Reads $all and emits $> links to $et-{eventType} streams$stream_by_category — Emits links grouping streams by category$streams — Emits a record for each new stream| Component | Path |
|---|---|
| HTTP endpoints | Services/Http/ProjectionsController.cs |
| gRPC endpoints | Services/Grpc/ProjectionManagement.*.cs |
| Management messages | Messages/ProjectionManagementMessage.cs |
| ProjectionManager | Services/Management/ProjectionManager.cs |
| ManagedProjection | Services/Management/ManagedProjection.cs |
| ManagedProjection states | Services/Management/ManagedProjectionStates/ |
| ProjectionCoreService | Services/Processing/ProjectionCoreService.cs |
| CoreProjection | Services/Processing/CoreProjection.cs |
| EventReaderCoreService | Services/Processing/EventReaderCoreService.cs |
| ReaderStrategy | Services/Processing/Strategies/ReaderStrategy.cs |
| ProcessingStrategySelector | Services/Processing/Strategies/ProcessingStrategySelector.cs |
| Processing phases | Services/Processing/Phases/ |
| Reader subscriptions | Services/Processing/Subscriptions/ |
| Event readers | Services/Processing/SingleStream/, MultiStream/, EventByType/, TransactionFile/ |
| Emitting pipeline | Services/Processing/Emitting/ |
| Emitted event types | Services/Processing/Emitting/EmittedEvents/ |
| Checkpoint management | Services/Processing/Checkpointing/ |
| Work items | Services/Processing/WorkItems/ |
| ResultWriter | Services/Processing/Strategies/ResultWriter.cs |
| StagedProcessingQueue | Services/Processing/StagedProcessingQueue.cs |
| ProjectionManagerNode (wiring) | ProjectionManagerNode.cs |
All paths relative to src/KurrentDB.Projections.Core/.