airflow-core/docs/authoring-and-scheduling/event-scheduling.rst
.. Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
.. versionadded:: 3.0
Apache Airflow allows for event-driven scheduling, enabling Dags to be triggered based on external events rather than predefined time-based schedules. This is particularly useful in modern data architectures where workflows need to react to real-time data changes, messages, or system signals.
By using assets, as described in :doc:asset-scheduling, you can configure Dags to start execution when specific external events
occur. Assets provide a mechanism to establish dependencies between external events and Dag execution, ensuring that
workflows react dynamically to changes in the external environment.
The AssetWatcher class plays a crucial role in this mechanism. It monitors an external event source, such as a
message queue, and triggers an asset update when a relevant event occurs.
The watchers parameter in the Asset definition allows you to associate multiple AssetWatcher instances with an
asset, enabling it to respond to various event sources.
See the :doc:common.messaging provider docs <apache-airflow-providers-common-messaging:triggers> for more information and examples.
Not all :doc:triggers <deferring> in Airflow can be used for event-driven scheduling. As opposed to all triggers that
inherit from BaseTrigger, only a subset that inherit from BaseEventTrigger are compatible.
The reason for this restriction is that some triggers are not designed for event-driven scheduling, and using them to
schedule Dags could lead to unintended results.
BaseEventTrigger ensures that triggers used for scheduling adhere to an event-driven paradigm, reacting appropriately
to external event changes without causing unexpected Dag behavior.
Writing event-driven compatible triggers
To make a trigger compatible with event-driven scheduling, it must inherit from ``BaseEventTrigger``. There are three
main scenarios for working with triggers in this context:
1. **Creating a new event-driven trigger**: If you need a new trigger for an unsupported event source, you should create
a new class inheriting from ``BaseEventTrigger`` and implement its logic.
2. **Adapting an existing compatible trigger**: If an existing trigger (inheriting from ``BaseTrigger``) is proven to be
already compatible with event-driven scheduling, then you just need to change the base class from ``BaseTrigger`` to
``BaseEventTrigger``.
3. **Adapting an existing incompatible trigger**: If an existing trigger does not appear to be compatible with
event-driven scheduling, then a new trigger must be created.
This new trigger must inherit ``BaseEventTrigger`` and ensure it properly works with event-driven scheduling.
It might inherit from the existing trigger as well if both triggers share some common code.
Sharing one poll across sibling triggers
.. versionadded:: 3.3
When several AssetWatcher instances on different assets back triggers that read from the same upstream resource
— a directory of flag files, a polling REST endpoint, and similar idempotent or
subscriber-side-effect sources — the triggerer would otherwise spin up one independent poll loop per trigger. For a
shared source with twenty subscribers that means twenty poll loops, twenty connections, twenty sets of API calls per
cadence. See "Suitable upstreams" below for the precise scope.
BaseEventTrigger supports an opt-in path so that sibling triggers share a single underlying poll, while each
trigger keeps its own DB row, its own run_trigger task, and its own per-instance filtering. To participate, a
subclass overrides three hooks:
:meth:~airflow.triggers.base.BaseEventTrigger.shared_stream_key — return a key identifying the shared
upstream (typically a tuple of strings). Triggers whose key compares equal will share one poll. Returning None
(the default) opts out — the trigger runs its own independent run() loop, exactly as before. The return value
is read once when the triggerer starts this trigger; changing it mid-lifetime has no effect on group
membership, so siblings that should share a poll must return the same key from the outset.
The key must be deterministic — derive it from configuration fields, never from per-call values such as
time.time() or uuid.uuid4(), because the comparison must be stable across the lifetime of the group.
:meth:~airflow.triggers.base.BaseEventTrigger.open_shared_stream — a @classmethod coroutine the triggerer
drives once per shared-stream group to yield raw events from the upstream. Because the triggerer reuses one
trigger's kwargs to drive the shared poll, only rely on fields whose values participate in shared_stream_key.
:meth:~airflow.triggers.base.BaseEventTrigger.filter_shared_stream — an instance method that consumes the
broadcast raw stream and yields the TriggerEvent instances this trigger should fire. Per-trigger filtering
(e.g. only events matching this instance's filename) lives here.
Example: a DirectoryFileDeleteTrigger that fires when a per-asset flag file appears in a shared inbox directory:
.. code-block:: python
from collections.abc import AsyncIterator, Hashable
from typing import Any
from airflow.triggers.base import BaseEventTrigger, TriggerEvent
class DirectoryFileDeleteTrigger(BaseEventTrigger):
def __init__(self, *, directory, filename, poke_interval=5.0):
super().__init__()
self.directory = directory
self.filename = filename
self.poke_interval = poke_interval
def shared_stream_key(self) -> Hashable | None:
# All triggers on the same directory + cadence share one scan.
return ("directory-scan", self.directory, self.poke_interval)
@classmethod
async def open_shared_stream(cls, kwargs: dict[str, Any]) -> AsyncIterator[Any]:
# Drives one directory listing loop per group.
...
async def filter_shared_stream(self, shared_stream: AsyncIterator[Any]) -> AsyncIterator[TriggerEvent]:
# Each instance fires only for its own filename.
async for snapshot in shared_stream:
if self.filename in snapshot["names"]:
yield TriggerEvent(...)
return
A complete example using this trigger ships in
airflow.example_dags.example_asset_with_watchers, where two sibling
DirectoryFileDeleteTrigger watchers share one directory scan alongside
a standalone FileDeleteTrigger watcher in the same Dag.
What is and isn't shared ^^^^^^^^^^^^^^^^^^^^^^^^
The sharing is narrower than the name might suggest:
Shared (one per shared_stream_key): the open_shared_stream async generator and its upstream I/O — for
example, the actual iterdir calls on the directory or polling REST API calls.
Not shared (one per trigger): the Trigger DB row, the trigger instance, the run_trigger
asyncio task, and the filter_shared_stream async generator. Each AssetWatcher still appears as its own
trigger in the UI and in the metadata database.
In other words, the savings is at the poll-loop and upstream-I/O layer, not at the persistence or scheduling layer.
Suitable upstreams ^^^^^^^^^^^^^^^^^^
Good fits for the shared-stream pattern:
unlink, local marking, …) goes through APIs the subscriber owns
independently of the shared producer handle.Producer-side ack channel ^^^^^^^^^^^^^^^^^^^^^^^^^
For upstreams where the producer must advance (commit, delete, or ack) only
after all subscribers have processed an event, override
:meth:~airflow.triggers.base.BaseEventTrigger.create_shared_stream_producer
to return a :class:~airflow.triggers.shared_stream.SharedStreamProducer.
When this factory is overridden, the manager enters ack mode. The
subscriber side does not change: filter_shared_stream receives raw
events exactly as on the fast path, so the same filter code works in both
modes — the framework infers when the broker may advance from each
subscriber's consumption progress and from the persistence of the trigger
events it derived.
create_shared_stream_producer(kwargs) once per
shared-stream group. The returned producer owns the broker connection for
the lifetime of one poll; open the connection lazily inside
open_stream, not in the factory.open_stream yields (event, broker_payload) tuples,
where broker_payload is whatever the producer needs later (e.g. an
SQS receipt handle, a Kafka offset, a Pub/Sub ack ID).filter_shared_stream receives raw events exactly
as in the fast path. A subscriber resolves an event once it has moved
past it (pulled the next raw event, or unsubscribed) and every
TriggerEvent it derived from the event has been persisted to the
metadata database.await producer.advance(batch) with the contiguous
prefix of fully resolved events in the event's lane — commit the
offsets, delete the SQS messages, etc. Each item in the batch is an
:class:~airflow.triggers.shared_stream.AdvanceItem carrying the
event's broker_payload and an
:class:~airflow.triggers.shared_stream.AdvanceOutcome with per-event
counts of how the subscribers resolved.Rejecting an event: a subscriber's filter can actively refuse a raw
event instead of yielding a trigger event from it, by calling
:func:~airflow.triggers.shared_stream.reject_shared_stream_event while
processing that event. This is distinct from an involuntary failure. A
failed count means a subscriber did not finish in time (ack timeout) or
fell behind (queue overflow) — the right response is usually redelivery. A
rejected count means a subscriber decided the event must not produce a
trigger event and should be terminally discarded — the right response is to
dead-letter or nack it, not redeliver. The
:class:~airflow.triggers.shared_stream.AdvanceOutcome reports both
counts separately so the producer can apply the right per-broker action in
advance:
rejected is non-zero,
abandon it (so the broker redelivers) when only failed is non-zero,
and complete it when every subscriber accepted the event.nack the message on a reject, otherwise ack it.The framework only reports the counts — it never dead-letters, nack s,
or redelivers on its own; that broker-specific decision lives entirely in
the producer's advance.
reject_shared_stream_event is meaningful only while the filter is
processing a raw event in ack mode (the binding window of that event is
open). Called on the fast path, from a standalone run(), or between two
raw events, it logs a warning and does nothing, because there is no broker
advance to influence. Because it resolves the event immediately, there is
nothing to persist and the reject does not wait on the persistence gate.
is_clean is True only when every subscriber that was online at
broadcast accepted the event: no rejects, no failures, and at least one
subscriber acknowledged it. A single reject or failure makes it False, and so
does a zero-subscriber broadcast (all-zero counts) — nothing was accepted,
so there is nothing the producer should commit on.
Example — reject inside a filter:
.. code-block:: python
from airflow.triggers.shared_stream import reject_shared_stream_event
async def filter_shared_stream(self, shared_stream):
async for raw in shared_stream:
if raw.get("malformed"):
# Never produce a trigger event from this; have the broker
# dead-letter it rather than redeliver it.
reject_shared_stream_event()
continue
yield TriggerEvent(raw)
Ordering guarantee: by default every event belongs to the same lane.
The items of a batch are in event order and form their lane's contiguous
resolved prefix; within a lane, batches arrive strictly in order — the
next advance is awaited only after the previous call returned. If
advance raises, the error is logged and the whole shared-stream
group is terminated: every subscriber receives a failure sentinel and
the broker redelivers from the never-committed offset (a loud, safe
failure rather than a silent data skip). Recovering more gracefully from
transient failures would require the producer to track which offsets are
safe to recommit. A producer can override get_advance_lane to
narrow that ordering to within a lane: events whose lane values compare
equal are batched and advanced in event order relative to each other,
while events in different lanes do not wait for one another. Either way,
at most one advance call is awaited at a time, and cumulative schemes
such as a Kafka offset commit only need to commit the batch's last item.
When the poll ends, the manager calls await producer.aclose() once,
best-effort.
Example — SQS-like producer:
.. code-block:: python
from collections.abc import AsyncIterator, Sequence
from typing import Any
from airflow.triggers.base import BaseEventTrigger, TriggerEvent
from airflow.triggers.shared_stream import AdvanceItem, SharedStreamProducer
class SqsSharedStreamProducer(SharedStreamProducer):
def __init__(self, queue_url: str):
self.queue_url = queue_url
self.client = None
async def open_stream(self) -> AsyncIterator[tuple[Any, Any]]:
# Open the connection here, not in the trigger's factory.
self.client = await create_sqs_client()
while True:
messages = await poll_sqs(self.client, self.queue_url)
for msg in messages:
yield msg["Body"], msg["ReceiptHandle"]
async def advance(self, batch: Sequence[AdvanceItem]) -> None:
# Called with one lane's batch of fully resolved messages.
for receipt_handle, outcome in batch:
if outcome.is_clean:
await delete_sqs_message(self.client, self.queue_url, receipt_handle)
# Otherwise leave the message for the visibility timeout to redeliver.
async def aclose(self) -> None:
if self.client is not None:
await self.client.close()
class SqsSharedTrigger(BaseEventTrigger):
def __init__(self, *, queue_url: str, region: str | None = None):
super().__init__()
self.queue_url = queue_url
self.region = region
def serialize(self):
return (
f"{type(self).__module__}.{type(self).__qualname__}",
{"queue_url": self.queue_url, "region": self.region},
)
def shared_stream_key(self):
return ("sqs", self.queue_url)
@classmethod
def create_shared_stream_producer(cls, kwargs) -> SqsSharedStreamProducer:
return SqsSharedStreamProducer(kwargs["queue_url"])
async def filter_shared_stream(self, shared_stream):
async for raw in shared_stream:
if self.region is None or raw.get("region") == self.region:
yield TriggerEvent(raw)
async def run(self):
yield TriggerEvent({})
Example — Kafka cumulative commit across partitions. A Kafka commit
acknowledges every offset up to the committed one within a partition, so
it is only safe if no later event from the same partition can be committed
while an earlier one is still pending — events on other partitions do not
matter. Returning (topic, partition) from get_advance_lane narrows
the ordering guarantee to exactly that granularity: each partition's
commits stay in order, and a slow partition no longer delays commits on
the other partitions:
.. code-block:: python
class KafkaSharedStreamProducer(SharedStreamProducer):
def __init__(self, topics: list[str]):
self.topics = topics
self.consumer = None
async def open_stream(self):
# Auto-commit must be off (the Kafka default is on), or the
# consumer commits on its own schedule and the ack channel
# no longer controls what the broker considers delivered.
self.consumer = await create_kafka_consumer(self.topics, enable_auto_commit=False)
async for message in self.consumer:
yield message.value, (message.topic, message.partition, message.offset)
def get_advance_lane(self, broker_payload):
topic, partition, _offset = broker_payload
return topic, partition
async def advance(self, batch):
# The batch is one lane's — here, one partition's — contiguous
# resolved prefix, in event order, so committing the offset of
# its last item covers the whole batch and can never skip past
# an event that is still pending.
#
# If this method raises, the whole shared-stream group is
# terminated and the broker redelivers from the last committed
# offset — no silent data skip.
#
# Inspect each item's outcome before committing: non-clean events
# (rejects, failures, or a zero-subscriber broadcast) should go to
# a dead-letter queue rather than be committed as delivered.
to_dlq = []
for broker_payload, outcome in batch:
if not outcome.is_clean:
to_dlq.append(broker_payload)
if to_dlq:
handle_dlq(to_dlq)
topic, partition, offset = batch[-1].broker_payload
await self.consumer.commit(topic, partition, offset + 1)
async def aclose(self):
if self.consumer is not None:
await self.consumer.stop()
The one constraint on filter authors is the binding between raw events and
the trigger events derived from them: yield every TriggerEvent derived
from a raw event before pulling the next raw event from the shared
stream — which is what a straightforward filter loop does anyway.
Snapshot-at-fan-out: the set of subscribers that must resolve a given event is frozen at the moment the event is broadcast. A subscriber that joins after the event was dispatched is not added to that event's pending set.
Per-event ack timeout: if a subscriber has not finished processing an
event within the ack timeout (default 5 minutes, configurable via the
[triggerer] shared_stream_ack_timeout config option) — it is still on
the event, or some of its derived trigger events were never confirmed
persisted — the manager force-fails that subscriber's trigger. Other
subscribers are not affected; once they resolve, the producer advances
normally. The ack timeout is a manager-level safety net and does not
replace any native broker session or visibility timeout.
From the subscriber's perspective the force-fail surfaces as an AckTimeout
(importable from airflow.triggers.shared_stream) raised by the
shared_stream iterator inside filter_shared_stream. Letting it propagate
is fine — the trigger fails through the standard trigger-failure path; catch it
only if the subscriber needs to run cleanup before failing.
Triggerer restart: resolution state lives in memory only. After a triggerer restart, the broker redelivers messages that were never advanced. Subscribers must therefore be idempotent.
Durability: the broker advance is gated on persistence. A subscriber's
resolution completes only after every TriggerEvent it derived from the
event has been stored in the metadata database; the confirmation reaches
the trigger runner on the next state sync, typically within a second or
two. If the confirmation never arrives — the triggerer crashed, or the
event could not be persisted — the ack timeout fails the event, the
producer does not commit, and the broker redelivers. A failure can
therefore cause duplicate delivery but never a lost event; idempotent
subscribers absorb the duplicates. The same trade-off applies when a
group stops while events are still awaiting confirmation (for example,
the last subscriber unsubscribes right after producing an event): the
pending advances are abandoned and the broker redelivers those events.
shared_stream_subscriber_queue_size in ack mode: the config bound
still governs unprocessed raw events per subscriber. The manager does
not wait for outstanding resolutions before pulling the next upstream
event; back-pressure is queue-bound — a subscriber whose queue is full is
force-failed. The queue primarily guards against burst delivery before a
subscriber's filter runs. Broker advances, by contrast, are dispatched in
order within each lane: an event with pending subscribers delays the
advance of every later event in the same lane — resolved events behind it
accumulate into the next batch — with the ack timeout bounding that wait.
Verifying that sharing is active ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The triggerer logs the creation of each shared-stream group, and names the poll task after its key:
.. code-block:: text
Shared stream group started key=('directory-scan', '/tmp/region-flags', 5.0)
.. code-block:: text
asyncio task name: shared-stream-poll[('directory-scan', '/tmp/region-flags', 5.0)]
If sharing is active you should see exactly one Shared stream group started line per distinct key, regardless of
how many subscribers join it. If you see one log line per subscriber instead, the keys probably do not compare equal
— verify that shared_stream_key returns identical values across the siblings.
Slow-subscriber overflow ^^^^^^^^^^^^^^^^^^^^^^^^
Each subscriber in a shared-stream group has a bounded in-memory queue. If the poll loop
produces events faster than a subscriber's filter_shared_stream can consume them, the
queue fills and that trigger is failed with _SubscriberOverflow — a deliberate fail-fast
rather than unbounded memory growth.
If subscribers repeatedly overflow, there are two ways to address this:
[triggerer] shared_stream_subscriber_queue_size to give the
filter more slack before the overflow threshold is reached.~airflow.triggers.base.BaseEventTrigger.shared_stream_key so fewer
sibling triggers share a single group — a narrower group reduces the rate at which any
one subscriber needs to consume events.Both reduce the mismatch between producer throughput and per-subscriber consume rate.
Avoid infinite scheduling
The reason why some triggers are not compatible with event-driven scheduling is that they are waiting
for an external resource to reach a given state. Examples:
* Wait for a file to exist in a storage service
* Wait for a job to be in a success state
* Wait for a row to be present in a database
Scheduling under such conditions can lead to infinite rescheduling. This is because once the condition becomes true,
it is likely to remain true for an extended period.
For example, consider a Dag scheduled to run when a specific job reaches a "success" state.
Once the job succeeds, it will typically remain in that state. As a result, the Dag will be triggered repeatedly every
time the triggerer checks the condition.
Another example is the ``S3KeyTrigger``, which checks for the presence of a specific file in an S3 bucket.
Once the file is created, the trigger will continue to succeed on every check, since the condition
"is file X present in bucket Y" remains true.
This leads to the Dag being triggered indefinitely every time the trigger mechanism runs.
When creating custom triggers, be cautious about using conditions that remain permanently true once met.
This can unintentionally result in infinite Dag executions and overwhelm your system.
Use cases for event-driven Dags
-------------------------------
* **Data ingestion pipelines**: Trigger ETL workflows when new data arrives in a storage system.
* **Machine learning workflows**: Start training models when new datasets become available.
* **IoT and real-time analytics**: React to sensor data, logs, or application events in real-time.
* **Microservices and event-driven architectures**: Orchestrate workflows based on service-to-service messages.