docs/lineage/airflow.md
:::note
If you're looking to schedule DataHub ingestion using Airflow, see the guide on scheduling ingestion with Airflow.
:::
The DataHub Airflow plugin supports:
MySqlOperator, PostgresOperator, SnowflakeOperator, BigQueryInsertJobOperator, and more), S3FileTransformOperator, and more.inlets and outlets on Airflow operators.The plugin requires Airflow 2.7+ and Python 3.10+. If you're using Airflow older than 2.7, it's possible to use the plugin with older versions of acryl-datahub-airflow-plugin. See the compatibility section for more details.
The plugin requires Airflow 2.7+ and Python 3.10+. If you don't meet these requirements, see the compatibility section for other options.
pip install 'acryl-datahub-airflow-plugin>=1.1.0.4'
Set up a DataHub connection in Airflow, either via command line or the Airflow UI.
airflow connections add --conn-type 'datahub-rest' 'datahub_rest_default' --conn-host 'http://datahub-gms:8080' --conn-password '<optional datahub auth token>'
If you are using DataHub Cloud then please use https://YOUR_PREFIX.acryl.io/gms as the --conn-host parameter.
On the Airflow UI, go to Admin -> Connections and click the "+" symbol to create a new connection. Select "DataHub REST Server" from the dropdown for "Connection Type" and enter the appropriate values.
<p align="center"> </p>No additional configuration is required to use the plugin. However, there are some optional configuration parameters that can be set in the airflow.cfg file.
[datahub]
# Optional - additional config here.
enabled = True # default
| Name | Default value | Description |
|---|---|---|
| enabled | true | If the plugin should be enabled. |
| conn_id | datahub_rest_default | The name of the datahub rest connection. |
| cluster | prod | name of the airflow cluster, this is equivalent to the env of the instance |
| platform_instance | None | The instance of the platform that all assets produced by this plugin belong to. It is optional. |
| capture_ownership_info | true | Extract DAG ownership. |
| capture_ownership_as_group | false | When extracting DAG ownership, treat DAG owner as a group rather than a user |
| capture_tags_info | true | Extract DAG tags. |
| capture_executions | true | Extract task runs and success/failure statuses. This will show up in DataHub "Runs" tab. |
| materialize_iolets | true | Create or un-soft-delete all entities referenced in lineage. |
| enable_extractors | true | Enable automatic lineage extraction. |
| disable_openlineage_plugin | true | Disable the OpenLineage plugin to avoid duplicative processing. |
| enable_multi_statement_sql_parsing | false | Parse multiple SQL statements within a single task. Resolves temp tables and merges lineage across statements in one execution. |
| log_level | no change | [debug] Set the log level for the plugin. |
| debug_emitter | false | [debug] If true, the plugin will log the emitted events. |
| dag_filter_str | { "allow": [".*"] } | AllowDenyPattern value in form of JSON string to filter the DAGs from running. |
| enable_datajob_lineage | true | If true, the plugin will emit input/output lineage for DataJobs. |
| capture_airflow_assets | true | Capture native Airflow Assets/Datasets as DataHub lineage. See Native Airflow Assets/Datasets. |
To automatically extract lineage information, the plugin builds on top of Airflow's built-in OpenLineage support. As such, we support a superset of the default operators that Airflow/OpenLineage supports.
The SQL-related extractors have been updated to use DataHub's SQL lineage parser, which is more robust than the built-in one and uses DataHub's metadata information to generate column-level lineage.
Supported operators:
SQLExecuteQueryOperator, including any subclasses. Note that in newer versions of Airflow (generally Airflow 2.5+), most SQL operators inherit from this class.AthenaOperator and AWSAthenaOperatorBigQueryOperator and BigQueryExecuteQueryOperatorBigQueryInsertJobOperator (incubating)MySqlOperatorPostgresOperatorRedshiftSQLOperatorSnowflakeOperator and SnowflakeOperatorAsyncSqliteOperatorTeradataOperator (Note: Teradata uses two-tier database.table naming without a schema level)TrinoOperatorWhen a task executes multiple SQL statements (e.g., CREATE TEMP TABLE ...; INSERT ... FROM temp_table;), enable this to parse all statements together and resolve temporary table dependencies. By default (False), only the first statement is parsed.
[datahub]
enable_multi_statement_sql_parsing = True # Default: False
Note: Use a list of SQL strings (recommended) or semicolon-separated statements in a single string:
inlets and outletsYou can manually annotate lineage by setting inlets and outlets on your Airflow operators. This is useful if you're using an operator that doesn't support automatic lineage extraction, or if you want to override the automatic lineage extraction.
We have a few code samples that demonstrate how to use inlets and outlets:
For more information, take a look at the Airflow lineage docs.
Starting with Airflow 2.4+, you can use native Airflow Datasets (renamed to Assets in Airflow 3.x) for data-aware scheduling. The DataHub plugin automatically captures these as lineage when used in inlets and outlets.
from airflow.sdk.definitions.asset import Asset # Airflow 3.x
# or: from airflow.datasets import Dataset as Asset # Airflow 2.4+
s3_input = Asset("s3://my-bucket/input/data.parquet")
bigquery_output = Asset("bigquery://my-project/dataset/result_table")
task = BashOperator(
task_id="process_data",
bash_command="echo 'Processing'",
inlets=[s3_input],
outlets=[bigquery_output],
)
The plugin maps URI schemes to DataHub platforms:
| URI Scheme | DataHub Platform |
|---|---|
s3://, s3a:// | s3 |
gs://, gcs:// | gcs |
postgresql:// | postgres |
mysql:// | mysql |
bigquery:// | bigquery |
snowflake:// | snowflake |
file:// | file |
hdfs:// | hdfs |
abfs://, abfss:// | adls |
Plain name assets (e.g., from the @asset decorator) default to the airflow platform.
[datahub]
# Set to false to disable capturing Airflow Assets as lineage (default: true)
capture_airflow_assets = true
Native Airflow Assets have the following limitations compared to using DataHub's Dataset or Urn entities directly:
No platform_instance support: The URN generated from an Airflow Asset URI cannot include a platform instance. The plugin only extracts the platform, dataset name, and environment from the URI.
Environment uses global plugin config: All native Airflow Assets use the cluster setting from the plugin configuration as their environment. You cannot specify a different environment per asset.
If you need platform_instance or per-asset environment control, use the DataHub entity classes instead:
from datahub_airflow_plugin.entities import Dataset
# Full control over URN components
s3_input = Dataset(
platform="s3",
name="my-bucket/input/data.parquet",
env="PROD",
platform_instance="us-west-2" # Specify platform instance
)
task = BashOperator(
task_id="process_data",
bash_command="echo 'Processing'",
inlets=[s3_input],
)
If you have created a custom Airflow operator that inherits from the BaseOperator class,
when overriding the execute function, set inlets and outlets via context['ti'].task.inlets and context['ti'].task.outlets.
The DataHub Airflow plugin will then pick up those inlets and outlets after the task runs.
You can only set table-level lineage using inlets and outlets. For column-level lineage, you need to write a custom extractor for your custom operator.
class DbtOperator(BaseOperator):
...
def execute(self, context):
# do something
inlets, outlets = self._get_lineage()
# inlets/outlets are lists of either datahub_airflow_plugin.entities.Dataset or datahub_airflow_plugin.entities.Urn
context['ti'].task.inlets = self.inlets
context['ti'].task.outlets = self.outlets
def _get_lineage(self):
# Do some processing to get inlets/outlets
return inlets, outlets
If you override the pre_execute and post_execute function, ensure they include the @prepare_lineage and @apply_lineage decorators respectively. Reference the Airflow docs for more details.
See example implementation of a custom operator using SQL parser to capture table level lineage here
If you're building a custom SQL operator, you have two approaches depending on your needs:
This is the easiest approach - inherit from Airflow's SQLExecuteQueryOperator and you automatically get:
from typing import Any
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
class MyCustomSQLOperator(SQLExecuteQueryOperator):
"""
Custom SQL operator that inherits OpenLineage support.
DataHub automatically enhances the SQL parsing with column-level lineage!
"""
def __init__(self, my_custom_param: str, **kwargs):
# Add your custom parameters
self.my_custom_param = my_custom_param
super().__init__(**kwargs)
def execute(self, context: Any) -> Any:
# Add any custom logic before SQL execution
self.log.info(f"Custom param: {self.my_custom_param}")
# Parent class handles SQL execution + OpenLineage lineage
return super().execute(context)
How it works:
SQLExecuteQueryOperator already has get_openlineage_facets_on_complete() implementedSQLParserSQLParser globally, so all SQL parsing gets enhanced automaticallyWhen to use this:
Only needed if you're building a completely custom operator that doesn't fit the SQLExecuteQueryOperator pattern:
from typing import Any, Optional
from airflow.models.baseoperator import BaseOperator
class MyCompletelyCustomOperator(BaseOperator):
"""
For special cases where SQLExecuteQueryOperator doesn't fit.
"""
def execute(self, context: Any) -> Any:
# Your custom SQL execution logic
pass
def get_openlineage_facets_on_complete(
self, task_instance: Any
) -> Optional["OperatorLineage"]:
"""
Implement OpenLineage interface manually.
DataHub's SQLParser patch still enhances this automatically!
"""
from airflow.providers.openlineage.sqlparser import SQLParser
hook = self.get_db_hook()
parser = SQLParser(
dialect=hook.get_openlineage_database_dialect(hook.get_connection(self.conn_id)),
default_schema=hook.get_openlineage_default_schema(),
)
# This uses DataHub's patched SQLParser - column lineage included!
return parser.generate_openlineage_metadata_from_sql(
sql=self.sql,
hook=hook,
database_info=hook.get_openlineage_database_info(hook.get_connection(self.conn_id)),
)
When to use this:
Key Point: DataHub patches SQLParser.generate_openlineage_metadata_from_sql() globally at import time, so any operator using OpenLineage's SQLParser automatically gets DataHub's enhanced parsing with column-level lineage!
If you prefer not to use OpenLineage, or are on older Airflow versions, you can manually extract and set lineage using DataHub's SQL parser:
from typing import Any, List, Tuple, Union
from airflow.models.baseoperator import BaseOperator
from datahub_airflow_plugin._config import get_enable_multi_statement
from datahub_airflow_plugin._sql_parsing_common import parse_sql_with_datahub
from datahub_airflow_plugin.entities import Urn
class CustomSQLOperator(BaseOperator):
def __init__(self, sql: Union[str, List[str]], database: str, **kwargs: Any):
super().__init__(**kwargs)
self.sql = sql
self.database = database
def execute(self, context: Any) -> Any:
# Execute SQL
# ...
# Extract and set lineage
inlets, outlets = self._get_lineage()
context["ti"].task.inlets = inlets
context["ti"].task.outlets = outlets
def _get_lineage(self) -> Tuple[List, List]:
# Get multi-statement config flag
enable_multi_statement = get_enable_multi_statement()
# Parse SQL with multi-statement support
# Handles both string and list of SQL statements
sql_parsing_result = parse_sql_with_datahub(
sql=self.sql,
platform="postgres", # your platform
default_database=self.database,
env="PROD",
default_schema=None,
graph=None,
enable_multi_statement=enable_multi_statement,
)
inlets = [Urn(table) for table in sql_parsing_result.in_tables]
outlets = [Urn(table) for table in sql_parsing_result.out_tables]
return inlets, outlets
See full example.
For advanced use cases with the legacy OpenLineage package (openlineage-airflow), you can create a custom extractor. This is useful if you're using a built-in Airflow operator for which we don't support automatic lineage extraction.
See this example PR which adds a custom extractor for the BigQueryInsertJobOperator operator.
There might be a case where the DAGs are removed from the Airflow but the corresponding pipelines and tasks are still there in the DataHub, let's call such pipelines ans tasks, obsolete pipelines and tasks
Following are the steps to cleanup them from the datahub:
Datahub_Cleanup, i.e.from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
from datahub_airflow_plugin.entities import Dataset, Urn
with DAG(
"Datahub_Cleanup",
start_date=datetime(2024, 1, 1),
schedule_interval=None,
catchup=False,
) as dag:
task = BashOperator(
task_id="cleanup_obsolete_data",
dag=dag,
bash_command="echo 'cleaning up the obsolete data from datahub'",
)
cluster value set in the airflow.cfgIf you are looking to find all tasks (aka DataJobs) that belong to a specific pipeline (aka DataFlow), you can use the following GraphQL query:
query {
dataFlow(urn: "urn:li:dataFlow:(airflow,db_etl,prod)") {
childJobs: relationships(
input: { types: ["IsPartOf"], direction: INCOMING, start: 0, count: 100 }
) {
total
relationships {
entity {
... on DataJob {
urn
}
}
}
}
}
}
If you can't use the plugin or annotate inlets/outlets, you can also emit lineage using the DatahubEmitterOperator.
Reference lineage_emission_dag.py for a full example.
In order to use this example, you must first configure the DataHub hook. Like in ingestion, we support a DataHub REST hook and a Kafka-based hook. See the plugin configuration for examples.
If you're not seeing lineage in DataHub, check the following:
INFO [datahub_airflow_plugin.datahub_listener] DataHub plugin using DataHubRestEmitter: configured to talk to <datahub_url> during Airflow startup, and the airflow plugins command should list datahub_plugin with a listener enabled.enable_extractors config is set to true and that automatic lineage is supported for your operator.datahub_airflow_plugin.entities.Dataset or datahub_airflow_plugin.entities.Urn classes for your inlets and outlets.If your URLs aren't being generated correctly (usually they'll start with http://localhost:8080 instead of the correct hostname), you may need to set the webserver base_url config.
[webserver]
base_url = http://airflow.mycorp.example.com
If you see errors like the following:
ERROR - on_task_instance_success() missing 3 required positional arguments: 'previous_state', 'task_instance', and 'session'
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/datahub_airflow_plugin/datahub_listener.py", line 124, in wrapper
f(*args, **kwargs)
TypeError: on_task_instance_success() missing 3 required positional arguments: 'previous_state', 'task_instance', and 'session'
The solution is to upgrade acryl-datahub-airflow-plugin>=0.12.0.4 or upgrade pluggy>=1.2.0. See this PR for details.
For extremely large Airflow deployments with thousands of tasks, you may see issues where the plugin interferes with the performance of the Airflow scheduler. In those cases, you can set the DATAHUB_AIRFLOW_PLUGIN_RUN_IN_THREAD_TIMEOUT=0 environment variable. This makes the DataHub plugin run fully in background threads, but can cause us to miss some metadata if the scheduler shuts down soon after processing a task.
There are two ways to disable the DataHub Plugin:
Set the datahub.enabled configuration property to False in the airflow.cfg file and restart the Airflow environment to reload the configuration and disable the plugin.
[datahub]
enabled = False
If a restart is not possible and you need a faster way to disable the plugin, you can use the kill-switch. Set the AIRFLOW_VAR_DATAHUB_AIRFLOW_PLUGIN_DISABLE_LISTENER environment variable to true. This ensures that the listener won't process anything.
export AIRFLOW_VAR_DATAHUB_AIRFLOW_PLUGIN_DISABLE_LISTENER=true
This will immediately disable the plugin without requiring a restart.
:::note Why Environment Variable Instead of Airflow Variable?
The plugin uses environment variables instead of Airflow's Variable.get() because listener hooks are called during SQLAlchemy's after_flush event (before the main transaction commits). Calling Variable.get() in this context creates a nested database session that can interfere with the outer transaction and cause data loss, such as missing TaskInstanceHistory records for retried tasks.
:::
We try to support Airflow releases for ~2 years after their release. This is a best-effort guarantee - it's not always possible due to dependency / security issues cropping up in older versions.
We no longer officially support Airflow <2.7. However, you can use older versions of acryl-datahub-airflow-plugin with older versions of Airflow.
We previously had two implementations of the plugin - v1 and v2. The v2 plugin is now the default, and the v1 plugin has since been removed. The v1 plugin had many limitations, chiefly that it does not support automatic lineage extraction. Docs for the v1 plugin can be accessed in our docs archive.
All recent versions require Python 3.10+.
DataHub also previously supported an Airflow lineage backend implementation. The lineage backend functionality was pretty limited - it did not support automatic lineage extraction, did not capture task failures, and did not work in AWS MWAA - and so it has been removed from the codebase. The documentation for the lineage backend has been archived.
Related DataHub videos: