Back to Datahub

DataJob Entity

metadata-integration/java/docs/sdk-v2/datajob-entity.md

1.5.0.339.5 KB
Original Source

DataJob Entity

The DataJob entity represents a unit of work in a data processing pipeline (e.g., an Airflow task, a dbt model, a Spark job). DataJobs belong to DataFlows (pipelines) and can have lineage to datasets and other DataJobs. This guide covers comprehensive DataJob operations in SDK V2.

Creating a DataJob

Minimal DataJob

Orchestrator, flowId, and jobId are required:

java
DataJob dataJob = DataJob.builder()
    .orchestrator("airflow")
    .flowId("my_dag")
    .jobId("my_task")
    .build();

With Cluster

Specify cluster (default is "prod"):

java
DataJob dataJob = DataJob.builder()
    .orchestrator("airflow")
    .flowId("analytics_pipeline")
    .cluster("staging")
    .jobId("transform_data")
    .build();
// URN: urn:li:dataJob:(urn:li:dataFlow:(airflow,analytics_pipeline,staging),transform_data)

With Metadata

Add description and name at construction (requires both name AND type):

java
DataJob dataJob = DataJob.builder()
    .orchestrator("dagster")
    .flowId("customer_etl")
    .cluster("prod")
    .jobId("load_customers")
    .description("Loads customer data from PostgreSQL to Snowflake")
    .name("Load Customers to DWH")
    .type("BATCH")
    .build();

With Custom Properties

Include custom properties in builder (requires name and type when using customProperties):

java
Map<String, String> props = new HashMap<>();
props.put("schedule", "0 2 * * *");
props.put("retries", "3");
props.put("timeout", "3600");

DataJob dataJob = DataJob.builder()
    .orchestrator("airflow")
    .flowId("daily_pipeline")
    .jobId("my_task")
    .name("My Daily Task")
    .type("BATCH")
    .customProperties(props)
    .build();

URN Construction

DataJob URNs follow the pattern:

urn:li:dataJob:(urn:li:dataFlow:({orchestrator},{flowId},{cluster}),{jobId})

Automatic URN creation:

java
DataJob dataJob = DataJob.builder()
    .orchestrator("airflow")
    .flowId("finance_reporting")
    .cluster("prod")
    .jobId("aggregate_transactions")
    .build();

DataJobUrn urn = dataJob.getDataJobUrn();
// urn:li:dataJob:(urn:li:dataFlow:(airflow,finance_reporting,prod),aggregate_transactions)

Description Operations

Setting Description

java
dataJob.setDescription("Processes daily customer transactions");

Reading Description

Get description (lazy-loaded from DataJobInfo):

java
String description = dataJob.getDescription();

Display Name Operations

Setting Name

java
dataJob.setName("Process Customer Transactions");

Reading Name

java
String name = dataJob.getName();

Tags

Adding Tags

java
// Simple tag name (auto-prefixed)
dataJob.addTag("critical");
// Creates: urn:li:tag:critical

// Full tag URN
dataJob.addTag("urn:li:tag:etl");

Removing Tags

java
dataJob.removeTag("critical");
dataJob.removeTag("urn:li:tag:etl");

Tag Chaining

java
dataJob.addTag("critical")
       .addTag("pii")
       .addTag("production");

Owners

Adding Owners

java
import com.linkedin.common.OwnershipType;

// Technical owner
dataJob.addOwner(
    "urn:li:corpuser:data_team",
    OwnershipType.TECHNICAL_OWNER
);

// Data steward
dataJob.addOwner(
    "urn:li:corpuser:compliance",
    OwnershipType.DATA_STEWARD
);

// Business owner
dataJob.addOwner(
    "urn:li:corpuser:product_team",
    OwnershipType.BUSINESS_OWNER
);

Removing Owners

java
dataJob.removeOwner("urn:li:corpuser:data_team");

Owner Types

Available ownership types:

  • TECHNICAL_OWNER - Maintains the technical implementation
  • BUSINESS_OWNER - Business stakeholder
  • DATA_STEWARD - Manages data quality and compliance
  • DATAOWNER - Generic data owner
  • DEVELOPER - Software developer
  • PRODUCER - Data producer
  • CONSUMER - Data consumer
  • STAKEHOLDER - Other stakeholder

Glossary Terms

Adding Terms

java
dataJob.addTerm("urn:li:glossaryTerm:DataProcessing");
dataJob.addTerm("urn:li:glossaryTerm:ETL");

Removing Terms

java
dataJob.removeTerm("urn:li:glossaryTerm:DataProcessing");

Term Chaining

