metadata-ingestion-modules/airflow-plugin/AIRFLOW_3_MIGRATION.md
This document outlines the changes made to support Apache Airflow 3.x and known limitations.
Apache Airflow 3.0 introduced significant breaking changes. The DataHub Airflow plugin has been fully updated to support both Airflow 2.4+ and 3.x with backward compatibility.
The plugin now uses separate implementations for Airflow 2.x and 3.x to achieve clean type safety and maintainability:
| Component | Airflow 2.x Implementation | Airflow 3.x Implementation |
|---|---|---|
| Main Module | airflow2/datahub_listener.py | airflow3/datahub_listener.py |
| Shims/Imports | airflow2/_shims.py | airflow3/_shims.py |
| Lineage Extraction | Extractor-based (_extractors.py) | OpenLineage native (_airflow3_sql_parser_patch.py) |
| OpenLineage Package | openlineage-airflow>=1.2.0 | Native provider (apache-airflow-providers-openlineage) |
| Type Checking | ✅ Clean Airflow 2.x types | ✅ Clean Airflow 3.x types |
Version Dispatcher: The main datahub_listener.py automatically imports the correct implementation at runtime:
from datahub_airflow_plugin._airflow_version_specific import IS_AIRFLOW_3_OR_HIGHER
if IS_AIRFLOW_3_OR_HIGHER:
from datahub_airflow_plugin.airflow3.datahub_listener import (
DataHubListener,
get_airflow_plugin_listener,
)
else:
from datahub_airflow_plugin.airflow2.datahub_listener import (
DataHubListener,
get_airflow_plugin_listener,
)
Benefits:
Package Installation:
# For Airflow 2.x (uses standalone openlineage-airflow package)
pip install 'acryl-datahub-airflow-plugin[airflow2]'
# For Airflow 3.x (uses native OpenLineage provider)
pip install 'acryl-datahub-airflow-plugin[airflow3]'
# For compatibility across versions (installs base package with native provider)
pip install 'acryl-datahub-airflow-plugin' # Works with both Airflow 2.7+ and 3.x
The most significant change is how lineage extraction works:
| Aspect | Airflow 2.x | Airflow 3.x |
|---|---|---|
| Lineage Extraction | Operator-specific extractors | Unified SQLParser patch |
| Customization Point | Custom extractor per operator | Single SQL parser integration |
| Column Lineage | Extractor-dependent | ✅ Consistent across all SQL operators |
| Maintenance | Multiple extractors to maintain | Single integration point |
In Airflow 2.x, we used operator-specific extractors:
# Different extractor for each SQL operator
SnowflakeExtractor, PostgresExtractor, MySQLExtractor, etc.
In Airflow 3.x, we use a unified SQLParser patch:
# Single patch point for ALL SQL operators
SQLParser.generate_openlineage_metadata_from_sql = datahub_enhanced_version
Benefits:
Airflow 3.0 introduced RuntimeTaskInstance which has a different structure than Airflow 2.x's TaskInstance:
| Attribute | Airflow 2.x | Airflow 3.0 RuntimeTaskInstance | Status |
|---|---|---|---|
run_id | ✅ Database field | ✅ Base class | Available in both |
start_date | ✅ Database field | ✅ RuntimeTI field | Available in both |
try_number | ✅ Database field | ✅ Base class | Available in both |
state | ✅ Database field | ✅ RuntimeTI field | Available in both |
task_id | ✅ Database field | ✅ Base class | Available in both |
dag_id | ✅ Database field | ✅ Base class | Available in both |
max_tries | ✅ Database field | ✅ RuntimeTI field | Available in both |
end_date | ✅ Database field | ✅ RuntimeTI field | Available in both |
log_url | ✅ Property | ✅ RuntimeTI field | Available in both |
execution_date | ✅ Database field | ❌ Renamed | Renamed to logical_date |
duration | ✅ Database field | ❌ Not available | Missing - must be calculated |
external_executor_id | ✅ Database field | ❌ Not available | Missing in Airflow 3.0 |
operator | ✅ Database field (string) | ⚠️ Different | Has task (BaseOperator) instead |
priority_weight | ✅ Database field | ❌ Not available | Missing in Airflow 3.0 |
Key Changes:
TaskInstance only has: id, task_id, dag_id, run_id, try_number, map_index, hostnameexecution_date was renamedPlugin Compatibility:
The plugin uses hasattr() checks to gracefully handle missing attributes:
def get_task_instance_attributes(ti: "TaskInstance") -> Dict[str, str]:
attributes = {}
# Safe attribute access - works in both versions
if hasattr(ti, "run_id"):
attributes["run_id"] = str(ti.run_id)
# Handle renamed attribute
if hasattr(ti, "execution_date"):
attributes["execution_date"] = str(ti.execution_date)
elif hasattr(ti, "logical_date"):
attributes["logical_date"] = str(ti.logical_date)
# Handle missing attributes gracefully
if hasattr(ti, "duration"):
attributes["duration"] = str(ti.duration)
return attributes
This approach ensures the plugin works correctly with both Airflow 2.x and 3.x task instances.
Files Updated:
src/datahub_airflow_plugin/_airflow_version_specific.py:21-74 - Version-compatible attribute extractionschedule_interval → schedule; default_view removed| Feature | Airflow 2.x | Airflow 3.x | Status |
|---|---|---|---|
| Task lineage | ✅ | ✅ | Fully working |
| Column lineage | ✅ | ✅ | Fully working |
| DAG metadata | ✅ | ✅ | Fully working |
| Execution tracking | ✅ | ✅ | Fully working |
| Threading | ✅ | ✅ | Fully working |
| SubDAG support | ✅ | ❌ | Removed in Airflow 3.x |
This section provides in-depth information about each change made for Airflow 3.x compatibility.
Airflow 3.x reorganized many modules under a new SDK structure. The plugin now uses conditional imports with fallbacks.
# Airflow 3.x (preferred)
from airflow.sdk.bases.operator import BaseOperator
# Airflow 2.x (fallback)
from airflow.models.baseoperator import BaseOperator
# Airflow 3.x (preferred)
from airflow.sdk.types import Operator
# Airflow 2.x (fallback)
from airflow.models.operator import Operator
# Airflow 3.x (preferred)
from airflow.providers.standard.operators.empty import EmptyOperator
# Airflow 2.x (fallback)
from airflow.operators.empty import EmptyOperator
# Airflow <2.2 (fallback)
from airflow.operators.dummy import DummyOperator
# Airflow 3.x (preferred)
from airflow.providers.standard.sensors.external_task import ExternalTaskSensor
# Airflow 2.x (fallback)
from airflow.sensors.external_task import ExternalTaskSensor
Files Updated:
src/datahub_airflow_plugin/airflow2/_shims.py - Clean Airflow 2.x importssrc/datahub_airflow_plugin/airflow3/_shims.py - Clean Airflow 3.x importssrc/datahub_airflow_plugin/_airflow_shims.py - Pure dispatcher (no logic, just routes to version-specific shims)src/datahub_airflow_plugin/client/airflow_generator.py - Import from shimsAirflow 2.7+ introduced native OpenLineage support, and Airflow 3.x completely removed support for the old openlineage-airflow package. The native provider has a different API and doesn't include SQL extractors.
# Airflow 2.7+ and 3.x (native provider)
from airflow.providers.openlineage.extractors import OperatorLineage as TaskMetadata
from airflow.providers.openlineage.extractors.base import BaseExtractor
from airflow.providers.openlineage.extractors.manager import ExtractorManager
# Airflow < 2.7 (old openlineage-airflow package)
from openlineage.airflow.extractors import TaskMetadata, BaseExtractor, ExtractorManager
from openlineage.airflow.extractors.sql_extractor import SqlExtractor
The native provider's ExtractorManager has a different API:
| Feature | Old Package (< 2.7) | Native Provider (2.7+) |
|---|---|---|
| Extractor registry | self.task_to_extractor.extractors | self.extractors |
| Add extractor | Direct dict assignment | Direct dict assignment or add_extractor() |
| SQL extractor | ✅ Included | ❌ Not included |
| Task metadata type | TaskMetadata | OperatorLineage (aliased as TaskMetadata) |
The plugin implements a compatibility layer that:
SqlExtractor)# Compatibility check in ExtractorManager.__init__
if hasattr(self, 'task_to_extractor'):
# Old openlineage-airflow (Airflow < 2.7)
extractors_dict = self.task_to_extractor.extractors
else:
# Native provider (Airflow 2.7+)
extractors_dict = self.extractors
Impact:
Files Updated:
src/datahub_airflow_plugin/_extractors.py - Conditional OpenLineage imports and API compatibility layerKnown Limitations:
Airflow 3.x removed the schedule_interval parameter in favor of schedule.
# Airflow 3.x (required)
DAG("my_dag", schedule=None, ...)
# Airflow 2.4+ (deprecated but supported)
DAG("my_dag", schedule_interval=None, ...)
# Airflow <2.4 (only option)
DAG("my_dag", schedule_interval=None, ...)
Note: The schedule parameter was introduced in Airflow 2.4.0, so test DAGs use schedule= which works in both Airflow 2.4+ and 3.x.
Files Updated:
tests/integration/dags/*.pyAirflow 3.x removed the default_view parameter from the DAG constructor.
# Airflow 2.x (supported)
DAG("my_dag", default_view="tree", ...) # Set default UI view
# Airflow 3.x (removed)
DAG("my_dag", ...) # default_view parameter no longer accepted
Reason for Removal:
Valid default_view values in Airflow 2.x:
"tree" - Tree view (hierarchical task structure)"graph" - Graph view (visual DAG graph)"duration" - Duration view"gantt" - Gantt chart view"landing_times" - Landing times viewMigration: Simply remove the default_view parameter from DAG definitions when upgrading to Airflow 3.x.
Files Updated:
tests/integration/dags/airflow3/datahub_emitter_operator_jinja_template_dag.py - Removed default_view="tree"Airflow 3.x removed the v1 API and only supports v2.
# Airflow 3.x
api_version = "v2"
# Airflow 2.x
api_version = "v1"
Airflow 3.x uses JWT token-based authentication instead of HTTP Basic Auth.
# Airflow 3.x - Get JWT token
response = requests.post(
f"{airflow_url}/auth/token",
data={"username": username, "password": password}
)
token = response.json()["access_token"]
session.headers["Authorization"] = f"Bearer {token}"
# Airflow 2.x - HTTP Basic Auth
session.auth = (username, password)
Airflow 3.x moved some configuration options:
# Airflow 3.x
AIRFLOW__API__PORT=8080
AIRFLOW__API__BASE_URL=http://airflow.example.com # Used for log URLs
# Airflow 2.x
AIRFLOW__WEBSERVER__WEB_SERVER_PORT=8080
AIRFLOW__WEBSERVER__BASE_URL=http://airflow.example.com # Used for log URLs
Log URL Format Changes:
Airflow 3.x changed the log URL format and configuration:
| Aspect | Airflow 2.x | Airflow 3.x |
|---|---|---|
| Config key | webserver.base_url | api.base_url |
| URL format | http://host/dags/{dag_id}/grid?dag_run_id={run_id}&task_id={task_id}&base_date={date}&tab=logs | http://host/dags/{dag_id}/runs/{run_id}/tasks/{task_id}?try_number={try_number} |
| Source | TaskInstance.log_url property | TaskInstance.log_url property |
Example Log URLs:
# Airflow 2.x
"http://airflow.example.com/dags/my_dag/grid?dag_run_id=manual_run&task_id=my_task&base_date=2023-09-27T21%3A34%3A38%2B0000&tab=logs"
# Airflow 3.x
"http://airflow.example.com/dags/my_dag/runs/manual_run/tasks/my_task?try_number=1"
Impact on DataHub:
log_url in DataHub's DataProcessInstance will reflect the Airflow version's formatFiles Updated:
tests/integration/test_plugin.py - Authentication, API version logic, and base URL configurationsrc/datahub_airflow_plugin/_airflow_version_specific.py - Task instance attribute extraction including log_urlAirflow 3.x renamed the --exec-date parameter to --logical-date.
# Airflow 3.x
airflow dags trigger --logical-date "2023-09-27T21:34:38+00:00" my_dag
# Airflow 2.x
airflow dags trigger --exec-date "2023-09-27T21:34:38+00:00" my_dag
Files Updated:
tests/integration/test_plugin.py - Conditional CLI parameterAirflow 3.x changed the signatures of listener hooks to remove the session parameter and add new parameters.
Airflow 3.x signatures:
on_task_instance_running(previous_state, task_instance)
on_task_instance_success(previous_state, task_instance)
on_task_instance_failed(previous_state, task_instance, error)
Airflow 2.x signatures:
on_task_instance_running(previous_state, task_instance, session)
on_task_instance_success(previous_state, task_instance, session)
on_task_instance_failed(previous_state, task_instance, session)
Compatibility Fix:
The plugin uses **kwargs to handle both versions without breaking pluggy's hook matching:
@hookimpl
def on_task_instance_running(self, previous_state, task_instance, **kwargs):
# Extract session if present (Airflow 2.x)
session = kwargs.get("session")
...
@hookimpl
def on_task_instance_failed(self, previous_state, task_instance, **kwargs):
# Extract error and session from kwargs (Airflow 3.x passes error, 2.x passes session)
session = kwargs.get("session")
error = kwargs.get("error")
...
Important: Using default parameters like session=None in Airflow 3.0 causes pluggy to fail to match the hook spec, preventing hooks from being called.
Files Updated:
src/datahub_airflow_plugin/datahub_listener.py:559-772 - Listener hook signaturesAirflow 3.x completely removed SubDAGs (deprecated since Airflow 2.0).
dag.is_subdag - ❌ Removeddag.parent_dag - ❌ Removedtask.subdag - ❌ RemovedSubDagOperator - ❌ RemovedThe plugin uses defensive attribute access:
# Safe for both Airflow 2.x and 3.x
if getattr(dag, "is_subdag", False) and dag.parent_dag is not None:
# Handle subdag (only executes in Airflow 2.x)
...
Files Updated:
src/datahub_airflow_plugin/client/airflow_generator.py:76 - SubDAG handlingAirflow listener hooks are called during SQLAlchemy's after_flush event, before the main transaction commits. Any database operations that create new sessions and commit them can interfere with the outer transaction and cause data loss.
The kill switch feature originally used Variable.get() to check if the plugin should be disabled:
# This causes issues:
# - Airflow 3.x: RuntimeError: UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS!
# - Airflow 2.x: Can cause TaskInstanceHistory records to not be persisted
# (see: https://github.com/apache/airflow/pull/48780)
if Variable.get("datahub_airflow_plugin_disable_listener", "false").lower() == "true":
return True
Variable.get() uses the @provide_session decorator which creates a new database session and commits it. When called from listener hooks (which execute during after_flush, before the main transaction commits), this nested commit can corrupt the outer transaction state and cause data loss.
Both Airflow 2.x and 3.x versions now use environment variables instead of database queries:
Both versions (airflow2/datahub_listener.py and airflow3/datahub_listener.py):
def check_kill_switch(self) -> bool:
"""
Check kill switch using environment variable.
We use os.getenv() instead of Variable.get() because Variable.get()
creates a new database session and commits it. When called from listener
hooks (which execute during SQLAlchemy's after_flush event, before the
main transaction commits), this nested commit can corrupt the outer
transaction state and cause data loss.
"""
if (
os.getenv(
f"AIRFLOW_VAR_{KILL_SWITCH_VARIABLE_NAME}".upper(), "false"
).lower()
== "true"
):
logger.debug("DataHub listener disabled by kill switch (env var)")
return True
return False
To disable the plugin (both Airflow 2.x and 3.x):
export AIRFLOW_VAR_DATAHUB_AIRFLOW_PLUGIN_DISABLE_LISTENER=true
Files Updated:
src/datahub_airflow_plugin/airflow2/datahub_listener.py - Kill switch using env varsrc/datahub_airflow_plugin/airflow3/datahub_listener.py - Kill switch using env varThreading is enabled by default in both Airflow 2.x and 3.x. Initial concerns about RuntimeTaskInstance unpickleable objects were unfounded.
Key insight: threading.Thread does NOT pickle its arguments - it passes object references directly within the same process. Only multiprocessing.Process requires pickling.
# This works fine - no pickling required
thread = threading.Thread(target=f, args=(task_instance,))
thread.start()
Verification: Tests pass successfully with DATAHUB_AIRFLOW_PLUGIN_RUN_IN_THREAD=true in Airflow 3.0.
Threading is enabled by default for performance benefits:
# Default: threading enabled
_RUN_IN_THREAD = os.getenv(
"DATAHUB_AIRFLOW_PLUGIN_RUN_IN_THREAD", "true"
).lower() in ("true", "1")
To disable threading (if needed):
export DATAHUB_AIRFLOW_PLUGIN_RUN_IN_THREAD=false
Benefits of Threading:
Files Updated:
src/datahub_airflow_plugin/datahub_listener.py:147-152 - Threading configurationAirflow 3.x's RuntimeTaskInstance cannot be deep-copied due to unpickleable objects.
The plugin previously deep-copied task instances to render Jinja templates:
# This fails in Airflow 3.x: TypeError: cannot pickle '_thread.lock' object
task_instance_copy = copy.deepcopy(task_instance)
task_instance_copy.render_templates()
The plugin now has separate template rendering implementations for each version:
Airflow 2.x (airflow2/datahub_listener.py):
def _render_templates(task_instance: "TaskInstance") -> "TaskInstance":
# Render templates in a copy of the task instance
try:
task_instance_copy = copy.deepcopy(task_instance)
task_instance_copy.render_templates()
return task_instance_copy
except Exception as e:
logger.info(f"Error rendering templates: {e}")
return task_instance
Airflow 3.x (airflow3/datahub_listener.py):
def _render_templates(task_instance: "TaskInstance") -> "TaskInstance":
"""
Templates are already rendered in Airflow 3.x by the task execution system.
RuntimeTaskInstance contains unpickleable thread locks, so we cannot use deepcopy.
RuntimeTaskInstance.task contains the operator with rendered templates.
"""
logger.debug(
"Skipping template rendering for Airflow 3.0+ (already rendered by task worker)"
)
return task_instance
Impact: Jinja-templated SQL queries are correctly parsed in both Airflow 2.x and 3.x.
Files Updated:
src/datahub_airflow_plugin/airflow2/datahub_listener.py - Template rendering for Airflow 2.xsrc/datahub_airflow_plugin/airflow3/datahub_listener.py - Template rendering for Airflow 3.xAirflow 3.x removed the extractor-based SQL parsing mechanism. SQL operators now call SQLParser.generate_openlineage_metadata_from_sql() directly.
Airflow 2.x:
SQL Operator → OpenLineage Extractor → DataHub Extractor → DataHub SQL Parser → Lineage
Airflow 3.x:
SQL Operator → SQLParser.generate_openlineage_metadata_from_sql() → [Patched by DataHub] → DataHub SQL Parser → Lineage
The plugin patches SQLParser.generate_openlineage_metadata_from_sql() to use DataHub's SQL parser:
def patch_sqlparser():
from airflow.providers.openlineage.sqlparser import SQLParser
# Store original method for fallback
SQLParser._original_generate_openlineage_metadata_from_sql = (
SQLParser.generate_openlineage_metadata_from_sql
)
# Replace with DataHub-enhanced version
SQLParser.generate_openlineage_metadata_from_sql = (
_datahub_generate_openlineage_metadata_from_sql
)
The patched method:
run_facets for later retrievalColumn-Level Lineage: The SQL parsing result is stored in the OperatorLineage.run_facets dictionary:
DATAHUB_SQL_PARSING_RESULT_KEY = "datahub_sql_parsing_result"
run_facets = {
DATAHUB_SQL_PARSING_RESULT_KEY: sql_parsing_result
}
operator_lineage = OperatorLineage(
inputs=inputs,
outputs=outputs,
job_facets={"sql": SqlJobFacet(query=sql)},
run_facets=run_facets,
)
The listener retrieves it and processes column lineage:
if DATAHUB_SQL_PARSING_RESULT_KEY in operator_lineage.run_facets:
sql_parsing_result = operator_lineage.run_facets[DATAHUB_SQL_PARSING_RESULT_KEY]
# Process column lineage
if sql_parsing_result.column_lineage:
fine_grained_lineages.extend(
FineGrainedLineageClass(
upstreamType=FineGrainedLineageUpstreamTypeClass.FIELD_SET,
downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD,
upstreams=[...],
downstreams=[...],
)
for column_lineage in sql_parsing_result.column_lineage
)
Important: OperatorLineage uses @define (attrs library) which creates a frozen dataclass. We cannot add arbitrary attributes to it, so we use the run_facets dictionary instead.
Supported Databases: All databases supported by DataHub's SQL parser:
Files Updated:
src/datahub_airflow_plugin/_airflow3_sql_parser_patch.py - SQLParser patch implementationsrc/datahub_airflow_plugin/datahub_listener.py:433-439 - Retrieve sql_parsing_result from run_facetssrc/datahub_airflow_plugin/_config.py - Enable SQL parser patch for Airflow 3.xIn Airflow 3.x, the DataHub plugin uses the SDK's Connection.get() method to initialize the emitter instead of relying on BaseHook.get_connection().
The BaseHook.get_connection() method requires SUPERVISOR_COMMS to be available in the execution context. However, in Airflow 3.x listener hooks (such as on_dag_start, on_dag_run_running), the SUPERVISOR_COMMS context is not available, causing connection retrieval to fail:
# This fails in listener context:
# ImportError: cannot import name 'SUPERVISOR_COMMS' from 'airflow.sdk.execution_time.task_runner'
hook = self.config.make_emitter_hook()
emitter = hook.make_emitter() # ❌ Fails: SUPERVISOR_COMMS not available
Why SUPERVISOR_COMMS is unavailable:
SUPERVISOR_COMMS is only available during actual task execution (when tasks are running)The plugin now uses the Airflow SDK's Connection.get() method to retrieve connection details:
def _create_single_emitter_from_connection(self, conn_id: str):
"""
Create a single emitter from a connection ID.
Uses Connection.get() from SDK which works in all contexts.
"""
from airflow.sdk import Connection
# Get connection using SDK API (works in all contexts)
conn = Connection.get(conn_id)
if not conn:
logger.warning(
f"Connection '{conn_id}' not found in secrets backend or environment variables"
)
return None
# Build emitter from connection details
host = conn.host or ""
if not host:
logger.warning(f"Connection '{conn_id}' has no host configured")
return None
# Parse URL and add port if needed
from urllib.parse import urlparse, urlunparse
parsed = urlparse(host if "://" in host else f"http://{host}")
netloc = parsed.netloc
if conn.port and not parsed.port:
netloc = f"{parsed.hostname}:{conn.port}"
host = urlunparse((
parsed.scheme or "http",
netloc,
parsed.path,
parsed.params,
parsed.query,
parsed.fragment
))
token = conf.get("datahub", "token", fallback=None) or conn.password
return DataHubRestEmitter(
host, token,
client_mode=ClientMode.INGESTION,
datahub_component="airflow-plugin",
**conn.extra_dejson
)
Why Connection.get() from SDK:
airflow.sdk.Connection is the proper SDK methodConnection.get_connection_from_secrets() from airflow.modelsImport Path:
# Airflow 3.x (correct, non-deprecated)
from airflow.sdk import Connection
# Airflow 2.x (deprecated in Airflow 3.x)
from airflow.models import Connection
Benefits:
urllib.parse for robust URL handling (handles IPv6, paths, query strings)CompositeEmitterToken Configuration:
The plugin supports token configuration via airflow.cfg (takes precedence) or connection password:
# airflow.cfg
[datahub]
token = your_datahub_token_here
If not set in airflow.cfg, the connection password field is used as a fallback.
Files Updated:
src/datahub_airflow_plugin/airflow3/datahub_listener.py:297-398 - Emitter initialization using SDK Connection APIIn Airflow 3.x, some operators require specific patches to enable proper lineage extraction because they either:
get_openlineage_database_info())The plugin patches these operators to provide full lineage support.
Problem: SqliteHook doesn't implement get_openlineage_database_info(), causing lineage extraction to fail.
Solution: Patch SqliteHook.get_openlineage_database_info() to return proper database info:
def get_openlineage_database_info(connection: Connection) -> DatabaseInfo:
# Extract database name from SQLite file path
db_path = connection.host
db_name = os.path.splitext(os.path.basename(db_path))[0]
return DatabaseInfo(
scheme="sqlite",
authority=None, # SQLite doesn't have host:port
database=db_name,
normalize_name_method=lambda x: x.lower(),
)
Files: src/datahub_airflow_plugin/airflow3/_sqlite_openlineage_patch.py
Problem: AthenaOperator uses SQLParser with dialect="generic", which doesn't provide column-level lineage.
Solution: Wrap AthenaOperator.get_openlineage_facets_on_complete() to:
def get_openlineage_facets_on_complete(self, task_instance):
# Get original OpenLineage result
operator_lineage = original_method(self, task_instance)
# Enhance with DataHub SQL parsing
sql_parsing_result = create_lineage_sql_parsed_result(
query=self.query,
platform="athena",
default_db=self.database,
)
# Store result in run_facets for DataHub listener
operator_lineage.run_facets["datahub_sql_parsing_result"] = sql_parsing_result
return operator_lineage
Files: src/datahub_airflow_plugin/airflow3/_athena_openlineage_patch.py
Problem: BigQueryInsertJobOperator stores SQL in a configuration dictionary, not as a direct attribute. This means:
SQLParser.generate_openlineage_metadata_from_sql()Solution: Wrap get_openlineage_facets_on_complete() to:
self.configuration.get("query", {}).get("query")run_facetsdef get_openlineage_facets_on_complete(self, task_instance):
# Extract SQL from configuration
sql = self.configuration.get("query", {}).get("query")
# Get original result
operator_lineage = original_method(self, task_instance)
# Run DataHub parser
sql_parsing_result = create_lineage_sql_parsed_result(
query=sql,
platform="bigquery",
default_db=self.project_id,
)
# Add destination table if specified in configuration
destination_table = self.configuration.get("query", {}).get("destinationTable")
if destination_table:
# Add to output tables
...
operator_lineage.run_facets["datahub_sql_parsing_result"] = sql_parsing_result
return operator_lineage
Files: src/datahub_airflow_plugin/airflow3/_bigquery_openlineage_patch.py
All patches are automatically applied when the plugin is loaded in Airflow 3.x:
# In _airflow_compat.py
from datahub_airflow_plugin.airflow3._sqlite_openlineage_patch import patch_sqlite_hook
from datahub_airflow_plugin.airflow3._athena_openlineage_patch import patch_athena_operator
from datahub_airflow_plugin.airflow3._bigquery_openlineage_patch import patch_bigquery_insert_job_operator
patch_sqlite_hook()
patch_athena_operator()
patch_bigquery_insert_job_operator()
Key Points:
run_facets["datahub_sql_parsing_result"] for the listener to processFiles Updated:
src/datahub_airflow_plugin/airflow3/_airflow_compat.py - Patch registrationsrc/datahub_airflow_plugin/airflow3/_sqlite_openlineage_patch.py - SQLite hook patchsrc/datahub_airflow_plugin/airflow3/_athena_openlineage_patch.py - Athena operator patchsrc/datahub_airflow_plugin/airflow3/_bigquery_openlineage_patch.py - BigQuery operator patchStatus: ✅ Fully Working
Column-level (fine-grained) lineage is now supported in Airflow 3.x through:
Example: For a SQL query like:
INSERT INTO processed_costs (id, month, total_cost, area, cost_per_area)
SELECT id, month, total_cost, area, total_cost / area
FROM costs
The plugin generates fine-grained lineage:
costs.id → processed_costs.idcosts.month → processed_costs.monthcosts.total_cost → processed_costs.total_costcosts.area → processed_costs.areacosts.area + costs.total_cost → processed_costs.cost_per_area (derived column)Verification: Run the Snowflake operator test:
tox -e py311-airflow302 -- -k "v2_snowflake_operator_airflow3"
Check the golden file for fineGrainedLineages:
grep -A 10 "fineGrainedLineages" tests/integration/goldens/v2_snowflake_operator_airflow3.json
Impact: If you're upgrading from Airflow 2.x and using SubDAGs, lineage tracking for subdags will no longer work in Airflow 3.x.
Reason: SubDAGs were completely removed from Airflow 3.x.
Migration Path: Use TaskGroups instead of SubDAGs. TaskGroups provide visual grouping without creating separate DAG runs.
# Old (Airflow 2.x) - SubDAG
from airflow.operators.subdag import SubDagOperator
def subdag(parent_dag_name, child_dag_name, args):
dag = DAG(f"{parent_dag_name}.{child_dag_name}", **args)
# Add tasks...
return dag
with DAG("parent_dag") as dag:
subdag_task = SubDagOperator(
task_id="subdag",
subdag=subdag("parent_dag", "subdag", default_args)
)
# New (Airflow 3.x) - TaskGroup
from airflow.utils.task_group import TaskGroup
with DAG("parent_dag") as dag:
with TaskGroup("task_group") as tg:
# Add tasks to the group...
task1 = BashOperator(...)
task2 = BashOperator(...)
Lineage Note: TaskGroup lineage is tracked at the task level, not as a separate DAG entity.
When upgrading to Airflow 3.x, update your configuration:
# Old Airflow 2.x config
AIRFLOW__WEBSERVER__WEB_SERVER_PORT=8080
AIRFLOW__API__AUTH_BACKEND=airflow.api.auth.backend.basic_auth
# New Airflow 3.x config
AIRFLOW__API__PORT=8080
# AUTH_BACKEND no longer needed - uses SimpleAuthManager by default
The DataHub Airflow plugin tests are designed to work with:
| Airflow Version | Test Support | Notes |
|---|---|---|
| 2.3.x | ✅ Limited | Only v1 plugin tested |
| 2.4.x - 2.9.x | ✅ Full | Both v1 and v2 plugins |
| 3.0.x+ | ✅ Full | v2 plugin only |
Note: Airflow 3.x requires the v2 plugin (listener-based). The v1 plugin is not compatible.
# Test with Airflow 3.0.x
tox -e py311-airflow310
# Test with Airflow 3.1.x
tox -e py311-airflow31
The following checks are performed in tests:
Error: ImportError: cannot import name 'BaseOperator' from 'airflow.models.baseoperator'
Solution: This is expected in Airflow 3.x. The plugin's shims handle this automatically. If you see this error, ensure you're using the latest version of the plugin.
Error: 401 Unauthorized when calling Airflow API
Solution:
/auth/token endpoint is accessibleWarning: Code references is_subdag attribute
Solution: This is expected and safe. The plugin uses getattr(dag, "is_subdag", False) which returns False in Airflow 3.x without errors.
Error: TypeError: __init__() got an unexpected keyword argument 'schedule_interval'
Solution: Update DAG definitions to use schedule= instead of schedule_interval=. The schedule parameter is supported in Airflow 2.4+ and required in Airflow 3.x.
Error: TypeError: __init__() got an unexpected keyword argument 'default_view'
Solution: Remove the default_view parameter from DAG definitions. This parameter was removed in Airflow 3.x. User view preferences are now persistent in the UI.
When upgrading to Airflow 3.x:
schedule= instead of schedule_interval=default_view= parameter from all DAG definitionsFor issues related to Airflow 3.x compatibility:
| Version | Date | Changes |
|---|---|---|
| 1.0 | 2025-01-XX | Initial Airflow 3.x support added |