airflow-core/docs/core-concepts/multi-team.rst
.. Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
.. warning::
Multi-Team is an :ref:experimental <experimental>/incomplete feature currently in preview. The feature will not be
fully complete until Airflow 3.3 and may be subject to changes without warning based on user feedback.
See the :ref:Work in Progress <multi-team-work-in-progress> section below for details.
Multi-Team Airflow is a feature that enables organizations to run multiple teams within a single Airflow deployment while providing resource isolation and team-based access controls. This feature is designed for medium to large organizations that need to share Airflow infrastructure across multiple teams while maintaining logical separation of resources.
.. note::
Multi-Team Airflow is different from multi-tenancy. It provides isolation within a single deployment but is not designed for complete tenant separation. All teams share the same Airflow infrastructure, scheduler, and metadata database.
Multi-Team mode is designed for medium to large organizations that typically have many discrete teams who need access to an Airflow environment. Often there is a dedicated platform or DevOps team to manage the shared Airflow infrastructure, but this is not required.
Use Multi-Team mode when:
/security/security_model for task-level isolation limitations)Teams ^^^^^
A Team is a logical grouping that represents a group of users within your organization. Teams are in part stored in the Airflow metadata database and serve as the basis for resource isolation.
Teams within the Airflow database have a very simple structure, only containing one field:
Teams are associated with Dag bundles through a separate association table, which links team names to Dag bundle names.
Dag Bundles and Team Ownership ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Teams are associated with Dags through Dag Bundles. A Dag bundle can be owned by at most one team. When a Dag bundle is assigned to a team:
.. note::
The relationship chain is: **Task/Callback → Dag → Dag Bundle → Team**
Resource Isolation ^^^^^^^^^^^^^^^^^^
When Multi-Team mode is enabled, the following resources can be scoped to specific teams:
Resources without a team assignment are considered global and accessible to all teams.
Secrets Backends """"""""""""""""
Airflow's secrets backends, including: environment variables, metastore and local filesystem are team-aware. Custom Secrets Backends are supported on a case by case basis.
When a task requests a Variable or Connection, the secrets backend will return a team-specific value, if any. The backend will automatically resolve the correct value based on the requesting task's team.
Auth Manager """"""""""""
In order to use multi-team mode, the auth manager must be compatible with it. A compatible auth manager must implement two methods:
is_authorized_team: Determines whether a user is authorized to perform a given action on a team. It is
used primarily to check whether a user belongs to a team._get_teams: Returns the set of teams defined in the auth manager.During initialization, Airflow validates that all teams defined in the auth manager are also present in the Airflow metadata database. If any team is missing, Airflow will raise an error.
If the auth manager you are using does not implement these methods, Airflow will raise a
NotImplementedError at runtime.
Example of auth managers compatible with multi-team:
Simple auth manager </core-concepts/auth-manager/index> (recommended for development usage only)Keycloak auth manager <apache-airflow-providers-keycloak:auth-manager/index>To enable Multi-Team mode, set the following configuration in your airflow.cfg:
.. code-block:: ini
[core]
multi_team = True
Or via environment variable:
.. code-block:: bash
export AIRFLOW__CORE__MULTI_TEAM=True
.. warning::
Changing this setting on an existing deployment requires careful planning.
Teams are managed using the Airflow CLI. The following commands are available:
Creating a Team ^^^^^^^^^^^^^^^
.. code-block:: bash
airflow teams create <team_name>
Team names must be 3-50 characters long and contain only alphanumeric characters, hyphens, and underscores.
Listing Teams ^^^^^^^^^^^^^
.. code-block:: bash
airflow teams list
This displays all teams in the deployment with their names.
Deleting a Team ^^^^^^^^^^^^^^^
.. code-block:: bash
airflow teams delete <team_name>
Or to skip the confirmation prompt:
.. code-block:: bash
airflow teams delete <team_name> --yes
.. warning::
A team cannot be deleted if it has associated resources (Dag bundles, Variables, Connections, or Pools). You must remove these associations first.
.. _multi-team-variables:
Team-scoped Variables ^^^^^^^^^^^^^^^^^^^^^
Variables can be associated with teams when created. Tasks belonging to a team can access:
When a task requests a variable, the system checks for a team-specific variable first.
Team-scoped variables can be created and managed through the Airflow UI or via environment variables.
Via environment variables, you can set team-scoped variables using the format:
.. code-block:: bash
# Global variable
export AIRFLOW_VAR_MY_VARIABLE="global_value"
# Team-scoped variable for "team_a"
export AIRFLOW_VAR__TEAM_A___MY_VARIABLE="team_a_value"
The format is: AIRFLOW_VAR__{TEAM}___{KEY} (note: double underscore before team, triple underscore between team and key)
Team-scoped Connections ^^^^^^^^^^^^^^^^^^^^^^^
Connections follow the same pattern as variables. Tasks can access connections owned by their team or global connections.
Team-scoped connections can be created and managed through the Airflow UI or via environment variables.
Via environment variables:
.. code-block:: bash
# Global connection
export AIRFLOW_CONN_MY_DATABASE="postgresql://..."
# Team-scoped connection for "team_a"
export AIRFLOW_CONN__TEAM_A___MY_DATABASE="postgresql://..."
The format is: AIRFLOW_CONN__{TEAM}___{CONN_ID} (note: double underscore before team, triple underscore between team and connection ID)
Team-scoped Pools ^^^^^^^^^^^^^^^^^
Pools can be assigned to teams, providing resource isolation for task execution slots. When a pool is assigned to a team:
Pools without a team assignment remain globally accessible to all teams.
Creating Team-scoped Pools via CLI """"""""""""""""""""""""""""""""""
Use the --team-name option with airflow pools set to assign a pool to a team:
.. code-block:: bash
# Create a pool assigned to team_a
airflow pools set team_a_pool 10 "Pool for team A" --team-name team_a
# Create a global pool (no team assignment)
airflow pools set shared_pool 20 "Shared pool for all teams"
# Update an existing pool to assign it to a team
airflow pools set existing_pool 5 "Now team-scoped" --team-name team_b
.. note::
The ``--team-name`` option is rejected when ``core.multi_team`` is disabled.
The specified team must exist in the database (create it first with ``airflow teams create``).
Creating Team-scoped Pools via the REST API """""""""""""""""""""""""""""""""""""""""""
Use the POST /api/v2/pools endpoint with the team_name field in the request body:
.. code-block:: bash
curl -X POST "http://localhost:8080/api/v2/pools" \
-H "Content-Type: application/json" \
-d '{
"name": "team_a_pool",
"slots": 10,
"description": "Pool for team A",
"include_deferred": false,
"team_name": "team_a"
}'
To update an existing pool's team assignment, use PATCH /api/v2/pools/{pool_name}:
.. code-block:: bash
curl -X PATCH "http://localhost:8080/api/v2/pools/team_a_pool" \
-H "Content-Type: application/json" \
-d '{"team_name": "team_a"}'
Omit team_name or set it to null to make a pool global.
Creating Team-scoped Pools via the UI """""""""""""""""""""""""""""""""""""
In the Airflow UI, navigate to Admin > Pools and create or edit a pool. When Multi-Team mode is enabled, a Team dropdown is available to assign the pool to a team. Leave it empty for a global pool.
One of the most powerful features of Multi-Team mode is the ability to configure different executors per team or the same executor but configured differently. This allows teams to have dedicated compute resources or execution flows per team.
Similarly to global executors, team-scoped executor configurations also support multiple executors (for example, both
LocalExecutor and KubernetesExecutor), allowing tasks within that team to specify which executor to use. For
details on configuring multiple executors, see :ref:Using Multiple Executors Concurrently <using-multiple-executors-concurrently>.
Configuration Format ^^^^^^^^^^^^^^^^^^^^
The executor configuration supports team-based assignments using the following format:
.. code-block:: ini
[core]
executor = GlobalExecutor;team1=Team1Executor;team2=Team2Executor
For example:
.. code-block:: ini
[core]
executor = LocalExecutor;team_a=CeleryExecutor;team_b=KubernetesExecutor
In this configuration:
LocalExecutor is the global default executorteam_a use CeleryExecutor or the LocalExecutorteam_b use KubernetesExecutor or the LocalExecutorLocalExecutorImportant Rules ^^^^^^^^^^^^^^^
Global executor must come first: At least one global executor (without a team prefix) must be configured and must appear before any team-specific executors.
Teams must exist: All team names in the executor configuration must exist in the database before Airflow starts.
Executor must support multi-team: Not all executors support multi-team mode. The executor class must have supports_multi_team = True.
No duplicate teams: Each team may only appear once in the executor configuration.
No duplicate executors within a team: A team cannot have the same executor configured multiple times.
Example configurations:
.. code-block:: ini
# Valid: Global executor with team-specific executors
executor = LocalExecutor;team_a=CeleryExecutor
# Valid: Multiple team-specific executors
executor = LocalExecutor;team_a=CeleryExecutor;team_b=KubernetesExecutor
# Valid: Multiple executors globally and per team
executor = LocalExecutor,KubernetesExecutor;team_a=CeleryExecutor,KubernetesExecutor;team_b=LocalExecutor
# Invalid: No global executor
executor = team_a=CeleryExecutor;team_b=LocalExecutor
# Invalid: Global executor after team executor
executor = team_a=CeleryExecutor;LocalExecutor
# Invalid: Duplicate Team
executor = LocalExecutor;team_a=CeleryExecutor;team_b=LocalExecutor;team_a=KubernetesExecutor
# Invalid: Duplicate Executor within a Team
executor = LocalExecutor;team_a=CeleryExecutor,CeleryExecutor;team_b=LocalExecutor
Aliasing Executors Across Teams ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
When the same executor type is used at both the global and team level (e.g., LocalExecutor globally and
LocalExecutor for a team), if tasks wish to target the global executor they need a way to distinguish between the
two instances. To accomplish this, you can assign aliases to core executors using the Alias:ExecutorName syntax:
.. code-block:: ini
[core]
executor = global_celery_exec:CeleryExecutor;team1=team_celery_exec:CeleryExecutor
With this configuration:
CeleryExecutor is available via the alias global_celery_execCeleryExecutor is available via the alias team_celery_execteam_a that sets executor="team_celery_exec", executor="CeleryExecutor", or executor="airflow.providers.celery.executors.celery_executor.CeleryExecutor"
will run on the team executorteam_a that sets executor="global_celery_exec" will run on the global executor.. code-block:: python
# Runs on the global CeleryExecutor via alias
BashOperator(
task_id="uses_global",
executor="global_celery_exec",
bash_command="echo 'running on global executor'",
)
# Runs on team_a's CeleryExecutor via alias
BashOperator(
task_id="use_team_alias",
executor="team_celery_exec",
bash_command="echo 'running on team executor'",
)
# Runs on team_a's CeleryExecutor via class name
BashOperator(
task_id="use_team_classname",
executor="CeleryExecutor",
bash_command="echo 'running on team executor'",
)
# Runs on team_a's CeleryExecutor via full module path
BashOperator(
task_id="use_team_module_path",
executor="airflow.providers.celery.executors.celery_executor.CeleryExecutor",
bash_command="echo 'running on team executor'",
)
# Also runs on team_a's CeleryExecutor (implicit team default)
BashOperator(
task_id="use_default",
bash_command="echo 'running on default team executor'",
)
Aliases work with all core executors (LocalExecutor, CeleryExecutor, KubernetesExecutor, etc) as
well as custom executor module paths. For more information on aliases and multiple executor configuration,
see :ref:Using Multiple Executors Concurrently <using-multiple-executors-concurrently>.
Team-specific Executor Settings ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
When multiple teams use the same executor type (e.g., both team_a and team_b using CeleryExecutor),
each team can provide its own configuration for that executor. This allows teams to point to different Celery
brokers, use different Kubernetes namespaces, or customize any executor setting independently.
Configuration Resolution Order """""""""""""""""""""""""""""""
When a team executor reads a configuration value (e.g., [celery] broker_url), the system checks the
following sources in order, returning the first value found:
AIRFLOW__{TEAM}___{SECTION}__{KEY}[team_name=section]fallback valuesThe following sources are skipped for team executors (they do not yet support team-based configuration):
{key}_cmd){key}_secret).. note::
Team-specific configuration does **not** fall back to the global environment variable or global config file
settings. For example, if there is a global ``CeleryExecutor`` and a team ``CeleryExecutor`` in use, the global
``CeleryExecutor`` may want to increase ``celery.worker_concurrency`` from the default of ``16`` to ``32`` by
overriding this configuration. However, the team ``CeleryExecutor`` should not be forced to ``32``, it will
continue to use the default of ``16`` unless it is explicitly overridden with team-specific configuration.
Via Environment Variables """""""""""""""""""""""""
Team-specific configuration can be provided using environment variables with the following format:
.. code-block:: text
AIRFLOW__{TEAM}___{SECTION}__{KEY}
Note the delimiters: double underscore before the team name (part of the AIRFLOW__ prefix), triple
underscore between the team name and section, and double underscore between section and key. The team name
is uppercase.
.. code-block:: bash
# team_a's Celery broker URL
export AIRFLOW__TEAM_A___CELERY__BROKER_URL="redis://team-a-redis:6379/0"
# team_b's Celery broker URL
export AIRFLOW__TEAM_B___CELERY__BROKER_URL="redis://team-b-redis:6379/0"
# team_b's Celery result backend
export AIRFLOW__TEAM_B___CELERY__RESULT_BACKEND="db+postgresql://team-b-db/celery_results"
Via Config File """""""""""""""
Team-specific settings can also be placed in the airflow.cfg file using sections prefixed with the team
name followed by an equals sign:
.. code-block:: ini
# Global celery settings (used by the global executor, NOT as a fallback for teams)
[celery]
broker_url = redis://default-redis:6379/0
result_backend = db+postgresql://default-db/celery_results
# team_a overrides
[team_a=celery]
broker_url = redis://team-a-redis:6379/0
result_backend = db+postgresql://team-a-db/celery_results
# team_b overrides
[team_b=celery]
broker_url = redis://team-b-redis:6379/0
result_backend = db+postgresql://team-b-db/celery_results
Dag bundles are associated with teams through the Dag bundle configuration. When configuring your Dag bundles, you specify a team_name for each bundle:
.. code-block:: ini
[dag_processor]
dag_bundle_config_list = [
{
"name": "team_a_dags",
"classpath": "airflow.dag_processing.bundles.local.LocalDagBundle",
"kwargs": {"path": "/opt/airflow/dags/team_a"},
"team_name": "team_a"
},
{
"name": "team_b_dags",
"classpath": "airflow.dag_processing.bundles.local.LocalDagBundle",
"kwargs": {"path": "/opt/airflow/dags/team_b"},
"team_name": "team_b"
},
{
"name": "shared_dags",
"classpath": "airflow.dag_processing.bundles.local.LocalDagBundle",
"kwargs": {"path": "/opt/airflow/dags/shared"}
}
]
In this example:
/opt/airflow/dags/team_a belong to team_a/opt/airflow/dags/team_b belong to team_b/opt/airflow/dags/shared have no team (global).. note::
The team specified in ``team_name`` must exist in the database before syncing the Dag bundles. Create teams first using ``airflow teams create``.
When Multi-Team mode is enabled, the scheduler performs additional logic to determine the correct executor for each task:
.. TODO: Diagram showing the scheduler team resolution flow would be helpful here
Task to Team Resolution: The scheduler resolves the team for each task by following the relationship chain:
dag_id)bundle_name)dag_bundle_team association table)Executor Selection: Once the team is determined:
.. warning::
Multi-Team Airflow provides **logical isolation** for a secure perimeter around teams, not complete isolation. All
teams share the same metadata database and common Airflow infrastructure. For absolutely strict security
requirements, consider separate Airflow deployments.
.. _multi-team-triggerer:
When Multi-Team mode is enabled, a triggerer should be scoped to each specific team using the --team-name CLI argument. A team-scoped triggerer processes deferred tasks (triggers) belonging to that team's Dags. This allows teams to run isolated triggerer instances with independent capacity and failure domains.
Configuration ^^^^^^^^^^^^^
Start a team-scoped triggerer by passing --team-name:
.. code-block:: bash
# Triggerer for team_a only
airflow triggerer --team-name team_a
# Triggerer for team_b only
airflow triggerer --team-name team_b
# Global triggerer — processes triggers from Dags with no team association
airflow triggerer
Startup validation ensures that core.multi_team is enabled and the specified team exists in the database.
Behavior ^^^^^^^^
--team-name team_x): Only picks up triggers whose originating Dag belongs to a bundle mapped to team_x.--team-name): Only picks up triggers whose originating Dag belongs to a bundle with no team assignment.core.multi_team = False): --team-name is rejected. No filtering occurs and all triggerers process all triggers (existing behavior).Interaction with --queues
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Team filtering and queue filtering are orthogonal — they combine as AND conditions. For example, a triggerer started with --team-name team_a --queues q1,q2 only processes triggers that both belong to team_a and were deferred from tasks in queues q1 or q2.
.. note::
Ensure that at least one triggerer is running for every team, otherwise that team's triggers will
remain unassigned until one starts — the same applies to every queue when ``--queues`` is used. If you
combine ``--team-name`` and ``--queues``, this requirement extends to each team-and-queue combination.
.. _multi-team-asset-event-filtering:
When Multi-Team mode is enabled, asset events are filtered by team membership before they trigger downstream Dag runs. This prevents asset events produced by one team's Dags from unintentionally triggering Dag runs for a different team.
Default Behavior ^^^^^^^^^^^^^^^^
By default, a consuming Dag only receives asset events from producers within the same team or from Dags with no team association, i.e. global Dags.
.. _asset_access_control:
Cross-Team Opt-In with producer_teams
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. versionadded:: 3.3.0
To allow specific teams to produce events that trigger consumers on a given asset from another team, use the
access_control parameter on the Asset definition with an AssetAccessControl instance:
.. code-block:: python
from airflow.sdk import Asset, AssetAccessControl
shared_data = Asset(
name="shared_data",
uri="s3://bucket/shared/data.csv",
access_control=AssetAccessControl(
producer_teams=["team_analytics", "team_ml"],
),
)
With this configuration, asset events from team_analytics or team_ml will be accepted by any
consuming Dag that schedules on shared_data, in addition to events from the consumer's own team.
To block global (teamless) Dag producers from triggering consumers, set allow_global=False:
.. code-block:: python
strict_data = Asset(
name="strict_data",
uri="s3://bucket/strict/data.csv",
access_control=AssetAccessControl(
producer_teams=["team_analytics"],
allow_global=False,
),
)
.. _asset_consumer_teams:
Cross-Team Opt-In with consumer_teams
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. versionadded:: 3.3.0
While producer_teams is specified on the consumer side (on the asset used in a Dag's schedule),
consumer_teams is specified on the producer side (on the asset used in a task's outlets). It
controls which consumer teams are permitted to receive events produced by that specific task.
.. code-block:: python
from airflow.sdk import DAG, Asset, AssetAccessControl, task
restricted_output = Asset(
name="restricted_output",
uri="s3://bucket/restricted/output.csv",
access_control=AssetAccessControl(
consumer_teams=["team_downstream", "team_reporting"],
),
)
with DAG(dag_id="producer_dag", schedule="@daily"):
@task(outlets=[restricted_output])
def produce_data():
"""Only team_downstream and team_reporting can consume events from this task."""
With this configuration, only consuming Dags belonging to team_downstream or team_reporting (plus
teamless consumers) will receive asset events produced by the produce_data task.
Per-producer scoping """"""""""""""""""""
consumer_teams is scoped per producing task, not per asset. If multiple tasks produce events
for the same asset, each task's consumer_teams applies independently to the events it produces:
.. code-block:: python
from airflow.sdk import DAG, Asset, AssetAccessControl, task
# This task restricts consumers to team_a only
restricted_asset = Asset(
name="shared_asset",
uri="s3://bucket/shared.csv",
access_control=AssetAccessControl(
consumer_teams=["team_a"],
),
)
# This task has no consumer restriction (empty list = all consumers allowed)
unrestricted_asset = Asset(name="shared_asset", uri="s3://bucket/shared.csv")
with DAG(dag_id="dag_1", schedule="@daily"):
@task(outlets=[restricted_asset])
def task_restricted():
"""Events from this task only reach team_a consumers."""
with DAG(dag_id="dag_2", schedule="@daily"):
@task(outlets=[unrestricted_asset])
def task_unrestricted():
"""Events from this task reach all consumers (no restriction)."""
Interaction with producer_teams
"""""""""""""""""""""""""""""""""""
Both producer_teams and consumer_teams are applied as a logical AND. A consumer Dag
is queued only if it passes both checks:
producer_teams (consumer side): "Which producer teams am I willing to accept events from?"consumer_teams (producer side): "Which consumer teams am I willing to deliver events to?"For example, if a consumer's schedule reference has producer_teams=["team_x"] and the producer's
outlet reference has consumer_teams=["team_y"], the consumer will only be queued if the producer
belongs to team_x and the consumer belongs to team_y.
Teamless consumer pass-through """"""""""""""""""""""""""""""
Teamless consumers (Dags with no team association) pass through the consumer_teams list check,
regardless of its contents. However, they can still be blocked by setting allow_global=False on
the producer-side asset, which prevents teamless consumers from receiving events. By default
(allow_global=True), teamless consumers receive events from all producers.
Behavioral Rules ^^^^^^^^^^^^^^^^
The following table describes the complete filtering logic:
.. list-table:: :header-rows: 1 :widths: 12 12 16 16 10 10 24
producer_teamsconsumer_teamsallow_global[][]["team_a"][]["team_a"]["team_b"]["team_a"]["team_c"][]["team_b"][]True[]False["team_b"]True["team_c"]allow_global=False)["team_a"][]["team_a"]["team_b"]Key rules:
allow_global=True: Triggers all consumers regardless of team (unless consumer_teams restricts them).allow_global=False: Blocked from triggering team-bound consumers.consumer_teams, unless allow_global=False is set on the producer-side asset.producer_teams: Allowed when the producer's team is listed in the asset's producer_teams.consumer_teams: Allowed when the consumer's team is listed in the producing task's consumer_teams.producer_teams and consumer_teams are specified, a consumer must pass both checks to be queued.API-Triggered Events ^^^^^^^^^^^^^^^^^^^^
When a user creates an asset event via the REST API, the user's team is resolved from the auth manager. The same filtering rules apply, with one distinction: a teamless API user can only trigger teamless consumers, whereas a teamless DAG producer is treated as global and can trigger any consumer.
The REST API also accepts an optional access_control object in the request body with the following
fields:
consumer_teams (list[str] | null): restricts which consumer teams can receive the event,
following the same rules as the task-level consumer_teams. When omitted or null, no
consumer-team filtering is applied.allow_global (bool, default true): whether teamless consumers can receive the event.When Multi-Team mode is disabled, the access_control parameter is accepted but ignored.
.. _multi-team-work-in-progress:
Work in Progress ^^^^^^^^^^^^^^^^
Multi-Team mode is currently an experimental feature in preview. It is not yet fully complete and may be subject to changes without warning based on user feedback. Some missing functionality includes:
Global Uniqueness of Identifiers ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Dag IDs, Variable keys, and Connection IDs must be unique across the entire Airflow deployment, regardless of which team owns them. This is similar to how S3 bucket names are globally unique across all AWS accounts. You should establish naming conventions within your organization to avoid naming conflicts (e.g. prefix identifiers with the team name)
Real-World Example: Cross-Team Data Pipeline ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Consider an e-commerce platform with three teams:
All three teams reference the same asset (s3://data-lake/clickstream/hourly.parquet), but in
different roles:
.. code-block:: python
# --- team_ingestion's Dag bundle ---
from airflow.sdk import DAG, Asset, AssetAccessControl, task
clickstream = Asset(
name="clickstream_hourly",
uri="s3://data-lake/clickstream/hourly.parquet",
access_control=AssetAccessControl(
# Allow team_analytics and team_ml to consume events produced by team_ingestion
consumer_teams=["team_analytics", "team_ml"],
),
)
with DAG(dag_id="ingest_clickstream", schedule="@hourly"):
@task(outlets=[clickstream])
def ingest_from_kafka():
"""Pull clickstream events from Kafka and write to S3."""
.. code-block:: python
# --- team_analytics's Dag bundle ---
from airflow.sdk import DAG, Asset, AssetAccessControl
clickstream = Asset(
name="clickstream_hourly",
uri="s3://data-lake/clickstream/hourly.parquet",
access_control=AssetAccessControl(
# Accept events from team_ingestion (in addition to own-team events)
producer_teams=["team_ingestion"],
),
)
with DAG(dag_id="build_reporting_tables", schedule=clickstream):
...
.. code-block:: python
# --- team_ml's Dag bundle ---
from airflow.sdk import DAG, Asset, AssetAccessControl
clickstream = Asset(
name="clickstream_hourly",
uri="s3://data-lake/clickstream/hourly.parquet",
access_control=AssetAccessControl(
# Accept events from team_ingestion (in addition to own-team events)
producer_teams=["team_ingestion"],
),
)
with DAG(dag_id="train_recommendations", schedule=clickstream):
...
In this setup:
The clickstream_hourly asset is the same global object across all three teams.
When team_ingestion's ingest_from_kafka task completes, it emits an asset event.
team_analytics's build_reporting_tables and team_ml's train_recommendations both
receive the event because:
producer_teams=["team_ingestion"]) opts in to accept events from
team_ingestion.consumer_teams=["team_analytics", "team_ml"]) opts in to deliver
events to those consumer teams.A Dag from an unrelated team_marketing would not receive the event, because it is
neither listed in consumer_teams on the producer side nor does it list team_ingestion
in its own producer_teams.
The following diagram shows a Multi-Team Airflow deployment with resource isolation between teams.
The components in blue are the shared components and those in green are the team components (note the green shadow box indicating more than one team is present in the architecture).
.. image:: /img/multi_team_arch_diagram.png :alt: Multi-Team Airflow Architecture showing resource isolation between teams