java
dataJob.addTerm("urn:li:glossaryTerm:DataProcessing")
       .addTerm("urn:li:glossaryTerm:ETL")
       .addTerm("urn:li:glossaryTerm:FinancialReporting");

Domain

Setting Domain

java
dataJob.setDomain("urn:li:domain:Engineering");

Removing Domain

java
dataJob.removeDomain();

Custom Properties

Adding Individual Properties

java
dataJob.addCustomProperty("schedule", "0 2 * * *");
dataJob.addCustomProperty("retries", "3");
dataJob.addCustomProperty("timeout", "3600");

Setting All Properties

Replace all custom properties:

java
Map<String, String> properties = new HashMap<>();
properties.put("schedule", "0 2 * * *");
properties.put("retries", "3");
properties.put("timeout", "3600");
properties.put("priority", "high");

dataJob.setCustomProperties(properties);

Removing Properties

java
dataJob.removeCustomProperty("timeout");

Lineage Operations

DataJob lineage defines the relationship between data jobs and the datasets they operate on. Lineage enables impact analysis, data provenance tracking, and understanding data flows through your pipelines.

The DataJob SDK supports four types of lineage:

  1. Dataset-level lineage - Track which datasets a job reads from and writes to
  2. DataJob dependencies - Track which jobs depend on other jobs (task dependencies)
  3. Field-level lineage - Track specific columns consumed and produced
  4. Fine-grained lineage - Track column-to-column transformations

Understanding Input and Output Datasets

Input Datasets - Datasets that the job reads from:

  • Represent source data for the job
  • Create upstream lineage: Dataset → DataJob

Output Datasets - Datasets that the job writes to:

  • Represent destination data from the job
  • Create downstream lineage: DataJob → Dataset

Input Datasets

Adding Single Inlet

java
// Using string URN
dataJob.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.transactions,PROD)");

// Using DatasetUrn object for type safety
DatasetUrn datasetUrn = DatasetUrn.createFromString(
    "urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.transactions,PROD)"
);
dataJob.addInputDataset(datasetUrn);

Adding Multiple Inlets

java
// Chain multiple calls
dataJob.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.transactions,PROD)")
       .addInputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.customers,PROD)")
       .addInputDataset("urn:li:dataset:(urn:li:dataPlatform:kafka,events.purchases,PROD)");

Setting All Inlets at Once

java
List<String> inletUrns = Arrays.asList(
    "urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.orders,PROD)",
    "urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.customers,PROD)",
    "urn:li:dataset:(urn:li:dataPlatform:kafka,events.clicks,PROD)"
);
dataJob.setInputDatasets(inletUrns);

Removing Inlets

java
// Remove single inlet
dataJob.removeInputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.transactions,PROD)");

// Or using DatasetUrn
DatasetUrn datasetUrn = DatasetUrn.createFromString(
    "urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.transactions,PROD)"
);
dataJob.removeInputDataset(datasetUrn);

Reading Inlets

java
// Get all inlets (lazy-loaded)
List<DatasetUrn> inlets = dataJob.getInputDatasets();
for (DatasetUrn inlet : inlets) {
    System.out.println("Input: " + inlet);
}

Output Datasets (Outlets)

Adding Single Outlet

java
// Using string URN
dataJob.addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.sales_summary,PROD)");

// Using DatasetUrn object
DatasetUrn datasetUrn = DatasetUrn.createFromString(
    "urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.sales_summary,PROD)"
);
dataJob.addOutputDataset(datasetUrn);

Adding Multiple Outlets

java
dataJob.addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.daily_summary,PROD)")
       .addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.monthly_summary,PROD)")
       .addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:s3,reports/summary.parquet,PROD)");

Setting All Outlets at Once

java
List<String> outletUrns = Arrays.asList(
    "urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.customer_metrics,PROD)",
    "urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.product_metrics,PROD)"
);
dataJob.setOutputDatasets(outletUrns);

Removing Outlets

java
// Remove single outlet
dataJob.removeOutputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.sales_summary,PROD)");

// Or using DatasetUrn
DatasetUrn datasetUrn = DatasetUrn.createFromString(
    "urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.sales_summary,PROD)"
);
dataJob.removeOutputDataset(datasetUrn);

Reading Outlets

java
// Get all outlets (lazy-loaded)
List<DatasetUrn> outlets = dataJob.getOutputDatasets();
for (DatasetUrn outlet : outlets) {
    System.out.println("Output: " + outlet);
}

DataJob Dependencies

DataJob dependencies model task-to-task relationships within workflows. This enables DataHub to track which jobs depend on other jobs completing first.

Use cases:

  • Airflow task dependencies (task A → task B → task C)
  • Cross-DAG dependencies (jobs in different pipelines)
  • Workflow orchestration visualization

Adding Job Dependencies

java
// Using string URN
dataJob.addInputDataJob("urn:li:dataJob:(urn:li:dataFlow:(airflow,pipeline,prod),upstream_task)");

// Using DataJobUrn object for type safety
DataJobUrn upstreamJob = DataJobUrn.createFromString(
    "urn:li:dataJob:(urn:li:dataFlow:(airflow,pipeline,prod),upstream_task)"
);
dataJob.addInputDataJob(upstreamJob);

Chaining Job Dependencies

java
// Multiple dependencies (task runs after all complete)
dataJob.addInputDataJob("urn:li:dataJob:(urn:li:dataFlow:(airflow,pipeline,prod),task_1)")
       .addInputDataJob("urn:li:dataJob:(urn:li:dataFlow:(airflow,pipeline,prod),task_2)")
       .addInputDataJob("urn:li:dataJob:(urn:li:dataFlow:(dagster,other_pipeline,prod),external_task)");

Removing Job Dependencies

java
// Remove single dependency
dataJob.removeInputDataJob("urn:li:dataJob:(urn:li:dataFlow:(airflow,pipeline,prod),task_1)");

// Or using DataJobUrn
DataJobUrn jobUrn = DataJobUrn.createFromString(
    "urn:li:dataJob:(urn:li:dataFlow:(airflow,pipeline,prod),task_1)"
);
dataJob.removeInputDataJob(jobUrn);

Reading Job Dependencies

java
// Get all upstream job dependencies (lazy-loaded)
List<DataJobUrn> dependencies = dataJob.getInputDataJobs();
for (DataJobUrn dependency : dependencies) {
    System.out.println("Depends on: " + dependency);
}

Example: Airflow Task Dependencies

java
// Model a typical Airflow DAG task chain
DataJob extractTask = DataJob.builder()
    .orchestrator("airflow")
    .flowId("etl_pipeline")
    .jobId("extract_data")
    .build();

DataJob validateTask = DataJob.builder()
    .orchestrator("airflow")
    .flowId("etl_pipeline")
    .jobId("validate_data")
    .build();

// validate_data depends on extract_data
validateTask.addInputDataJob(extractTask.getUrn().toString());

DataJob transformTask = DataJob.builder()
    .orchestrator("airflow")
    .flowId("etl_pipeline")
    .jobId("transform_data")
    .build();

// transform_data depends on validate_data
transformTask.addInputDataJob(validateTask.getUrn().toString());

// Save all tasks
client.entities().upsert(extractTask);
client.entities().upsert(validateTask);
client.entities().upsert(transformTask);

// Result: extract_data → validate_data → transform_data

Field-Level Lineage

Field-level lineage tracks which specific columns (fields) a job consumes and produces. This provides finer granularity than dataset-level lineage.

Use cases:

  • Track which columns are read/written by transformations
  • Understand field-level dependencies
  • Validate that jobs only access necessary columns

Field URN Format:

urn:li:schemaField:(DATASET_URN,COLUMN_NAME)

Adding Input Fields

java
// Track which columns the job reads
dataJob.addInputField("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,db.orders,PROD),order_id)");
dataJob.addInputField("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,db.orders,PROD),customer_id)");
dataJob.addInputField("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,db.orders,PROD),total_amount)");

Adding Output Fields

java
// Track which columns the job writes
dataJob.addOutputField("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.sales,PROD),order_id)");
dataJob.addOutputField("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.sales,PROD),customer_id)");
dataJob.addOutputField("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.sales,PROD),revenue)");

Removing Fields

java
// Remove field lineage
dataJob.removeInputField("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,db.orders,PROD),order_id)");
dataJob.removeOutputField("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.sales,PROD),revenue)");

Reading Fields

java
// Get all input fields (lazy-loaded)
List<Urn> inputFields = dataJob.getInputFields();
for (Urn field : inputFields) {
    System.out.println("Reads field: " + field);
}

// Get all output fields (lazy-loaded)
List<Urn> outputFields = dataJob.getOutputFields();
for (Urn field : outputFields) {
    System.out.println("Writes field: " + field);
}

Example: Column-Level Tracking

java
DataJob aggregateJob = DataJob.builder()
    .orchestrator("airflow")
    .flowId("analytics")
    .jobId("aggregate_sales")
    .description("Aggregates sales data by customer")
    .name("Aggregate Sales by Customer")
    .type("BATCH")
    .build();

