managed/api-examples/python-simple/xcluster.ipynb
In this notebook, we will go over the xCluster APIs in YBA with examples and the steps required to make those API calls. We assume that you have already two universes deployed in your YBA and their universe uuids are known.
All xCluster replication YBA APIs are restricted to only authenticated users, so to use them through API, you first need to get the API token using the following code:
import requests
import os
from pprint import pprint
yba_url = os.getenv("YBA_URL", "http://localhost:9000")
yba_user = {
"email": os.getenv("YBA_USER_EMAIL", "admin"),
"password": os.getenv("YBA_USER_PASSWORD", "admin"),
}
route = f"{yba_url}/api/v1/api_login"
payload = {
"email": yba_user["email"],
"password": yba_user["password"],
}
response = requests.post(url=route, json=payload).json()
pprint(response)
customer_uuid = response["customerUUID"]
yba_api_token = response["apiToken"]
headers = {"X-AUTH-YW-API-TOKEN": yba_api_token}
Then you can use customer_uuid as a url parameter and pass the yba_api_token in the request header with name X-AUTH-YW-API-TOKEN to show that the user is authenticated.
XCluster uses backup/restore for replicating existing data on the source universe, and a storage config is required to store the backup and then restore from. To get the storage config uuid, use the following code:
route = f"{yba_url}/api/v1/customers/{customer_uuid}/configs"
response = requests.get(url=route, headers=headers).json()
storage_configs = list(filter(lambda config: config["type"] == "STORAGE", response))
if len(storage_configs) < 1:
print("No storage config found")
exit(-1)
storage_config_uuid = storage_configs[0]["configUUID"]
print(storage_config_uuid)
XCluster APIs are at the table granularity. You need to pass the list of tables you would like to replicate to the xCluster API request bodies. Please note that although the xCluster APIs are at table granularity, but YBA only supports bootstrapping of YSQL tables with DB granularity, so if you would like to do an xCluster operation that requires bootstrapping of YSQL tables, you need to pass all the table UUIDs in a database. The following is how you can get the list of the tables for a universe:
source_universe_uuid = os.getenv(
"YBA_SOURCE_UNIVERSE_UUID", "33f4166a-685a-489d-9137-c9042cfb8616"
)
route = (f"{yba_url}/api/v1/customers/{customer_uuid}/universes/{source_universe_uuid}/tables"
f"?includeParentTableInfo={str(False).lower()}&onlySupportedForXCluster={str(True).lower()}")
response = requests.get(url=route, headers=headers).json()
all_ysql_tables_uuid_list = [
table["tableUUID"]
for table in list(
filter(lambda table: table["tableType"] == "PGSQL_TABLE_TYPE", response)
)
]
pprint(all_ysql_tables_uuid_list)
The xCluster APIs will create a task in the backend and returns a task uuid which you can follow to see the progress and the status of the task. You can use the following function to wait for a task:
import time
def waitForTask(task_uuid):
route = f"{yba_url}/api/v1/customers/{customer_uuid}/tasks/{task_uuid}"
while True:
response = requests.get(url=route, headers=headers).json()
status = response["status"]
if status == "Failure":
route = f"{yba_url}/api/customers/{customer_uuid}/tasks/{task_uuid}/failed"
response = requests.get(url=route, headers=headers)
if response is not None:
response = response.json()
if "failedSubTasks" in response:
errors = [
subtask["errorString"] for subtask in response["failedSubTasks"]
]
print(f"Task {task_uuid} failed with the following errors:")
print("\n".join(errors))
else:
pprint(response)
else:
print(
f"Task {task_uuid} failed, but could not get the failure messages"
)
exit(-1)
elif status == "Success":
print(f"Task {task_uuid} finished successfully")
break
print(f"Waiting for task {task_uuid}...")
time.sleep(20)
Now we have all the required information to successfully create a cross cluster replication from universe source_universe_uuid to target_universe_uuid. configType can be either Basic or Txn. Txn provides transactional guarantees while replicating the data.
Please note that unless you have good reasons to skip bootstrapping, the list of tables in .tables and .bootstrapParams.tables should be the same.
target_universe_uuid = os.getenv(
"YBA_TARGET_UNIVERSE_UUID", "21c5edb3-6676-4f15-8a52-94f987280823"
)
route = f"{yba_url}/api/v1/customers/{customer_uuid}/xcluster_configs"
payload = {
"sourceUniverseUUID": source_universe_uuid,
"targetUniverseUUID": target_universe_uuid,
"name": "my-xcluster-config",
"tables": all_ysql_tables_uuid_list,
"configType": "Basic", # It could be Basic or Txn.
"bootstrapParams": { # You can omit this field to forcefully avoid bootstrapping.
"tables": all_ysql_tables_uuid_list,
"allowBootstrap": True,
"backupRequestParams": {"storageConfigUUID": storage_config_uuid},
},
}
response = requests.post(url=route, json=payload, headers=headers).json()
pprint(response)
if "taskUUID" not in response:
print(f"Failed to create the task: {response}")
exit(-1)
waitForTask(response["taskUUID"])
xcluster_config_uuid = response["resourceUUID"]
You can get the xCluster config using its uuid for faster retrieval. See the following example.
# Get the xCluster config without syncing with the DB for faster response time
route = f"{yba_url}/api/v1/customers/{customer_uuid}/xcluster_configs/{xcluster_config_uuid}?syncWithDB={str(False).lower()}"
response = requests.get(url=route, headers=headers).json()
pprint(response)
You can get the xCluster config Sync with source and target universe. See the following example.
# Get the xCluster config with syncing with the DB for the latest data
route = f"{yba_url}/api/v1/customers/{customer_uuid}/xcluster_configs/{xcluster_config_uuid}?syncWithDB={str(True).lower()}"
response = requests.get(url=route, headers=headers).json()
pprint(response)
You can add/remove tables to/from an existing xCluster config. This is useful when you would like to add new tables to your database after the replication is set up, or you would like to drop a table from your database. Please note that to drop a table from your database, first you need to remove that table from the xCluster config. To modify the tables in replication in an xCluster config, you need to pass the list of the tables that you would like to be in replication. In other words, you remove the table UUIDs that you do not want to be replicated, and add the new table uuids you want to replication. See the following example.
# Remove tables.
route = f"{yba_url}/api/v1/customers/{customer_uuid}/xcluster_configs/{xcluster_config_uuid}"
payload = {
"tables": all_ysql_tables_uuid_list[:-2],
}
response = requests.put(url=route, json=payload, headers=headers).json()
pprint(response)
if "taskUUID" not in response:
print(f"Failed to create the task: {response}")
exit(-1)
waitForTask(response["taskUUID"])
# Add tables.
route = f"{yba_url}/api/v1/customers/{customer_uuid}/xcluster_configs/{xcluster_config_uuid}"
payload = {
"tables": all_ysql_tables_uuid_list,
"bootstrapParams": { # You can omit this field to forcefully avoid bootstrapping.
"tables": all_ysql_tables_uuid_list,
"allowBootstrap": True,
"backupRequestParams": {"storageConfigUUID": storage_config_uuid},
},
}
response = requests.put(url=route, json=payload, headers=headers).json()
pprint(response)
if "taskUUID" not in response:
print(f"Failed to create the task: {response}")
exit(-1)
waitForTask(response["taskUUID"])
Sometimes, it is required to make changes to the replication group using yb-admin. In these cases, the corresponding xCluster config in YBA will not be automatically updated to learn about the yb-admin changes, and a manual synchronization call is required as follows:
route = (
f"{yba_url}/api/v1/customers/{customer_uuid}/xcluster_configs/{xcluster_config_uuid}/sync"
)
response = requests.post(url=route, headers=headers).json()
if "taskUUID" not in response:
print(f"Failed to create the task: {response}")
exit(-1)
waitForTask(response["taskUUID"])
The replication between two universes can break for various reasons including temporary network partitions. In these cases, after the issue is resolved, you can restart replication. You may also include index tables to the replication by restarting the replication for their main tables.
route = f"{yba_url}/api/v1/customers/{customer_uuid}/xcluster_configs/{xcluster_config_uuid}"
payload = {
"tables": all_ysql_tables_uuid_list,
"bootstrapParams": {
"allowBootstrap": True,
"backupRequestParams": {"storageConfigUUID": storage_config_uuid},
},
}
response = requests.post(url=route, json=payload, headers=headers).json()
pprint(response)
if "taskUUID" not in response:
print(f"Failed to create the task: {response}")
exit(-1)
waitForTask(response["taskUUID"])
You can pause the replication for some time and then resume it. Please note that if the replication is paused for an extended period of time, a replication restart is required.
# Pause the replication.
route = f"{yba_url}/api/v1/customers/{customer_uuid}/xcluster_configs/{xcluster_config_uuid}"
payload = {"status": "Paused"}
response = requests.put(url=route, json=payload, headers=headers).json()
pprint(response)
if "taskUUID" not in response:
print(f"Failed to create the task: {response}")
exit(-1)
waitForTask(response["taskUUID"])
# Resume the replication.
route = f"{yba_url}/api/v1/customers/{customer_uuid}/xcluster_configs/{xcluster_config_uuid}"
payload = {"status": "Running"}
response = requests.put(url=route, json=payload, headers=headers).json()
pprint(response)
if "taskUUID" not in response:
print(f"Failed to create the task: {response}")
exit(-1)
waitForTask(response["taskUUID"])
You can delete the xCluster config so there is no replication relation between the two universes. Please note that isForceDelete is useful when one of the universes is not available or there is an issue with the config. In those cases, you pass True and it will ignore errors and delete the config.
route = (f"{yba_url}/api/v1/customers/{customer_uuid}/xcluster_configs/{xcluster_config_uuid}"
f"?isForceDelete={str(True).lower()}")
response = requests.delete(url=route, headers=headers).json()
pprint(response)
if "taskUUID" not in response:
print(f"Failed to create the task: {response}")
exit(-1)
waitForTask(response["taskUUID"])