docs/v3/advanced/debouncing-events.mdx
Prefect allows you to trigger deployment runs based on events. However, when multiple events occur in quick succession, you may want a single flow run to handle all of them rather than spinning up a separate run for each event. For example, when multiple files are uploaded to an S3 bucket simultaneously, or a flurry of webhook events are received by Prefect, you typically want to process all files together in one run.
This pattern is called debouncing, and you can implement it using reactive triggers with the within parameter of the trigger and the schedule_after parameter of the run-deployment action.
Automations fire in response to a single event and can only pass that event's context to the triggered deployment. This creates a challenge when multiple events arrive rapidly:
Debouncing solves this by:
Automations can only pass the context from the triggering event to your deployment. Design your flows to query the source system directly (like listing S3 objects) rather than relying on individual event data. </Note>
Consider a scenario where you have a webhook configured to receive S3 ObjectCreated events. When users upload five files in quick succession:
Without debouncing: Five separate flow runs are triggered, one for each file.
With debouncing: One flow run is triggered after all uploads complete, processing all five files together.
Use a reactive trigger with matching within and schedule_after values:
prefect.yamldeployments:
- name: process-s3-uploads
entrypoint: flows/s3_processor.py:process_files
work_pool:
name: my-work-pool
triggers:
- type: event
enabled: true
match:
prefect.resource.id:
- "s3-bucket-name/*"
expect:
- "aws:s3:ObjectCreated:*"
for_each:
- "prefect.resource.id"
posture: Reactive
threshold: 1
within: 60 # 60 seconds
schedule_after: "PT1M" # Wait 1 minute before running
.servefrom datetime import timedelta
from prefect import flow
from prefect.events import DeploymentEventTrigger
@flow(log_prints=True)
def process_files():
"""Process all files in the S3 bucket"""
# Query S3 directly to find all files
# Your flow logic here to list and process all files
print("Processing all pending files...")
if __name__ == "__main__":
process_files.serve(
name="process-s3-uploads",
triggers=[
DeploymentEventTrigger(
enabled=True,
match={"prefect.resource.id": "s3-bucket-name/*"},
expect=["aws:s3:ObjectCreated:*"],
for_each=["prefect.resource.id"],
posture="Reactive",
threshold=1,
within=60, # 60 seconds
schedule_after=timedelta(seconds=60), # Wait 1 minute
)
],
)
When you configure a reactive trigger with both within and schedule_after:
schedule_after), all events from the burst have occurredThe within parameter implements eager debouncing: it fires immediately on the first event, then ignores subsequent events for the specified duration.
The schedule_after parameter delays the actual flow run, ensuring all events in the burst have completed before processing begins. This implements late debouncing.
Using both parameters together prevents duplicate runs while ensuring your flow has access to all events from the burst.
<Tip> **Matching time windows**Set within and schedule_after to the same value. This ensures the deployment run is scheduled after the debounce window closes, so all related work is visible to your flow when it runs.
</Tip>
The appropriate time window depends on your use case:
Test with your actual event patterns to find the optimal window.
<Warning> **Time format requirements**The schedule_after parameter accepts:
"PT1M" (1 minute), "PT30S" (30 seconds), "PT2H" (2 hours)60 (60 seconds)timedelta(minutes=1) (in code)The within parameter accepts integer seconds only.
</Warning>
Since automations can only pass one event's context, design your flows to discover and process all available work:
import boto3
from prefect import flow
@flow(log_prints=True)
def process_s3_files(bucket_name: str = "my-bucket"):
"""Process all files in the pending prefix of an S3 bucket"""
s3 = boto3.client('s3')
# List all objects in the pending prefix
response = s3.list_objects_v2(
Bucket=bucket_name,
Prefix='pending/'
)
files = response.get('Contents', [])
print(f"Found {len(files)} files to process")
# Process each file
for file in files:
key = file['Key']
print(f"Processing {key}")
# Your processing logic here
# ...
# Move to completed prefix
s3.copy_object(
Bucket=bucket_name,
CopySource={'Bucket': bucket_name, 'Key': key},
Key=key.replace('pending/', 'completed/')
)
s3.delete_object(Bucket=bucket_name, Key=key)
print(f"Processed {len(files)} files")
Key design principles:
For additional control, combine debouncing with deployment concurrency limits to prevent overlapping runs:
deployments:
- name: process-s3-uploads
entrypoint: flows/s3_processor.py:process_files
work_pool:
name: my-work-pool
concurrency_limit:
limit: 1
collision_strategy: CANCEL_NEW
triggers:
- type: event
enabled: true
match:
prefect.resource.id:
- "s3-bucket-name/*"
expect:
- "aws:s3:ObjectCreated:*"
posture: Reactive
threshold: 1
within: 60
schedule_after: "PT1M"
This ensures:
Events that arrive during the within window are still recorded in Prefect's event system:
The automation system recognizes these as part of the same event burst and doesn't create additional runs.