metadata-integration/java/docs/sdk-v2/datajob-entity.md
The DataJob entity represents a unit of work in a data processing pipeline (e.g., an Airflow task, a dbt model, a Spark job). DataJobs belong to DataFlows (pipelines) and can have lineage to datasets and other DataJobs. This guide covers comprehensive DataJob operations in SDK V2.
Orchestrator, flowId, and jobId are required:
DataJob dataJob = DataJob.builder()
.orchestrator("airflow")
.flowId("my_dag")
.jobId("my_task")
.build();
Specify cluster (default is "prod"):
DataJob dataJob = DataJob.builder()
.orchestrator("airflow")
.flowId("analytics_pipeline")
.cluster("staging")
.jobId("transform_data")
.build();
// URN: urn:li:dataJob:(urn:li:dataFlow:(airflow,analytics_pipeline,staging),transform_data)
Add description and name at construction (requires both name AND type):
DataJob dataJob = DataJob.builder()
.orchestrator("dagster")
.flowId("customer_etl")
.cluster("prod")
.jobId("load_customers")
.description("Loads customer data from PostgreSQL to Snowflake")
.name("Load Customers to DWH")
.type("BATCH")
.build();
Include custom properties in builder (requires name and type when using customProperties):
Map<String, String> props = new HashMap<>();
props.put("schedule", "0 2 * * *");
props.put("retries", "3");
props.put("timeout", "3600");
DataJob dataJob = DataJob.builder()
.orchestrator("airflow")
.flowId("daily_pipeline")
.jobId("my_task")
.name("My Daily Task")
.type("BATCH")
.customProperties(props)
.build();
DataJob URNs follow the pattern:
urn:li:dataJob:(urn:li:dataFlow:({orchestrator},{flowId},{cluster}),{jobId})
Automatic URN creation:
DataJob dataJob = DataJob.builder()
.orchestrator("airflow")
.flowId("finance_reporting")
.cluster("prod")
.jobId("aggregate_transactions")
.build();
DataJobUrn urn = dataJob.getDataJobUrn();
// urn:li:dataJob:(urn:li:dataFlow:(airflow,finance_reporting,prod),aggregate_transactions)
dataJob.setDescription("Processes daily customer transactions");
Get description (lazy-loaded from DataJobInfo):
String description = dataJob.getDescription();
dataJob.setName("Process Customer Transactions");
String name = dataJob.getName();
// Simple tag name (auto-prefixed)
dataJob.addTag("critical");
// Creates: urn:li:tag:critical
// Full tag URN
dataJob.addTag("urn:li:tag:etl");
dataJob.removeTag("critical");
dataJob.removeTag("urn:li:tag:etl");
dataJob.addTag("critical")
.addTag("pii")
.addTag("production");
import com.linkedin.common.OwnershipType;
// Technical owner
dataJob.addOwner(
"urn:li:corpuser:data_team",
OwnershipType.TECHNICAL_OWNER
);
// Data steward
dataJob.addOwner(
"urn:li:corpuser:compliance",
OwnershipType.DATA_STEWARD
);
// Business owner
dataJob.addOwner(
"urn:li:corpuser:product_team",
OwnershipType.BUSINESS_OWNER
);
dataJob.removeOwner("urn:li:corpuser:data_team");
Available ownership types:
TECHNICAL_OWNER - Maintains the technical implementationBUSINESS_OWNER - Business stakeholderDATA_STEWARD - Manages data quality and complianceDATAOWNER - Generic data ownerDEVELOPER - Software developerPRODUCER - Data producerCONSUMER - Data consumerSTAKEHOLDER - Other stakeholderdataJob.addTerm("urn:li:glossaryTerm:DataProcessing");
dataJob.addTerm("urn:li:glossaryTerm:ETL");
dataJob.removeTerm("urn:li:glossaryTerm:DataProcessing");
dataJob.addTerm("urn:li:glossaryTerm:DataProcessing")
.addTerm("urn:li:glossaryTerm:ETL")
.addTerm("urn:li:glossaryTerm:FinancialReporting");
dataJob.setDomain("urn:li:domain:Engineering");
dataJob.removeDomain();
dataJob.addCustomProperty("schedule", "0 2 * * *");
dataJob.addCustomProperty("retries", "3");
dataJob.addCustomProperty("timeout", "3600");
Replace all custom properties:
Map<String, String> properties = new HashMap<>();
properties.put("schedule", "0 2 * * *");
properties.put("retries", "3");
properties.put("timeout", "3600");
properties.put("priority", "high");
dataJob.setCustomProperties(properties);
dataJob.removeCustomProperty("timeout");
DataJob lineage defines the relationship between data jobs and the datasets they operate on. Lineage enables impact analysis, data provenance tracking, and understanding data flows through your pipelines.
The DataJob SDK supports four types of lineage:
Input Datasets - Datasets that the job reads from:
Output Datasets - Datasets that the job writes to:
// Using string URN
dataJob.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.transactions,PROD)");
// Using DatasetUrn object for type safety
DatasetUrn datasetUrn = DatasetUrn.createFromString(
"urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.transactions,PROD)"
);
dataJob.addInputDataset(datasetUrn);
// Chain multiple calls
dataJob.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.transactions,PROD)")
.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.customers,PROD)")
.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:kafka,events.purchases,PROD)");
List<String> inletUrns = Arrays.asList(
"urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.orders,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.customers,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:kafka,events.clicks,PROD)"
);
dataJob.setInputDatasets(inletUrns);
// Remove single inlet
dataJob.removeInputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.transactions,PROD)");
// Or using DatasetUrn
DatasetUrn datasetUrn = DatasetUrn.createFromString(
"urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.transactions,PROD)"
);
dataJob.removeInputDataset(datasetUrn);
// Get all inlets (lazy-loaded)
List<DatasetUrn> inlets = dataJob.getInputDatasets();
for (DatasetUrn inlet : inlets) {
System.out.println("Input: " + inlet);
}
// Using string URN
dataJob.addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.sales_summary,PROD)");
// Using DatasetUrn object
DatasetUrn datasetUrn = DatasetUrn.createFromString(
"urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.sales_summary,PROD)"
);
dataJob.addOutputDataset(datasetUrn);
dataJob.addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.daily_summary,PROD)")
.addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.monthly_summary,PROD)")
.addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:s3,reports/summary.parquet,PROD)");
List<String> outletUrns = Arrays.asList(
"urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.customer_metrics,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.product_metrics,PROD)"
);
dataJob.setOutputDatasets(outletUrns);
// Remove single outlet
dataJob.removeOutputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.sales_summary,PROD)");
// Or using DatasetUrn
DatasetUrn datasetUrn = DatasetUrn.createFromString(
"urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.sales_summary,PROD)"
);
dataJob.removeOutputDataset(datasetUrn);
// Get all outlets (lazy-loaded)
List<DatasetUrn> outlets = dataJob.getOutputDatasets();
for (DatasetUrn outlet : outlets) {
System.out.println("Output: " + outlet);
}
DataJob dependencies model task-to-task relationships within workflows. This enables DataHub to track which jobs depend on other jobs completing first.
Use cases:
// Using string URN
dataJob.addInputDataJob("urn:li:dataJob:(urn:li:dataFlow:(airflow,pipeline,prod),upstream_task)");
// Using DataJobUrn object for type safety
DataJobUrn upstreamJob = DataJobUrn.createFromString(
"urn:li:dataJob:(urn:li:dataFlow:(airflow,pipeline,prod),upstream_task)"
);
dataJob.addInputDataJob(upstreamJob);
// Multiple dependencies (task runs after all complete)
dataJob.addInputDataJob("urn:li:dataJob:(urn:li:dataFlow:(airflow,pipeline,prod),task_1)")
.addInputDataJob("urn:li:dataJob:(urn:li:dataFlow:(airflow,pipeline,prod),task_2)")
.addInputDataJob("urn:li:dataJob:(urn:li:dataFlow:(dagster,other_pipeline,prod),external_task)");
// Remove single dependency
dataJob.removeInputDataJob("urn:li:dataJob:(urn:li:dataFlow:(airflow,pipeline,prod),task_1)");
// Or using DataJobUrn
DataJobUrn jobUrn = DataJobUrn.createFromString(
"urn:li:dataJob:(urn:li:dataFlow:(airflow,pipeline,prod),task_1)"
);
dataJob.removeInputDataJob(jobUrn);
// Get all upstream job dependencies (lazy-loaded)
List<DataJobUrn> dependencies = dataJob.getInputDataJobs();
for (DataJobUrn dependency : dependencies) {
System.out.println("Depends on: " + dependency);
}
// Model a typical Airflow DAG task chain
DataJob extractTask = DataJob.builder()
.orchestrator("airflow")
.flowId("etl_pipeline")
.jobId("extract_data")
.build();
DataJob validateTask = DataJob.builder()
.orchestrator("airflow")
.flowId("etl_pipeline")
.jobId("validate_data")
.build();
// validate_data depends on extract_data
validateTask.addInputDataJob(extractTask.getUrn().toString());
DataJob transformTask = DataJob.builder()
.orchestrator("airflow")
.flowId("etl_pipeline")
.jobId("transform_data")
.build();
// transform_data depends on validate_data
transformTask.addInputDataJob(validateTask.getUrn().toString());
// Save all tasks
client.entities().upsert(extractTask);
client.entities().upsert(validateTask);
client.entities().upsert(transformTask);
// Result: extract_data → validate_data → transform_data
Field-level lineage tracks which specific columns (fields) a job consumes and produces. This provides finer granularity than dataset-level lineage.
Use cases:
Field URN Format:
urn:li:schemaField:(DATASET_URN,COLUMN_NAME)
// Track which columns the job reads
dataJob.addInputField("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,db.orders,PROD),order_id)");
dataJob.addInputField("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,db.orders,PROD),customer_id)");
dataJob.addInputField("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,db.orders,PROD),total_amount)");
// Track which columns the job writes
dataJob.addOutputField("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.sales,PROD),order_id)");
dataJob.addOutputField("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.sales,PROD),customer_id)");
dataJob.addOutputField("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.sales,PROD),revenue)");
// Remove field lineage
dataJob.removeInputField("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,db.orders,PROD),order_id)");
dataJob.removeOutputField("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.sales,PROD),revenue)");
// Get all input fields (lazy-loaded)
List<Urn> inputFields = dataJob.getInputFields();
for (Urn field : inputFields) {
System.out.println("Reads field: " + field);
}
// Get all output fields (lazy-loaded)
List<Urn> outputFields = dataJob.getOutputFields();
for (Urn field : outputFields) {
System.out.println("Writes field: " + field);
}
DataJob aggregateJob = DataJob.builder()
.orchestrator("airflow")
.flowId("analytics")
.jobId("aggregate_sales")
.description("Aggregates sales data by customer")
.name("Aggregate Sales by Customer")
.type("BATCH")
.build();
// Dataset-level lineage
aggregateJob.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.transactions,PROD)");
aggregateJob.addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.customer_sales,PROD)");
// Field-level lineage - specify exact columns used
aggregateJob.addInputField("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.transactions,PROD),customer_id)");
aggregateJob.addInputField("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.transactions,PROD),amount)");
aggregateJob.addInputField("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.transactions,PROD),transaction_date)");
aggregateJob.addOutputField("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.customer_sales,PROD),customer_id)");
aggregateJob.addOutputField("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.customer_sales,PROD),total_sales)");
aggregateJob.addOutputField("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.customer_sales,PROD),transaction_count)");
client.entities().upsert(aggregateJob);
Fine-grained lineage captures column-to-column transformations, showing exactly which input columns produce which output columns and how they're transformed.
Use cases:
// Basic transformation (no confidence score)
dataJob.addFineGrainedLineage(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.orders,PROD),customer_id)",
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.sales,PROD),customer_id)",
"IDENTITY",
null
);
// Transformation with confidence score (0.0 to 1.0)
dataJob.addFineGrainedLineage(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.orders,PROD),amount)",
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.sales,PROD),revenue)",
"SUM",
1.0f // High confidence
);
// IDENTITY - direct copy
dataJob.addFineGrainedLineage(upstream, downstream, "IDENTITY", 1.0f);
// Aggregations
dataJob.addFineGrainedLineage(upstream, downstream, "SUM", 1.0f);
dataJob.addFineGrainedLineage(upstream, downstream, "COUNT", 1.0f);
dataJob.addFineGrainedLineage(upstream, downstream, "AVG", 1.0f);
dataJob.addFineGrainedLineage(upstream, downstream, "MAX", 1.0f);
dataJob.addFineGrainedLineage(upstream, downstream, "MIN", 1.0f);
// String operations
dataJob.addFineGrainedLineage(upstream, downstream, "CONCAT", 0.9f);
dataJob.addFineGrainedLineage(upstream, downstream, "UPPER", 1.0f);
dataJob.addFineGrainedLineage(upstream, downstream, "SUBSTRING", 0.95f);
// Date operations
dataJob.addFineGrainedLineage(upstream, downstream, "DATE_TRUNC", 1.0f);
dataJob.addFineGrainedLineage(upstream, downstream, "EXTRACT", 1.0f);
// Custom transformations
dataJob.addFineGrainedLineage(upstream, downstream, "CUSTOM_FUNCTION", 0.8f);
// Remove specific transformation
dataJob.removeFineGrainedLineage(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.orders,PROD),amount)",
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.sales,PROD),revenue)",
"SUM",
null // queryUrn (optional)
);
// Get all fine-grained lineage (lazy-loaded)
List<FineGrainedLineage> lineages = dataJob.getFineGrainedLineages();
for (FineGrainedLineage lineage : lineages) {
System.out.println("Upstreams: " + lineage.getUpstreams());
System.out.println("Downstreams: " + lineage.getDownstreams());
System.out.println("Transformation: " + lineage.getTransformOperation());
System.out.println("Confidence: " + lineage.getConfidenceScore());
}
DataJob salesAggregation = DataJob.builder()
.orchestrator("airflow")
.flowId("analytics")
.jobId("daily_sales_summary")
.name("Daily Sales Summary")
.type("BATCH")
.build();
// Dataset-level lineage
salesAggregation.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:postgres,sales.transactions,PROD)");
salesAggregation.addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.daily_summary,PROD)");
// Fine-grained transformations
String inputDataset = "urn:li:dataset:(urn:li:dataPlatform:postgres,sales.transactions,PROD)";
String outputDataset = "urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.daily_summary,PROD)";
// Date is copied directly
salesAggregation.addFineGrainedLineage(
"urn:li:schemaField:(" + inputDataset + ",transaction_date)",
"urn:li:schemaField:(" + outputDataset + ",date)",
"IDENTITY",
1.0f
);
// Revenue is SUM of amounts
salesAggregation.addFineGrainedLineage(
"urn:li:schemaField:(" + inputDataset + ",amount)",
"urn:li:schemaField:(" + outputDataset + ",total_revenue)",
"SUM",
1.0f
);
// Transaction count
salesAggregation.addFineGrainedLineage(
"urn:li:schemaField:(" + inputDataset + ",transaction_id)",
"urn:li:schemaField:(" + outputDataset + ",transaction_count)",
"COUNT",
1.0f
);
// Average order value
salesAggregation.addFineGrainedLineage(
"urn:li:schemaField:(" + inputDataset + ",amount)",
"urn:li:schemaField:(" + outputDataset + ",avg_order_value)",
"AVG",
1.0f
);
client.entities().upsert(salesAggregation);
// Model a transformation where output depends on multiple input columns
DataJob enrichmentJob = DataJob.builder()
.orchestrator("airflow")
.flowId("enrichment")
.jobId("enrich_customer_data")
.build();
String inputDataset = "urn:li:dataset:(urn:li:dataPlatform:postgres,crm.customers,PROD)";
String outputDataset = "urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.customers_enriched,PROD)";
// full_name = CONCAT(first_name, ' ', last_name)
// Both first_name and last_name contribute to full_name
enrichmentJob.addFineGrainedLineage(
"urn:li:schemaField:(" + inputDataset + ",first_name)",
"urn:li:schemaField:(" + outputDataset + ",full_name)",
"CONCAT",
1.0f
);
enrichmentJob.addFineGrainedLineage(
"urn:li:schemaField:(" + inputDataset + ",last_name)",
"urn:li:schemaField:(" + outputDataset + ",full_name)",
"CONCAT",
1.0f
);
// email_domain = SUBSTRING(email, POSITION('@', email) + 1)
enrichmentJob.addFineGrainedLineage(
"urn:li:schemaField:(" + inputDataset + ",email)",
"urn:li:schemaField:(" + outputDataset + ",email_domain)",
"SUBSTRING",
1.0f
);
client.entities().upsert(enrichmentJob);
Confidence scores (0.0 to 1.0) indicate how certain you are about the transformation:
// High confidence - exact transformation known
dataJob.addFineGrainedLineage(source, target, "UPPER", 1.0f);
// Medium confidence - inferred from SQL parsing
dataJob.addFineGrainedLineage(source, target, "CASE_WHEN", 0.85f);
// Low confidence - ML-predicted transformation
dataJob.addFineGrainedLineage(source, target, "INFERRED", 0.6f);
This example demonstrates all four types of lineage working together:
// Create upstream validation job
DataJob validateJob = DataJob.builder()
.orchestrator("airflow")
.flowId("analytics_pipeline")
.cluster("prod")
.jobId("validate_transactions")
.name("Validate Transaction Data")
.type("BATCH")
.build();
validateJob.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.transactions,PROD)")
.addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,validated.transactions,PROD)");
client.entities().upsert(validateJob);
// Create main transformation job with comprehensive lineage
DataJob transformJob = DataJob.builder()
.orchestrator("airflow")
.flowId("analytics_pipeline")
.cluster("prod")
.jobId("aggregate_sales")
.description("Aggregates daily sales data from multiple validated sources")
.name("Aggregate Daily Sales")
.type("BATCH")
.build();
// 1. Dataset-level lineage - Which tables are read/written
transformJob.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,validated.transactions,PROD)")
.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.customers,PROD)")
.addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.daily_sales,PROD)");
// 2. DataJob dependencies - This job depends on the validation job
transformJob.addInputDataJob(validateJob.getUrn().toString());
// 3. Field-level lineage - Which specific columns are accessed
String transactionsDataset = "urn:li:dataset:(urn:li:dataPlatform:snowflake,validated.transactions,PROD)";
String customersDataset = "urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.customers,PROD)";
String outputDataset = "urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.daily_sales,PROD)";
// Input fields
transformJob.addInputField("urn:li:schemaField:(" + transactionsDataset + ",transaction_id)")
.addInputField("urn:li:schemaField:(" + transactionsDataset + ",customer_id)")
.addInputField("urn:li:schemaField:(" + transactionsDataset + ",amount)")
.addInputField("urn:li:schemaField:(" + transactionsDataset + ",transaction_date)")
.addInputField("urn:li:schemaField:(" + customersDataset + ",customer_id)")
.addInputField("urn:li:schemaField:(" + customersDataset + ",customer_name)");
// Output fields
transformJob.addOutputField("urn:li:schemaField:(" + outputDataset + ",date)")
.addOutputField("urn:li:schemaField:(" + outputDataset + ",customer_name)")
.addOutputField("urn:li:schemaField:(" + outputDataset + ",total_revenue)")
.addOutputField("urn:li:schemaField:(" + outputDataset + ",transaction_count)");
// 4. Fine-grained lineage - Specific column-to-column transformations
// Date column (identity transformation)
transformJob.addFineGrainedLineage(
"urn:li:schemaField:(" + transactionsDataset + ",transaction_date)",
"urn:li:schemaField:(" + outputDataset + ",date)",
"IDENTITY",
1.0f
);
// Customer name (join + identity)
transformJob.addFineGrainedLineage(
"urn:li:schemaField:(" + customersDataset + ",customer_name)",
"urn:li:schemaField:(" + outputDataset + ",customer_name)",
"IDENTITY",
1.0f
);
// Total revenue (aggregation)
transformJob.addFineGrainedLineage(
"urn:li:schemaField:(" + transactionsDataset + ",amount)",
"urn:li:schemaField:(" + outputDataset + ",total_revenue)",
"SUM",
1.0f
);
// Transaction count (aggregation)
transformJob.addFineGrainedLineage(
"urn:li:schemaField:(" + transactionsDataset + ",transaction_id)",
"urn:li:schemaField:(" + outputDataset + ",transaction_count)",
"COUNT",
1.0f
);
// Add other metadata
transformJob.addTag("critical")
.addOwner("urn:li:corpuser:data_team", OwnershipType.TECHNICAL_OWNER);
// Save to DataHub
client.entities().upsert(transformJob);
// Result: Creates comprehensive lineage showing:
// - Job dependency: validate_transactions → aggregate_sales
// - Dataset flow: raw.transactions → validated.transactions → analytics.daily_sales
// raw.customers → analytics.daily_sales
// - Column-level: transaction_date → date (IDENTITY)
// amount → total_revenue (SUM)
// transaction_id → transaction_count (COUNT)
// customer_name → customer_name (IDENTITY via JOIN)
The comprehensive lineage example above creates this multi-level lineage graph:
Job-to-Job Level:
┌────────────────────────┐ ┌──────────────────────┐
│ Validate Transactions │────────→│ Aggregate Sales Job │
└────────────────────────┘ └──────────────────────┘
Dataset Level:
┌─────────────────────┐ ┌─────────────────────────┐ ┌─────────────────────────┐
│ raw.transactions │───→│ validated.transactions │───→│ │
└─────────────────────┘ └─────────────────────────┘ │ analytics.daily_sales │
│ │
┌─────────────────────┐ │ │
│ raw.customers │──────────────────────────────────→│ │
└─────────────────────┘ └─────────────────────────┘
Column Level (Fine-Grained):
validated.transactions.transaction_date ──[IDENTITY]──→ daily_sales.date
validated.transactions.amount ──[SUM]──────→ daily_sales.total_revenue
validated.transactions.transaction_id ──[COUNT]────→ daily_sales.transaction_count
raw.customers.customer_name ──[IDENTITY]──→ daily_sales.customer_name
Model a complete Extract-Transform-Load pipeline:
// Extract job
DataJob extractJob = DataJob.builder()
.orchestrator("airflow")
.flowId("etl_pipeline")
.jobId("extract")
.build();
extractJob.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:mysql,prod.orders,PROD)")
.addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:s3,staging/orders_raw,PROD)");
client.entities().upsert(extractJob);
// Transform job
DataJob transformJob = DataJob.builder()
.orchestrator("airflow")
.flowId("etl_pipeline")
.jobId("transform")
.build();
transformJob.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:s3,staging/orders_raw,PROD)")
.addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:s3,staging/orders_clean,PROD)");
client.entities().upsert(transformJob);
// Load job
DataJob loadJob = DataJob.builder()
.orchestrator("airflow")
.flowId("etl_pipeline")
.jobId("load")
.build();
loadJob.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:s3,staging/orders_clean,PROD)")
.addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.orders,PROD)");
client.entities().upsert(loadJob);
// Creates end-to-end lineage:
// mysql.orders → [Extract] → s3.raw → [Transform] → s3.clean → [Load] → snowflake.analytics
// Load existing job
DataJobUrn urn = DataJobUrn.createFromString(
"urn:li:dataJob:(urn:li:dataFlow:(airflow,my_pipeline,prod),my_task)"
);
DataJob dataJob = client.entities().get(urn);
// Add new inlet (e.g., requirements changed)
dataJob.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:kafka,events.new_source,PROD)");
// Remove old outlet (e.g., deprecated table)
dataJob.removeOutputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,old.deprecated_table,PROD)");
// Apply changes
client.entities().update(dataJob);
// Data aggregation job
dataJob.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:postgres,sales.orders,PROD)")
.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:postgres,sales.customers,PROD)")
.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:postgres,sales.products,PROD)")
.addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.sales_summary,PROD)");
// Data fanout job
dataJob.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:kafka,events.raw,PROD)")
.addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:s3,archive/events,PROD)")
.addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,events.processed,PROD)")
.addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:elasticsearch,events.searchable,PROD)");
// ETL across different platforms
dataJob.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:mysql,production.transactions,PROD)")
.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:kafka,events.user_activity,PROD)")
.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:s3,raw/reference_data,PROD)")
.addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.customer_360,PROD)")
.addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:bigquery,reporting.customer_metrics,PROD)");
import datahub.client.v2.DataHubClientV2;
import datahub.client.v2.entity.DataJob;
import com.linkedin.common.OwnershipType;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
public class DataJobExample {
public static void main(String[] args) {
// Create client
DataHubClientV2 client = DataHubClientV2.builder()
.server("http://localhost:8080")
.build();
try {
// Build data job with all metadata
DataJob dataJob = DataJob.builder()
.orchestrator("airflow")
.flowId("customer_analytics")
.cluster("prod")
.jobId("process_events")
.description("Processes customer events from Kafka to warehouse")
.name("Process Customer Events")
.type("BATCH")
.build();
// Add tags
dataJob.addTag("critical")
.addTag("etl")
.addTag("pii");
// Add owners
dataJob.addOwner("urn:li:corpuser:data_team", OwnershipType.TECHNICAL_OWNER)
.addOwner("urn:li:corpuser:product_team", OwnershipType.BUSINESS_OWNER);
// Add glossary terms
dataJob.addTerm("urn:li:glossaryTerm:DataProcessing")
.addTerm("urn:li:glossaryTerm:CustomerData");
// Set domain
dataJob.setDomain("urn:li:domain:Analytics");
// Add custom properties
dataJob.addCustomProperty("schedule", "0 2 * * *")
.addCustomProperty("retries", "3")
.addCustomProperty("timeout", "7200");
// Upsert to DataHub
client.entities().upsert(dataJob);
System.out.println("Successfully created data job: " + dataJob.getUrn());
} catch (IOException | ExecutionException | InterruptedException e) {
e.printStackTrace();
} finally {
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
// Load existing data job
DataJobUrn urn = DataJobUrn.createFromString(
"urn:li:dataJob:(urn:li:dataFlow:(airflow,my_dag,prod),my_task)"
);
DataJob dataJob = client.entities().get(urn);
// Add new metadata (creates patches)
dataJob.addTag("new-tag")
.addOwner("urn:li:corpuser:new_owner", OwnershipType.TECHNICAL_OWNER);
// Apply patches
client.entities().update(dataJob);
// Just add what you need
dataJob.addTag("critical");
client.entities().update(dataJob);
// Later, add more
dataJob.addCustomProperty("priority", "high");
client.entities().update(dataJob);
| Method | Required | Description |
|---|---|---|
orchestrator(String) | ✅ Yes | Orchestrator (e.g., "airflow", "dagster") |
flowId(String) | ✅ Yes | Flow/DAG identifier |
jobId(String) | ✅ Yes | Job/task identifier |
cluster(String) | No | Cluster name (e.g., "prod", "dev"). Default: "prod" |
description(String) | No | Job description. Requires both name() and type() to be set |
name(String) | No | Display name shown in UI. Required if using description(), type(), or customProperties() |
type(String) | No | Job type (e.g., "BATCH", "STREAMING"). Required if using description(), name(), or customProperties() |
customProperties(Map) | No | Map of custom key-value properties. Requires both name() and type() to be set |
Important: The DataJobInfo aspect requires both name and type fields. If you provide any of description, name, type, or customProperties in the builder, you must provide both name and type. Otherwise, you'll get an IllegalArgumentException at build time.
String[] tasks = {"extract", "transform", "load"};
for (String taskName : tasks) {
DataJob dataJob = DataJob.builder()
.orchestrator("airflow")
.flowId("etl_pipeline")
.cluster("prod")
.jobId(taskName)
.build();
dataJob.addTag("etl")
.addCustomProperty("team", "data-engineering");
client.entities().upsert(dataJob);
}
DataJob dataJob = DataJob.builder()
.orchestrator("airflow")
.flowId("my_dag")
.jobId("my_task")
.build();
List<String> tags = Arrays.asList("critical", "production", "etl");
tags.forEach(dataJob::addTag);
client.entities().upsert(dataJob); // Emits all tags in one call
if (isCritical(dataJob)) {
dataJob.addTag("critical")
.addTerm("urn:li:glossaryTerm:BusinessCritical");
}
if (processesFinancialData(dataJob)) {
dataJob.addTag("financial")
.addOwner("urn:li:corpuser:compliance_team", OwnershipType.DATA_STEWARD);
}
DataFlow represents a pipeline or DAG (e.g., an Airflow DAG):
urn:li:dataFlow:(orchestrator,flowId,cluster)DataJob represents a task within a pipeline:
urn:li:dataJob:(flowUrn,jobId)Example hierarchy:
DataFlow: urn:li:dataFlow:(airflow,customer_pipeline,prod)
├── DataJob: urn:li:dataJob:(urn:li:dataFlow:(airflow,customer_pipeline,prod),extract)
├── DataJob: urn:li:dataJob:(urn:li:dataFlow:(airflow,customer_pipeline,prod),transform)
└── DataJob: urn:li:dataJob:(urn:li:dataFlow:(airflow,customer_pipeline,prod),load)
Common orchestrator values:
airflow - Apache Airflowdagster - Dagsterprefect - Prefectdbt - dbt (data build tool)spark - Apache Sparkglue - AWS Gluedataflow - Google Cloud Dataflowazkaban - Azkabanluigi - Luigi{{ inline /metadata-integration/java/examples/src/main/java/io/datahubproject/examples/v2/DataJobCreateExample.java show_path_as_comment }}
{{ inline /metadata-integration/java/examples/src/main/java/io/datahubproject/examples/v2/DataJobFullExample.java show_path_as_comment }}
{{ inline /metadata-integration/java/examples/src/main/java/io/datahubproject/examples/v2/DataJobLineageExample.java show_path_as_comment }}