metadata-models/docs/entities/dataProcessInstance.md
DataProcessInstance represents an individual execution run of a data pipeline or data processing task. While DataJob and DataFlow entities define the structure and logic of your data pipelines, DataProcessInstance captures the runtime behavior, tracking each specific execution with its inputs, outputs, status, and timing information.
Think of it this way: if a DataJob is a recipe, a DataProcessInstance is one particular time you followed that recipe, recording when you started, what ingredients you used, what you produced, and whether it succeeded or failed.
DataProcessInstance entities are uniquely identified by a single field: an id string. This makes the identifier structure simpler than other entities but requires careful consideration when generating IDs.
The URN format is:
urn:li:dataProcessInstance:<id>
The <id> field should be globally unique across all process instances and typically encodes information about the orchestrator, execution context, and run identifier. The Python SDK provides a helper that generates this ID from three components:
airflow, spark, dagster)prod, dev, staging)Example URNs:
urn:li:dataProcessInstance:abc123def456...
The actual ID is a deterministic GUID generated from the orchestrator, cluster, and execution id:
from datahub.api.entities.dataprocess.dataprocess_instance import DataProcessInstance
# The ID is automatically generated from these fields
instance = DataProcessInstance(
id="scheduled__2024-01-15T10:00:00+00:00",
orchestrator="airflow",
cluster="prod"
)
# Results in: urn:li:dataProcessInstance:<deterministic-guid>
DataProcessInstance entities are linked to their template definitions through the parentTemplate field in the dataProcessInstanceRelationships aspect:
parentTemplate points to the DataJob URNparentTemplate points to the DataFlow URNparentTemplate can be nullThis relationship enables several important capabilities:
The dataProcessInstanceRunEvent aspect (a timeseries aspect) tracks the lifecycle of each execution with high granularity:
Process instances move through a simple lifecycle:
When a process completes, the result type indicates the outcome:
Each run event captures:
This enables monitoring dashboards to:
The dataProcessInstanceProperties aspect captures metadata about the execution:
Example use cases for customProperties:
properties={
"airflow_version": "2.7.0",
"executor": "CeleryExecutor",
"pool": "default_pool",
"queue": "default",
"operator": "PythonOperator"
}
Unlike DataJob, which defines static lineage relationships, DataProcessInstance captures the actual inputs and outputs consumed and produced during a specific execution.
The dataProcessInstanceInput aspect records:
The dataProcessInstanceOutput aspect records:
This enables powerful capabilities:
Example: An Airflow task that succeeded might show it read from dataset_v1 and wrote to dataset_v2, while a retry might show different input/output datasets if the data evolved.
The dataProcessInstanceRelationships aspect supports complex execution hierarchies:
The parentInstance field links nested executions:
DataFlow Instance (DAG Run)
└── DataJob Instance 1 (Task Run 1)
└── DataJob Instance 2 (Task Run 2)
└── DataJob Instance 3 (Task Run 3)
This enables:
The upstreamInstances field creates dependencies between process instances:
Process Instance A (completed) → Process Instance B (triggered)
This captures dynamic execution dependencies, such as:
DataProcessInstance can represent processes that run within a Container (like an ML experiment) without being tied to a specific DataJob or DataFlow:
from datahub.emitter.mcp_builder import ContainerKey
container_key = ContainerKey(
platform="urn:li:dataPlatform:mlflow",
name="experiment_123",
env="PROD"
)
instance = DataProcessInstance.from_container(
container_key=container_key,
id="training_run_456"
)
This is useful for:
# metadata-ingestion/examples/library/data_process_instance_create_simple.py
import time
from datahub.api.entities.dataprocess.dataprocess_instance import (
DataProcessInstance,
InstanceRunResult,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import DataProcessTypeClass
from datahub.utilities.urns.data_job_urn import DataJobUrn
from datahub.utilities.urns.dataset_urn import DatasetUrn
# Create REST emitter
emitter = DatahubRestEmitter(gms_server="http://localhost:8080")
# Define the parent DataJob that this instance is executing
parent_job_urn = DataJobUrn.create_from_string(
"urn:li:dataJob:(urn:li:dataFlow:(airflow,sales_pipeline,prod),process_sales_data)"
)
# Create a process instance for a specific execution
# This might represent an Airflow task run on 2024-01-15 at 10:00:00
instance = DataProcessInstance(
id="scheduled__2024-01-15T10:00:00+00:00",
orchestrator="airflow",
cluster="prod",
template_urn=parent_job_urn,
type=DataProcessTypeClass.BATCH_SCHEDULED,
properties={
"airflow_version": "2.7.0",
"executor": "CeleryExecutor",
"pool": "default_pool",
},
url="https://airflow.company.com/dags/sales_pipeline/grid?dag_run_id=scheduled__2024-01-15T10:00:00+00:00&task_id=process_sales_data",
inlets=[
DatasetUrn.create_from_string(
"urn:li:dataset:(urn:li:dataPlatform:postgres,sales_db.raw_orders,PROD)"
)
],
outlets=[
DatasetUrn.create_from_string(
"urn:li:dataset:(urn:li:dataPlatform:postgres,sales_db.processed_orders,PROD)"
)
],
)
# Record the start of execution
start_time = int(time.time() * 1000)
instance.emit_process_start(
emitter=emitter,
start_timestamp_millis=start_time,
attempt=1,
emit_template=True,
materialize_iolets=True,
)
print(f"Started tracking process instance: {instance.urn}")
# Simulate process execution
print("Process is running...")
time.sleep(2)
# Record the end of execution
end_time = int(time.time() * 1000)
instance.emit_process_end(
emitter=emitter,
end_timestamp_millis=end_time,
result=InstanceRunResult.SUCCESS,
result_type="airflow",
attempt=1,
start_timestamp_millis=start_time,
)
print(f"Completed tracking process instance with result: SUCCESS")
print(f"Duration: {end_time - start_time}ms")
# metadata-ingestion/examples/library/data_process_instance_create_with_retry.py
import time
from datahub.api.entities.dataprocess.dataprocess_instance import (
DataProcessInstance,
InstanceRunResult,
)
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import DataProcessTypeClass
from datahub.utilities.urns.data_job_urn import DataJobUrn
from datahub.utilities.urns.dataset_urn import DatasetUrn
emitter = DatahubRestEmitter(gms_server="http://localhost:8080")
parent_job_urn = DataJobUrn.create_from_string(
"urn:li:dataJob:(urn:li:dataFlow:(airflow,etl_pipeline,prod),load_customer_data)"
)
instance = DataProcessInstance(
id="scheduled__2024-01-15T14:30:00+00:00",
orchestrator="airflow",
cluster="prod",
template_urn=parent_job_urn,
type=DataProcessTypeClass.BATCH_SCHEDULED,
inlets=[
DatasetUrn.create_from_string(
"urn:li:dataset:(urn:li:dataPlatform:s3,customer_exports,PROD)"
)
],
outlets=[
DatasetUrn.create_from_string(
"urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.customers,PROD)"
)
],
)
# First attempt
start_time_attempt1 = int(time.time() * 1000)
instance.emit_process_start(
emitter=emitter,
start_timestamp_millis=start_time_attempt1,
attempt=1,
emit_template=True,
materialize_iolets=True,
)
print("Attempt 1 started...")
time.sleep(1)
# First attempt fails
end_time_attempt1 = int(time.time() * 1000)
instance.emit_process_end(
emitter=emitter,
end_timestamp_millis=end_time_attempt1,
result=InstanceRunResult.UP_FOR_RETRY,
result_type="airflow",
attempt=1,
start_timestamp_millis=start_time_attempt1,
)
print("Attempt 1 failed, will retry...")
time.sleep(2)
# Second attempt (retry)
start_time_attempt2 = int(time.time() * 1000)
instance.emit_process_start(
emitter=emitter,
start_timestamp_millis=start_time_attempt2,
attempt=2,
emit_template=False,
materialize_iolets=False,
)
print("Attempt 2 started (retry)...")
time.sleep(1)
# Second attempt succeeds
end_time_attempt2 = int(time.time() * 1000)
instance.emit_process_end(
emitter=emitter,
end_timestamp_millis=end_time_attempt2,
result=InstanceRunResult.SUCCESS,
result_type="airflow",
attempt=2,
start_timestamp_millis=start_time_attempt2,
)
print("Attempt 2 succeeded!")
# metadata-ingestion/examples/library/data_process_instance_create_from_dataflow.py
import time
from datahub.api.entities.datajob import DataFlow
from datahub.api.entities.dataprocess.dataprocess_instance import (
DataProcessInstance,
InstanceRunResult,
)
from datahub.emitter.rest_emitter import DatahubRestEmitter
emitter = DatahubRestEmitter(gms_server="http://localhost:8080")
# Define the DataFlow (Airflow DAG)
dataflow = DataFlow(
orchestrator="airflow",
id="daily_reporting_pipeline",
env="prod",
description="Daily reporting pipeline that aggregates metrics",
)
# Create a DataProcessInstance for a specific DAG run
dag_run_instance = DataProcessInstance.from_dataflow(
dataflow=dataflow,
id="scheduled__2024-01-15T00:00:00+00:00"
)
# Set properties specific to this DAG run
dag_run_instance.properties = {
"execution_date": "2024-01-15",
"run_type": "scheduled",
"external_trigger": "false",
}
dag_run_instance.url = "https://airflow.company.com/dags/daily_reporting_pipeline/grid?dag_run_id=scheduled__2024-01-15T00:00:00+00:00"
# Track DAG run start
start_time = int(time.time() * 1000)
dag_run_instance.emit_process_start(
emitter=emitter,
start_timestamp_millis=start_time,
attempt=1,
emit_template=True,
materialize_iolets=True,
)
print(f"DAG run started: {dag_run_instance.urn}")
# Simulate DAG execution
time.sleep(3)
# Track DAG run completion
end_time = int(time.time() * 1000)
dag_run_instance.emit_process_end(
emitter=emitter,
end_timestamp_millis=end_time,
result=InstanceRunResult.SUCCESS,
result_type="airflow",
attempt=1,
start_timestamp_millis=start_time,
)
print(f"DAG run completed successfully in {end_time - start_time}ms")
# metadata-ingestion/examples/library/data_process_instance_create_hierarchical.py
import time
from datahub.api.entities.datajob import DataFlow, DataJob
from datahub.api.entities.dataprocess.dataprocess_instance import (
DataProcessInstance,
InstanceRunResult,
)
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.utilities.urns.dataset_urn import DatasetUrn
emitter = DatahubRestEmitter(gms_server="http://localhost:8080")
# Define the DataFlow (Airflow DAG)
dataflow = DataFlow(
orchestrator="airflow",
id="etl_pipeline",
env="prod",
description="ETL pipeline with multiple tasks",
)
# Define DataJobs (Tasks in the DAG)
extract_job = DataJob(
id="extract_data",
flow_urn=dataflow.urn,
description="Extract data from source",
)
# Create a DAG run instance (parent)
dag_run_id = "scheduled__2024-01-15T12:00:00+00:00"
dag_run_instance = DataProcessInstance.from_dataflow(dataflow=dataflow, id=dag_run_id)
# Track DAG run start
dag_start_time = int(time.time() * 1000)
dag_run_instance.emit_process_start(
emitter=emitter,
start_timestamp_millis=dag_start_time,
attempt=1,
emit_template=True,
materialize_iolets=False,
)
print(f"DAG run started: {dag_run_instance.urn}")
# Create task instance for extract_data (child of DAG run)
extract_instance = DataProcessInstance.from_datajob(
datajob=extract_job,
id=f"{dag_run_id}__extract_data",
)
extract_instance.parent_instance = dag_run_instance.urn
extract_instance.inlets = [
DatasetUrn.create_from_string(
"urn:li:dataset:(urn:li:dataPlatform:postgres,raw_db.orders,PROD)"
)
]
extract_instance.outlets = [
DatasetUrn.create_from_string(
"urn:li:dataset:(urn:li:dataPlatform:s3,staging/orders,PROD)"
)
]
# Track extract task execution
extract_start_time = int(time.time() * 1000)
extract_instance.emit_process_start(
emitter=emitter,
start_timestamp_millis=extract_start_time,
attempt=1,
emit_template=True,
materialize_iolets=True,
)
time.sleep(1)
extract_end_time = int(time.time() * 1000)
extract_instance.emit_process_end(
emitter=emitter,
end_timestamp_millis=extract_end_time,
result=InstanceRunResult.SUCCESS,
attempt=1,
start_timestamp_millis=extract_start_time,
)
# Track DAG run completion
dag_end_time = int(time.time() * 1000)
dag_run_instance.emit_process_end(
emitter=emitter,
end_timestamp_millis=dag_end_time,
result=InstanceRunResult.SUCCESS,
attempt=1,
start_timestamp_millis=dag_start_time,
)
print(f"DAG run completed successfully")
ML training runs are a specialized use case of DataProcessInstance entities. In addition to standard process instance aspects, training runs can use ML-specific aspects:
MLAssetSubTypes.MLFLOW_TRAINING_RUN to identify the process as an ML training runmlModelProperties.trainingJobs) and experiments (via container aspect)For comprehensive documentation on ML training runs, including the complete workflow with experiments, models, and datasets, see the ML Model entity documentation.
<details> <summary>Python SDK: Track a standalone ML training run</summary># metadata-ingestion/examples/library/data_process_instance_create_ml_training.py
import time
from datahub.api.entities.dataprocess.dataprocess_instance import (
DataProcessInstance,
InstanceRunResult,
)
from datahub.emitter.mcp_builder import ContainerKey
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import DataProcessTypeClass
from datahub.utilities.urns.dataset_urn import DatasetUrn
emitter = DatahubRestEmitter(gms_server="http://localhost:8080")
# Define the ML experiment container
experiment_container = ContainerKey(
platform="urn:li:dataPlatform:mlflow",
name="customer_churn_experiment",
env="PROD"
)
# Create a process instance for a training run
training_run = DataProcessInstance.from_container(
container_key=experiment_container,
id="run_abc123def456"
)
# Set training-specific properties
training_run.type = DataProcessTypeClass.BATCH_AD_HOC
training_run.properties = {
"model_type": "RandomForestClassifier",
"hyperparameters": "n_estimators=100,max_depth=10",
"framework": "scikit-learn",
"framework_version": "1.3.0",
}
training_run.url = "https://mlflow.company.com/experiments/5/runs/abc123def456"
# Set training data inputs
training_run.inlets = [
DatasetUrn.create_from_string(
"urn:li:dataset:(urn:li:dataPlatform:snowflake,ml.training_data,PROD)"
)
]
# Set model output
training_run.outlets = [
DatasetUrn.create_from_string(
"urn:li:mlModel:(urn:li:dataPlatform:mlflow,customer_churn_model_v2,PROD)"
)
]
# Track training start
start_time = int(time.time() * 1000)
training_run.emit_process_start(
emitter=emitter,
start_timestamp_millis=start_time,
attempt=1,
emit_template=False,
materialize_iolets=True,
)
print("ML training run started...")
# Simulate training
time.sleep(5)
# Track training completion
end_time = int(time.time() * 1000)
training_run.emit_process_end(
emitter=emitter,
end_timestamp_millis=end_time,
result=InstanceRunResult.SUCCESS,
result_type="mlflow",
attempt=1,
start_timestamp_millis=start_time,
)
print(f"ML training completed in {(end_time - start_time)/1000:.2f}s")
# Get all process instances for a specific DataJob
curl -X GET 'http://localhost:8080/relationships?direction=INCOMING&urn=urn%3Ali%3AdataJob%3A%28urn%3Ali%3AdataFlow%3A%28airflow%2Csales_pipeline%2Cprod%29%2Cprocess_sales_data%29&types=InstanceOf'
Response:
{
"start": 0,
"count": 10,
"relationships": [
{
"type": "InstanceOf",
"entity": "urn:li:dataProcessInstance:abc123..."
},
{
"type": "InstanceOf",
"entity": "urn:li:dataProcessInstance:def456..."
}
],
"total": 25
}
To get full details of each instance, fetch the entities:
curl 'http://localhost:8080/entities/urn%3Ali%3AdataProcessInstance%3Aabc123...'
Response includes all aspects:
{
"urn": "urn:li:dataProcessInstance:abc123...",
"aspects": {
"dataProcessInstanceProperties": {
"name": "scheduled__2024-01-15T10:00:00+00:00",
"type": "BATCH_SCHEDULED",
"created": {
"time": 1705318800000,
"actor": "urn:li:corpuser:datahub"
},
"customProperties": {
"airflow_version": "2.7.0"
}
},
"dataProcessInstanceInput": {
"inputs": [
"urn:li:dataset:(urn:li:dataPlatform:postgres,sales_db.raw_orders,PROD)"
]
},
"dataProcessInstanceOutput": {
"outputs": [
"urn:li:dataset:(urn:li:dataPlatform:postgres,sales_db.processed_orders,PROD)"
]
},
"dataProcessInstanceRelationships": {
"parentTemplate": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sales_pipeline,prod),process_sales_data)"
}
}
}
The DataHub GraphQL API provides a runs field on DataJob entities to query execution history:
query GetJobRuns {
dataJob(
urn: "urn:li:dataJob:(urn:li:dataFlow:(airflow,sales_pipeline,prod),process_sales_data)"
) {
runs(start: 0, count: 10) {
total
runs {
urn
created {
time
}
properties {
name
type
externalUrl
customProperties {
key
value
}
}
relationships {
parentTemplate
parentInstance
}
inputs {
urn
type
}
outputs {
urn
type
}
}
}
}
}
Note: The timeseries dataProcessInstanceRunEvent aspect contains the actual run status, timing, and results. To query this timeseries data, use the timeseries aggregation APIs or directly query the timeseries index.
DataProcessInstance is the bridge between orchestration platforms and DataHub's metadata layer:
The DataHub Airflow plugin automatically creates DataProcessInstance entities for:
The plugin tracks:
Configuration example:
from datahub_airflow_plugin.datahub_listener import DatahubListener
# In your airflow.cfg or environment:
# AIRFLOW__DATAHUB__ENABLED=true
# AIRFLOW__DATAHUB__DATAHUB_CONN_ID=datahub_rest_default
Similar patterns apply for:
DataProcessInstance complements DataJob and DataFlow entities:
| Entity | Purpose | Cardinality | Example |
|---|---|---|---|
| DataFlow | Pipeline definition | 1 per logical pipeline | Airflow DAG "sales_pipeline" |
| DataJob | Task definition | N per DataFlow | Airflow Task "process_sales_data" |
| DataProcessInstance | Execution run | M per DataJob/DataFlow | Task run on 2024-01-15 10:00 |
Key differences:
DataProcessInstance creates instance-level lineage to datasets:
Dataset Version 1 ─┐
├─> DataProcessInstance (Run #1) ─> Dataset Version 2
Dataset Version 1 ─┘
Dataset Version 2 ─┐
├─> DataProcessInstance (Run #2) ─> Dataset Version 3
Dataset Version 2 ─┘
This enables:
DataProcessInstance entities are exposed via GraphQL through several resolvers:
DataJobRunsResolver: Fetches process instances for a DataJob
parentTemplate fieldEntityRunsResolver: Generic resolver for any entity's runs
DataProcessInstanceMapper: Converts internal representation to GraphQL type
The GraphQL schema exposes:
type DataJob {
runs(start: Int, count: Int): DataProcessInstanceResult
}
type DataFlow {
runs(start: Int, count: Int): DataProcessInstanceResult
}
type DataProcessInstance {
urn: String!
properties: DataProcessInstanceProperties
relationships: DataProcessInstanceRelationships
inputs: [Entity]
outputs: [Entity]
# Note: Run events are in timeseries data
}
A common pitfall is confusing instance-level metadata with definition-level metadata:
Wrong: Attaching tags to a DataProcessInstance
# Don't do this - tags belong on the DataJob
instance = DataProcessInstance(...)
# instance.tags = ["pii", "critical"] # This doesn't exist
Right: Attach tags to the DataJob, use properties for run-specific metadata
# Tag the DataJob definition
datajob.tags = ["pii", "critical"]
# Use properties for run-specific metadata
instance = DataProcessInstance(...)
instance.properties = {"data_size_mb": "150", "row_count": "1000000"}
DataProcessInstance status follows a simple state machine:
Common mistakes:
DataProcessInstance entities can accumulate quickly in high-frequency pipelines:
Considerations:
DataHub does not automatically clean up old DataProcessInstance entities. You should implement cleanup based on your needs:
# Example: Cleanup instances older than 90 days
import datetime
from datahub.emitter.rest_emitter import DatahubRestEmitter
emitter = DatahubRestEmitter(gms_server="http://localhost:8080")
cutoff_time = int((datetime.datetime.now() - datetime.timedelta(days=90)).timestamp() * 1000)
# Query for old instances and soft-delete them
# (Implementation depends on your retention requirements)
The DataProcessInstance URN is generated from the orchestrator, cluster, and id fields. This means:
Best practices: