Back to Datahub

DataFlow Entity

metadata-integration/java/docs/sdk-v2/dataflow-entity.md

1.5.0.411.9 KB
Original Source

DataFlow Entity

The DataFlow entity represents a data processing pipeline or workflow in DataHub. It models orchestrated workflows from tools like Apache Airflow, Apache Spark, dbt, Apache Flink, and other data orchestration platforms.

Overview

A DataFlow is a logical grouping of data processing tasks that work together to achieve a data transformation or movement goal. DataFlows are typically:

  • Scheduled workflows (Airflow DAGs)
  • Batch processing jobs (Spark applications)
  • Transformation projects (dbt projects)
  • Streaming pipelines (Flink jobs)

DataFlows serve as parent containers for DataJob entities, representing the overall pipeline while individual jobs represent specific tasks within that pipeline.

URN Structure

DataFlow URNs follow this format:

urn:li:dataFlow:(orchestrator,flowId,cluster)

Components:

  • orchestrator: The platform or tool running the workflow (e.g., "airflow", "spark", "dbt", "flink")
  • flowId: The unique identifier for the flow within the orchestrator (e.g., DAG ID, job name, project name)
  • cluster: The cluster or environment where the flow runs (e.g., "prod", "prod-us-west-2", "emr-cluster")

Examples:

urn:li:dataFlow:(airflow,customer_etl_daily,prod)
urn:li:dataFlow:(spark,ml_feature_generation,emr-prod-cluster)
urn:li:dataFlow:(dbt,marketing_analytics,prod)

Creating a DataFlow

Basic Creation

java
DataFlow dataflow = DataFlow.builder()
    .orchestrator("airflow")
    .flowId("my_dag_id")
    .cluster("prod")
    .displayName("My ETL Pipeline")
    .description("Daily ETL pipeline for customer data")
    .build();

client.entities().upsert(dataflow);

With Custom Properties

java
Map<String, String> properties = new HashMap<>();
properties.put("schedule", "0 2 * * *");
properties.put("team", "data-engineering");
properties.put("sla_hours", "4");

DataFlow dataflow = DataFlow.builder()
    .orchestrator("airflow")
    .flowId("customer_pipeline")
    .cluster("prod")
    .customProperties(properties)
    .build();

Properties

Core Properties

PropertyTypeDescriptionExample
orchestratorStringPlatform running the flow (required)"airflow", "spark", "dbt"
flowIdStringUnique flow identifier (required)"my_dag_id", "my_job_name"
clusterStringCluster/environment (required)"prod", "dev", "prod-us-west-2"
displayNameStringHuman-readable name"Customer ETL Pipeline"
descriptionStringFlow description"Processes customer data daily"

Additional Properties

PropertyTypeDescription
externalUrlStringLink to flow in orchestration tool
projectStringAssociated project or namespace
customPropertiesMap<String, String>Key-value metadata
createdLongCreation timestamp (milliseconds)
lastModifiedLongLast modified timestamp (milliseconds)

Operations

Ownership

java
// Add owners
dataflow.addOwner("urn:li:corpuser:johndoe", OwnershipType.TECHNICAL_OWNER);
dataflow.addOwner("urn:li:corpuser:analytics_team", OwnershipType.BUSINESS_OWNER);

// Remove owner
dataflow.removeOwner("urn:li:corpuser:johndoe");

Tags

java
// Add tags (with or without "urn:li:tag:" prefix)
dataflow.addTag("etl");
dataflow.addTag("production");
dataflow.addTag("urn:li:tag:pii");

// Remove tag
dataflow.removeTag("etl");

Glossary Terms

java
// Add terms
dataflow.addTerm("urn:li:glossaryTerm:ETL");
dataflow.addTerm("urn:li:glossaryTerm:DataPipeline");

// Remove term
dataflow.removeTerm("urn:li:glossaryTerm:ETL");

Domain

java
// Set domain
dataflow.setDomain("urn:li:domain:DataEngineering");

// Remove specific domain
dataflow.removeDomain("urn:li:domain:DataEngineering");

// Or clear all domains
dataflow.clearDomains();

Custom Properties

java
// Add individual properties
dataflow.addCustomProperty("schedule", "0 2 * * *");
dataflow.addCustomProperty("team", "data-engineering");

// Remove property
dataflow.removeCustomProperty("schedule");

// Set all properties (replaces existing)
Map<String, String> props = new HashMap<>();
props.put("key1", "value1");
props.put("key2", "value2");
dataflow.setCustomProperties(props);

Description and Display Name

java
// Set description
dataflow.setDescription("Daily ETL pipeline for customer data");

// Set display name
dataflow.setDisplayName("Customer ETL Pipeline");

// Get description
String description = dataflow.getDescription();

// Get display name
String displayName = dataflow.getDisplayName();

Timestamps and URLs

java
// Set external URL
dataflow.setExternalUrl("https://airflow.example.com/dags/my_dag");

// Set project
dataflow.setProject("customer_analytics");

// Set timestamps
dataflow.setCreated(System.currentTimeMillis() - 86400000L); // 1 day ago
dataflow.setLastModified(System.currentTimeMillis());

Orchestrator-Specific Examples

Apache Airflow

java
DataFlow airflowFlow = DataFlow.builder()
    .orchestrator("airflow")
    .flowId("customer_etl_daily")
    .cluster("prod")
    .displayName("Customer ETL Pipeline")
    .description("Daily pipeline processing customer data from MySQL to Snowflake")
    .build();

airflowFlow
    .addTag("etl")
    .addTag("production")
    .addCustomProperty("schedule", "0 2 * * *")
    .addCustomProperty("catchup", "false")
    .addCustomProperty("max_active_runs", "1")
    .setExternalUrl("https://airflow.company.com/dags/customer_etl_daily");

Apache Spark

java
DataFlow sparkFlow = DataFlow.builder()
    .orchestrator("spark")
    .flowId("ml_feature_generation")
    .cluster("emr-prod-cluster")
    .displayName("ML Feature Generation Job")
    .description("Large-scale Spark job generating ML features")
    .build();

sparkFlow
    .addTag("spark")
    .addTag("machine-learning")
    .addCustomProperty("spark.executor.memory", "8g")
    .addCustomProperty("spark.driver.memory", "4g")
    .addCustomProperty("spark.executor.cores", "4")
    .setDomain("urn:li:domain:MachineLearning");

dbt

java
DataFlow dbtFlow = DataFlow.builder()
    .orchestrator("dbt")
    .flowId("marketing_analytics")
    .cluster("prod")
    .displayName("Marketing Analytics Models")
    .description("dbt transformations for marketing data")
    .build();

dbtFlow
    .addTag("dbt")
    .addTag("transformation")
    .addCustomProperty("dbt_version", "1.5.0")
    .addCustomProperty("target", "production")
    .addCustomProperty("models_count", "87")
    .setProject("marketing")
    .setExternalUrl("https://github.com/company/dbt-marketing");
java
DataFlow flinkFlow = DataFlow.builder()
    .orchestrator("flink")
    .flowId("real_time_fraud_detection")
    .cluster("prod-flink-cluster")
    .displayName("Real-time Fraud Detection")
    .description("Real-time streaming pipeline for fraud detection")
    .build();

flinkFlow
    .addTag("streaming")
    .addTag("real-time")
    .addTag("fraud-detection")
    .addCustomProperty("parallelism", "16")
    .addCustomProperty("checkpoint_interval", "60000")
    .setDomain("urn:li:domain:Security");

Fluent API

All mutation methods return this to support method chaining:

java
DataFlow dataflow = DataFlow.builder()
    .orchestrator("airflow")
    .flowId("sales_pipeline")
    .cluster("prod")
    .build();

