doc/developer/design/20260210_incremental_occ_read_then_write.md
Read-then-write operations (DELETE, UPDATE, INSERT...SELECT) in Materialize currently rely on in-process pessimistic locking. The Coordinator acquires write locks before reading, holds them through the write, and releases them only after the write has been applied to the timestamp oracle. This ensures that no other write can interleave between the read and write phases of a single operation.
This approach is correct for a single environmentd process, but it does not
extend to multiple processes: the locks are in-memory mutexes that cannot be
shared across process boundaries. We need concurrent multi-process writes for:
environmentd processes must
both be able to serve writes during the handover windowenvironmentd processes must be able to
serve queries concurrentlyenvironmentd) for different workloadsThis design doc proposes replacing the pessimistic locking approach with optimistic concurrency control (OCC) for read-then-write operations, backed by a subscribe that continually tracks the current state of the data.
environmentd processesThe core idea is to replace the "lock, peek, write" sequence with a subscribe-based OCC loop:
selection from the
ReadThenWrite plan), starting at the timestamp determined by the oracleThis approach is correct by construction: the subscribe always reflects the committed state of the data, and the timestamped write mechanism ensures that the write is applied at exactly the timestamp the diffs were computed for. If anything changes between the read and the write, the write fails and is retried with fresh data.
Below we describe the current approach, the proposed approach, correctness arguments, and performance implications.
The current sequence_read_then_write in the Coordinator works as follows:
OwnedMutexGuard is acquired. All locks are acquired atomically
(all-or-nothing) to prevent deadlocks. If any lock is unavailable, the
entire operation is deferred until the lock becomes available.QueryWhen::FreshestTableWrite,
reading the current state of the data.send_diffs, which adds them to the pending
group commit queue.The correctness of this approach depends entirely on the in-process locks. If
the locks were removed or if a second environmentd process were to execute a
concurrent write, the read-then-write would be susceptible to lost updates.
The complexity of this locking mechanism is significant:
WriteLocks uses an all-or-nothing builder pattern to prevent deadlocksGroupCommitWriteLocks merges compatible locks across concurrent blind
writesgroup_commit() has multiple branches for
different lock states (pre-validated, no locks needed, missing locks)The new approach moves read-then-write sequencing from the Coordinator to the session task (the per-connection async task), similar to how frontend peek sequencing already works. The session task does the planning, optimization, and OCC retry loop. It communicates with the Coordinator only for specific operations that require Coordinator state:
Session Task Coordinator
| |
|-- plan & optimize MIR/LIR |
| |
|-- acquire OCC semaphore |
| |
|-- CreateReadThenWriteSubscribe ----> |
| <------------ subscribe channel -----|
| |
| +-- OCC Loop ------------------+ |
| | receive diffs from subscribe | |
| | on frontier advance: | |
| | consolidate diffs | |
| | AttemptTimestampedWrite -> |-->|-- group_commit()
| | <-- Success/Failed --------|<--|
| | if Failed: continue loop | |
| | if Success: break | |
| +------------------------------+ |
| |
|-- DropReadThenWriteSubscribe ------> |
| |
A timestamped write is a write that must be committed at a specific timestamp. The group commit machinery has to be extended to supports this by:
Only one timestamped write is processed per group commit round. If multiple timestamped writes target the same timestamp, one is selected and the others are failed with a timestamp passed error. This is necessary because independently computed timestamped writes may be inconsistent with each other: they were each computed from the state at their respective read timestamps and could conflict if applied together.
The subscribe needs to produce the right diffs directly, rather than raw rows that the adapter then transforms. We apply the mutation transformation at the MIR level:
Negate, producing (row, -1) diffsLet binding to share the selection. The body unions a
negated Get (old rows with diff -1) with a mapped Get (new rows with diff
+1, applying the assignment expressions)When multiple read-then-write operations run concurrently, each maintains a subscribe that continuously receives and processes updates. With N concurrent OCC loops, whenever one loop succeeds, the other N-1 loops must process the resulting updates and retry. This leads to O(N^2) total work.
To bound this, a semaphore has to limit the number of concurrent OCC operations (default: 4). Additional operations wait for a permit before starting their subscribe.
The subscribes created for read-then-write are internal: they do not appear in
mz_subscriptions or other introspection tables, and they don't increment the
active subscribes metric. They are created and dropped via dedicated Command
variants (CreateReadThenWriteSubscribe, DropReadThenWriteSubscribe).
The correctness argument has two parts: (1) the OCC loop produces the right diffs, and (2) the timestamped write mechanism ensures they are applied at the right timestamp.
The subscribe starts at the oracle read timestamp and emits the current state of the selection expression as its initial snapshot. As other writes commit, the subscribe emits updates that reflect those writes. At any point, if we consolidate all diffs received so far, we get the current state of the expression.
The MIR transformations (Negate for DELETE, Let/Union for UPDATE) ensure that
the diffs represent the correct mutation. For example, after consolidation, a
DELETE subscribe contains (row, -1) for each row currently matching the
selection.
The write is submitted at the timestamp corresponding to the subscribe's frontier. The group commit machinery checks that this timestamp hasn't been passed by the oracle:
This ensures that the write is always based on the state of the data at exactly the write timestamp. There is no window for lost updates: either the write succeeds because nothing changed since the read, or it fails and retries with fresh data.
Semantically, a read-then-write is a SELECT followed by a write. Normally we
have to linearize reads, ensuring that the oracle read timestamp is at least
the timestamp chosen for a peek, so that results can't "go backwards". With the
subscribe-based OCC loop, we might observe data timestamped beyond the current
oracle read timestamp. However, actually applying the write bumps the oracle
read timestamp to at least the write timestamp, so at write time it holds that
write_ts <= oracle_read_ts. The linearization invariant is maintained.
Only one timestamped write is processed per group commit round. This is correct because:
We have to be careful about bounding the lifetime of the occ loop, both in wallclock time and number of retries. With the old approach, a read-then-write could take arbitrarily long, and block the rest of the system. With the new approach, the occ loop might try arbitrarily long, without ever succeeding. It will not block the rest of the system, though, which is a big benefit.
As a safety net, we should bound the lifetime of the occ loop with our existing statement timeout, and potentially add a hard upper limit on the number of attempts per occ loop.
In the old approach, correctness depends on:
In the new approach, correctness depends on:
The new approach is arguably easier to reason about: there is no global lock state to consider, no deferred operations, no lock merging. The correctness argument is local to the OCC loop and the group commit mechanism.
The goal is not to make writes faster, but to not regress significantly.
Benchmarking a PoC-level implementation of the OCC approach against main for
UPDATE t SET x = x + 1 shows the following:
The benchmark varies concurrency (number of workers) on the x-axis and shows throughput (left) and latency (right). Key observations:
main. This is because the OCC path begins preparing the write (opening
the subscribe, receiving the snapshot) before the write timestamp is claimed,
whereas the old path only starts the peek after acquiring the lock.The new path is controlled by a enable_adapter_frontend_occ_read_then_write
dyncfg (default: disabled).
If we did a partial rollout where we check the dyncfg per read-then-write
operation, an OCC write could slip between an old-path reader's read and write
phases without the old path detecting it (since the OCC path doesn't acquire
write locks). We therefore must make the flag sticky per environmentd process
lifetime (check on bootstrap only) to avoid this, and keep the current
confirm_leadership checks.
Once the OCC path is fully rolled out and validated:
sequence_read_then_write code pathWriteLocks,
WriteLockBuilder, GroupCommitWriteLocks, deferred write operations)confirm_leadership()-style lock validation in group commitThis removes a significant amount of complexity and uncertainty from the codebase.
Instead of OCC, we could extend the current pessimistic locking approach to work across processes by using a distributed locking service.
The flow would be:
FreshestTableWriteThis preserves the familiar lock-based model but has significant drawbacks:
confirm_leadership() check.The OCC approach avoids all of these issues. Contention is handled by retrying, which is simple and local. The cost is paid only when there is actual contention, and the subscribe ensures that retries are based on fresh data.
clusterdInstead of sending subscribe results back to environmentd and running the OCC
loop there, we could run the OCC loop right on the cluster. This should work,
if we give clusterd access to the timestamp oracle. A benefit of this
approach is that we take environmentd as much out of the processing path as
possible, and so we get better distribution of work.
Another school of thought will say that we want environmentd to be in the
path, because we can maybe be smarter about how we commit data to persist.
There's a separation between data layer, which comes up with the changes and
runs the dataflow, and the control layer, which takes pointers to the changes
and appends them durably, with maybe some smarts in the middle.