docs/rfcs/2025-09-08-laminar-flow.md
This RFC proposes a redesign of the flow architecture where flownode becomes a lightweight in-memory state management node with an embedded frontend for direct computation. This approach optimizes resource utilization and improves scalability by eliminating network hops while maintaining clear separation between coordination and computation tasks.
The current flow architecture has several limitations:
The laminar Flow architecture addresses these issues by:
The laminar Flow architecture transforms flownode into a lightweight coordinator that maintains flow state with an embedded frontend for computation. The key components involved are:
graph TB
subgraph "laminar Flow Architecture"
subgraph Flownode["Flownode (State Manager + Embedded Frontend)"]
StateMap["Flow State Map
Map<Timestamp, (Map<Key, Value>, Sequence)>"]
Coordinator["Computation Coordinator"]
subgraph EmbeddedFrontend["Embedded Frontend"]
QueryEngine["Query Engine"]
AggrState["__aggr_state Executor"]
end
end
subgraph Datanode["Datanode"]
Storage["Data Storage"]
Results["Result Tables"]
end
end
Coordinator -->|Internal Query| EmbeddedFrontend
EmbeddedFrontend -->|Incremental States| Coordinator
Flownode -->|Incremental Results| Datanode
EmbeddedFrontend -.->|Read Data| Datanode
Flownode maintains a state map for each flow:
type FlowState = Map<Timestamp, (Map<Key, Value>, Sequence)>;
Where:
group_exprs)aggr_exprs)The computation process follows these steps:
Trigger Evaluation: Flownode determines when to trigger computation based on:
Query Execution: Flownode executes __aggr_state queries using its embedded frontend with:
State Update: Flownode receives partial state results and updates its internal state:
Result Materialization: Flownode computes final results using __aggr_merge operations:
-- Example incremental state query executed by embedded frontend
SELECT
__aggr_state(avg(value)) as state,
time_window,
group_key
FROM source_table
WHERE
timestamp >= :window_start
AND timestamp < :window_end
AND __sequence >= :last_sequence
AND __sequence < :current_sequence
-- sequence range is actually written in grpc header, but shown here for clarity
GROUP BY time_window, group_key;
sequenceDiagram
participant F as Flownode (Coordinator)
participant EF as Embedded Frontend (Lightweight)
participant DN as Datanode (Heavy Computation)
F->>F: Evaluate trigger conditions
F->>EF: Execute __aggr_state query with sequence range
EF->>DN: Send query to datanode (Heavy scan & aggregation)
DN->>DN: Scan data and compute partial aggregation state (Heavy CPU/I/O)
DN->>EF: Return aggregated state results
EF->>F: Forward state results (Lightweight merge)
F->>F: Merge with existing state
F->>F: Update sequence markers (Lightweight)
F->>EF: Compute incremental results with __aggr_merge
EF->>DN: Write incremental results to datanode
Refill is implemented as a straightforward __aggr_state query with time and sequence constraints:
-- Refill query for flow state recovery
SELECT
__aggr_state(aggregation_functions) as state,
time_window,
group_keys
FROM source_table
WHERE
timestamp >= :refill_start_time
AND timestamp < :refill_end_time
AND __sequence >= :start_sequence
AND __sequence < :end_sequence
-- sequence range is actually written in grpc header, but shown here for clarity
GROUP BY time_window, group_keys;
Mirror writes are simplified to only transmit timestamps to flownode:
struct MirrorWrite {
timestamps: Vec<Timestamp>,
// Removed: actual data payload
}
This optimization:
Another optimization could be just send dirty time windows range for each flow to flownode directly, no need to send timestamps one by one.
The core optimization relies on sequence-constrained queries:
-- Optimized incremental query
SELECT __aggr_state(expr)
FROM table
WHERE time_range AND sequence_range
Benefits:
graph LR
subgraph "Time Windows"
W1["Window 1
09:00-09:05"]
W2["Window 2
09:05-09:10"]
W3["Window 3
09:10-09:15"]
end
subgraph "Processing Strategy"
W1 --> Batch["Batch Mode
(Old Data)"]
W2 --> Stream["Stream Mode
(Recent Data)"]
W3 --> Stream2["Stream Mode
(Current Data)"]
end
Flow maintains two critical sequences to track incremental query progress for each region:
memtable_last_seq: Tracks the latest sequence number read from the memtablesst_last_seq: Tracks the latest sequence number read from SST filesThese sequences enable precise incremental data processing by defining the exact range of data to query in subsequent iterations.
When executing incremental queries, flownode provides both sequence parameters to datanode:
struct GrpcHeader {
...
// Sequence tracking for incremental reads
memtable_last_seq: HashMap<RegionId, SequenceNumber>,
sst_last_seqs: HashMap<RegionId, SequenceNumber>,
}
The datanode processes these parameters to return only the data within the specified sequence ranges, ensuring efficient incremental processing.
A critical challenge occurs when data referenced by memtable_last_seq gets flushed from memory to disk. Since SST files only maintain a single maximum sequence number for the entire file (rather than per-record sequence tracking), precise incremental queries become impossible for the affected time ranges.
Detection of Invalidation:
// When memtable_last_seq data has been flushed to SST
if memtable_last_seq_flushed_to_disk {
// Incremental query is no longer feasible
// Need to trigger refill for affected time ranges
}
Refill Process:
memtable_last_seq datamemtable_last_seq to the current latest sequence, while sst_last_seq continues normal incremental updates-- Refill query when memtable data has been flushed
SELECT
__aggr_state(aggregation_functions) as state,
time_window,
group_keys
FROM source_table
WHERE
timestamp >= :affected_time_start
AND timestamp < :affected_time_end
-- Full scan required since sequence precision is lost in SST
GROUP BY time_window, group_keys;
Datanode must implement enhanced query processing capabilities to support sequence-based incremental reads:
Input Processing:
memtable_last_seq and sst_last_seq parameters in query requestsOutput Enhancement:
struct OutputMeta {
pub plan: Option<Arc<dyn ExecutionPlan>>,
pub cost: OutputCost,
pub sequence_info: HashMap<RegionId, SequenceInfo>, // New field for sequence tracking per regions involved in the query
}
struct SequenceInfo {
// Sequence tracking for next iteration
max_memtable_seq: SequenceNumber, // Highest sequence from memtable in this result
max_sst_seq: SequenceNumber, // Highest sequence from SST in this result
}
Sequence Tracking Logic:
datanode already impl max_sst_seq in leader range read, can reuse similar logic for max_memtable_seq.
Normal Incremental Updates:
memtable_last_seq and sst_last_seq after successful query executionmax_memtable_seq and max_sst_seq values for next iterationRefill Scenario:
memtable_last_seq to current maximum after refill completionsst_last_seq updates based on successful query responsesSequence Range Optimization:
Memory Management:
This sequential read implementation ensures reliable incremental processing while gracefully handling the complexities of storage architecture, maintaining both correctness and performance in the face of background compaction and flush operations.
__aggr_state query interface in embedded frontend(Already done in previous query pushdown optimizer work)After phase 1, the system should support basic flow operations with incremental updates.
Keep computation in flownode but optimize through:
Pros:
Cons:
Embed lightweight computation engines within flownode:
Pros:
Cons:
The laminar Flow architecture represents a significant improvement over the current flow system by separating state management from computation execution. This design enables better resource utilization, improved scalability, and simplified maintenance while maintaining the core functionality of continuous aggregation.
The key benefits include:
While the architecture introduces some complexity in terms of distributed coordination and error handling, the benefits significantly outweigh the drawbacks, making it a compelling evolution of the flow system.