airflow-core/docs/tutorial/fundamentals.rst
.. Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Welcome to world of Apache Airflow! In this tutorial, we'll guide you through the essential concepts of Airflow, helping you understand how to write your first Dag. Whether you're familiar with Python or just starting out, we'll make the journey enjoyable and straightforward.
At its core, a Dag is a collection of tasks organized in a way that reflects their relationships and dependencies. It's like a roadmap for your workflow, showing how each task connects to the others. Don't worry if this sounds a bit complex; we'll break it down step by step.
Let's start with a simple example of a pipeline definition. Although it might seem overwhelming at first, we'll explain each line in detail.
.. exampleinclude:: /../src/airflow/example_dags/tutorial.py :language: python :start-after: [START tutorial] :end-before: [END tutorial]
Think of the Airflow Python script as a configuration file that lays out the structure of your Dag in code. The actual tasks you define here run in a different environment, which means this script isn't meant for data processing. Its main job is to define the Dag object, and it needs to evaluate quickly since the Dag File Processor checks it regularly for any changes.
To get started, we need to import the necessary libraries. This is a typical first step in any Python script.
.. exampleinclude:: /../src/airflow/example_dags/tutorial.py :language: python :start-after: [START import_module] :end-before: [END import_module]
|
For more details on how Python and Airflow handle modules, check out
:doc:/administration-and-deployment/modules_management.
When creating a Dag and its tasks, you can either pass arguments directly to each task or define a set of default parameters in a dictionary. The latter approach is usually more efficient and cleaner.
.. exampleinclude:: /../src/airflow/example_dags/tutorial.py :language: python :dedent: 4 :start-after: [START default_args] :end-before: [END default_args]
|
If you want to dive deeper into the parameters of the BaseOperator, take a look at the documentation for
:py:class:airflow.sdk.BaseOperator documentation.
Next, we'll need to create a Dag object to house our tasks. We'll provide a unique identifier for the Dag, known as the
dag_id, and specify the default arguments we just defined. We'll also set a schedule for our Dag to run every day.
.. exampleinclude:: /../src/airflow/example_dags/tutorial.py :language: python :start-after: [START instantiate_dag] :end-before: [END instantiate_dag]
An operator represents a unit of work in Airflow. They are the building blocks of your workflows, allowing you to
define what tasks will be executed. While we can use operators for many tasks, Airflow also offers the :doc:TaskFlow API <taskflow>
for a more Pythonic way to define workflows, which we'll touch on later.
All operators derive from the BaseOperator, which includes the essential arguments needed to run tasks in Airflow.
Some popular operators include the PythonOperator, BashOperator, and KubernetesPodOperator. In this tutorial, we'll
focus on the BashOperator to execute some simple bash commands.
To use an operator, you must instantiate it as a task. Tasks dictate how the operator will perform its work within the
Dag's context. In the example below, we instantiate the BashOperator twice to run two different bash scripts. The
task_id serves as a unique identifier for each task.
.. exampleinclude:: /../src/airflow/example_dags/tutorial.py :language: python :dedent: 4 :start-after: [START basic_task] :end-before: [END basic_task]
|
Notice how we mix operator-specific arguments (like bash_command) with common arguments (like retries) inherited
from BaseOperator. This approach simplifies our code. In the second task, we even override the retries parameter to
set it to 3.
The precedence for task arguments is as follows:
default_args dictionary|
.. note::
Remember, every task must include or inherit the arguments task_id and owner. Otherwise, Airflow will raise an
error. Fortunately, a fresh Airflow installation defaults the owner to airflow, so you mainly need to ensure
task_id is set.
Airflow harnesses the power of Jinja Templating <https://jinja.palletsprojects.com/en/2.11.x/>_, giving you access to
built-in parameters and macros to enhance your workflows. This section will introduce you to the basics of templating in
Airflow, focusing on the commonly used template variable: {{ ds }}, which represents today's date stamp.
.. exampleinclude:: /../src/airflow/example_dags/tutorial.py :language: python :dedent: 4 :start-after: [START jinja_template] :end-before: [END jinja_template]
|
You'll notice that the templated_command includes logic in {% %} blocks and references parameters like
{{ ds }}. You can also pass files to the bash_command, such as bash_command='templated_command.sh', allowing
for better organization of your code. You can even define user_defined_macros and user_defined_filters to create
your own variables and filters for use in templates. For more on custom filters, refer to the
Jinja Documentation <https://jinja.palletsprojects.com/en/latest/api/#custom-filters>_.
For more information on the variables and macros that can be referenced in templates, please read through the
:ref:templates-ref.
You can add documentation to your Dag or individual tasks. While Dag documentation currently supports markdown, task documentation can be in plain text, markdown reStructuredText, JSON, or YAML. It's a good practice to include documentation at the start of your Dag file.
.. exampleinclude:: /../src/airflow/example_dags/tutorial.py :language: python :dedent: 4 :start-after: [START documentation] :end-before: [END documentation]
|
.. image:: ../img/ui-light/task_doc.png
|
.. image:: ../img/ui-light/dag_doc.png
In Airflow, tasks can depend on one another. For instance, if you have tasks t1, t2, and t3, you can define
their dependencies in several ways:
.. code-block:: python
t1.set_downstream(t2)
# This means that t2 will depend on t1
# running successfully to run.
# It is equivalent to:
t2.set_upstream(t1)
# The bit shift operator can also be
# used to chain operations:
t1 >> t2
# And the upstream dependency with the
# bit shift operator:
t2 << t1
# Chaining multiple dependencies becomes
# concise with the bit shift operator:
t1 >> t2 >> t3
# A list of tasks can also be set as
# dependencies. These operations
# all have the same effect:
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1
Be mindful that Airflow will raise errors if it detects cycles in your Dag or if a dependency is referenced multiple times.
Creating a time zone aware Dag is straightforward. Just ensure you use time zone aware dates
with pendulum <https://github.com/python-pendulum/pendulum>. Avoid using the standard library
timezone <https://docs.python.org/3/library/datetime.html#timezone-objects> as they have known limitations.
Congratulations! By now, you should have a basic understanding of how to create a Dag, define tasks and their dependencies, and use templating in Airflow. Your code should resemble the following:
.. exampleinclude:: /../src/airflow/example_dags/tutorial.py :language: python :start-after: [START tutorial] :end-before: [END tutorial]
.. _testing:
Now it's time to test your pipeline! First, ensure that your script parses successfully. If you saved your code in
tutorial.py within the Dags folder specified in your airflow.cfg, you can run:
.. code-block:: bash
python ~/airflow/dags/tutorial.py
If the script runs without errors, congratulations! Your Dag is set up correctly.
Command Line Metadata Validation ''''''''''''''''''''''''''''''''' Let's validate your script further by running a few commands:
.. code-block:: bash
# initialize the database tables
airflow db migrate
# print the list of active Dags
airflow dags list
# prints the list of tasks in the "tutorial" Dag
airflow tasks list tutorial
# prints the graphviz representation of "tutorial" Dag
airflow dags show tutorial
Testing Task Instances and Dag Runs ''''''''''''''''''''''''''''''''''' You can test specific task instances for a designated logical date. This simulates the scheduler running your task for a particular date and time.
.. note:: Notice that the scheduler runs your task for a specific date and time, not necessarily at that date or time. The logical date is the timestamp that a Dag run is named after, and it typically corresponds to the end of the time period your workflow is operating on — or the time at which the Dag run was manually triggered.
Airflow uses this logical date to organize and track each run; it's how you refer to a specific execution in the UI, logs, and code. When triggering a Dag via the UI or API, you can supply your own logical date to run the workflow as of a specific point in time.
.. code-block:: bash
# command layout: command subcommand [dag_id] [task_id] [(optional) date]
# testing print_date
airflow tasks test tutorial print_date 2015-06-01
# testing sleep
airflow tasks test tutorial sleep 2015-06-01
You can also see how your templates get rendered by running:
.. code-block:: bash
# testing templated
airflow tasks test tutorial templated 2015-06-01
This command will provide detailed logs and execute your bash command.
Keep in mind that the airflow tasks test command runs task instances locally, outputs their logs to stdout, and
doesn't track state in the database. This is a handy way to test individual task instances.
Similarly, airflow dags test runs a single Dag run without registering any state in the database, which is useful
for testing your entire Dag locally.
That's a wrap! You've successfully written and tested your first Airflow pipeline. As you continue your journey, consider merging your code into a repository with a Scheduler running against it, which will allow your Dag to be triggered and executed daily.
Here are a few suggestions for your next steps:
.. seealso::
- Continue to the next step of the tutorial: :doc:/tutorial/taskflow
- Explore the :doc:/core-concepts/index section for detailed explanation of Airflow concepts such as Dags, Tasks, Operators, and more.