design/backup_v2_partitioned_logs.md
Github tracking issue: https://github.com/apple/foundationdb/issues/1003
The purpose of this document is to capture functional requirements as well as propose a high level design for implementation of the new backup system in FoundationDB. The intended audience for this document includes:
As an essential component of a database system, backup and restore is commonly used technique for disaster recovery, reliability, audit and compliance purposes. The current FDB backup system consumes about half of the cluster’s write bandwidth, causes write skew among storage servers, increases storage space usage, and results in data balancing. The new backup system aims to double cluster’s write bandwidth for HA clusters (old DR clusters still need old style backup system).
FDB backup system continuously scan the database’s key-value space, save key-value pairs and mutations at versions into range files and log files in blob storage. Specifically, mutation logs are generated at CommitProxy, and are written to transaction logs along with regular mutations. In production clusters like CK clusters, backup system is always on, which means each mutation is written twice to transaction logs, consuming about half of write bandwidth and about 40% of CommitProxy CPU time.
The design of old backup system is here, and the data format of range files and mutations files is here. The technical overview of FDB is here. The FDB recovery is described in this doc.
int8_t, representing the data center ID and a negative number denotes special system locality) and an ID (int16_t). The idea is that the tag is a small data structure that consumes less bytes than using IP addresses or storage server’s UIDs (16 bytes each), since tags are associated with each mutation and are stored both in memory and on disk.-2:0 where locality -2 means log router tag and 0 means ID. If attached to a mutation, originally this tag means the mutation should be sent to a remote log router. In the new backup system, we reuse this tag for backup workers to receive all mutations in a number of partitioned streams.v is a restorable version if the entire key-space and mutations in version [v1, v) are recorded in backup files.Feature priorities: Feature 1, 2, 3, 4, 5 are must-have; Feature 6 is better to have.
v must match the original state at version v.Security: The backup system’s components are assumed to be trusted components, because they are running on the nodes in a FDB cluster. The transmission from cluster to blob store is through SSL connections. Blob credentials are passed in from “fdbserver” command line.
Privacy: Backup data are stored in blob store with appropriate access control. Data retention policy can be set with “fdbbackup” tool to delete older backup data.
This section discusses changes that may need to be identified or accounted for on the back-end in order to support the feature from a monitoring or management perspective.
Workflow is needed for DBA to start, pause, resume, abort the new type of backups. The difference from the old type of backups should be only a flag change for starting the backup. The FDB cluster then generates backups as specified by the flag.
A command line tool fdbconvert has been written to convert new backup logs into the format of old backup logs. Thus, if the new restore system has issues, we can still restore the new backup with existing old restore system.
Deployment instructions for tooling development
A new stateless role “Backup Worker” (or “BW” for abbreviation) is introduced in a FDB cluster. The number of BW processes is based on the number of log routers (usually they are the same). If there is no log routers, the number of transaction logs is used. Note that occasionally the cluster may recruit more backup workers for version ranges in the old epoch. Since these version ranges are small, the resource requirements for these short-lived backup workers are very small.
As in the old backup system, backup agents need to be started for saving snapshot files to blob storage. In contrast, backup workers in the new backup system running in the primary DC are responsible for saving mutation logs to blob storage.
Backup worker’s memory should be large enough to hold 10s of seconds worth of mutation data from TLogs. The memory requirement can be calculated as: WriteThroughput * BufferPeriod / partitions + SafetyMargin, where WriteThroughput is the aggregated TLog write bandwidth, partitions is the number of log router tags.
A new process class “backup” is defined for backup workers.
How to start a new type backup: e.g.,
fdbbackup start -C fdb.cluster --partitioned-log-experimental -d blob_url
The solution must provide at least the following KPIs:
The feature does not require any specific customer care awareness or interaction.
The feature must follow the usual roll-out process. It needs to coexist with the existing backup system and periodically restore clusters to test its correctness. Only after we gain enough confidence will we deprecate the existing backup system.
Note the new backup system is designed for HA clusters. Existing DR clusters still uses the old backup system. Thus, rolling out of the new backup system is only for HA clusters.
This feature requires a blob storage for saving all log files. The blob storage must have enough:
One sentence summary: the new backup system introduces a new role, backup worker, to pull mutations from transaction logs and save them, thus removing the burden of saving mutation logs into the database.
The old backup system writes the mutation log to the database itself, thus doubling the write bandwidth usage. Backup agents later fetch mutation logs from the database, upload them to blob storage, and then remove the mutation logs from the database.
This project saves the mutation log to blob storage directly from the FDB cluster, which should almost double the database's write bandwidth when backup is enabled. In FDB, every mutation already has exactly one log router tag, so the idea of the new system is to backup data for each log router tag individually (i.e., saving mutation logs into multiple partitioned logs). During restore time, these partitioned mutation logs are combined together to form a continuous mutation log stream.
Design question 1: Should backup workers be recruited as part of log system or not? There are two design alternatives:
Decision: We choose the second approach for the simplicity of the recruiting process and handling of mapping of LogRouter tags to backup workers.
Design question 2: Place of backup workers on the primary or remote Data Center (DC)? Placing backup workers on the primary side has the advantage of supporting any deployment configurations (single DC, multi DC).
Placing on the remote is desirable to reduce the workload on the primary DC’s transaction logs. Since log routers on the remote side is already pulling mutations from primary DC, backup workers can simply pull from these log routers.
Decision: We choose to recruit backup workers on the primary DC, because not all clusters are configured with multiple DCs and the backup system needs to support all types of deployment.
The design proposed below is based upon the following assumptions:
The requirement of the new backup system raises several design challenges:
Backup Worker: This is a new role introduced in the new backup system. A backup worker is a fdbserver process running inside a FDB cluster, responsible for pulling mutations from transaction logs and saving the mutations to blob storage.
Cluster Controller (CC): The CC is responsible for coordinating the transition of the FDB transaction sub-system from one generation to the next. In particular, the CC recruits backup workers during the recovery.
Transaction Logs (TLogs): The transaction logs make mutations durable to disk for fast commit latencies. The logs receive commits from the commit proxy in version order, and only respond to the commit proxy once the data has been written and fsync'ed to an append only mutation log on disk. Storage servers retrieve mutations from TLogs. Once the storage servers have persisted mutations, storage servers then pop the mutations from the TLogs.
CommitProxy: The commit proxies are responsible for committing transactions, and tracking the storage servers responsible for each range of keys. In the old backup system, commit proxies are responsible to group mutations into backup mutations and write them to the database.
GrvProxy: The GRV proxies are responsible for providing read versions.
From an end-to-end perspective, the new backup system works in the following steps:
fdbbackup command line tool;TaskBucket and system keys);TaskBucket) starts taking snapshots of key ranges in the database;The new backup has five major components: 1) backup workers; 2) recruitment of backup workers; 3) extension of tag partitioned log system to support pseudo tags; 4) integration with existing TaskBucket based backup command interface; and 5) integration with the existing TaskBucket based restore system to read partitioned mutation logs.
Backup worker is a new role introduced in the new backup system. A backup worker is responsible for pulling mutations from transaction logs and saving the mutations to blob storage. Internally, a backup worker maintains a message buffer, which keeps mutations pulled from transaction logs, but have not been saved to blob storage yet. Periodically, the backup worker parses mutations in the message buffer, extracts those mutations that are within user specified key ranges, and then uploads mutation data to blob storage. After data is saved, the backup worker removes these messages from its internal buffer and saves its progress in the database, so that after a failure, a new backup worker starts from the previously saved version.
Backup worker has two modes of operation: no-op mode, and working mode. When there is no active backup in the cluster, backup worker operates in the no-op mode, which simply obtains the recently committed version from Proxies and then pops mutations from transaction logs. After operators submit a new backup request to the cluster, backup workers transition into the working mode that starts pulling mutations from transaction logs and saving the mutation data to blob storage.
In the working mode, the popping of backup workers need to follow a strictly increasing version order. For the same tag, there could be multiple backup workers, each is responsible for a different epoch. These backup workers must coordinating their popping order, otherwise the backup can miss some mutation data. This coordination among backup workers is achieved by deferring popping of a later epoch and only allowing the oldest epoch to pop first. After the oldest epoch has finished, these corresponding backup workers notifies the CC, which will then advances the oldest backup epoch so that the next epoch can proceed the popping.
A subtle issue for a displaced backup worker (i.e., being displaced because a new epoch begins), is that the last pop of the backup worker can cause missing version ranges in mutation logs. This is because the transaction for saving the progress may be delayed during recovery. As a result, the CC could already recruited a new backup worker for the old epoch starting at the previously saved progress version. Then the saving transaction succeeds, and the worker pops mutations that the new backup worker is supposed to save, resulting in missing data for new backup worker’s log. The solution to this problem can be: 1) the old backup worker aborts immediately after knowing itself is displaced, thus not trying to save its progress; or 2) the old backup worker skip its last pop, since the next epoch will pop versions larger than its progress. Because the second approach avoids doing duplicated work in the new epoch, we choose to the second approach.
Finally, multiple concurrent backups are supported. Each backup worker keeps track of current backup jobs and saves mutations to corresponding backup containers for the same batch of mutations.
Backup workers are recruited during cluster recovery as part of log system. The CC recruits a fixed number of backup workers, one for each log router tag. During the recruiting process, the CC sends backup worker initialization request as:
struct InitializeBackupRequest {
UID reqId;
LogEpoch epoch; // epoch this worker is recruited
LogEpoch backupEpoch; // epoch that this worker actually works on
Tag routerTag;
Version startVersion;
Optional<Version> endVersion; // Only present for unfinished old epoch
ReplyPromise<struct BackupInterface> reply;
… // additional methods elided
};
Note we need two epochs here: one for the recruited epoch and one for backing up epoch. The recruited epoch is the epoch of the log system, which is used by a backup worker to find out if it works for the current epoch. If so, the worker should save its progress and immediately exit. The backupEpoch is used for saving progress. The backupEpoch is usually the same as the epoch that the worker is recruited. However, it can be some earlier epoch than the recruiting epoch, signifying that the worker is responsible for data in that earlier epoch. In this case, when the worker is done and exits, the CC should not flag its departure as a trigger of recovery. This is solved by the following protocol:
BackupWorkerDoneRequest to the CC;oldestBackupEpoch;backupEpoch is equal to oldestBackupEpoch, then the worker may start popping from TLogs.Note oldestBackupEpoch is introduced to prevent a backup worker for a newer epoch from popping when there are backup workers for older epochs. Otherwise, these older backup workers may lose data.
The tag partitioned log system is modeled like a FIFO queue, where Proxies push mutations to the queue and Storage Servers or Log Routers pop mutations from the queue. Specifically, consumers of the tag partitioned log system use two operations, peek and pop, to read mutations for a given tag and to pop mutations from the queue. Because Proxies assign each mutation a unique log router tag, the backup system reuses this tag to obtain the whole mutation stream. As a result, each log router tag now has two consumers, a log router and a backup worker.
To support multiple consumers of the log router tag, the peek and pop has been extended to support pseudo tags. In other words, each log router tag can be mapped to multiple pseudo tags. Log routers and Backup workers still peek mutations with the log router tag, but pop with different pseudo tags. Only after both pseudo tags are popped, TLogs can pop the mutations from its internal queue.
Note the introduction of pseudo tags opens the possibility for more usage scenarios. For instance, a change stream can be implemented with a pseudo tag, where the new consumer can look at each mutation and emit mutations on specified key ranges.
TaskBucket based backup command interfaceWe strive to keep the operational interface the same as the old backup system. That is, the new backup is initiated by the client as before with an additional flag. FDB cluster receives the backup request, sees the flag being set, and uses the new system for generating mutation logs.
By default, backup workers are not enabled in the system. When operators submit a new backup request for the first time, the database performs a configuration change (backup_worker_enabled:=1) that enables backup workers.
The operator’s backup request can indicate if an old backup or a new backup is used. This is a command line option (i.e., --partitioned-log-experimental) in the fdbbackup command. A backup request of the new type is started in the following steps:
fdbbackup tool to write the backup range to a system key, i.e., \xff\x02/backupStarted.\xff\x02/backupStarted, see the change, and start logging mutations.fdbbackup tool initiates the backup of all or specified key ranges by issuing a transaction Ts.Compared to the old backup system, the above step 1 and 2 are new and is only triggered if client requests for a new type of backup. The purpose is to allow backup workers to function as no-op if there are no ongoing backups. However, the backup workers should still continuously pop their corresponding tags, otherwise mutations will be kept in the TLog. In order to know the version to pop, backup workers can obtain the read version from any GRV proxy. Because the read version must be a committed version, so popping to this version is safe.
Backup Submission Protocol
Protocol for submitBackup() to ensure that all backup workers of the current epoch have started logging mutations:
submitBackup() call, the task bucket (i.e., StartFullBackupTaskFunc) starts by creating a BackupConfig object in the system key space.\xff\x02/backupStarted key and notices the new backup job. Then the backup worker inserts the new job into its internal queue, and writes to startedBackupWorkers key in the BackupConfig object if the worker’s backupEpoch is the current epoch. Among these workers, the worker with Log Router Tag -2:0 monitors the startedBackupWorkers key, and sets allWorkerStarted key after all workers have updated the startedBackupWorkers key.startedBackupWorkers key and declares the job submission successful.This protocol was implemented after another abandoned protocol: the startedBackupWorkers key is set after all backup workers have saved logs with versions larger than the version of submitBackup() call. This protocol fails if there is already a backup job and there is a backup worker that doesn’t notice the change to the \xff\x02/backupStarted key. As a result, the worker is saving versions larger than the new job’s start version, but in the old backup container. Thus the new container misses some mutations.
Protocol for Determining A Backup is Restorable
-2:0 of current epoch monitors all workers’ progress. If the oldest backup epoch is the current epoch (i.e, there are no backup workers for any old epochs, thus no version ranges missing before this epoch), this worker updates latestBackupWorkerSavedVersion key in the BackupConfig object with the minimum saved version among workers.describeBackup(), which eventually calls getLatestRestorableVersion to read the value from the latestBackupWorkerSavedVersion key. If this version is larger than the first snapshot’s end version, then the backup is restorable.Pause and Resume Backups
The command line for pause or resume backups remains the same, but the implementation for the new backup system is different from the old one. This is because in the old backup system, both mutation logs and range logs are handled by TaskBucket, an asynchronous task scheduling framework that stores states in the FDB database. Thus, the old backup system simply pauses or resumes the TaskBucket. In the new backup system, mutation logs are generated by backup workers, thus the pause or resume command needs to tell all backup workers to pause or resume pulling mutations from TLogs. Specifically,
TaskBucket and \xff\x02/backupPaused key.\xff\x02/backupPaused key and notices the change. Then the backup worker pauses or resumes pulling from TLogs.Backup Container Changes
Partitioned mutation logs are stored in plogs/XXXX/XXXX directory and their names are in the format of log,[startVersion],[endVersion],[UID],[N-of-M],[blockSize], where M is total partition number, N can be any number from 0 to M - 1. In contrast, old mutation logs are stored in logs/XXXX/XXXX directory and are named differently.
To restore a version range, all partitioned logs for the range needs to be available. The restore process should read all partitioned logs, and combine mutations from different logs into one mutation stream, ordered by (commit_version, subsequence) pair. It is guaranteed that all mutations form a total order. Note in the old backup files, there is no subsequence number, as each version’s mutations are serialized in order in one file.
PR #11901 adds RestoreDispatchPartitionedTaskFunc to support restoring from partitioned-log-format backup. At a high level, backup agents will read partitioned mutation logs from the blob store, convert them into the old mutation log format in memory, and write mutations to the target database. All the rest of restore process is the same as before.
Specifically, StartFullRestoreTaskFunc kicks off the process by setting up some variable and start the first RestoreDispatchPartitionedTaskFunc, and each RestoreDispatchPartitionedTaskFunc with an input range [v0, v1) is to orchestrate the restore of both log and range file, then start the next RestoreDispatchPartitionedTaskFunc with an input range [v1, v2), or finish the process if v1 > targetVersion.
There are many RestoreRangeTaskFunc for a single RestoreDispatchPartitionedTaskFunc, each responsible to restore a block of a range file, and one RestoreLogDataPartitionedTaskFunc for a single RestoreDispatchPartitionedTaskFunc. RestoreRangeTaskFunc would restore the keys to their original key while RestoreLogDataPartitionedTaskFunc would restore the keys to an intermediate state with a certain prefix(alog), which can be recognized by the commit proxy later to convert them back from this intermediate format into their original mutations. This is to make sure smaller version mutations do not override later version range file keys.
The Performant Restore System is deprecated and will be removed in the future, since the system is not fault tolerant during the restore. We have made changes to the current restore system to support the new backup file structure and format. This is kept in the code so we have additional coverage for simulation tests of the new backup system.
As discussed above, the new backup system split mutation logs into multiple partitions. Thus, the restore process must verify the backup files are continuous for all partitions with the restore’s version range. This is possible because each log file name has the information about its partition number and the total number of partitions.
Once the restore system verifies the version range is continuous, the restore system needs to filter out duplicated version range among different log files (both log continuity analysis and dedup logic are implemented in BackupContainer abstraction). A given version range may be stored in multiple mutation log files. This can happen because a recruited backup worker can upload mutation files successfully, but doesn’t save the progress before another recovery happens. As a result, the new epoch tries to backup this version range again, producing the same version ranges (though the file names are different).
Finally, the restore system loads the same version’s mutations from all partitions, and then merges these mutations in the order of their subsequence number before they are applied on the restore cluster. Note the mutations in the old backup system lack subsequence numbers. As a result, restoring old backups needs to assign subsequence number to mutations.
The backup system must generate log files that the restore system can apply all the mutations on the backup cluster in the same order exactly once.
Ordering guarantee. To maintain the ordering of mutations, each mutation is stored with its commit version and a subsequence number, both are assigned by Proxies during commit. The restore system can load all mutations and derive a total order among all the mutations.
Completeness guarantee. All mutations should be saved in log files. We cannot allow any mutations missing from the backup. This is guaranteed by the fault tolerance discussed below. Essentially all backup workers checkpoint their progress in the database. After the recovery, the new CC reads previous checkpoints and recruit new backup workers for any missing version ranges.
The old backup file format is documented here. We can’t use this file format, because our backup files are created for log router tags. When there are more than one log routers (almost always the case), the mutations in one transaction can be given different log router tags. As a result, for the same version, mutations are distributed in many files. Another subtle issue is that, there can be two mutations, (e.g., a = 1 and a = 2 in a transaction), which are given two different tags. We have to preserve the order of these two mutations in the restore process. Even though the order is saved in the sub-sequence number of a version, we still need to merge mutations from multiple files and apply them in the correct order.
In the new backup system, mutation log file is named as log,[startVersion],[endVersion],[UID],[N-of-M],[blockSize], where startVersion is inclusive and endVersion is not inclusive, e.g., log,332850851,332938927,7be23c0a3e80df8ab1530fa76fa66980,1-of-4,1048576. With the information from all file names, the restore process can find all files for a version range, i.e., versions intersect with the range and all log router tags. “M” is the total number of tags, and “N” is from 0 to m - 1.Note tagId is not required in the old backup filename, since all mutations for a version are included in one file.
Each file content is a list of fixed size blocks. Each block contains a sequence of mutations, where each mutation consists of a serialized Version, int32_t, int32_t, (all these three numbers are in big endian) and Mutation, where Mutation is of format type|kLen|vLen|Key|Value, where type is the mutation type (e.g., Set or Clear), kLen and vLen respectively are the lengths of the key and value in the mutation. Key and Value are the serialized value of the Key and Value in the mutation. The paddings at the end of the block are bytes of 0xFF.
`<BlockHeader>`
`<Version_1><Subseq_1><Mutation1_len><Mutation1>`
`<Version_2><Subseq_2><Mutation2_len><Mutation2>`
`…`
`<Padding>
`
Note the big Endianness for version is required, as 0xFF is used as the padding to indicate block end. A little endian number can easily be mistaken as the end. In contrast, big endian for version almost guarantee the first byte is not 0xFF (should always be 0x00).
Add a metadata file describe the backup file:
The information can be used to optimize the restore process. For instance, the number of mutations can be used to make better load balancing decisions; if there is no atomic operations, the restore can apply mutation in a backward fashion -- skipping mutations with earlier versions.
Failures of a backup worker will trigger a cluster recovery. After the recovery, the new CC recruits a new set of backup workers. Among them, a new backup worker shall continue the work of the failed backup worker from the previous epoch.
The interesting part is the handling of old epochs, since the backup workers for the old epoch are in the “displaced” state and should exit. So the basic idea is that we need a set of backup workers for the data left in the old epochs. To figure out the set of data not backed up yet, the CC first loads saved backup progress data <Worker_UID, LogEpoch, SavedVersion, Tag, TotalTags> from the database, and then computes for each epoch, what version ranges have not been backed up. For each of the version range and tag, the CC recruits a worker to resume the backup for that version range and tag. Note that this worker has a different worker UID from the worker in the original epoch. As a result, for a given epoch and a tag, there might be multiple progress status, as these workers are recruited at different epochs.
The backup system emits the following metrics:
fdbbackup can show the status of backup, including the size of mutation logs (LogBytes written) and snapshots (RangeBytes written). By taking two consecutive backup status, the backup speed can be estimated as (2nd_LogBytes - 1st_LogBytes) / interval.BackupWorkerMetrics trace events every 5 seconds, which includes SavedVersion, MinKnownCommittedVersion, and MsgQ. The backup delay can be estimated as (MinKnownCommittedVersion - SavedVersion) / 1,000,000 seconds, which is the difference between a worker’s saved version and current committed version, divided by 1M version per second. MsgQ is the queue size of memory buffer of the backup worker.System operator can control the following backup properties:
BACKUP_UPLOAD_DELAY) for saving mutation logs to blob storage;The feature will be tested both in simulation and in real clusters: