airflow-core/docs/faq.rst
.. Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
.. _faq:
Scheduling / Dag file parsing ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
There are very many reasons why your task might not be getting scheduled. Here are some of the common causes:
Does your script "compile", can the Airflow engine parse it and find your
Dag object? To test this, you can run airflow dags list and
confirm that your Dag shows up in the list. You can also run
airflow dags show foo_dag_id and confirm that your task
shows up in the graphviz format as expected. If you use the CeleryExecutor, you
may want to confirm that this works both where the scheduler runs as well
as where the worker runs.
Does the file containing your Dag contain the string airflow and DAG somewhere
in the contents? When searching the Dag directory, Airflow ignores files not containing
airflow and DAG in order to prevent the DagBag parsing from importing all python
files collocated with user's Dags.
Is your start_date set properly? For time-based Dags, the task won't be triggered until the
the first schedule interval following the start date has passed.
Is your schedule argument set properly? The default
is one day (datetime.timedelta(1)). You must specify a different schedule
directly to the Dag object you instantiate, not as a default_param, as task instances
do not override their parent Dag's schedule.
Is your start_date beyond where you can see it in the UI? If you
set your start_date to some time say 3 months ago, you won't be able to see
it in the main view in the UI, but you should be able to see it in the
Menu -> Browse ->Task Instances.
Are the dependencies for the task met? The task instances directly
upstream from the task need to be in a success state. Also,
if you have set depends_on_past=True, the previous task instance
needs to have succeeded or been skipped (except if it is the first run for that task).
Also, if wait_for_downstream=True, make sure you understand
what it means - all tasks immediately downstream of the previous
task instance must have succeeded or been skipped.
You can view how these properties are set from the Task Instance Details
page for your task.
Are the DagRuns you need created and active? A DagRun represents a specific
execution of an entire Dag and has a state (running, success, failed, ...).
The scheduler creates new DagRun as it moves forward, but never goes back
in time to create new ones. The scheduler only evaluates running DagRuns
to see what task instances it can trigger. Note that clearing tasks
instances (from the UI or CLI) does set the state of a DagRun back to
running. You can bulk view the list of DagRuns and alter states by clicking
on the schedule tag for a Dag.
Is the concurrency parameter of your Dag reached? concurrency defines
how many running task instances a Dag is allowed to have, beyond which
point things get queued.
Is the max_active_runs parameter of your Dag reached? max_active_runs defines
how many running concurrent instances of a Dag there are allowed to be.
You may also want to read about the :ref:scheduler and make
sure you fully understand how the scheduler cycle.
There are some Airflow configuration to allow for a larger scheduling capacity and frequency:
config:core__parallelismconfig:core__max_active_tasks_per_dagconfig:core__max_active_runs_per_dagDags have configurations that improves efficiency:
max_active_tasks: Overrides :ref:config:core__max_active_tasks_per_dag.max_active_runs: Overrides :ref:config:core__max_active_runs_per_dag.Operators or tasks also have configurations that improves efficiency and scheduling priority:
max_active_tis_per_dag: This parameter controls the number of concurrent running task instances across dag_runs
per task.pool: See :ref:concepts:pool.priority_weight: See :ref:concepts:priority-weight.queue: See :ref:apache-airflow-providers-celery:celery_executor:queue for CeleryExecutor deployments only.Airflow 2.0 has low Dag scheduling latency out of the box (particularly when compared with Airflow 1.10.x),
however, if you need more throughput you can :ref:start multiple schedulers<scheduler:ha>.
You can achieve this with :ref:concepts:trigger-rules.
.. _faq:how-to-control-dag-file-parsing-timeout:
(only valid for Airflow >= 2.3.0)
You can add a get_dagbag_import_timeout function in your airflow_local_settings.py which gets
called right before a Dag file is parsed. You can return different timeout value based on the Dag file.
When the return value is less than or equal to 0, it means no timeout during the Dag parsing.
.. code-block:: python :caption: airflow_local_settings.py :name: airflow_local_settings.py
def get_dagbag_import_timeout(dag_file_path: str) -> Union[int, float]:
"""
This setting allows to dynamically control the Dag file parsing timeout.
It is useful when there are a few Dag files requiring longer parsing times, while others do not.
You can control them separately instead of having one value for all Dag files.
If the return value is less than or equal to 0, it means no timeout during the Dag parsing.
"""
if "slow" in dag_file_path:
return 90
if "no-timeout" in dag_file_path:
return 0
return conf.getfloat("core", "DAGBAG_IMPORT_TIMEOUT")
See :ref:Configuring local settings <set-config:configuring-local-settings> for details on how to
configure local settings.
Change the :ref:config:dag_processor__file_parsing_sort_mode to modified_time, raise
the :ref:config:dag_processor__min_file_process_interval to 600 (10 minutes), 6000 (100 minutes)
or a higher value.
The Dag parser will skip the min_file_process_interval check if a file is recently modified.
This might not work for case where the Dag is imported/created from a separate file. Example:
dag_file.py that imports dag_loader.py where the actual logic of the Dag file is as shown below.
In this case if dag_loader.py is updated but dag_file.py is not updated, the changes won't be reflected
until min_file_process_interval is reached since Dag Parser will look for modified time for dag_file.py file.
.. code-block:: python :caption: dag_file.py :name: dag_file.py
from dag_loader import create_dag
globals()[dag.dag_id] = create_dag(dag_id, schedule, dag_number, default_args)
.. code-block:: python :caption: dag_loader.py :name: dag_loader.py
from airflow.sdk import DAG
from airflow.sdk import task
import pendulum
def create_dag(dag_id, schedule, dag_number, default_args):
dag = DAG(
dag_id,
schedule=schedule,
default_args=default_args,
pendulum.datetime(2021, 9, 13, tz="UTC"),
)
with dag:
@task()
def hello_world():
print("Hello World")
print(f"This is Dag: {dag_number}")
hello_world()
return dag
There are several reasons why Dags might disappear from the UI. Common causes include:
Total parsing of all Dags is too long - If parsing takes longer than :ref:config:core__dagbag_import_timeout,
files may not be processed completely. This often occurs when Dags don't follow
:ref:Dag writing best practices<best_practice:writing_a_dag> like:
Inconsistent dynamic Dag generation - Dags created through
:doc:dynamic generation </howto/dynamic-dag-generation> must produce stable Dag IDs across parses.
Verify consistency by running python your_dag_file.py repeatedly.
File processing configuration issues - A certain combination of parameters may lead to scenarios which certain Dags are less likely to be processed at each loop. Check these parameters:
config:dag_processor__file_parsing_sort_mode - Ensure sorting method matches your sync strategyconfig:dag_processor__parsing_processes - Number of parallel parsersconfig:scheduler__parsing_cleanup_interval - Controls stale Dag cleanup frequencyFile synchronization problems - Common with git-sync setups:
mtime preservation issuesTime synchronization issues - Ensure all nodes (database, schedulers, workers) use NTP with <1s clock drift.
.. _faq:dag-version-inflation:
Every time the Dag processor parses a Dag file, it serializes the Dag and compares the result with the version stored in the metadata database. If anything has changed, Airflow creates a new Dag version.
Dag version inflation occurs when the version number increases indefinitely without the Dag author making any intentional changes.
What goes wrong """""""""""""""
When Dag versions increase without meaningful changes:
Common causes """""""""""""
Version inflation is caused by using values that change at parse time — that is, every time the Dag processor evaluates the Dag file — as arguments to Dag or Task constructors. The most common patterns are:
1. Using datetime.now() or pendulum.now() as start_date:
.. code-block:: python
from datetime import datetime
from airflow.sdk import DAG
with DAG(
dag_id="bad_example",
# BAD: datetime.now() produces a different value on every parse
start_date=datetime.now(),
schedule="@daily",
):
...
Every parse produces a different start_date, so the serialized Dag is always different from the
stored version.
2. Using random values in Dag or Task arguments:
.. code-block:: python
import random
from airflow.sdk import DAG
from airflow.providers.standard.operators.python import PythonOperator
with DAG(dag_id="bad_random", start_date="2024-01-01", schedule="@daily") as dag:
PythonOperator(
# BAD: random value changes every parse
task_id=f"task_{random.randint(1, 1000)}",
python_callable=lambda: None,
)
3. Assigning runtime-varying values to variables used in constructors:
.. code-block:: python
from datetime import datetime
from airflow.sdk import DAG
from airflow.providers.standard.operators.python import PythonOperator
# BAD: the variable captures a parse-time value, then is passed to the DAG
default_args = {"start_date": datetime.now()}
with DAG(dag_id="bad_defaults", default_args=default_args, schedule="@daily") as dag:
PythonOperator(task_id="my_task", python_callable=lambda: None)
Even though datetime.now() is not called directly inside the Dag constructor, it flows in through
default_args and still causes a different serialized Dag on every parse.
4. Using environment variables or file contents that change between parses:
.. code-block:: python
import os
from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator
with DAG(dag_id="bad_env", start_date="2024-01-01", schedule="@daily") as dag:
BashOperator(
task_id="echo_build",
# BAD if BUILD_NUMBER changes on every deployment or parse
bash_command=f"echo {os.environ.get('BUILD_NUMBER', 'unknown')}",
)
How to avoid version inflation """"""""""""""""""""""""""""""
Use fixed start_date values. Always set start_date to a static datetime literal:
.. code-block:: python
import datetime
from airflow.sdk import DAG
with DAG(
dag_id="good_example",
start_date=datetime.datetime(2024, 1, 1),
schedule="@daily",
):
...
Keep all Dag and Task constructor arguments deterministic. Arguments passed to Dag and Operator
constructors must produce the same value on every parse. Move any dynamic computation into the
execute() method or use Jinja templates, which are evaluated at task execution time rather than
parse time.
Use Jinja templates for dynamic values:
.. code-block:: python
from airflow.providers.standard.operators.bash import BashOperator
BashOperator(
task_id="echo_date",
# GOOD: the template is resolved at execution time, not parse time
bash_command="echo {{ ds }}",
)
Use Airflow Variables with templates instead of top-level lookups:
.. code-block:: python
from airflow.providers.standard.operators.bash import BashOperator
BashOperator(
task_id="echo_var",
# GOOD: Variable is resolved at execution time via template
bash_command="echo {{ var.value.my_variable }}",
)
Dag version inflation detection """"""""""""""""""""""""""""""""
Starting from Airflow 3.2, the Dag processor performs AST-based static analysis on every Dag file before parsing to detect runtime-varying values in Dag and Task constructors. When a potential issue is found, it is surfaced as a Dag warning visible in the UI.
You can control this behavior with the
:ref:dag_version_inflation_check_level <config:dag_processor__dag_version_inflation_check_level>
configuration option:
off — Disables the check entirely. No errors or warnings are generated.warning (default) — Dags load normally but warnings are displayed in the UI when issues are detected.error — Treats detected issues as Dag import errors, preventing the Dag from loading.Additionally, you can catch these issues earlier in your development workflow by using the
AIR302 <https://docs.astral.sh/ruff/rules/airflow3-dag-dynamic-value/>_ ruff rule, which detects
dynamic values in Dag and Task constructors as part of static linting. See
:ref:best_practices/code_quality_and_linting for how to set up ruff with Airflow-specific rules.
Dag construction ^^^^^^^^^^^^^^^^
start_date?start_date is partly legacy from the pre-DagRun era, but it is still
relevant in many ways. When creating a new Dag, you probably want to set
a global start_date for your tasks. This can be done by declaring your
start_date directly in the DAG() object. A Dag's first
DagRun will be created based on the first complete data_interval
after start_date. For example, for a Dag with
start_date=datetime(2024, 1, 1) and schedule="0 0 3 * *", the
first Dag run will be triggered at midnight on 2024-02-03 with
data_interval_start=datetime(2024, 1, 3) and
data_interval_end=datetime(2024, 2, 3). From that point on, the scheduler
creates new DagRuns based on your schedule and the corresponding task
instances run as your dependencies are met. When introducing new tasks to
your Dag, you need to pay special attention to start_date, and may want
to reactivate inactive DagRuns to get the new task onboarded properly.
We recommend against using dynamic values as start_date, especially
datetime.now() as it can be quite confusing. The task is triggered
once the period closes, and in theory an @hourly Dag would never get to
an hour after now as now() moves along.
Previously, we also recommended using rounded start_date in relation to your
Dag's schedule. This meant an @hourly would be at 00:00
minutes:seconds, a @daily job at midnight, a @monthly job on the
first of the month. This is no longer required. Airflow will now auto align
the start_date and the schedule, by using the start_date
as the moment to start looking.
You can use any sensor or a TimeDeltaSensor to delay
the execution of tasks within the schedule interval.
While schedule does allow specifying a datetime.timedelta
object, we recommend using the macros or cron expressions instead, as
it enforces this idea of rounded schedules.
When using depends_on_past=True, it's important to pay special attention
to start_date, as the past dependency is not enforced only on the specific
schedule of the start_date specified for the task. It's also
important to watch DagRun activity status in time when introducing
new depends_on_past=True, unless you are planning on running a backfill
for the new task(s).
It is also important to note that the task's start_date is ignored in backfills.
Creating a time zone aware datetime (e.g. Dag's start_date) is quite simple. Just make sure to supply
a time zone aware dates using pendulum. Don't try to use standard library
timezone <https://docs.python.org/3/library/datetime.html#timezone-objects>_ as they are known to
have limitations and we deliberately disallow using them in Dags.
.. _faq:what-does-execution-date-mean:
execution_date mean?Execution date or execution_date is a historical name for what is called a
logical date, and also usually the start of the data interval represented by a
Dag run.
Airflow was developed as a solution for ETL needs. In the ETL world, you
typically summarize data. So, if you want to summarize data for 2016-02-19,
you would do it at 2016-02-20 midnight UTC, which would be right after all
data for 2016-02-19 becomes available. This interval between midnights of
2016-02-19 and 2016-02-20 is called the data interval, and since it
represents data in the date of 2016-02-19, this date is also called the
run's logical date, or the date that this Dag run is executed for, thus
execution date.
For backward compatibility, a datetime value execution_date is still
as :ref:Template variables<templates:variables> with various formats in Jinja
templated fields, and in Airflow's Python API. It is also included in the
context dictionary given to an Operator's execute function.
.. code-block:: python
class MyOperator(BaseOperator):
def execute(self, context):
logging.info(context["execution_date"])
However, you should always use data_interval_start or data_interval_end
if possible, since those names are semantically more correct and less prone to
misunderstandings.
Note that ds (the YYYY-MM-DD form of data_interval_start) refers to
date string, not date start as may be confusing to some.
.. tip::
For more information on ``logical date``, see :ref:`data-interval` and
:ref:`concepts-dag-run`.
Airflow looks in your DAGS_FOLDER for modules that contain DAG objects
in their global namespace and adds the objects it finds in the
DagBag. Knowing this, all we need is a way to dynamically assign
variable in the global namespace. This is easily done in python using the
globals() function for the standard library, which behaves like a
simple dictionary.
.. code-block:: python
def create_dag(dag_id):
"""
A function returning a DAG object.
"""
return DAG(dag_id)
for i in range(10):
dag_id = f"foo_{i}"
globals()[dag_id] = DAG(dag_id)
# or better, call a function that returns a DAG object!
other_dag_id = f"bar_{i}"
globals()[other_dag_id] = create_dag(other_dag_id)
Even though Airflow supports multiple Dag definition per python file, dynamically generated or otherwise, it is not recommended as Airflow would like better isolation between Dags from a fault and deployment perspective and multiple Dags in the same file goes against that.
While it is not recommended to write any code outside of defining Airflow constructs, Airflow does support any
arbitrary python code as long as it does not break the Dag file processor or prolong file processing time past the
:ref:config:core__dagbag_import_timeout value.
A common example is the violation of the time limit when building a dynamic Dag which usually requires querying data from another service like a database. At the same time, the requested service is being swamped with Dag file processors requests for data to process the file. These unintended interactions may cause the service to deteriorate and eventually cause Dag file processing to fail.
Refer to :ref:Dag writing best practices<best_practice:writing_a_dag> for more information.
It is not possible to render :ref:Macros<macros> or any Jinja template within another Jinja template. This is
commonly attempted in user_defined_macros.
.. code-block:: python
dag = DAG(
# ...
user_defined_macros={"my_custom_macro": "day={{ ds }}"}
)
bo = BashOperator(task_id="my_task", bash_command="echo {{ my_custom_macro }}", dag=dag)
This will echo "day={{ ds }}" instead of "day=2020-01-01" for a Dag run with a
data_interval_start of 2020-01-01 00:00:00.
.. code-block:: python
bo = BashOperator(task_id="my_task", bash_command="echo day={{ ds }}", dag=dag)
By using the ds macros directly in the template_field, the rendered value results in "day=2020-01-01".
next_ds or prev_ds might not contain expected values?next_ds next_ds_nodash prev_ds prev_ds_nodash are calculated using
logical_date and the Dag's schedule (if applicable). If you set schedule as None or @once,
the next_ds, next_ds_nodash, prev_ds, prev_ds_nodash values will be set to None.prev_ds == next_ds == ds.Task execution interactions ^^^^^^^^^^^^^^^^^^^^^^^^^^^
TemplateNotFound mean?TemplateNotFound errors are usually due to misalignment with user expectations when passing path to operator
that trigger Jinja templating. A common occurrence is with :class:~airflow.providers.standard.operators.BashOperator.
Another commonly missed fact is that the files are resolved relative to where the pipeline file lives. You can add
other directories to the template_searchpath of the Dag object to allow for other non-relative location.
For tasks that are related through dependency, you can set the trigger_rule to TriggerRule.ALL_FAILED if the
task execution depends on the failure of ALL its upstream tasks or TriggerRule.ONE_FAILED for just one of the
upstream task.
.. code-block:: python
import pendulum
from airflow.sdk import dag, task
from airflow.exceptions import AirflowException
from airflow.utils.trigger_rule import TriggerRule
@task()
def a_func():
raise AirflowException
@task(
trigger_rule=TriggerRule.ALL_FAILED,
)
def b_func():
pass
@dag(schedule="@once", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"))
def my_dag():
a = a_func()
b = b_func()
a >> b
dag = my_dag()
See :ref:concepts:trigger-rules for more information.
If the tasks are not related by dependency, you will need to :ref:build a custom Operator<custom_operator>.
Airflow UI ^^^^^^^^^^
Logs are :ref:typically served when a task reaches a terminal state <serving-worker-trigger-logs>. Sometimes, a task's normal lifecycle is disrupted, and the task's
worker is unable to write the task's logs. This typically happens for one of two reasons:
Task Instance Heartbeat Timeout <concepts:task-instance-heartbeat-timeout>.scheduler.task_queued_timeout <config:scheduler__task_queued_timeout> will be marked as failed, and there will be no task logs in the Airflow UI.Setting retries for each task drastically reduces the chance that either of these problems impact a workflow.
Set the value of [fab] update_fab_perms configuration in airflow.cfg to False.
If pausing or unpausing a Dag fails for any reason, the Dag toggle will revert to its previous state and turn red. If you observe this behavior, try pausing the Dag again, or check the console or server logs if the issue recurs.
API Server ^^^^^^^^^^
.. _faq:api-server-memory-growth:
The API server caches serialized Dag objects in memory. Over time, as Dag versions accumulate
(see :ref:faq:dag-version-inflation), this cache grows and can consume several gigabytes of memory.
The recommended solution (available since Airflow 3.2.0) is to use gunicorn with rolling worker
restarts. Gunicorn periodically recycles worker processes, releasing all accumulated memory. It also
uses preload + fork, so workers share read-only memory pages via copy-on-write, reducing overall
memory usage by 40-50% compared to uvicorn's multiprocess mode.
To enable gunicorn with worker recycling:
.. code-block:: ini
[api]
server_type = gunicorn
# Restart each worker every 12 hours (43200 seconds)
worker_refresh_interval = 43200
worker_refresh_batch_size = 1
This requires the apache-airflow-core[gunicorn] extra to be installed.
See :ref:config:api__server_type, :ref:config:api__worker_refresh_interval, and
:ref:config:api__worker_refresh_batch_size for the full configuration reference.
.. note::
Worker recycling handles memory growth from *any* source, not just the Dag cache. It is the
recommended approach for production API server deployments.
MySQL and MySQL variant Databases ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
You may occasionally experience OperationalError with the message "MySQL Server has gone away". This is due to the
connection pool keeping connections open too long and you are given an old connection that has expired. To ensure a
valid connection, you can set :ref:config:database__sql_alchemy_pool_recycle to ensure connections are invalidated after
that many seconds and new ones are created.
If you intend to use extended ASCII or Unicode characters in Airflow, you have to provide a proper connection string to the MySQL database since they define charset explicitly.
.. code-block:: text
sql_alchemy_conn = mysql://airflow@localhost:3306/airflow?charset=utf8
You will experience UnicodeDecodeError thrown by WTForms templating and other Airflow modules like below.
.. code-block:: text
'ascii' codec can't decode byte 0xae in position 506: ordinal not in range(128)
explicit_defaults_for_timestamp needs to be on (1)?This means explicit_defaults_for_timestamp is disabled in your mysql server and you need to enable it by:
#. Set explicit_defaults_for_timestamp = 1 under the mysqld section in your my.cnf file.
#. Restart the Mysql server.
Connections ^^^^^^^^^^^
For security reasons, the test connection functionality is disabled by default across the Airflow UI,
API and CLI. This can be modified by setting ref:config:core__test_connection.
You can utilize a Dag to regularly test connections. This is referred to as a "Canary Dag" and can detect and alert on failures in external systems that your Dags depend on. You can create a simple Dag that tests connections such as the following Airflow 3 example:
.. code-block:: python
from airflow import DAG from airflow.sdk import task
with DAG(dag_id="canary", schedule="@daily", doc_md="Canary Dag to regularly test connections to systems."):
@task(doc_md="Test a connection by its Connection ID.")
def test_connection(conn_id):
from airflow.hooks.base import BaseHook
ok, status = BaseHook.get_hook(conn_id=conn_id).test_connection()
if ok:
return status
raise RuntimeError(status)
for conn_id in [
# Add more connections here to create tasks to test them.
"aws_default",
]:
test_connection.override(task_id="test_" + conn_id)(conn_id)