docs/v3/api-ref/python/prefect-server-events-triggers.mdx
prefect.server.events.triggersThe triggers consumer watches events streaming in from the event bus and decides whether to act on them based on the automations that users have set up.
evaluate <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L86" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>evaluate(session: AsyncSession, trigger: EventTrigger, bucket: 'ORMAutomationBucket', now: prefect.types._datetime.DateTime, triggering_event: Optional[ReceivedEvent]) -> 'ORMAutomationBucket | None'
Evaluates an Automation, either triggered by a specific event or proactively on a time interval. Evaluating a Automation updates the associated counters for each automation, and will fire the associated action if it has met the threshold.
fire <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L308" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>fire(session: AsyncSession, firing: Firing) -> None
evaluate_composite_trigger <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L319" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>evaluate_composite_trigger(session: AsyncSession, firing: Firing) -> None
act <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L428" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>act(firing: Firing) -> None
Given a Automation that has been triggered, the triggering labels and event
(if there was one), publish an action for the actions service to process.
update_events_clock <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L511" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>update_events_clock(event: ReceivedEvent) -> None
get_events_clock <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L531" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>get_events_clock() -> Optional[float]
get_events_clock_offset <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L536" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>get_events_clock_offset() -> float
Calculate the current clock offset. This takes into account both the occurred
of the last event, as well as the time we saw the last event. This helps to
ensure that in low volume environments, we don't end up getting huge offsets.
reset_events_clock <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L552" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>reset_events_clock() -> None
reactive_evaluation <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L559" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>reactive_evaluation(event: ReceivedEvent, depth: int = 0) -> None
Evaluate all automations that may apply to this event.
event (ReceivedEvent): The event to evaluate. This object contains all the necessary information about the event, including its type, associated resources, and metadata. depth (int, optional): The current recursion depth. This is used to prevent infinite recursion due to cyclic event dependencies. Defaults to 0 and is incremented with each recursive call.
get_lost_followers <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L673" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>get_lost_followers() -> List[ReceivedEvent]
Get followers that have been sitting around longer than our lookback
periodic_evaluation <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L678" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>periodic_evaluation(now: prefect.types._datetime.DateTime) -> None
Periodic tasks that should be run regularly, but not as often as every event
evaluate_periodically <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L701" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>evaluate_periodically(periodic_granularity: timedelta) -> None
Runs periodic evaluation on the given interval
find_interested_triggers <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L736" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>find_interested_triggers(event: ReceivedEvent) -> Collection[EventTrigger]
clear_loaded_automations <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L741" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>clear_loaded_automations() -> None
load_automation <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L747" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>load_automation(automation: Optional[Automation]) -> None
Loads the given automation into memory so that it is available for evaluations
forget_automation <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L765" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>forget_automation(automation_id: UUID) -> None
Unloads the given automation from memory
automation_changed <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L773" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>automation_changed(automation_id: UUID, event: Literal['automation__created', 'automation__updated', 'automation__deleted']) -> None
load_automations <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L791" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>load_automations(db: PrefectDBInterface, session: AsyncSession)
Loads all automations for the given set of accounts
read_automation_state_snapshot <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L807" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>read_automation_state_snapshot(db: PrefectDBInterface, session: AsyncSession) -> AutomationStateSnapshot
reconcile_automations <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L825" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>reconcile_automations(force: bool = False) -> bool
remove_buckets_exceeding_threshold <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L854" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>remove_buckets_exceeding_threshold(db: PrefectDBInterface, session: AsyncSession, trigger: EventTrigger)
Deletes bucket where the count has already exceeded the threshold
read_buckets_for_automation <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L869" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>read_buckets_for_automation(db: PrefectDBInterface, session: AsyncSession, trigger: Trigger, batch_size: int = AUTOMATION_BUCKET_BATCH_SIZE) -> AsyncGenerator['ORMAutomationBucket', None]
Yields buckets for the given automation and trigger in batches.
read_bucket <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L903" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>read_bucket(db: PrefectDBInterface, session: AsyncSession, trigger: Trigger, bucketing_key: Tuple[str, ...]) -> Optional['ORMAutomationBucket']
Gets the bucket this event would fall into for the given Automation, if there is one currently
read_bucket_by_trigger_id <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L920" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>read_bucket_by_trigger_id(db: PrefectDBInterface, session: AsyncSession, automation_id: UUID, trigger_id: UUID, bucketing_key: Tuple[str, ...]) -> 'ORMAutomationBucket | None'
Gets the bucket this event would fall into for the given Automation, if there is one currently
increment_bucket <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L943" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>increment_bucket(db: PrefectDBInterface, session: AsyncSession, bucket: 'ORMAutomationBucket', count: int, last_event: Optional[ReceivedEvent]) -> 'ORMAutomationBucket'
Adds the given count to the bucket, returning the new bucket
start_new_bucket <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L994" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>start_new_bucket(db: PrefectDBInterface, session: AsyncSession, trigger: EventTrigger, bucketing_key: Tuple[str, ...], start: prefect.types._datetime.DateTime, end: prefect.types._datetime.DateTime, count: int, triggered_at: Optional[prefect.types._datetime.DateTime] = None, last_event: Optional[ReceivedEvent] = None) -> 'ORMAutomationBucket'
Ensures that a bucket with the given start and end exists with the given count, returning the new bucket
ensure_bucket <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L1054" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>ensure_bucket(db: PrefectDBInterface, session: AsyncSession, trigger: EventTrigger, bucketing_key: Tuple[str, ...], start: prefect.types._datetime.DateTime, end: prefect.types._datetime.DateTime, last_event: Optional[ReceivedEvent], initial_count: int = 0) -> 'ORMAutomationBucket'
Ensures that a bucket has been started for the given automation and key, returning the current bucket. Will not modify the existing bucket.
remove_bucket <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L1107" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>remove_bucket(db: PrefectDBInterface, session: AsyncSession, bucket: 'ORMAutomationBucket')
Removes the given bucket from the database
sweep_closed_buckets <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L1121" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>sweep_closed_buckets(db: PrefectDBInterface, session: AsyncSession, older_than: prefect.types._datetime.DateTime) -> None
reset <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L1131" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>reset() -> None
Resets the in-memory state of the service
listen_for_automation_changes <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L1140" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>listen_for_automation_changes() -> None
Listens for any changes to automations via PostgreSQL NOTIFY/LISTEN, and applies those changes to the set of loaded automations.
consumer <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L1211" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>consumer(periodic_granularity: timedelta = timedelta(seconds=5)) -> AsyncGenerator[MessageHandler, None]
The triggers.consumer processes all Events arriving on the event bus to
determine if they meet the automation criteria, queuing up a corresponding
TriggeredAction for the actions service if the automation criteria is met.
proactive_evaluation <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L1269" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>proactive_evaluation(trigger: EventTrigger, as_of: prefect.types._datetime.DateTime) -> prefect.types._datetime.DateTime
The core proactive evaluation operation for a single Automation
evaluate_proactive_triggers <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/triggers.py#L1321" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>evaluate_proactive_triggers() -> None