// Dataset-level lineage
aggregateJob.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.transactions,PROD)");
aggregateJob.addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.customer_sales,PROD)");

// Field-level lineage - specify exact columns used
aggregateJob.addInputField("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.transactions,PROD),customer_id)");
aggregateJob.addInputField("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.transactions,PROD),amount)");
aggregateJob.addInputField("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.transactions,PROD),transaction_date)");

aggregateJob.addOutputField("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.customer_sales,PROD),customer_id)");
aggregateJob.addOutputField("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.customer_sales,PROD),total_sales)");
aggregateJob.addOutputField("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.customer_sales,PROD),transaction_count)");

client.entities().upsert(aggregateJob);

Fine-Grained Lineage

Fine-grained lineage captures column-to-column transformations, showing exactly which input columns produce which output columns and how they're transformed.

Use cases:

  • Document transformation logic (e.g., "SUM(amount)")
  • Track column-level impact analysis
  • Understand data derivations
  • Compliance and audit trails

Adding Fine-Grained Lineage

java
// Basic transformation (no confidence score)
dataJob.addFineGrainedLineage(
    "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.orders,PROD),customer_id)",
    "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.sales,PROD),customer_id)",
    "IDENTITY",
    null
);

// Transformation with confidence score (0.0 to 1.0)
dataJob.addFineGrainedLineage(
    "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.orders,PROD),amount)",
    "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.sales,PROD),revenue)",
    "SUM",
    1.0f  // High confidence
);

Common Transformation Types

java
// IDENTITY - direct copy
dataJob.addFineGrainedLineage(upstream, downstream, "IDENTITY", 1.0f);

// Aggregations
dataJob.addFineGrainedLineage(upstream, downstream, "SUM", 1.0f);
dataJob.addFineGrainedLineage(upstream, downstream, "COUNT", 1.0f);
dataJob.addFineGrainedLineage(upstream, downstream, "AVG", 1.0f);
dataJob.addFineGrainedLineage(upstream, downstream, "MAX", 1.0f);
dataJob.addFineGrainedLineage(upstream, downstream, "MIN", 1.0f);

// String operations
dataJob.addFineGrainedLineage(upstream, downstream, "CONCAT", 0.9f);
dataJob.addFineGrainedLineage(upstream, downstream, "UPPER", 1.0f);
dataJob.addFineGrainedLineage(upstream, downstream, "SUBSTRING", 0.95f);

// Date operations
dataJob.addFineGrainedLineage(upstream, downstream, "DATE_TRUNC", 1.0f);
dataJob.addFineGrainedLineage(upstream, downstream, "EXTRACT", 1.0f);

// Custom transformations
dataJob.addFineGrainedLineage(upstream, downstream, "CUSTOM_FUNCTION", 0.8f);

Removing Fine-Grained Lineage

java
// Remove specific transformation
dataJob.removeFineGrainedLineage(
    "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.orders,PROD),amount)",
    "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.sales,PROD),revenue)",
    "SUM",
    null  // queryUrn (optional)
);

Reading Fine-Grained Lineage

java
// Get all fine-grained lineage (lazy-loaded)
List<FineGrainedLineage> lineages = dataJob.getFineGrainedLineages();
for (FineGrainedLineage lineage : lineages) {
    System.out.println("Upstreams: " + lineage.getUpstreams());
    System.out.println("Downstreams: " + lineage.getDownstreams());
    System.out.println("Transformation: " + lineage.getTransformOperation());
    System.out.println("Confidence: " + lineage.getConfidenceScore());
}

Example: Complex Aggregation

java
DataJob salesAggregation = DataJob.builder()
    .orchestrator("airflow")
    .flowId("analytics")
    .jobId("daily_sales_summary")
    .name("Daily Sales Summary")
    .type("BATCH")
    .build();

// Dataset-level lineage
salesAggregation.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:postgres,sales.transactions,PROD)");
salesAggregation.addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.daily_summary,PROD)");

// Fine-grained transformations
String inputDataset = "urn:li:dataset:(urn:li:dataPlatform:postgres,sales.transactions,PROD)";
String outputDataset = "urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.daily_summary,PROD)";

// Date is copied directly
salesAggregation.addFineGrainedLineage(
    "urn:li:schemaField:(" + inputDataset + ",transaction_date)",
    "urn:li:schemaField:(" + outputDataset + ",date)",
    "IDENTITY",
    1.0f
);

// Revenue is SUM of amounts
salesAggregation.addFineGrainedLineage(
    "urn:li:schemaField:(" + inputDataset + ",amount)",
    "urn:li:schemaField:(" + outputDataset + ",total_revenue)",
    "SUM",
    1.0f
);

