managed/api-examples/python-simple/disaster_recovery.ipynb
In this notebook, we will go over the disaster recovery APIs in YBA with examples and the steps required to make those API calls. We assume that you have already two universes deployed in your YBA and you know their universe uuids.
All disaster recovery YBA APIs are restricted to only authenticated users, so to use them through API, you first need to get the API token using the following code:
import requests
import os
from pprint import pprint
yba_url = os.getenv("YBA_URL", "http://192.168.56.102:9000")
yba_user = {
"email": os.getenv("YBA_USER_EMAIL", "admin"),
"password": os.getenv("YBA_USER_PASSWORD", "admin"),
}
route = f"{yba_url}/api/v1/api_login"
payload = {
"email": yba_user["email"],
"password": yba_user["password"],
}
response = requests.post(url=route, json=payload).json()
pprint(response)
customer_uuid = response["customerUUID"]
yba_api_token = response["apiToken"]
headers = {"X-AUTH-YW-API-TOKEN": yba_api_token}
Then you can use customer_uuid as a url parameter and pass the yba_api_token in the request header with name X-AUTH-YW-API-TOKEN to show that the user is authenticated.
Disaster recovery uses backup/restore for replicating existing data on the source universe, and a storage config is required to store the backup and then restore from. To get the storage config uuid, use the following code:
route = f"{yba_url}/api/v1/customers/{customer_uuid}/configs"
response = requests.get(url=route, headers=headers).json()
storage_configs = list(filter(lambda config: config["type"] == "STORAGE", response))
if len(storage_configs) < 1:
print("No storage config found")
exit(-1)
storage_config_uuid = storage_configs[0]["configUUID"]
print(storage_config_uuid)
Disaster recovery is only available for YSQL tables and can be set up at the database granularity. You need to pass the list of databases you would like to replicate to the disaster recovery create API. The following is how you can get the list of the databases for a universe:
source_universe_uuid = os.getenv(
"YBA_SOURCE_UNIVERSE_UUID", "0194df05-362d-4b73-b9b9-e0e5b3ad02b5"
)
route = f"{yba_url}/api/v1/customers/{customer_uuid}/universes/{source_universe_uuid}/namespaces"
response = requests.get(url=route, headers=headers).json()
ysql_database_list = [
db
for db in list(
filter(lambda db: db["tableType"] == "PGSQL_TABLE_TYPE" and db["name"] != "yugabyte", response)
)
]
ysql_database_name_list = [db["name"] for db in ysql_database_list]
pprint(ysql_database_name_list)
ysql_database_uuid_list = [db["namespaceUUID"] for db in ysql_database_list]
pprint(ysql_database_uuid_list)
To change the list of tables in replication as part of the disaster recovery config, you the list of the tables UUIDs in the source universe. Please note that although the API to change the tables list in a disaster recovery is at table granularity, but YBA only supports bootstrapping of YSQL tables with DB granularity, so if you would like to add tables to the disaster recovery config that require bootstrapping, you need to pass all the table UUIDs in a database. The following is how you can get the list of the tables for a universe:
route = (f"{yba_url}/api/v1/customers/{customer_uuid}/universes/{source_universe_uuid}/tables"
f"?includeParentTableInfo={str(False).lower()}&onlySupportedForXCluster={str(True).lower()}")
response = requests.get(url=route, headers=headers).json()
ysql_tables = [
table
for table in list(
filter(lambda table: table["tableType"] == "PGSQL_TABLE_TYPE" and table["keySpace"] in ysql_database_name_list, response)
)
]
pprint(ysql_tables)
ysql_table_uuid_list = [table["tableUUID"] for table in ysql_tables]
The disaster recovery APIs will create a task in the backend and returns a task uuid which you can follow to see the progress and the status of the task. You can use the following function to wait for a task:
import time
def waitForTask(task_uuid):
route = f"{yba_url}/api/v1/customers/{customer_uuid}/tasks/{task_uuid}"
while True:
response = requests.get(url=route, headers=headers).json()
status = response["status"]
if status == "Failure":
route = f"{yba_url}/api/customers/{customer_uuid}/tasks/{task_uuid}/failed"
response = requests.get(url=route, headers=headers)
if response is not None:
response = response.json()
if "failedSubTasks" in response:
errors = [
subtask["errorString"] for subtask in response["failedSubTasks"]
]
print(f"Task {task_uuid} failed with the following errors:")
print("\n".join(errors))
else:
pprint(response)
else:
print(
f"Task {task_uuid} failed, but could not get the failure messages"
)
exit(-1)
elif status == "Success":
print(f"Task {task_uuid} finished successfully")
break
print(f"Waiting for task {task_uuid}...")
time.sleep(20)
Now we have all the required information to successfully create a disaster recovery config from universe source_universe_uuid to target_universe_uuid.
target_universe_uuid = os.getenv(
"YBA_TARGET_UNIVERSE_UUID", "dc7940d0-8130-4f04-a004-918dd4f4ff95"
)
route = f"{yba_url}/api/v1/customers/{customer_uuid}/dr_configs"
payload = {
"sourceUniverseUUID": source_universe_uuid,
"targetUniverseUUID": target_universe_uuid,
"name": "my-dr",
"dbs": ysql_database_uuid_list,
"bootstrapParams": {
"backupRequestParams": {"storageConfigUUID": storage_config_uuid},
}
}
response = requests.post(url=route, json=payload, headers=headers).json()
pprint(response)
if "taskUUID" not in response:
print(f"Failed to create the task: {response}")
exit(-1)
waitForTask(response["taskUUID"])
dr_config_uuid = response["resourceUUID"]
You can get the disaster recovery config using its uuid. See the following example.
route = f"{yba_url}/api/v1/customers/{customer_uuid}/dr_configs/{dr_config_uuid}"
response = requests.get(url=route, headers=headers).json()
pprint(response)
You can add/remove tables to/from an existing disaster recovery config. This is useful when you would like to add new tables to your database after the disaster recovery config is set up, or you would like to drop a table from your database. Please note that to drop a table from your database, first you need to remove that table from the disaster recovery config. To modify the tables in replication in a disaster recovery config, you need to pass the list of the tables that you would like to be in replication. In other words, you remove the table UUIDs that you do not want to be replicated, and add the new table uuids you want to replication. See the following example.
# Remove tables.
route = f"{yba_url}/api/v1/customers/{customer_uuid}/dr_configs/{dr_config_uuid}/set_tables"
payload = {
"tables": ysql_table_uuid_list[:-1]
}
response = requests.post(url=route, json=payload, headers=headers).json()
pprint(response)
if "taskUUID" not in response:
print(f"Failed to create the task: {response}")
exit(-1)
waitForTask(response["taskUUID"])
# Add tables.
route = f"{yba_url}/api/v1/customers/{customer_uuid}/dr_configs/{dr_config_uuid}/set_tables"
payload = {
"tables": ysql_table_uuid_list,
"bootstrapParams": {
"backupRequestParams": {"storageConfigUUID": storage_config_uuid},
},
}
response = requests.post(url=route, json=payload, headers=headers).json()
pprint(response)
if "taskUUID" not in response:
print(f"Failed to create the task: {response}")
exit(-1)
waitForTask(response["taskUUID"])
Sometimes, it is required to make changes to the replication group using yb-admin. In these cases, the corresponding disaster recovery config in YBA will not be automatically updated to learn about the yb-admin changes, and a manual synchronization call is required as follows.
Please note that a disaster recovery config named <dr-name>, the corresponding replication group name will be <source-universe-uuid>_--DR-CONFIG-<dr-name>-0.
route = f"{yba_url}/api/v1/customers/{customer_uuid}/dr_configs/{dr_config_uuid}/sync"
response = requests.post(url=route, headers=headers).json()
if "taskUUID" not in response:
print(f"Failed to create the task: {response}")
exit(-1)
waitForTask(response["taskUUID"])
The replication between two universes can break for various reasons including temporary network partitions. In these cases, after the issue is resolved, you can restart replication. You may also include index tables to the replication by restarting the replication for their main tables.
route = f"{yba_url}/api/v1/customers/{customer_uuid}/dr_configs/{dr_config_uuid}/restart"
payload = {
"dbs": ysql_database_uuid_list,
"bootstrapParams": {
"backupRequestParams": {"storageConfigUUID": storage_config_uuid},
},
}
response = requests.post(url=route, json=payload, headers=headers).json()
pprint(response)
if "taskUUID" not in response:
print(f"Failed to create the task: {response}")
exit(-1)
waitForTask(response["taskUUID"])
You may switch over the primary and dr replica universes and then route your application writes to the old dr replica with zero RPO to drill a failover operation. The primaryUniverseUuid field in the payload will be the new primary universe UUID and the drReplicaUniverseUuid field will be the new dr replication universe UUID.
route = f"{yba_url}/api/v1/customers/{customer_uuid}/dr_configs/{dr_config_uuid}/switchover"
payload = {
"primaryUniverseUuid": target_universe_uuid,
"drReplicaUniverseUuid": source_universe_uuid,
}
response = requests.post(url=route, json=payload, headers=headers).json()
pprint(response)
if "taskUUID" not in response:
print(f"Failed to create the task: {response}")
exit(-1)
waitForTask(response["taskUUID"])
In case the current primary universe becomes unavailable, you do a failover operations in order to promote the current dr replica as primary and then route your application traffic to the new primary universe. In a failover operation, some data can be lost. To get an estimate of the amount of data that could be lost, you need to get the current safetime using the following api call.
route = f"{yba_url}/api/v1/customers/{customer_uuid}/dr_configs/{dr_config_uuid}/safetime"
response = requests.get(url=route, headers=headers).json()
pprint(response)
namespace_id_to_safetime_epoch_micros_dict = {safetime["namespaceId"]: safetime["safetimeEpochUs"] for safetime in response["safetimes"]}
If the estimatedDataLossMs and current safetime on the current dr replica sounds good, you may call the following api do a failover operation (please note that in previous section we switched the source and target universes).
route = f"{yba_url}/api/v1/customers/{customer_uuid}/dr_configs/{dr_config_uuid}/failover"
payload = {
"primaryUniverseUuid": source_universe_uuid,
"drReplicaUniverseUuid": target_universe_uuid,
"namespaceIdSafetimeEpochUsMap": namespace_id_to_safetime_epoch_micros_dict
}
response = requests.post(url=route, json=payload, headers=headers).json()
pprint(response)
if "taskUUID" not in response:
print(f"Failed to create the task: {response}")
exit(-1)
waitForTask(response["taskUUID"])
After a failover operation, you could either restart the disaster recovery config to use the old primary universe as the dr replica, or you could use the following API to use a new universe as the dr replica.
new_target_universe_uuid = os.getenv(
"YBA_NEW_TARGET_UNIVERSE_UUID", "fcd8ad18-9130-45d7-b504-fa28d187df05"
)
route = f"{yba_url}/api/v1/customers/{customer_uuid}/dr_configs/{dr_config_uuid}/replace_replica"
payload = {
"primaryUniverseUuid": source_universe_uuid,
"drReplicaUniverseUuid": new_target_universe_uuid,
"bootstrapParams": {
"backupRequestParams": {"storageConfigUUID": storage_config_uuid},
},
}
response = requests.post(url=route, json=payload, headers=headers).json()
pprint(response)
if "taskUUID" not in response:
print(f"Failed to create the task: {response}")
exit(-1)
waitForTask(response["taskUUID"])
You can delete the disaster recovery config so there is no replication relation between the two universes and the dr replica will be in active state. Please note that isForceDelete is useful when one of the universes is not available or there is an issue with the config. In those cases, you pass True and it will ignore errors and delete the config.
route = (f"{yba_url}/api/v1/customers/{customer_uuid}/dr_configs/{dr_config_uuid}"
f"?isForceDelete={str(False).lower()}")
response = requests.delete(url=route, headers=headers).json()
pprint(response)
if "taskUUID" not in response:
print(f"Failed to create the task: {response}")
exit(-1)
waitForTask(response["taskUUID"])