Back to Airflow

Dynamic Task Mapping with Task SDK

task-sdk/docs/dynamic-task-mapping.rst

3.2.12.9 KB
Original Source

.. Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

.. _sdk-dynamic-task-mapping:

Dynamic Task Mapping with Task SDK

Dynamic Task Mapping allows tasks defined with the Task SDK to generate a variable number of task instances at runtime based on upstream data. This is enabled via the expand() method on tasks, providing a way to parallelize execution without knowing the number of tasks ahead of time.

Simple Mapping

Map over a Python list directly in the DAG:

.. code-block:: python

from datetime import datetime

from airflow.sdk import DAG, task

@task def add_one(x: int): return x + 1

@task def sum_it(values: list[int]): print(f"Total was {sum(values)}")

with DAG(dag_id="dynamic-map-simple", start_date=datetime(2022, 1, 1)) as dag: summed = sum_it(values=add_one.expand(x=[1, 2, 3, 4, 5]))

Task-Generated Mapping

Generate the list at runtime from an upstream task:

.. code-block:: python

@task def make_list(): # This could fetch data from an API, database, etc. return ["a", "b", "c"]

@task def consume(item: str): print(item)

with DAG(dag_id="dynamic-map-generated", start_date=datetime(2022, 1, 1)) as dag: consume.expand(item=make_list())

Details

  • Only keyword arguments can be passed to expand().

  • Mapped inputs are provided to tasks as lazy proxy objects. To force evaluation into a concrete list, wrap the proxy in list().

  • Combine static parameters with mapped ones using partial():

    .. code-block:: python

    @task def add(x: int, y: int): return x + y

    with DAG(dag_id="map-with-partial", start_date=datetime(2022, 1, 1)) as dag: add.partial(y=10).expand(x=[1, 2, 3])

Advanced Usage

For advanced patterns—such as repeated mapping, cross-product mapping, named mappings (via map_index_template), and handling large datasets—see the Airflow Core documentation:

Dynamic Task Mapping in the Airflow Core docs <https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/dynamic-task-mapping.html>_.