// Transaction count
salesAggregation.addFineGrainedLineage(
    "urn:li:schemaField:(" + inputDataset + ",transaction_id)",
    "urn:li:schemaField:(" + outputDataset + ",transaction_count)",
    "COUNT",
    1.0f
);

// Average order value
salesAggregation.addFineGrainedLineage(
    "urn:li:schemaField:(" + inputDataset + ",amount)",
    "urn:li:schemaField:(" + outputDataset + ",avg_order_value)",
    "AVG",
    1.0f
);

client.entities().upsert(salesAggregation);

Example: Multi-Column Derivation

java
// Model a transformation where output depends on multiple input columns
DataJob enrichmentJob = DataJob.builder()
    .orchestrator("airflow")
    .flowId("enrichment")
    .jobId("enrich_customer_data")
    .build();

String inputDataset = "urn:li:dataset:(urn:li:dataPlatform:postgres,crm.customers,PROD)";
String outputDataset = "urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.customers_enriched,PROD)";

// full_name = CONCAT(first_name, ' ', last_name)
// Both first_name and last_name contribute to full_name
enrichmentJob.addFineGrainedLineage(
    "urn:li:schemaField:(" + inputDataset + ",first_name)",
    "urn:li:schemaField:(" + outputDataset + ",full_name)",
    "CONCAT",
    1.0f
);

enrichmentJob.addFineGrainedLineage(
    "urn:li:schemaField:(" + inputDataset + ",last_name)",
    "urn:li:schemaField:(" + outputDataset + ",full_name)",
    "CONCAT",
    1.0f
);

// email_domain = SUBSTRING(email, POSITION('@', email) + 1)
enrichmentJob.addFineGrainedLineage(
    "urn:li:schemaField:(" + inputDataset + ",email)",
    "urn:li:schemaField:(" + outputDataset + ",email_domain)",
    "SUBSTRING",
    1.0f
);

client.entities().upsert(enrichmentJob);

Confidence Scores

Confidence scores (0.0 to 1.0) indicate how certain you are about the transformation:

  • 1.0 - Exact, deterministic transformation (e.g., IDENTITY, SUM)
  • 0.9-0.99 - High confidence (e.g., simple string operations)
  • 0.7-0.89 - Medium confidence (e.g., complex transformations with some uncertainty)
  • 0.5-0.69 - Low confidence (e.g., ML-derived lineage, heuristic-based)
  • < 0.5 - Very uncertain (generally not recommended)
java
// High confidence - exact transformation known
dataJob.addFineGrainedLineage(source, target, "UPPER", 1.0f);

// Medium confidence - inferred from SQL parsing
dataJob.addFineGrainedLineage(source, target, "CASE_WHEN", 0.85f);

// Low confidence - ML-predicted transformation
dataJob.addFineGrainedLineage(source, target, "INFERRED", 0.6f);

Complete Lineage Example

This example demonstrates all four types of lineage working together:

java
// Create upstream validation job
DataJob validateJob = DataJob.builder()
    .orchestrator("airflow")
    .flowId("analytics_pipeline")
    .cluster("prod")
    .jobId("validate_transactions")
    .name("Validate Transaction Data")
    .type("BATCH")
    .build();

validateJob.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.transactions,PROD)")
           .addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,validated.transactions,PROD)");

client.entities().upsert(validateJob);

// Create main transformation job with comprehensive lineage
DataJob transformJob = DataJob.builder()
    .orchestrator("airflow")
    .flowId("analytics_pipeline")
    .cluster("prod")
    .jobId("aggregate_sales")
    .description("Aggregates daily sales data from multiple validated sources")
    .name("Aggregate Daily Sales")
    .type("BATCH")
    .build();

// 1. Dataset-level lineage - Which tables are read/written
transformJob.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,validated.transactions,PROD)")
            .addInputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.customers,PROD)")
            .addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.daily_sales,PROD)");

// 2. DataJob dependencies - This job depends on the validation job
transformJob.addInputDataJob(validateJob.getUrn().toString());

// 3. Field-level lineage - Which specific columns are accessed
String transactionsDataset = "urn:li:dataset:(urn:li:dataPlatform:snowflake,validated.transactions,PROD)";
String customersDataset = "urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.customers,PROD)";
String outputDataset = "urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.daily_sales,PROD)";

