Back to Prefect

How to emit and use custom events

docs/v3/advanced/use-custom-event-grammar.mdx

3.6.30.dev32.4 KB
Original Source

Motivating custom events

Imagine you are running an e-commerce platform and you want to trigger a deployment when a customer completes an order.

There might be a number of events that occur during an order on your platform, for example:

  • order.created
  • order.item.added
  • order.payment-method.confirmed
  • order.shipping-method.added
  • order.complete
<Tip> **Event grammar**

The above choices of event names are arbitrary. With Prefect events, you're free to select any event grammar that best represents your use case. </Tip>

In this case, we want to trigger a deployment when a user completes an order, so our trigger should:

  • expect an order.complete event
  • after an order.created event
  • evaluate these conditions for_each user id

Finally, it should pass the user_id as a parameter to the deployment.

Define the trigger

Here's how this looks in code:

python
from prefect import flow
from prefect.events.schemas.deployment_triggers import DeploymentEventTrigger

order_complete = DeploymentEventTrigger(
    expect={"order.complete"},
    after={"order.created"},
    for_each={"prefect.resource.id"},
    parameters={"user_id": "{{ event.resource.id }}"},
)


@flow(log_prints=True)
def post_order_complete(user_id: str):
    print(f"User {user_id} has completed an order -- doing stuff now")


if __name__ == "__main__":
    post_order_complete.serve(triggers=[order_complete])
<Tip> **Specify multiple events or resources**

The expect and after fields accept a set of event names, so you can specify multiple events for each condition.

Similarly, the for_each field accepts a set of resource ids. </Tip>

Simulate events

To simulate users causing order status events, run the following in a Python shell or script:

python
import time
from prefect.events import emit_event

user_id_1, user_id_2 = "123", "456"
for event_name, user_id in [
    ("order.created", user_id_1),
    ("order.created", user_id_2), # other user
    ("order.complete", user_id_1),
]:
    event = emit_event(
        event=event_name,
        resource={"prefect.resource.id": user_id},
    )
    time.sleep(1)
    print(f"{user_id} emitted {event_name}")

In the above example:

  • user_id_1 creates and then completes an order, triggering a run of our deployment.
  • user_id_2 creates an order, but no completed event is emitted so no deployment is triggered.