Back to Prefect

triggers

docs/v3/api-ref/python/prefect-server-events-triggers.mdx

3.6.30.dev313.6 KB
Original Source

prefect.server.events.triggers

The triggers consumer watches events streaming in from the event bus and decides whether to act on them based on the automations that users have set up.

Functions

evaluate <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L86" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
evaluate(session: AsyncSession, trigger: EventTrigger, bucket: 'ORMAutomationBucket', now: prefect.types._datetime.DateTime, triggering_event: Optional[ReceivedEvent]) -> 'ORMAutomationBucket | None'

Evaluates an Automation, either triggered by a specific event or proactively on a time interval. Evaluating a Automation updates the associated counters for each automation, and will fire the associated action if it has met the threshold.

fire <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L308" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
fire(session: AsyncSession, firing: Firing) -> None

evaluate_composite_trigger <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L319" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
evaluate_composite_trigger(session: AsyncSession, firing: Firing) -> None

act <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L428" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
act(firing: Firing) -> None

Given a Automation that has been triggered, the triggering labels and event (if there was one), publish an action for the actions service to process.

update_events_clock <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L511" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
update_events_clock(event: ReceivedEvent) -> None

get_events_clock <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L531" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_events_clock() -> Optional[float]

get_events_clock_offset <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L536" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_events_clock_offset() -> float

Calculate the current clock offset. This takes into account both the occurred of the last event, as well as the time we saw the last event. This helps to ensure that in low volume environments, we don't end up getting huge offsets.

reset_events_clock <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L552" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
reset_events_clock() -> None

reactive_evaluation <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L559" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
reactive_evaluation(event: ReceivedEvent, depth: int = 0) -> None

Evaluate all automations that may apply to this event.

event (ReceivedEvent): The event to evaluate. This object contains all the necessary information about the event, including its type, associated resources, and metadata. depth (int, optional): The current recursion depth. This is used to prevent infinite recursion due to cyclic event dependencies. Defaults to 0 and is incremented with each recursive call.

get_lost_followers <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L673" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_lost_followers() -> List[ReceivedEvent]

Get followers that have been sitting around longer than our lookback

periodic_evaluation <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L678" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
periodic_evaluation(now: prefect.types._datetime.DateTime) -> None

Periodic tasks that should be run regularly, but not as often as every event

evaluate_periodically <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L701" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
evaluate_periodically(periodic_granularity: timedelta) -> None

Runs periodic evaluation on the given interval

find_interested_triggers <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L736" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
find_interested_triggers(event: ReceivedEvent) -> Collection[EventTrigger]

clear_loaded_automations <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L741" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
clear_loaded_automations() -> None

load_automation <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L747" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
load_automation(automation: Optional[Automation]) -> None

Loads the given automation into memory so that it is available for evaluations

forget_automation <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L765" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
forget_automation(automation_id: UUID) -> None

Unloads the given automation from memory

automation_changed <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L773" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
automation_changed(automation_id: UUID, event: Literal['automation__created', 'automation__updated', 'automation__deleted']) -> None

load_automations <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L791" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
load_automations(db: PrefectDBInterface, session: AsyncSession)

Loads all automations for the given set of accounts

read_automation_state_snapshot <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L807" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
read_automation_state_snapshot(db: PrefectDBInterface, session: AsyncSession) -> AutomationStateSnapshot

reconcile_automations <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L825" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
reconcile_automations(force: bool = False) -> bool

remove_buckets_exceeding_threshold <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L854" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
remove_buckets_exceeding_threshold(db: PrefectDBInterface, session: AsyncSession, trigger: EventTrigger)

Deletes bucket where the count has already exceeded the threshold

read_buckets_for_automation <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L869" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
read_buckets_for_automation(db: PrefectDBInterface, session: AsyncSession, trigger: Trigger, batch_size: int = AUTOMATION_BUCKET_BATCH_SIZE) -> AsyncGenerator['ORMAutomationBucket', None]

Yields buckets for the given automation and trigger in batches.

read_bucket <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L903" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
read_bucket(db: PrefectDBInterface, session: AsyncSession, trigger: Trigger, bucketing_key: Tuple[str, ...]) -> Optional['ORMAutomationBucket']

Gets the bucket this event would fall into for the given Automation, if there is one currently

read_bucket_by_trigger_id <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L920" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
read_bucket_by_trigger_id(db: PrefectDBInterface, session: AsyncSession, automation_id: UUID, trigger_id: UUID, bucketing_key: Tuple[str, ...]) -> 'ORMAutomationBucket | None'

Gets the bucket this event would fall into for the given Automation, if there is one currently

increment_bucket <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L943" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
increment_bucket(db: PrefectDBInterface, session: AsyncSession, bucket: 'ORMAutomationBucket', count: int, last_event: Optional[ReceivedEvent]) -> 'ORMAutomationBucket'

Adds the given count to the bucket, returning the new bucket

start_new_bucket <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L994" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
start_new_bucket(db: PrefectDBInterface, session: AsyncSession, trigger: EventTrigger, bucketing_key: Tuple[str, ...], start: prefect.types._datetime.DateTime, end: prefect.types._datetime.DateTime, count: int, triggered_at: Optional[prefect.types._datetime.DateTime] = None, last_event: Optional[ReceivedEvent] = None) -> 'ORMAutomationBucket'

Ensures that a bucket with the given start and end exists with the given count, returning the new bucket

ensure_bucket <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L1054" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
ensure_bucket(db: PrefectDBInterface, session: AsyncSession, trigger: EventTrigger, bucketing_key: Tuple[str, ...], start: prefect.types._datetime.DateTime, end: prefect.types._datetime.DateTime, last_event: Optional[ReceivedEvent], initial_count: int = 0) -> 'ORMAutomationBucket'

Ensures that a bucket has been started for the given automation and key, returning the current bucket. Will not modify the existing bucket.

remove_bucket <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L1107" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
remove_bucket(db: PrefectDBInterface, session: AsyncSession, bucket: 'ORMAutomationBucket')

Removes the given bucket from the database

sweep_closed_buckets <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L1121" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
sweep_closed_buckets(db: PrefectDBInterface, session: AsyncSession, older_than: prefect.types._datetime.DateTime) -> None

reset <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L1131" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
reset() -> None

Resets the in-memory state of the service

listen_for_automation_changes <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L1140" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
listen_for_automation_changes() -> None

Listens for any changes to automations via PostgreSQL NOTIFY/LISTEN, and applies those changes to the set of loaded automations.

consumer <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L1211" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
consumer(periodic_granularity: timedelta = timedelta(seconds=5)) -> AsyncGenerator[MessageHandler, None]

The triggers.consumer processes all Events arriving on the event bus to determine if they meet the automation criteria, queuing up a corresponding TriggeredAction for the actions service if the automation criteria is met.

proactive_evaluation <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L1269" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
proactive_evaluation(trigger: EventTrigger, as_of: prefect.types._datetime.DateTime) -> prefect.types._datetime.DateTime

The core proactive evaluation operation for a single Automation

evaluate_proactive_triggers <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L1321" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
evaluate_proactive_triggers() -> None