// Input fields
transformJob.addInputField("urn:li:schemaField:(" + transactionsDataset + ",transaction_id)")
            .addInputField("urn:li:schemaField:(" + transactionsDataset + ",customer_id)")
            .addInputField("urn:li:schemaField:(" + transactionsDataset + ",amount)")
            .addInputField("urn:li:schemaField:(" + transactionsDataset + ",transaction_date)")
            .addInputField("urn:li:schemaField:(" + customersDataset + ",customer_id)")
            .addInputField("urn:li:schemaField:(" + customersDataset + ",customer_name)");

// Output fields
transformJob.addOutputField("urn:li:schemaField:(" + outputDataset + ",date)")
            .addOutputField("urn:li:schemaField:(" + outputDataset + ",customer_name)")
            .addOutputField("urn:li:schemaField:(" + outputDataset + ",total_revenue)")
            .addOutputField("urn:li:schemaField:(" + outputDataset + ",transaction_count)");

// 4. Fine-grained lineage - Specific column-to-column transformations
// Date column (identity transformation)
transformJob.addFineGrainedLineage(
    "urn:li:schemaField:(" + transactionsDataset + ",transaction_date)",
    "urn:li:schemaField:(" + outputDataset + ",date)",
    "IDENTITY",
    1.0f
);

// Customer name (join + identity)
transformJob.addFineGrainedLineage(
    "urn:li:schemaField:(" + customersDataset + ",customer_name)",
    "urn:li:schemaField:(" + outputDataset + ",customer_name)",
    "IDENTITY",
    1.0f
);

// Total revenue (aggregation)
transformJob.addFineGrainedLineage(
    "urn:li:schemaField:(" + transactionsDataset + ",amount)",
    "urn:li:schemaField:(" + outputDataset + ",total_revenue)",
    "SUM",
    1.0f
);

// Transaction count (aggregation)
transformJob.addFineGrainedLineage(
    "urn:li:schemaField:(" + transactionsDataset + ",transaction_id)",
    "urn:li:schemaField:(" + outputDataset + ",transaction_count)",
    "COUNT",
    1.0f
);

// Add other metadata
transformJob.addTag("critical")
            .addOwner("urn:li:corpuser:data_team", OwnershipType.TECHNICAL_OWNER);

// Save to DataHub
client.entities().upsert(transformJob);

// Result: Creates comprehensive lineage showing:
// - Job dependency: validate_transactions → aggregate_sales
// - Dataset flow: raw.transactions → validated.transactions → analytics.daily_sales
//                 raw.customers → analytics.daily_sales
// - Column-level: transaction_date → date (IDENTITY)
//                 amount → total_revenue (SUM)
//                 transaction_id → transaction_count (COUNT)
//                 customer_name → customer_name (IDENTITY via JOIN)

Lineage Flow Visualization

The comprehensive lineage example above creates this multi-level lineage graph:

Job-to-Job Level:
┌────────────────────────┐         ┌──────────────────────┐
│ Validate Transactions  │────────→│  Aggregate Sales Job │
└────────────────────────┘         └──────────────────────┘

Dataset Level:
┌─────────────────────┐    ┌─────────────────────────┐    ┌─────────────────────────┐
│ raw.transactions    │───→│ validated.transactions  │───→│                         │
└─────────────────────┘    └─────────────────────────┘    │  analytics.daily_sales  │
                                                           │                         │
┌─────────────────────┐                                   │                         │
│ raw.customers       │──────────────────────────────────→│                         │
└─────────────────────┘                                   └─────────────────────────┘

Column Level (Fine-Grained):
validated.transactions.transaction_date ──[IDENTITY]──→ daily_sales.date
validated.transactions.amount           ──[SUM]──────→ daily_sales.total_revenue
validated.transactions.transaction_id   ──[COUNT]────→ daily_sales.transaction_count
raw.customers.customer_name             ──[IDENTITY]──→ daily_sales.customer_name

ETL Pipeline Example

Model a complete Extract-Transform-Load pipeline:

java
// Extract job
DataJob extractJob = DataJob.builder()
    .orchestrator("airflow")
    .flowId("etl_pipeline")
    .jobId("extract")
    .build();

extractJob.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:mysql,prod.orders,PROD)")
          .addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:s3,staging/orders_raw,PROD)");

client.entities().upsert(extractJob);

// Transform job
DataJob transformJob = DataJob.builder()
    .orchestrator("airflow")
    .flowId("etl_pipeline")
    .jobId("transform")
    .build();

transformJob.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:s3,staging/orders_raw,PROD)")
            .addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:s3,staging/orders_clean,PROD)");

client.entities().upsert(transformJob);

// Load job
DataJob loadJob = DataJob.builder()
    .orchestrator("airflow")
    .flowId("etl_pipeline")
    .jobId("load")
    .build();

