plans/2026-01-12-deployment-crud-events.md
Emit events for deployment create, update, and delete operations in OSS, enabling users to build automations that respond to deployment lifecycle changes.
Issue: https://github.com/PrefectHQ/prefect/issues/20176
OSS currently emits:
prefect.deployment.{status} - status change events (ready/not-ready)prefect.work-pool.updated - field change events for work poolsprefect.work-queue.updated - field change events for work queuesMissing:
prefect.deployment.createdprefect.deployment.updatedprefect.deployment.deletedCloud emits these events via the Auditor system, which adds actor-related resources for audit trail. The event names use the prefect. prefix (not prefect-cloud.) because deployments are "orchestration objects":
# Cloud uses prefect.* prefix for orchestration objects
orchestration_object_types: set[str] = {
"deployment", # Uses prefect.deployment.* naming
"flow",
"work-pool",
...
}
Consistency: OSS events will be identical in shape to Cloud, just without actor-related resources. Automations using these events will be portable between OSS and Cloud.
Emitted when a new deployment is created (not on upsert/update of existing).
Event(
event="prefect.deployment.created",
resource={
"prefect.resource.id": f"prefect.deployment.{deployment.id}",
"prefect.resource.name": deployment.name,
},
related=[
{
"prefect.resource.id": f"prefect.flow.{flow.id}",
"prefect.resource.name": flow.name,
"prefect.resource.role": "flow",
},
# work-queue if present
# work-pool if present
],
payload={...}, # deployment data
)
Emitted when an existing deployment is modified. Includes changed fields in payload.
Event(
event="prefect.deployment.updated",
resource={
"prefect.resource.id": f"prefect.deployment.{deployment.id}",
"prefect.resource.name": deployment.name,
},
related=[...], # flow, work-queue, work-pool
payload={
"updated_fields": ["description", "parameters"],
"updates": {
"description": {"from": "...", "to": "..."},
"parameters": {"from": {...}, "to": {...}},
},
},
)
Emitted when a deployment is deleted.
Event(
event="prefect.deployment.deleted",
resource={
"prefect.resource.id": f"prefect.deployment.{deployment.id}",
"prefect.resource.name": deployment.name,
},
related=[...], # flow, work-queue, work-pool (captured before deletion)
)
Location: src/prefect/server/models/events.py
Add three new functions following the existing work_pool_updated_event pattern:
async def deployment_created_event(
session: AsyncSession,
deployment: ORMDeployment,
occurred: DateTime,
) -> Event:
"""Create an event for deployment creation."""
...
async def deployment_updated_event(
session: AsyncSession,
deployment: ORMDeployment,
changed_fields: Dict[str, Dict[str, Any]],
occurred: DateTime,
) -> Event:
"""Create an event for deployment field updates."""
...
async def deployment_deleted_event(
session: AsyncSession,
deployment: ORMDeployment,
occurred: DateTime,
) -> Event:
"""Create an event for deployment deletion."""
...
Helper function (similar to _flow_run_related_resources_from_orm):
async def _deployment_related_resources(
session: AsyncSession,
deployment: ORMDeployment,
) -> RelatedResourceList:
"""Get related resources for a deployment event."""
# Returns flow, work-queue, work-pool as related resources
...
Status:
deployment_created_event functiondeployment_updated_event functiondeployment_deleted_event function_deployment_related_resources helpertests/server/models/test_events.py)Location: src/prefect/server/models/deployments.py
Add emit wrapper functions (following emit_work_queue_updated_event pattern):
async def emit_deployment_created_event(
session: AsyncSession,
deployment: ORMDeployment,
) -> None:
"""Emit an event when a deployment is created."""
async with PrefectServerEventsClient() as events_client:
await events_client.emit(
await deployment_created_event(
session=session,
deployment=deployment,
occurred=now("UTC"),
)
)
async def emit_deployment_updated_event(
session: AsyncSession,
deployment: ORMDeployment,
changed_fields: Dict[str, Dict[str, Any]],
) -> None:
"""Emit an event when a deployment is updated."""
if not changed_fields:
return
async with PrefectServerEventsClient() as events_client:
await events_client.emit(
await deployment_updated_event(
session=session,
deployment=deployment,
changed_fields=changed_fields,
occurred=now("UTC"),
)
)
async def emit_deployment_deleted_event(
session: AsyncSession,
deployment: ORMDeployment,
) -> None:
"""Emit an event when a deployment is deleted."""
async with PrefectServerEventsClient() as events_client:
await events_client.emit(
await deployment_deleted_event(
session=session,
deployment=deployment,
occurred=now("UTC"),
)
)
Status:
emit_deployment_created_event functionemit_deployment_updated_event functionemit_deployment_deleted_event functionEmit from src/prefect/server/models/deployments.py, following the existing work-pool/work-queue pattern.
In create_deployment model function:
created >= invocation_time) or existingemit_deployment_created_event or emit_deployment_updated_eventchanged_fields by comparing old vs new valuesIn update_deployment model function:
emit_deployment_updated_event with changed fieldsIn delete_deployment model function:
emit_deployment_deleted_eventStatus:
created event into create/upsert flowupdated event into update flowdeleted event into delete flowtests/server/api/test_deployments.py)
prefect.deployment.createdprefect.deployment.updatedprefect.deployment.updatedprefect.deployment.deleted| Scenario | Behavior |
|---|---|
| Upsert creates new deployment | Emit created event |
| Upsert updates existing deployment | Emit updated event |
| Update with no actual changes | No event emitted (changed_fields empty) |
| Delete non-existent deployment | No event (404 returned) |
| Deployment without work pool | Events still emitted, work-pool not in related |
| Bulk delete | Emit deleted event for each deployment |
created event in event streamupdated event with changed fieldsdeleted eventprefect.deployment.created