docs/api/tutorials/sdk/bulk-assertions-sdk.md
import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem';
This guide specifically covers how to use the DataHub Cloud Python SDK for bulk creating smart assertions, including:
This is particularly useful for applying data quality checks across many tables and columns at scale.
Bulk creating assertions with the Python SDK allows you to:
You need:
pip install acryl-datahub-cloud)The actor making API calls must have the Edit Assertions and Edit Monitors privileges for the datasets at hand.
:::note Before creating assertions, you need to ensure the target datasets are already present in your DataHub instance. If you attempt to create assertions for entities that do not exist, GMS will continuously report errors to the logs. :::
This guide will show you how to programmatically create large numbers of smart assertions using the DataHub Cloud Python SDK.
The bulk assertion creation process follows these steps:
Connect to your DataHub instance:
from datahub.sdk import DataHubClient
client = DataHubClient(server="<your_server>", token="<your_token>")
http://localhost:8080https://<your_datahub_url>/gmsAlternatively, initialize via using the from_env() method after setting the DATAHUB_GMS_URL and DATAHUB_GMS_TOKEN env vars or by creating a ~/.datahubenv file via running datahub init.
from datahub.sdk import DataHubClient
client = DataHubClient.from_env()
from datahub.metadata.urns import DatasetUrn
# Define specific tables you want to add assertions to
table_urns = [
"urn:li:dataset:(urn:li:dataPlatform:snowflake,database.schema.users,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:snowflake,database.schema.orders,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:snowflake,database.schema.products,PROD)",
]
# Convert to DatasetUrn objects
datasets = [DatasetUrn.from_string(urn) for urn in table_urns]
For comprehensive search capabilities and filter options, see the Search API documentation.
from datahub.sdk.search_filters import FilterDsl
from datahub.metadata.urns import DatasetUrn
# Search for tables matching criteria
def find_tables_by_pattern(client, platform="snowflake", name_pattern="production_*"):
"""Find tables matching a specific pattern."""
# Create filters for datasets on a specific platform with name pattern
filters = FilterDsl.and_(
FilterDsl.entity_type("dataset"),
FilterDsl.platform(platform),
FilterDsl.custom_filter("name", "EQUAL", [name_pattern])
)
# Use the search client to find matching datasets
urns = list(client.search.get_urns(filter=filters))
return [DatasetUrn.from_string(str(urn)) for urn in urns]
# Use the search function
datasets = find_tables_by_pattern(client, platform="snowflake", name_pattern="production_*")
def find_tables_by_tag(client, tag_name="critical"):
"""Find tables with a specific tag."""
# Create filters for datasets with a specific tag
filters = FilterDsl.and_(
FilterDsl.entity_type("dataset"),
FilterDsl.custom_filter("tags", "EQUAL", [f"urn:li:tag:{tag_name}"])
)
# Use the search client to find matching datasets
urns = list(client.search.get_urns(filter=filters))
return [DatasetUrn.from_string(str(urn)) for urn in urns]
# Find all tables tagged as "critical"
critical_datasets = find_tables_by_tag(client, "critical")
# Storage for assertion URNs (for later updates)
assertion_registry = {
"freshness": {},
"volume": {},
"smart_sql": {},
"column_metrics": {}
}
def create_freshness_assertions(datasets, client, registry):
"""Create smart freshness assertions for multiple datasets."""
for dataset_urn in datasets:
try:
# Create smart freshness assertion
freshness_assertion = client.assertions.sync_smart_freshness_assertion(
dataset_urn=dataset_urn,
display_name=f"Freshness Anomaly Monitor",
# Detection mechanism - information_schema is recommended
detection_mechanism="information_schema",
# Smart sensitivity setting
sensitivity="medium", # options: "low", "medium", "high"
# Tags for grouping (supports urns or plain tag names!)
tags=["automated", "freshness", "data_quality"],
# Enable the assertion
enabled=True
)
# Store the assertion URN for future reference
registry["freshness"][str(dataset_urn)] = str(freshness_assertion.urn)
print(f"✅ Created freshness assertion for {dataset_urn.name}: {freshness_assertion.urn}")
except Exception as e:
print(f"❌ Failed to create freshness assertion for {dataset_urn.name}: {e}")
# Create freshness assertions for all datasets
create_freshness_assertions(datasets, client, assertion_registry)
def create_volume_assertions(datasets, client, registry):
"""Create smart volume assertions for multiple datasets."""
for dataset_urn in datasets:
try:
# Create smart volume assertion
volume_assertion = client.assertions.sync_smart_volume_assertion(
dataset_urn=dataset_urn,
display_name=f"Smart Volume Check",
# Detection mechanism options
detection_mechanism="information_schema",
# Smart sensitivity setting
sensitivity="medium",
# Tags for grouping
tags=["automated", "volume", "data_quality"],
# Schedule (optional - defaults to hourly)
schedule="0 */6 * * *", # Every 6 hours
# Enable the assertion
enabled=True
)
# Store the assertion URN
registry["volume"][str(dataset_urn)] = str(volume_assertion.urn)
print(f"✅ Created volume assertion for {dataset_urn.name}: {volume_assertion.urn}")
except Exception as e:
print(f"❌ Failed to create volume assertion for {dataset_urn.name}: {e}")
# Create volume assertions for all datasets
create_volume_assertions(datasets, client, assertion_registry)
def create_smart_sql_assertions(datasets, client, registry):
"""Create smart SQL assertions for multiple datasets."""
# Define SQL queries to run on each table
sql_queries = {
"row_count": "SELECT COUNT(*) FROM {table_name}",
"null_check": "SELECT COUNT(*) FROM {table_name} WHERE id IS NULL",
"active_records": "SELECT COUNT(*) FROM {table_name} WHERE status = 'active'",
}
for dataset_urn in datasets:
registry["smart_sql"][str(dataset_urn)] = {}
for query_name, query_template in sql_queries.items():
try:
# Build the query with the table name
table_name = dataset_urn.name
statement = query_template.format(table_name=table_name)
# Create smart SQL assertion
sql_assertion = client.assertions.sync_smart_sql_assertion(
dataset_urn=dataset_urn,
display_name=f"Smart SQL - {query_name}",
statement=statement,
# AI-powered sensitivity setting
sensitivity="medium", # options: "low", "medium", "high"
# Tags for grouping
tags=["automated", "smart_sql", query_name],
# Schedule
schedule="0 */6 * * *", # Every 6 hours
# Enable the assertion
enabled=True
)
# Store the assertion URN
registry["smart_sql"][str(dataset_urn)][query_name] = str(sql_assertion.urn)
print(f"✅ Created smart SQL assertion '{query_name}' for {dataset_urn.name}: {sql_assertion.urn}")
except Exception as e:
print(f"❌ Failed to create smart SQL assertion '{query_name}' for {dataset_urn.name}: {e}")
# Create smart SQL assertions for all datasets
create_smart_sql_assertions(datasets, client, assertion_registry)
def get_dataset_columns(client, dataset_urn):
"""Get column information for a dataset."""
try:
# Get dataset using the entities client
dataset = client.entities.get(dataset_urn)
if dataset and hasattr(dataset, 'schema') and dataset.schema:
return [
{
"name": field.field_path,
"type": field.native_data_type,
"nullable": field.nullable if hasattr(field, 'nullable') else True
}
for field in dataset.schema.fields
]
return []
except Exception as e:
print(f"❌ Failed to get columns for {dataset_urn}: {e}")
return []
# Get columns for each dataset
dataset_columns = {}
for dataset_urn in datasets:
columns = get_dataset_columns(client, dataset_urn)
dataset_columns[str(dataset_urn)] = columns
print(f"📊 Found {len(columns)} columns in {dataset_urn.name}")
def create_column_assertions(datasets, columns_dict, client, registry):
"""Create smart column metric assertions for multiple datasets and columns."""
# Define rules for which columns should get which assertions
assertion_rules = {
# Null count checks for critical columns
"null_checks": {
"column_patterns": ["id", "*_id", "user_id", "email"],
"metric_type": "null_count",
},
# Unique count checks for ID columns
"unique_checks": {
"column_patterns": ["*_id", "email", "username"],
"metric_type": "unique_count",
},
# Empty count checks for string columns
"empty_checks": {
"column_patterns": ["name", "description", "title"],
"metric_type": "empty_count",
},
}
for dataset_urn in datasets:
dataset_key = str(dataset_urn)
columns = columns_dict.get(dataset_key, [])
if not columns:
print(f"⚠️ No columns found for {dataset_urn.name}")
continue
registry["column_metrics"][dataset_key] = {}
for column in columns:
column_name = column["name"]
column_type = column["type"].upper()
# Apply assertion rules based on column name and type
for rule_name, rule_config in assertion_rules.items():
if should_apply_rule(column_name, column_type, rule_config):
try:
assertion = client.assertions.sync_smart_column_metric_assertion(
dataset_urn=dataset_urn,
column_name=column_name,
metric_type=rule_config["metric_type"],
display_name=f"{rule_name.replace('_', ' ').title()} - {column_name}",
# Detection mechanism for column metrics
detection_mechanism="all_rows_query_datahub_dataset_profile",
# Tags (plain names automatically converted to URNs)
tags=["automated", "column_quality", rule_name],
enabled=True
)
# Store assertion URN
if column_name not in registry["column_metrics"][dataset_key]:
registry["column_metrics"][dataset_key][column_name] = {}
registry["column_metrics"][dataset_key][column_name][rule_name] = str(assertion.urn)
print(f"✅ Created {rule_name} assertion for {dataset_urn.name}.{column_name}")
except Exception as e:
print(f"❌ Failed to create {rule_name} assertion for {dataset_urn.name}.{column_name}: {e}")
def should_apply_rule(column_name, column_type, rule_config):
"""Determine if a rule should be applied to a column."""
import fnmatch
# Check column name patterns
for pattern in rule_config["column_patterns"]:
if fnmatch.fnmatch(column_name.lower(), pattern.lower()):
return True
# Add type-based rules if needed
if rule_config.get("column_types"):
return any(col_type in column_type for col_type in rule_config["column_types"])
return False
# Create column assertions
create_column_assertions(datasets, dataset_columns, client, assertion_registry)
Reference the Subscriptions SDK for more information on how to create subscriptions on Datasets or Assertions.
:::note When creating subscriptions in bulk, you must perform the operation in a single thread to avoid race conditions. Additionally, we recommend creating subscriptions at the dataset level rather than for individual assertions, as this makes ongoing management much easier. :::
import json
from datetime import datetime
def save_assertion_registry(registry, filename=None):
"""Save assertion URNs to a file for future reference."""
if filename is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"assertion_registry_{timestamp}.json"
# Add metadata
registry_with_metadata = {
"created_at": datetime.now().isoformat(),
"total_assertions": {
"freshness": len(registry["freshness"]),
"volume": len(registry["volume"]),
"column_metrics": sum(
len(cols) for cols in registry["column_metrics"].values()
)
},
"assertions": registry
}
with open(filename, 'w') as f:
json.dump(registry_with_metadata, f, indent=2)
print(f"💾 Saved assertion registry to {filename}")
return filename
# Save the registry
registry_file = save_assertion_registry(assertion_registry)
def load_assertion_registry(filename):
"""Load assertion URNs from a previously saved file."""
with open(filename, 'r') as f:
data = json.load(f)
return data["assertions"]
# Later, load for updates
# assertion_registry = load_assertion_registry("assertion_registry_20240101_120000.json")
def update_existing_assertions(registry, client):
"""Update existing assertions using stored URNs."""
# Update freshness assertions
for dataset_urn, assertion_urn in registry["freshness"].items():
try:
updated_assertion = client.assertions.sync_smart_freshness_assertion(
dataset_urn=dataset_urn,
urn=assertion_urn, # Provide existing URN for updates
# Update any parameters as needed
sensitivity="high", # Change sensitivity
tags=["automated", "freshness", "data_quality", "updated"],
enabled=True
)
print(f"🔄 Updated freshness assertion {assertion_urn}")
except Exception as e:
print(f"❌ Failed to update freshness assertion {assertion_urn}: {e}")
# Update assertions when needed
# update_existing_assertions(assertion_registry, client)
def create_conditional_assertions(datasets, client):
"""Create assertions based on dataset metadata conditions."""
for dataset_urn in datasets:
try:
# Get dataset metadata
dataset = client.entities.get(dataset_urn)
# Check if dataset has specific tags
if dataset.tags and any("critical" in str(tag.tag) for tag in dataset.tags):
# Create more stringent assertions for critical datasets
client.assertions.sync_smart_freshness_assertion(
dataset_urn=dataset_urn,
sensitivity="high",
detection_mechanism="information_schema",
tags=["critical", "automated", "freshness"]
)
# Check dataset size and apply appropriate volume checks
if dataset.dataset_properties:
# Create different volume assertions based on table characteristics
pass
except Exception as e:
print(f"❌ Error processing {dataset_urn}: {e}")
import time
from typing import List, Dict, Any
def batch_create_assertions(
datasets: List[DatasetUrn],
client: DataHubClient,
batch_size: int = 10,
delay_seconds: float = 1.0
) -> Dict[str, Any]:
"""Create assertions in batches with error handling and rate limiting."""
results = {
"successful": [],
"failed": [],
"total_processed": 0
}
for i in range(0, len(datasets), batch_size):
batch = datasets[i:i + batch_size]
print(f"Processing batch {i//batch_size + 1}: {len(batch)} datasets")
for dataset_urn in batch:
try:
# Create assertion
assertion = client.assertions.sync_smart_freshness_assertion(
dataset_urn=dataset_urn,
tags=["batch_created", "automated"],
enabled=True
)
results["successful"].append({
"dataset_urn": str(dataset_urn),
"assertion_urn": str(assertion.urn)
})
except Exception as e:
results["failed"].append({
"dataset_urn": str(dataset_urn),
"error": str(e)
})
results["total_processed"] += 1
# Rate limiting between batches
if i + batch_size < len(datasets):
time.sleep(delay_seconds)
return results
# Use batch processing
batch_results = batch_create_assertions(datasets, client, batch_size=5)
print(f"Batch results: {batch_results['total_processed']} processed, "
f"{len(batch_results['successful'])} successful, "
f"{len(batch_results['failed'])} failed")
["automated", "freshness", "critical"]"my_tag" → "urn:li:tag:my_tag"Our backend is designed to handle large scale operations. However, since writes are submitted asynchronously onto a Kafka queue, you may experience significant delays in the operations being applied. If you run into any issues, here are some tips that may help:
#!/usr/bin/env python3
"""
Complete example script for bulk creating smart assertions.
"""
import json
import time
from datetime import datetime
from typing import List, Dict, Any
from datahub.sdk import DataHubClient
from datahub.ingestion.graph.client import DataHubGraph
from datahub.metadata.urns import DatasetUrn
def main():
# Initialize the DataHub client
client = DataHubClient(
server="https://your-datahub-instance.com",
token="your-access-token",
)
# The client provides both search and entity access
# Define target datasets
table_urns = [
"urn:li:dataset:(urn:li:dataPlatform:snowflake,prod.analytics.users,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:snowflake,prod.analytics.orders,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:snowflake,prod.analytics.products,PROD)",
]
datasets = [DatasetUrn.from_string(urn) for urn in table_urns]
# Registry to store assertion URNs
assertion_registry = {
"freshness": {},
"volume": {},
"column_metrics": {}
}
print(f"🚀 Starting bulk assertion creation for {len(datasets)} datasets")
# Step 1: Create table-level assertions
print("\n📋 Creating freshness assertions...")
create_freshness_assertions(datasets, client, assertion_registry)
print("\n📊 Creating volume assertions...")
create_volume_assertions(datasets, client, assertion_registry)
# Step 2: Get column information and create column assertions
print("\n🔍 Analyzing columns and creating column assertions...")
dataset_columns = {}
for dataset_urn in datasets:
columns = get_dataset_columns(client, dataset_urn)
dataset_columns[str(dataset_urn)] = columns
create_column_assertions(datasets, dataset_columns, client, assertion_registry)
# Step 3: Save results
print("\n💾 Saving assertion registry...")
registry_file = save_assertion_registry(assertion_registry)
# Summary
total_assertions = (
len(assertion_registry["freshness"]) +
len(assertion_registry["volume"]) +
sum(len(cols) for cols in assertion_registry["column_metrics"].values())
)
print(f"\n✅ Bulk assertion creation complete!")
print(f" 📈 Total assertions created: {total_assertions}")
print(f" 🕐 Freshness assertions: {len(assertion_registry['freshness'])}")
print(f" 📊 Volume assertions: {len(assertion_registry['volume'])}")
print(f" 🎯 Column assertions: {sum(len(cols) for cols in assertion_registry['column_metrics'].values())}")
print(f" 💾 Registry saved to: {registry_file}")
if __name__ == "__main__":
main()
This guide provides a comprehensive approach to bulk creating smart assertions using the DataHub Cloud Python SDK. The new tag name auto-conversion feature makes it easier to organize and manage your assertions with simple, readable tag names that are automatically converted to proper URN format.