loadJob.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:s3,staging/orders_clean,PROD)")
       .addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.orders,PROD)");

client.entities().upsert(loadJob);

// Creates end-to-end lineage:
// mysql.orders → [Extract] → s3.raw → [Transform] → s3.clean → [Load] → snowflake.analytics

Updating Lineage

java
// Load existing job
DataJobUrn urn = DataJobUrn.createFromString(
    "urn:li:dataJob:(urn:li:dataFlow:(airflow,my_pipeline,prod),my_task)"
);
DataJob dataJob = client.entities().get(urn);

// Add new inlet (e.g., requirements changed)
dataJob.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:kafka,events.new_source,PROD)");

// Remove old outlet (e.g., deprecated table)
dataJob.removeOutputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,old.deprecated_table,PROD)");

// Apply changes
client.entities().update(dataJob);

Lineage Best Practices

  1. Be Complete - Define both inputs and outputs for accurate lineage
  2. Use Correct URNs - Ensure dataset URNs match existing datasets in DataHub
  3. Update When Changed - Keep lineage current as pipelines evolve
  4. Document Transformations - Use descriptions to explain what the job does
  5. Model All Jobs - Include every step in your pipeline for complete lineage
  6. Use Typed URNs - Prefer DatasetUrn/DataJobUrn objects over strings for compile-time safety
  7. Layer Your Lineage - Start with dataset-level, add field-level and fine-grained as needed
  8. Track Dependencies - Use DataJob dependencies to model task orchestration
  9. Be Precise with Transformations - Use accurate transformation types in fine-grained lineage
  10. Set Confidence Scores - Use appropriate confidence scores to indicate lineage quality

Common Patterns

Multiple Sources to Single Destination

java
// Data aggregation job
dataJob.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:postgres,sales.orders,PROD)")
       .addInputDataset("urn:li:dataset:(urn:li:dataPlatform:postgres,sales.customers,PROD)")
       .addInputDataset("urn:li:dataset:(urn:li:dataPlatform:postgres,sales.products,PROD)")
       .addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.sales_summary,PROD)");

Single Source to Multiple Destinations

java
// Data fanout job
dataJob.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:kafka,events.raw,PROD)")
       .addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:s3,archive/events,PROD)")
       .addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,events.processed,PROD)")
       .addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:elasticsearch,events.searchable,PROD)");

Cross-Platform Lineage

java
// ETL across different platforms
dataJob.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:mysql,production.transactions,PROD)")
       .addInputDataset("urn:li:dataset:(urn:li:dataPlatform:kafka,events.user_activity,PROD)")
       .addInputDataset("urn:li:dataset:(urn:li:dataPlatform:s3,raw/reference_data,PROD)")
       .addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.customer_360,PROD)")
       .addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:bigquery,reporting.customer_metrics,PROD)");

Complete Example

java
import datahub.client.v2.DataHubClientV2;
import datahub.client.v2.entity.DataJob;
import com.linkedin.common.OwnershipType;
import java.io.IOException;
import java.util.concurrent.ExecutionException;