dataflow
    .addTag("etl")
    .addTag("production")
    .addOwner("urn:li:corpuser:owner1", OwnershipType.TECHNICAL_OWNER)
    .addOwner("urn:li:corpuser:owner2", OwnershipType.BUSINESS_OWNER)
    .addTerm("urn:li:glossaryTerm:Sales")
    .setDomain("urn:li:domain:Sales")
    .setDescription("Sales data pipeline")
    .addCustomProperty("schedule", "0 2 * * *")
    .addCustomProperty("team", "sales-analytics");

client.entities().upsert(dataflow);

Relationship with DataJob

DataFlows are parent entities to DataJobs. A DataJob represents a specific task or step within a DataFlow:

java
// Create the parent DataFlow
DataFlow dataflow = DataFlow.builder()
    .orchestrator("airflow")
    .flowId("customer_etl")
    .cluster("prod")
    .build();

client.entities().upsert(dataflow);

// Create child DataJobs that reference the parent flow
DataJob extractJob = DataJob.builder()
    .flow(dataflow.getUrn())  // References parent DataFlow
    .jobId("extract_customers")
    .build();

DataJob transformJob = DataJob.builder()
    .flow(dataflow.getUrn())
    .jobId("transform_customers")
    .build();

client.entities().upsert(extractJob);
client.entities().upsert(transformJob);

This hierarchy allows you to:

  • Model the overall pipeline (DataFlow)
  • Model individual tasks within the pipeline (DataJob)
  • Track task-level lineage and dependencies
  • Organize governance metadata at both levels

Best Practices

  1. Use consistent naming: Keep orchestrator names consistent across your organization (e.g., always use "airflow", not "Airflow" or "AIRFLOW")

  2. Choose appropriate clusters: Use meaningful cluster names that indicate environment and region (e.g., "prod-us-west-2", "staging-eu-central-1")

  3. Add scheduling information: Include schedule expressions in custom properties for batch workflows

  4. Link to source systems: Always set externalUrl to link back to the orchestration tool's UI

  5. Set ownership early: Assign technical and business owners when creating flows

  6. Use tags for categorization: Tag flows by type (etl, streaming, ml), environment (production, staging), and criticality

  7. Document SLAs: Use custom properties to document SLA requirements and alert channels

  8. Track versions: For versioned workflows (like dbt), include version information in custom properties

Complete Example

java
// Initialize client
DataHubClientConfigV2 config = DataHubClientConfigV2.builder()
    .server("http://localhost:8080")
    .token(System.getenv("DATAHUB_TOKEN"))
    .build();

try (DataHubClientV2 client = new DataHubClientV2(config)) {

    // Create comprehensive DataFlow
    Map<String, String> customProps = new HashMap<>();
    customProps.put("schedule", "0 2 * * *");
    customProps.put("catchup", "false");
    customProps.put("team", "data-engineering");
    customProps.put("sla_hours", "4");
    customProps.put("alert_channel", "#data-alerts");

    DataFlow dataflow = DataFlow.builder()
        .orchestrator("airflow")
        .flowId("production_etl_pipeline")
        .cluster("prod-us-east-1")
        .displayName("Production ETL Pipeline")
        .description("Main ETL pipeline for customer data processing")
        .customProperties(customProps)
        .build();

    dataflow
        .addTag("etl")
        .addTag("production")
        .addTag("pii")
        .addOwner("urn:li:corpuser:data_eng_team", OwnershipType.TECHNICAL_OWNER)
        .addOwner("urn:li:corpuser:product_owner", OwnershipType.BUSINESS_OWNER)
        .addTerm("urn:li:glossaryTerm:ETL")
        .addTerm("urn:li:glossaryTerm:CustomerData")
        .setDomain("urn:li:domain:DataEngineering")
        .setProject("customer_analytics")
        .setExternalUrl("https://airflow.company.com/dags/production_etl_pipeline")
        .setCreated(System.currentTimeMillis() - 86400000L * 30)
        .setLastModified(System.currentTimeMillis());

    // Upsert to DataHub
    client.entities().upsert(dataflow);

    System.out.println("Created DataFlow: " + dataflow.getUrn());
}

See Also