metadata-integration/java/docs/sdk-v2/dataflow-entity.md
The DataFlow entity represents a data processing pipeline or workflow in DataHub. It models orchestrated workflows from tools like Apache Airflow, Apache Spark, dbt, Apache Flink, and other data orchestration platforms.
A DataFlow is a logical grouping of data processing tasks that work together to achieve a data transformation or movement goal. DataFlows are typically:
DataFlows serve as parent containers for DataJob entities, representing the overall pipeline while individual jobs represent specific tasks within that pipeline.
DataFlow URNs follow this format:
urn:li:dataFlow:(orchestrator,flowId,cluster)
Components:
Examples:
urn:li:dataFlow:(airflow,customer_etl_daily,prod)
urn:li:dataFlow:(spark,ml_feature_generation,emr-prod-cluster)
urn:li:dataFlow:(dbt,marketing_analytics,prod)
DataFlow dataflow = DataFlow.builder()
.orchestrator("airflow")
.flowId("my_dag_id")
.cluster("prod")
.displayName("My ETL Pipeline")
.description("Daily ETL pipeline for customer data")
.build();
client.entities().upsert(dataflow);
Map<String, String> properties = new HashMap<>();
properties.put("schedule", "0 2 * * *");
properties.put("team", "data-engineering");
properties.put("sla_hours", "4");
DataFlow dataflow = DataFlow.builder()
.orchestrator("airflow")
.flowId("customer_pipeline")
.cluster("prod")
.customProperties(properties)
.build();
| Property | Type | Description | Example |
|---|---|---|---|
orchestrator | String | Platform running the flow (required) | "airflow", "spark", "dbt" |
flowId | String | Unique flow identifier (required) | "my_dag_id", "my_job_name" |
cluster | String | Cluster/environment (required) | "prod", "dev", "prod-us-west-2" |
displayName | String | Human-readable name | "Customer ETL Pipeline" |
description | String | Flow description | "Processes customer data daily" |
| Property | Type | Description |
|---|---|---|
externalUrl | String | Link to flow in orchestration tool |
project | String | Associated project or namespace |
customProperties | Map<String, String> | Key-value metadata |
created | Long | Creation timestamp (milliseconds) |
lastModified | Long | Last modified timestamp (milliseconds) |
// Add owners
dataflow.addOwner("urn:li:corpuser:johndoe", OwnershipType.TECHNICAL_OWNER);
dataflow.addOwner("urn:li:corpuser:analytics_team", OwnershipType.BUSINESS_OWNER);
// Remove owner
dataflow.removeOwner("urn:li:corpuser:johndoe");
// Add tags (with or without "urn:li:tag:" prefix)
dataflow.addTag("etl");
dataflow.addTag("production");
dataflow.addTag("urn:li:tag:pii");
// Remove tag
dataflow.removeTag("etl");
// Add terms
dataflow.addTerm("urn:li:glossaryTerm:ETL");
dataflow.addTerm("urn:li:glossaryTerm:DataPipeline");
// Remove term
dataflow.removeTerm("urn:li:glossaryTerm:ETL");
// Set domain
dataflow.setDomain("urn:li:domain:DataEngineering");
// Remove specific domain
dataflow.removeDomain("urn:li:domain:DataEngineering");
// Or clear all domains
dataflow.clearDomains();
// Add individual properties
dataflow.addCustomProperty("schedule", "0 2 * * *");
dataflow.addCustomProperty("team", "data-engineering");
// Remove property
dataflow.removeCustomProperty("schedule");
// Set all properties (replaces existing)
Map<String, String> props = new HashMap<>();
props.put("key1", "value1");
props.put("key2", "value2");
dataflow.setCustomProperties(props);
// Set description
dataflow.setDescription("Daily ETL pipeline for customer data");
// Set display name
dataflow.setDisplayName("Customer ETL Pipeline");
// Get description
String description = dataflow.getDescription();
// Get display name
String displayName = dataflow.getDisplayName();
// Set external URL
dataflow.setExternalUrl("https://airflow.example.com/dags/my_dag");
// Set project
dataflow.setProject("customer_analytics");
// Set timestamps
dataflow.setCreated(System.currentTimeMillis() - 86400000L); // 1 day ago
dataflow.setLastModified(System.currentTimeMillis());
DataFlow airflowFlow = DataFlow.builder()
.orchestrator("airflow")
.flowId("customer_etl_daily")
.cluster("prod")
.displayName("Customer ETL Pipeline")
.description("Daily pipeline processing customer data from MySQL to Snowflake")
.build();
airflowFlow
.addTag("etl")
.addTag("production")
.addCustomProperty("schedule", "0 2 * * *")
.addCustomProperty("catchup", "false")
.addCustomProperty("max_active_runs", "1")
.setExternalUrl("https://airflow.company.com/dags/customer_etl_daily");
DataFlow sparkFlow = DataFlow.builder()
.orchestrator("spark")
.flowId("ml_feature_generation")
.cluster("emr-prod-cluster")
.displayName("ML Feature Generation Job")
.description("Large-scale Spark job generating ML features")
.build();
sparkFlow
.addTag("spark")
.addTag("machine-learning")
.addCustomProperty("spark.executor.memory", "8g")
.addCustomProperty("spark.driver.memory", "4g")
.addCustomProperty("spark.executor.cores", "4")
.setDomain("urn:li:domain:MachineLearning");
DataFlow dbtFlow = DataFlow.builder()
.orchestrator("dbt")
.flowId("marketing_analytics")
.cluster("prod")
.displayName("Marketing Analytics Models")
.description("dbt transformations for marketing data")
.build();
dbtFlow
.addTag("dbt")
.addTag("transformation")
.addCustomProperty("dbt_version", "1.5.0")
.addCustomProperty("target", "production")
.addCustomProperty("models_count", "87")
.setProject("marketing")
.setExternalUrl("https://github.com/company/dbt-marketing");
DataFlow flinkFlow = DataFlow.builder()
.orchestrator("flink")
.flowId("real_time_fraud_detection")
.cluster("prod-flink-cluster")
.displayName("Real-time Fraud Detection")
.description("Real-time streaming pipeline for fraud detection")
.build();
flinkFlow
.addTag("streaming")
.addTag("real-time")
.addTag("fraud-detection")
.addCustomProperty("parallelism", "16")
.addCustomProperty("checkpoint_interval", "60000")
.setDomain("urn:li:domain:Security");
All mutation methods return this to support method chaining:
DataFlow dataflow = DataFlow.builder()
.orchestrator("airflow")
.flowId("sales_pipeline")
.cluster("prod")
.build();
dataflow
.addTag("etl")
.addTag("production")
.addOwner("urn:li:corpuser:owner1", OwnershipType.TECHNICAL_OWNER)
.addOwner("urn:li:corpuser:owner2", OwnershipType.BUSINESS_OWNER)
.addTerm("urn:li:glossaryTerm:Sales")
.setDomain("urn:li:domain:Sales")
.setDescription("Sales data pipeline")
.addCustomProperty("schedule", "0 2 * * *")
.addCustomProperty("team", "sales-analytics");
client.entities().upsert(dataflow);
DataFlows are parent entities to DataJobs. A DataJob represents a specific task or step within a DataFlow:
// Create the parent DataFlow
DataFlow dataflow = DataFlow.builder()
.orchestrator("airflow")
.flowId("customer_etl")
.cluster("prod")
.build();
client.entities().upsert(dataflow);
// Create child DataJobs that reference the parent flow
DataJob extractJob = DataJob.builder()
.flow(dataflow.getUrn()) // References parent DataFlow
.jobId("extract_customers")
.build();
DataJob transformJob = DataJob.builder()
.flow(dataflow.getUrn())
.jobId("transform_customers")
.build();
client.entities().upsert(extractJob);
client.entities().upsert(transformJob);
This hierarchy allows you to:
Use consistent naming: Keep orchestrator names consistent across your organization (e.g., always use "airflow", not "Airflow" or "AIRFLOW")
Choose appropriate clusters: Use meaningful cluster names that indicate environment and region (e.g., "prod-us-west-2", "staging-eu-central-1")
Add scheduling information: Include schedule expressions in custom properties for batch workflows
Link to source systems: Always set externalUrl to link back to the orchestration tool's UI
Set ownership early: Assign technical and business owners when creating flows
Use tags for categorization: Tag flows by type (etl, streaming, ml), environment (production, staging), and criticality
Document SLAs: Use custom properties to document SLA requirements and alert channels
Track versions: For versioned workflows (like dbt), include version information in custom properties
// Initialize client
DataHubClientConfigV2 config = DataHubClientConfigV2.builder()
.server("http://localhost:8080")
.token(System.getenv("DATAHUB_TOKEN"))
.build();
try (DataHubClientV2 client = new DataHubClientV2(config)) {
// Create comprehensive DataFlow
Map<String, String> customProps = new HashMap<>();
customProps.put("schedule", "0 2 * * *");
customProps.put("catchup", "false");
customProps.put("team", "data-engineering");
customProps.put("sla_hours", "4");
customProps.put("alert_channel", "#data-alerts");
DataFlow dataflow = DataFlow.builder()
.orchestrator("airflow")
.flowId("production_etl_pipeline")
.cluster("prod-us-east-1")
.displayName("Production ETL Pipeline")
.description("Main ETL pipeline for customer data processing")
.customProperties(customProps)
.build();
dataflow
.addTag("etl")
.addTag("production")
.addTag("pii")
.addOwner("urn:li:corpuser:data_eng_team", OwnershipType.TECHNICAL_OWNER)
.addOwner("urn:li:corpuser:product_owner", OwnershipType.BUSINESS_OWNER)
.addTerm("urn:li:glossaryTerm:ETL")
.addTerm("urn:li:glossaryTerm:CustomerData")
.setDomain("urn:li:domain:DataEngineering")
.setProject("customer_analytics")
.setExternalUrl("https://airflow.company.com/dags/production_etl_pipeline")
.setCreated(System.currentTimeMillis() - 86400000L * 30)
.setLastModified(System.currentTimeMillis());
// Upsert to DataHub
client.entities().upsert(dataflow);
System.out.println("Created DataFlow: " + dataflow.getUrn());
}