docs/design/2021-12-09-TiDB-log-based-incremental-backup.md
This document introduces a novel solution for backing up incremental transactional data within the TiKV server.
The ability of commercial databases to handle emergencies and accidents is a basic requirement. When a disaster occurs, we must restore the database with minimal data loss and recovery time. Various factors such as exercise, recovery time, operability and operation and maintenance.
A new subcommand is added to BR to control incremental backup tasks. (Provide a TiDB HTTP API as well, see the spec.)
$ br stream start [task_name] -u 10.0.0.1:2379 -s 's3://bucket/path' [--start-ts 123456789] ...
Starting task <task_name> from ts=123456789...
Store 1: ok (next_backup_ts =222222222).
Store 2: ok (next_backup_ts =333333333).
Store 3: ok (next_backup_ts =444444444).
Store 8: ok (next_backup_ts =123456789).
Started!
$ br stream status task_name -u 10.0.0.1:2379
Checking status of <task_name>...
Store 1: error, task completed or not found.
Store 2: ok (next_backup_ts =900000000).
Store 3: ok (next_backup_ts =911111111).
Store 8: ok (next_backup_ts =922222222).
$ br stream stop task_name -u 10.0.0.1:2379
Stopping task <task_name>...
Store 1: error, task completed or not found.
Store 2: ok (next_backup_ts =987654321).
Store 3: ok (next_backup_ts =989898989).
Store 8: ok (next_backup_ts =987987987).
Stopped at --lastbackupts=987654321.
$ br stream stop task_name -u 10.0.0.1:2379
Stopping task <task_name>...
Store 1: error, task completed or not found.
Store 2: error, task completed or not found.
Store 3: error, task completed or not found.
Store 8: error, task completed or not found.
Failed to stop the task, maybe it is already stopped. Check the TiKV logs for details.
We will reuse the CmdObserver<E> trait. We may either embed the logic entirely inside cdc:: Delegate, or create a new StreamBackupObserver. Prefering the latter for separation of concern.
Support table filtering
We choose to enable table filtering during backup
The problem about table filtering is we only have a <Table ID → Table Name> map with txn in meta.go during log backup.(e.g. mDB1_Table20 -> []byte(tableInfo)), and some operations may change the map. For example, RENAME then TRUNCATE table. we may get such sequences.
To Solve this problem, we need TiKV to watch all meta key changes during a task.
BR starts a task
Every TiKV node watches the task and key range space in the task.
Meta Key
Handle TiDB DDL
Memory control
Rollback
Stopping a task causes all collected events not yet flushed to be lost.
Pausing a task will make it stop observing the TiKV changes, record the ResolvedTS of every involved region, and immediately flush the log files into external storage. Then it sleeps.
Resuming a task is like re-subscribing to a CDC stream, like running br stream start «TaskName» --backupts «PrevRTS». All KVs between PrevRTS and current CommitTS will be scanned out and logged as a single batch.
Put(cf="default", key="zt1\_r1«StartTS»", value="encoded\_value")Put(cf="write", key="zt1\_r1«CommitTS»", value="P«StartTS»")Delete(cf="default", key="zt1\_r1«StartTS»")Put(cf="write", key="zt1\_r1«CommitTS»", value="D«StartTS»")Prewrite(start\_ts=«StartTS», op=Put(key="t1\_r1", value="encoded\_value"))Commit(start\_ts=«StartTS», commit\_ts=«CommitTS», key="t1\_r1")Prewrite(start\_ts=«StartTS», op=Delete(key="t1\_r1"))Commit(start\_ts=«StartTS», commit\_ts=«CommitTS», key="t1\_r1")enum FileType {
Delete = 0;
Put = 1;
}
message DataFileInfo {
// SHA256 of the file.
bytes sha_256 = 1;
// Path of the file.
string path = 2;
int64 number_of_entries = 3;
/// Below are extra information of the file, for better filtering files.
// The min ts of the keys in the file.
uint64 min_ts = 4;
// The max ts of the keys in the file.
uint64 max_ts = 5;
// The resolved ts of the region when saving the file.
uint64 resolved_ts = 6;
// The region of the file.
int64 region_id = 7;
// The key range of the file.
// Encoded.
bytes start_key = 8;
bytes end_key = 9;
// The column family of the file.
string cf = 10;
// The operation type of the file.
FileType type = 11;
// Whether the data file contains meta keys(m prefixed keys) only.
bool is_meta = 12;
// The table ID of the file contains, when `is_meta` is true, would be ignored.
int64 table_id = 13;
// It may support encrypting at future.
reserved "iv";
}
$ br restore full -s 's3://bucket/snapshot-path' -u 10.0.5.1:2379
$ br stream restore -u 10.0.5.1:2379 \
-s 's3://bucket/path' \
[--startts 123456789] \
[--endts 987654321] \
[--checksum] \
[-f 'db3.*']
$ br stream merge \
-s 's3://bucket/path/' \
[--read-snapshot-storage 's3://bucket/snapshot-path'] \
--write-snapshot-storage 's3://bucket/snapshot-path-new' \
[--startts 123456789] \
[--endts 987654321]
The "merge" operation performs an offline compaction. It applies the KV events on top of an existing snapshot, and produces a new set of SST files.
If BR (or TiDB API) directly sends the tasks to TiKV services (i.e. "push-based interface"), when we add new TiKV stores they will not know there is a backup task, causing information loss.
I think this suggests that either
I very much prefer the stateless, pull-based approach, but afaik TiKV has never introduced any etcdv3 dependency before (TiDB o.t.o.h. uses etcdv3 extensively, esp in DDL sync).
(Since TiKV itself is also a KV store, we could as well use RawKV in a non-watched keyspace to share the task list. But it seems CDC is TiKV's Watch 🤔)
After a TiKV store has successfully uploaded content to external storage, it should report to the master / etcdv3 / S3 like "in task T, in store 8, in region 432, we have backed up until TS=123456789".
In case of a crash, the new leader of region 432 should initialize itself from this progress report, and start scanning from TS=123456789 rather than TS=0 or Now().
This also means there is a communication cost of fetching the initial TS.
Because we need to ensure everything in the TS range 123456789..Now() are intact, we have to extend the GC lifetime to include 123456789. Setting a GC safepoint has a global effect, however, whereas we only want to prevent GC from happening in this particular region. Therefore, we'd like to instead change the <u>gc_worker.rs</u> <u>implementation in TiKV</u>. A keyrange can indicate if it is ready to be GC'ed. If a leader has just been elected for < 5 minutes, we indicate the region as not-GC-ready.
Consider split-brain, where a region's peers are split into two groups (say <u>1</u>,2,3 / 3,4,<u>5</u>) and both sides have leader (<u>1</u>, <u>5</u>), and both leaders do back up (inconsistent) data to S3. Raft should be able to avoid this, the bridge node (3) being aware of both sides should prevent any modification (Put/Delete) on one side from taking place and thus the back up can never diverge. So this is equivalent to a normal partition where the minority side is considered dead (<u>1</u>,2,3 / <s>4,5</s>).
As stated above we need a place reachable from all TiKVs to store the task status.
Starting task
BR pushes these keys:
TiKV, on initialization, scans for every KV in the keyspace TaskInfo: to learn what backup tasks have been scheduled. After it is done, TiKV watches the keyspace for any new tasks.
When a new task is seen by TiKV, and the task's end_ts > current_ts, it will do
for region in self.regions {
if self.is_leader(region.id) {
put!(
"NextBackupTS:({task.name}, {self.store_id}, {region_id})",
task.start_ts,
);
}
}
Stop the task if end_ts <= current_ts.
Stopping task
Delete the TaskInfo key. Watcher should be able to do cleanup.
Configurating, resuming and pausing task
Update the TaskInfo key. The KV API is equivalent to starting a task. Pausing changes the --start-ts to the last key's CommitTS, and set "paused" to true. Resuming restores "paused" to false.
Initialization, leader change and region split/merge
For every new region, we assume all keys between NextBackupTS:* to the current TS is not yet backed up. So we initiate a full scan on the region for CommitTS between these two numbers.
Flushing
For every region which this store is still a leader of, update NextBackupTS:* to the ResolvedTS of the region.
Risks
Scanning RaftDB
Since all modifications must go through Raft consensus, we can capture the changes on RaftDB instead of TiKVDB for the incremental changes. We can even physically copy the WAL file to minimize computation. Another advantage of scanning RaftDB is that, if we want to batch copy events between two timestamps (e.g. after changing leader), we don't need to scan the entire region key space, but just those entries indexed by TS.
In fact, TiCDC also considered this approach. It was eventually abandoned, however, due to the difficulty of actually utilizing the raft log. Given the previous failure, we will not attempt to touch the raft log unless the TxnEntry scan proved to have a really bad effect on performance.
Scan the raft log directly. The current raftstore implementation needs to be adjusted, such as raft log gc. In addition, it is also necessary to ensure that the scan is performed during the leader’s lifetime, otherwise the complete data may not be seen.
Also need to pay attention to region epoch check, raft log commit != apply successfully
To support the interpretation of raftlog, I think I need to implement most of the logic of raftstore now, especially some operations such as split/merge/conf change. Due to the existence of these operations, the original total order raft log in a region has a sloping relationship between different regions. I think it is not easy to solve. This is also the reason why cdc did not use raftlog or wal at the time.