public class DataJobExample {
    public static void main(String[] args) {
        // Create client
        DataHubClientV2 client = DataHubClientV2.builder()
            .server("http://localhost:8080")
            .build();

        try {
            // Build data job with all metadata
            DataJob dataJob = DataJob.builder()
                .orchestrator("airflow")
                .flowId("customer_analytics")
                .cluster("prod")
                .jobId("process_events")
                .description("Processes customer events from Kafka to warehouse")
                .name("Process Customer Events")
                .type("BATCH")
                .build();

            // Add tags
            dataJob.addTag("critical")
                   .addTag("etl")
                   .addTag("pii");

            // Add owners
            dataJob.addOwner("urn:li:corpuser:data_team", OwnershipType.TECHNICAL_OWNER)
                   .addOwner("urn:li:corpuser:product_team", OwnershipType.BUSINESS_OWNER);

            // Add glossary terms
            dataJob.addTerm("urn:li:glossaryTerm:DataProcessing")
                   .addTerm("urn:li:glossaryTerm:CustomerData");

            // Set domain
            dataJob.setDomain("urn:li:domain:Analytics");

            // Add custom properties
            dataJob.addCustomProperty("schedule", "0 2 * * *")
                   .addCustomProperty("retries", "3")
                   .addCustomProperty("timeout", "7200");

            // Upsert to DataHub
            client.entities().upsert(dataJob);

            System.out.println("Successfully created data job: " + dataJob.getUrn());

        } catch (IOException | ExecutionException | InterruptedException e) {
            e.printStackTrace();
        } finally {
            try {
                client.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

Updating Existing DataJobs

Load and Modify

java
// Load existing data job
DataJobUrn urn = DataJobUrn.createFromString(
    "urn:li:dataJob:(urn:li:dataFlow:(airflow,my_dag,prod),my_task)"
);
DataJob dataJob = client.entities().get(urn);

// Add new metadata (creates patches)
dataJob.addTag("new-tag")
       .addOwner("urn:li:corpuser:new_owner", OwnershipType.TECHNICAL_OWNER);

// Apply patches
client.entities().update(dataJob);

Incremental Updates

java
// Just add what you need
dataJob.addTag("critical");
client.entities().update(dataJob);

// Later, add more
dataJob.addCustomProperty("priority", "high");
client.entities().update(dataJob);

Builder Options Reference

MethodRequiredDescription
orchestrator(String)✅ YesOrchestrator (e.g., "airflow", "dagster")
flowId(String)✅ YesFlow/DAG identifier
jobId(String)✅ YesJob/task identifier
cluster(String)NoCluster name (e.g., "prod", "dev"). Default: "prod"
description(String)NoJob description. Requires both name() and type() to be set
name(String)NoDisplay name shown in UI. Required if using description(), type(), or customProperties()
type(String)NoJob type (e.g., "BATCH", "STREAMING"). Required if using description(), name(), or customProperties()
customProperties(Map)NoMap of custom key-value properties. Requires both name() and type() to be set

Important: The DataJobInfo aspect requires both name and type fields. If you provide any of description, name, type, or customProperties in the builder, you must provide both name and type. Otherwise, you'll get an IllegalArgumentException at build time.

Common Patterns

Creating Multiple DataJobs

java
String[] tasks = {"extract", "transform", "load"};
for (String taskName : tasks) {
    DataJob dataJob = DataJob.builder()
        .orchestrator("airflow")
        .flowId("etl_pipeline")
        .cluster("prod")
        .jobId(taskName)
        .build();

    dataJob.addTag("etl")
           .addCustomProperty("team", "data-engineering");

    client.entities().upsert(dataJob);
}

Batch Metadata Addition

java
DataJob dataJob = DataJob.builder()
    .orchestrator("airflow")
    .flowId("my_dag")
    .jobId("my_task")
    .build();

List<String> tags = Arrays.asList("critical", "production", "etl");
tags.forEach(dataJob::addTag);

client.entities().upsert(dataJob);  // Emits all tags in one call

Conditional Metadata

java
if (isCritical(dataJob)) {
    dataJob.addTag("critical")
           .addTerm("urn:li:glossaryTerm:BusinessCritical");
}

if (processesFinancialData(dataJob)) {
    dataJob.addTag("financial")
           .addOwner("urn:li:corpuser:compliance_team", OwnershipType.DATA_STEWARD);
}

DataJob vs DataFlow

DataFlow represents a pipeline or DAG (e.g., an Airflow DAG):

  • URN: urn:li:dataFlow:(orchestrator,flowId,cluster)
  • Contains multiple DataJobs

DataJob represents a task within a pipeline:

  • URN: urn:li:dataJob:(flowUrn,jobId)
  • Belongs to one DataFlow
  • Can have lineage to datasets and other DataJobs

Example hierarchy:

DataFlow: urn:li:dataFlow:(airflow,customer_pipeline,prod)
├── DataJob: urn:li:dataJob:(urn:li:dataFlow:(airflow,customer_pipeline,prod),extract)
├── DataJob: urn:li:dataJob:(urn:li:dataFlow:(airflow,customer_pipeline,prod),transform)
└── DataJob: urn:li:dataJob:(urn:li:dataFlow:(airflow,customer_pipeline,prod),load)

Orchestrator Examples

Common orchestrator values:

  • airflow - Apache Airflow
  • dagster - Dagster
  • prefect - Prefect
  • dbt - dbt (data build tool)
  • spark - Apache Spark
  • glue - AWS Glue
  • dataflow - Google Cloud Dataflow
  • azkaban - Azkaban
  • luigi - Luigi

Next Steps

Examples

Basic DataJob Creation

java
{{ inline /metadata-integration/java/examples/src/main/java/io/datahubproject/examples/v2/DataJobCreateExample.java show_path_as_comment }}

Comprehensive DataJob with Metadata and Lineage

java
{{ inline /metadata-integration/java/examples/src/main/java/io/datahubproject/examples/v2/DataJobFullExample.java show_path_as_comment }}

DataJob Lineage Operations

java
{{ inline /metadata-integration/java/examples/src/main/java/io/datahubproject/examples/v2/DataJobLineageExample.java show_path_as_comment }}