Back to Yugabyte Db

Cross Cluster Replication APIs

managed/api-examples/python-simple/xcluster.ipynb

2026.1.0.0-b2511.3 KB
Original Source

Cross Cluster Replication APIs

In this notebook, we will go over the xCluster APIs in YBA with examples and the steps required to make those API calls. We assume that you have already two universes deployed in your YBA and their universe uuids are known.

Get the API Token

All xCluster replication YBA APIs are restricted to only authenticated users, so to use them through API, you first need to get the API token using the following code:

python
import requests
import os
from pprint import pprint

yba_url = os.getenv("YBA_URL", "http://localhost:9000")
yba_user = {
    "email": os.getenv("YBA_USER_EMAIL", "admin"),
    "password": os.getenv("YBA_USER_PASSWORD", "admin"),
}

route = f"{yba_url}/api/v1/api_login"
payload = {
    "email": yba_user["email"],
    "password": yba_user["password"],
}
response = requests.post(url=route, json=payload).json()
pprint(response)

customer_uuid = response["customerUUID"]
yba_api_token = response["apiToken"]
headers = {"X-AUTH-YW-API-TOKEN": yba_api_token}

Then you can use customer_uuid as a url parameter and pass the yba_api_token in the request header with name X-AUTH-YW-API-TOKEN to show that the user is authenticated.

Get Storage Config UUID

XCluster uses backup/restore for replicating existing data on the source universe, and a storage config is required to store the backup and then restore from. To get the storage config uuid, use the following code:

python
route = f"{yba_url}/api/v1/customers/{customer_uuid}/configs"
response = requests.get(url=route, headers=headers).json()
storage_configs = list(filter(lambda config: config["type"] == "STORAGE", response))
if len(storage_configs) < 1:
    print("No storage config found")
    exit(-1)

storage_config_uuid = storage_configs[0]["configUUID"]
print(storage_config_uuid)

List Tables on Source Universe

XCluster APIs are at the table granularity. You need to pass the list of tables you would like to replicate to the xCluster API request bodies. Please note that although the xCluster APIs are at table granularity, but YBA only supports bootstrapping of YSQL tables with DB granularity, so if you would like to do an xCluster operation that requires bootstrapping of YSQL tables, you need to pass all the table UUIDs in a database. The following is how you can get the list of the tables for a universe:

python
source_universe_uuid = os.getenv(
    "YBA_SOURCE_UNIVERSE_UUID", "33f4166a-685a-489d-9137-c9042cfb8616"
)
route = (f"{yba_url}/api/v1/customers/{customer_uuid}/universes/{source_universe_uuid}/tables"
         f"?includeParentTableInfo={str(False).lower()}&onlySupportedForXCluster={str(True).lower()}")
response = requests.get(url=route, headers=headers).json()
all_ysql_tables_uuid_list = [
    table["tableUUID"]
    for table in list(
        filter(lambda table: table["tableType"] == "PGSQL_TABLE_TYPE", response)
    )
]
pprint(all_ysql_tables_uuid_list)

Waiting For XCluster Tasks

The xCluster APIs will create a task in the backend and returns a task uuid which you can follow to see the progress and the status of the task. You can use the following function to wait for a task:

python
import time

def waitForTask(task_uuid):
    route = f"{yba_url}/api/v1/customers/{customer_uuid}/tasks/{task_uuid}"
    while True:
        response = requests.get(url=route, headers=headers).json()
        status = response["status"]
        if status == "Failure":
            route = f"{yba_url}/api/customers/{customer_uuid}/tasks/{task_uuid}/failed"
            response = requests.get(url=route, headers=headers)
            if response is not None:
                response = response.json()
                if "failedSubTasks" in response:
                    errors = [
                        subtask["errorString"] for subtask in response["failedSubTasks"]
                    ]
                    print(f"Task {task_uuid} failed with the following errors:")
                    print("\n".join(errors))
                else:
                    pprint(response)
            else:
                print(
                    f"Task {task_uuid} failed, but could not get the failure messages"
                )
            exit(-1)
        elif status == "Success":
            print(f"Task {task_uuid} finished successfully")
            break
        print(f"Waiting for task {task_uuid}...")
        time.sleep(20)

Creating XCluster Configs

Now we have all the required information to successfully create a cross cluster replication from universe source_universe_uuid to target_universe_uuid. configType can be either Basic or Txn. Txn provides transactional guarantees while replicating the data. Please note that unless you have good reasons to skip bootstrapping, the list of tables in .tables and .bootstrapParams.tables should be the same.

python
target_universe_uuid = os.getenv(
    "YBA_TARGET_UNIVERSE_UUID", "21c5edb3-6676-4f15-8a52-94f987280823"
)
route = f"{yba_url}/api/v1/customers/{customer_uuid}/xcluster_configs"
payload = {
    "sourceUniverseUUID": source_universe_uuid,
    "targetUniverseUUID": target_universe_uuid,
    "name": "my-xcluster-config",
    "tables": all_ysql_tables_uuid_list,
    "configType": "Basic",  # It could be Basic or Txn.
    "bootstrapParams": {  # You can omit this field to forcefully avoid bootstrapping.
        "tables": all_ysql_tables_uuid_list,
        "allowBootstrap": True,
        "backupRequestParams": {"storageConfigUUID": storage_config_uuid},
    },
}
response = requests.post(url=route, json=payload, headers=headers).json()
pprint(response)
if "taskUUID" not in response:
    print(f"Failed to create the task: {response}")
    exit(-1)

waitForTask(response["taskUUID"])

xcluster_config_uuid = response["resourceUUID"]

Getting XCluster Configs without DB Sync

You can get the xCluster config using its uuid for faster retrieval. See the following example.

python
# Get the xCluster config without syncing with the DB for faster response time
route = f"{yba_url}/api/v1/customers/{customer_uuid}/xcluster_configs/{xcluster_config_uuid}?syncWithDB={str(False).lower()}"
response = requests.get(url=route, headers=headers).json()
pprint(response)

Getting XCluster Configs with DB Sync

You can get the xCluster config Sync with source and target universe. See the following example.

python
# Get the xCluster config with syncing with the DB for the latest data
route = f"{yba_url}/api/v1/customers/{customer_uuid}/xcluster_configs/{xcluster_config_uuid}?syncWithDB={str(True).lower()}"
response = requests.get(url=route, headers=headers).json()
pprint(response)

Modifying Tables in XCluster Configs

You can add/remove tables to/from an existing xCluster config. This is useful when you would like to add new tables to your database after the replication is set up, or you would like to drop a table from your database. Please note that to drop a table from your database, first you need to remove that table from the xCluster config. To modify the tables in replication in an xCluster config, you need to pass the list of the tables that you would like to be in replication. In other words, you remove the table UUIDs that you do not want to be replicated, and add the new table uuids you want to replication. See the following example.

python
# Remove tables.
route = f"{yba_url}/api/v1/customers/{customer_uuid}/xcluster_configs/{xcluster_config_uuid}"
payload = {
    "tables": all_ysql_tables_uuid_list[:-2],
}
response = requests.put(url=route, json=payload, headers=headers).json()
pprint(response)
if "taskUUID" not in response:
    print(f"Failed to create the task: {response}")
    exit(-1)

waitForTask(response["taskUUID"])

# Add tables.
route = f"{yba_url}/api/v1/customers/{customer_uuid}/xcluster_configs/{xcluster_config_uuid}"
payload = {
    "tables": all_ysql_tables_uuid_list,
    "bootstrapParams": {  # You can omit this field to forcefully avoid bootstrapping.
        "tables": all_ysql_tables_uuid_list,
        "allowBootstrap": True,
        "backupRequestParams": {"storageConfigUUID": storage_config_uuid},
    },
}
response = requests.put(url=route, json=payload, headers=headers).json()
pprint(response)
if "taskUUID" not in response:
    print(f"Failed to create the task: {response}")
    exit(-1)

waitForTask(response["taskUUID"])

Reconciling XCluster Configs with YBDB State

Sometimes, it is required to make changes to the replication group using yb-admin. In these cases, the corresponding xCluster config in YBA will not be automatically updated to learn about the yb-admin changes, and a manual synchronization call is required as follows:

python
route = (
    f"{yba_url}/api/v1/customers/{customer_uuid}/xcluster_configs/{xcluster_config_uuid}/sync"
)
response = requests.post(url=route, headers=headers).json()
if "taskUUID" not in response:
    print(f"Failed to create the task: {response}")
    exit(-1)

waitForTask(response["taskUUID"])

Restarting XCluster Configs

The replication between two universes can break for various reasons including temporary network partitions. In these cases, after the issue is resolved, you can restart replication. You may also include index tables to the replication by restarting the replication for their main tables.

python
route = f"{yba_url}/api/v1/customers/{customer_uuid}/xcluster_configs/{xcluster_config_uuid}"
payload = {
    "tables": all_ysql_tables_uuid_list,
    "bootstrapParams": {
        "allowBootstrap": True,
        "backupRequestParams": {"storageConfigUUID": storage_config_uuid},
    },
}
response = requests.post(url=route, json=payload, headers=headers).json()
pprint(response)
if "taskUUID" not in response:
    print(f"Failed to create the task: {response}")
    exit(-1)

waitForTask(response["taskUUID"])

Pausing/Resuming XCluster Configs

You can pause the replication for some time and then resume it. Please note that if the replication is paused for an extended period of time, a replication restart is required.

python
# Pause the replication.
route = f"{yba_url}/api/v1/customers/{customer_uuid}/xcluster_configs/{xcluster_config_uuid}"
payload = {"status": "Paused"}
response = requests.put(url=route, json=payload, headers=headers).json()
pprint(response)
if "taskUUID" not in response:
    print(f"Failed to create the task: {response}")
    exit(-1)

waitForTask(response["taskUUID"])

# Resume the replication.
route = f"{yba_url}/api/v1/customers/{customer_uuid}/xcluster_configs/{xcluster_config_uuid}"
payload = {"status": "Running"}
response = requests.put(url=route, json=payload, headers=headers).json()
pprint(response)
if "taskUUID" not in response:
    print(f"Failed to create the task: {response}")
    exit(-1)

waitForTask(response["taskUUID"])

Deleting XCluster Configs

You can delete the xCluster config so there is no replication relation between the two universes. Please note that isForceDelete is useful when one of the universes is not available or there is an issue with the config. In those cases, you pass True and it will ignore errors and delete the config.

python
route = (f"{yba_url}/api/v1/customers/{customer_uuid}/xcluster_configs/{xcluster_config_uuid}"
         f"?isForceDelete={str(True).lower()}")
response = requests.delete(url=route, headers=headers).json()
pprint(response)
if "taskUUID" not in response:
    print(f"Failed to create the task: {response}")
    exit(-1)

waitForTask(response["taskUUID"])