docs/design/2024-01-09-pipelined-DML.md
Authors: @ekexium, @you06
Tracking issue: https://github.com/pingcap/tidb/issues/50215
In the context of this document, we assume that the users are primarily from TiDB. However, it is also applicable to pure TiKV users.
The 2PC protocol of TiDB buffers all writes of a transaction in TiDB memory before committing. A large transaction can easily cause OOM. In bulk data processing, it's common to have a single statement processing millions of lines, which cannot be satisfied by normal transactions. We had developed features to work around this: the deprecated unsafe batch-dml and non-transactional DML for such scenarios. However, they are either unsafe or have too many constraints to satisfy all users.
The doc proposes a refinement of the original 2PC protocol, called Pipelined DML, introducing a new mode of transaction. Writes of a transaction are continuously flushed to TiKV, rather than buffered in memory until commit. This approach allows the size of TiDB transactions to be largely unrestricted by memory limitations. By asynchronously flushing the buffer, the execution phase and the prewrite phase form a pipeline, thus decreasing transaction latencies.
This design presents a large transaction solution that effectively removes size limitations while significantly enhancing execution speed.
Terms Used
Execution phase: The opertions in parser, planner, executor, including those writing to Membuffer.
Batch: The content in the immutable MemDB is defined as a 'batch.' Each call to flush() creates a new batch.
MemDB: a special data structure based on red-black tree and value logs in TiDB. Currently all mutations of a transaction are buffered in its MemDB before committing.
SPK: Sub-primary key. A special type of key used for large transactions. Its value contains many secondary keys. It helps track all secondary keys belonging to a transaction.
For normal transactions, mutations are buffered in memory before committing. Larger transactions consume a larger volume of memory. There are 2 mechanisms that limit transaction sizes to avoid OOM.
txn-total-size-limit. If it is set to other values than its default 100MB, it takes control. It limits the total size of keys and values of a memdb to be smaller than this value. Note the actual memory usage must be greater than the size due to various amplification.txn-total-size-limit is set to its default value, this setting is ignored and the memory tracker takes control. The mem tracker controls the memory used by the session, with a limit set by the system variable tidb_mem_quota_query.To bypass the limit, there was a batch-dml feature. It has been deprecated and not suggested for use because it is unsafe and highly prone to corrupt data. It splits a statement into multiple transactions and commit them. It is enabled and controlled by system variables tidb_enable_batch_dml, tidb_batch_dml_insert, tidb_batch_dml_delete, tidb_dml_batch_size.
To provide a safe method for splitting transactions for a statement, we developed non-transactional DML. It essentially acts like a user script, splitting a single statement into multiple statements for execution, rather than splitting the transaction itself. It's free from data corruption. However, the complexity of splitting and concatenating SQL statements imposes many constraints on this feature.
The need for large transactions remains.
TiDB cannot support super large transactions (> tens of GBs) due to its design, which stores the KV mutations in its memory before committing. There are two drawbacks:
Some users suffer from the above drawbacks.
Non-requirements for current state:
We've brainstormed and discussed various options. By evaluating the pros and cons, we were able to eliminate most of them.
There are two designs worth consideration. They have much in common and can be discussed together. Both designs periodically flush the content of mem buffer to TiKV via Flush(Prewrite) requests. A read(get) request should read from both membuffer and TiKV. When committing, the prewrite phase is no longer needed once all content of the membuffer are flushed.
Their differences are marked in different colors. The two solutions vary fundamentally in their approach to committing:
We conducted a series of experiments to verify the feasibility of the proposed design.
General result of experiments:
| Memory(RES) | Duration of Vanilla TiDB | Duration of This Proposal | Diff in Duration | |
|---|---|---|---|---|
| Sysbench10m rows | 6.64GB | 3m36s | 2m10s | -40% |
| SysbenchNonclustered10m rows | 8.73GB | 5m1s | 3m47s | -25% |
Note: * marks the calculation value(cannot test due to the environment limitation).
Experiments show that the proposed solution has
We need a variable to control the feature and the flush concurrency, name it tidb_xx_concurrency now.
tidb_xx_concurrency == 0, feature off.tidb_xx_concurrency > 0, feature on.SET_VAR hint.The proposed solution only affects the tikv client and the scheduler part of TiKV, plus necessary controlling code in TiDB. The enhanecment shall be transparent to upper layers like optimizers and executors, and lower storage layers.
A user(TiDB, in this context) of the transaction interface provided by client-go, shall be able to switch to the large transaction mode without changes, except when it's using the staging API which is discussed in Unresolved Questions below.
To eliminate the memory limitation, we cannot let memdb hold all mutations. KV mutations are written into stores in the same way as prewrite records, thus atomicity is guaranteed by the transaction protocol.
Because large transactions and common transactions share the same code for execution, to avoid too much modification of the TiDB code, we choose to modify the memdb.
We need to provide a wrapper for the current memdb.
We can mitigate the performance degeneration caused by O(NlogN) complexity of memdb.
At any moment we allow at most 1 memdb to be in the process of flushing. Consider the following operation sequence. It may not be a TiDB case, but we don't want to add more dependency on how TiDB uses it. The final result is expected to be (x, 2). But if there are multiple memdbs flushing concurrently, the handling of the mutations may be totally reversed(put(x, 2) -> del(x) -> put(x, 1)) . The final result becomes (x, 1).
put(x, 1)
del(x)
put(x, 2)
To avoid this anomaly, we should avoid multiple requests writing a key in parallel. Since we limit only 1 memdb can be flushing in the same time.
Eliminating concurrent flushing is not enough. Consider such an operation sequence
write(x, 1)
flush (retry happened)
TiKV write (x, 1)
clientReceiveResponse (x, 1)
write(x, 2)
flush
TiKV write (x, 2)
clientReceiveResponse (x, 2)
retrying request (x, 1) arrived late
TiKV write (x, 1)
The final result in TiKV becomes (x, 1), while we expect (x, 2).
To make the system linearizable, we attach a self-increasing number to each flush. We call it the generation. Each prewrite request in a flush carries a generation to TiKV. Locks in LOCK CF store the generation and update them when lock value is updated. Prewrite request checks the generations. Those requests that must have been stale are rejected. A Quint spec verifies the design.
There are 3 types of keys in a transaction
spk_<start_ts>_<spk_id> format. It is a representative of a group of secondary keys. The secondary keys in the group are encoded in its value.Memory usage is then reduced to a fraction of its original, down to one in several thousandths.
The diagram shows their overall difference in process between normal transactions and our proposed large transactions.
The staging interface is tightly coupled with the implementation of MemDB.
AddRecord/UpdateRecord/DeleteRecord/updateRecord
Iter() always returns an empty iterator.We choose option 2 for its simplicity and more predictable delivery.
The interface remains unchanged. Users of the new mem buffer see no difference between normal and large transactions.
After every write opertion to memdb, trigger a check to start flush. The check condition can be based on the number of keys in current memdb, or the current total size of memdb.
If the mutable memdb becomes too large and the last flush has not finished, the write is blocked until the last flush finishes.
In addition to values, write operations can write flags. Flags are stored in memdbs, but not in TiKV. These flags usually control key-specific behavior. In large transactions, flags are handled differently.
PresumeKNE, AssertExist,AssertNotExist.KeyLockedand NeedConstraintCheckInPrewrite.if there is no ongoing flush {
mark current active mutable memdb as immutable
go withRetry(flush(immutable memdb)())
new a memdb as current active mutable memdb (or reuse the other one)
}
Divide the batch into several groups. For each group, make a new entry with key = spk_<start_ts(fixed length)>_<spk_id> and value = encode(keys in the group)
Store the key (without its value) in memory. We call it a SPK (sub-primary key). Flush it together with the group.
A group can not be unrestrictedly large as the value of SPK must contain all keys in the group.
"spk_")
TxnMeta (in addition to Put, Delete, Lock, Rollback), or just reusing the Lock mutation type, we can make it transparent to normal transaction reads.For Get(key) and BatchGet(keys), read in the following order. Resort to the next tier only if the key cannot be found in the previous tiers.
Mutable memdb -> immutable memdb -> TiKV
For Iterator and Scan, similar to the existing UnionIter, a 3-tier UnionIter is used.
Mutable memdb -> immutable memdb -> TiKV
Statement snapshot: A statement shall not read its own mutations using Iterator. Get() can read its own mutations.
For single statement transaction, simply returns an empty iter for Iter-like interface. Mutations that have been flushed to TiKV exist in locks, which will not be read by the txn itself during execution.
In commit phase, we need to switch the prewrite record into readable commit record, the key here is how to find out all keys that need to be committed.
The ideal way is to let each store scan the locks located in it and commit them, but it's complex to deal it with Raft leader transfer, e.g. the region not scanned yet is moved to another store, in which it's located in the scanned key space, then it'll be missed. Though missing commit of secondaries does not affect correctness, the cost of resolve lock is unacceptable. So it's better to commit by regions.
The client will record the lower and upper bounds of all keys in the prewrite phase. In commit phase, TiDB will iterate over the related regions, and commit the regions with concurrency limited by store.
It may reuse the resolve-lock command. When the batch size reaches the limit(256 keys or size of 32K), it stops scanning locks, do the async write, and spawn a new task waiting to be executed next round.
The client
for remaining secondary keys in membuffer, make a group and flush. Wait until flush finishes.
prewrite(update) PK, and get minCommitTS in PK
get commit ts
commit PK
// txn is considered committed now
for all SPK in membuffer,
commit SPK to TiKV
get SKs in this group, commit them
In typical scenarios that require large transactions, we anticipate minimal contention. Consequently, large transactions adopt an extended optimistic transaction model, which avoids pessimistic locks and performs existence checks during the prewrite phase.
The unique checking should follow the lazy-check-key-not-exist strategy as in optimistic transactions to reduce latency during executing. The lazy check setting is required to be true.
With this strategy, when writing rows, only memdb is checked, the rest unique checking will be done in the flush request in TiKV. Once a flush request returns a duplicated key error, the statement should be aborted.
LockKeys is forbidden, as a large transaction uses optimistic concurrency control.
A large transaction will execute for a long time, so we must keep renewing the TTL of the primary key since the first mutation is written into the store.
After checking transaction status from PK, TiDB will resolve locks for the region. If the coordinator is down, the rest locks will be handled by GC.
A variant worth consideration is to select another special key as PK, or utilize the secondaries field that was for async commit, and let PK track all SPKs. In this way, we will be able to find all keys of a transaction from any one of the keys. ResolveLocks can then proactively resolve all locks the transaction left. We leave this as a potential optimization rather than a mandatory requirement in our initial version.
A large transaction can leave so many unresolved locks. Fortunately we have Transaction Status Cache. It caches the status of some transactions. Once a lock belonging to a large transaction is checked once, following read requests that meet other locks in the same TiKV can quickly ignore the lock by checking the transaction status cache.
GC will block until the large transaction is done.
SPKs need to be GCed some time after the large transaction finishes, whether it is committed or rolled back. After resolving locks, the GC worker could safely scan the range to find useless SPKs [spk_, spk_<safepoint(fixed length)>) and delete them.
Alternative: Instead of exposing the transaction implementation detail (the SPKs) to the upper level GC worker, we can add a new mutation type that can be safely deleted in GC, or simply store the value in a new field in LOCK type and supports reading it. However it introduces extra complexity in implementation.
Large-write x normal-write
Whoever acquires the lock (in LOCK CF or in-memory pessimistic lock) wins. The other gets a KeyIsLocked error as usual. If a flush meets a KeyIslocked, it backs off and retries.
Large-write x large-write
Same as large-write x normal-write. This is a bad practice to have multiple large transactions that can conflict with each other running in parallel.
Large-write x read
A read request meets a lock written by the large txn.
check_txn_status on the PK, trying to push its min_commit_ts so that min_commit_ts > read_ts of the read request. If min_commit_ts is successfully pushed, the read request can ignore the lock.
min_commit_ts is continuously pushed, the large txn may have difficuly in committing. It is also bad practice to have read-write conflict.Large-read x normal write
Same as normal read
min_commit_ts to maintain resolved_ts instead ofstart_ts. There have been discussions to free CDC from being blocked by long transactions.{encode(user_key)}{ts}. If we allow some keys to be {encode(user_key)}{additional_suffixes}{ts}, where the additional_suffixes can be a mark that tells that the key is an SPK for some transaction, it then can be distinguished with any other data keys. This sounds simple. However, it would be a breaking change to the Key type in TiKV and any other related stuff. Also it's hard to tell how high the risk would be for other components (such as TiFlash, brie tools, etc.).There are some requirements for large transactions:
We control concurrency, batch size and priority of flush and commit tasks for flow control.
If resource control is enabled in TiKV, there is a priority based scheduler pool with 3 priorities(high/normal/low). Large transactions can set the default priority to low to reduce the impact of other queries. If there is a resource control group, large transaction also inherit the resource limitation set by the group.
min_commit_ts in transaction cache. When read requests are frequent, it can save a lot check_txn_status requests.Run large transactions of sizes from 10 GB to 500 GB. The memory usage of client(TiDB) should be lower than 1% of the transaction size. The latency should be lower than or comparable to vanilla version.
Large transactions satisfy simulated workload of real customer needs.
Establish the expectation of memory, latency, and resource usage for various workloads.
There is a set of Staging()/Release()/Cleanup() methods in the current membuffer implementation. They are used to partially rollback modifications inside a transaction.
INSERT INTO t SELECT * FROM t. VanillaMemDB.getSnapshot() returns the snapshot at stage[0], namely the snapshot of the starting point of current statement.AddRecord, UpdateRecord, DeleteRecord.Celanup() as if the statement has never been executed. In our proposed solution, modifications can exist in both membuffer and TiKV, making the rolling back hard to implement.During a UnionIter's lifetime, if a write operation happens and triggers flush, the underlying memdbs and TiKV could change, which may affect the UnionIter depending on how it's implemented.
For single-statement, the memdb iterator always returns empty, and it should never read its own write. No problem.
For multi-statement, it depends on how to solve the staging interface issue.