Back to Prefect

event_persister

docs/v3/api-ref/python/prefect-server-events-services-event_persister.mdx

3.7.05.4 KB
Original Source

prefect.server.events.services.event_persister

The event persister moves event messages from the event bus to storage storage as fast as it can. Never gets tired.

Functions

create_handler <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/services/event_persister.py#L100" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
create_handler(batch_size: int = 20, flush_every: timedelta = timedelta(seconds=5), queue_max_size: int = 50000, max_flush_retries: int = 5) -> AsyncGenerator[MessageHandler, None]

Set up a message handler that will accumulate and send events to the database every batch_size messages, or every flush_every interval to flush any remaining messages.

Event trimming/retention is handled by the db_vacuum service (vacuum_old_events and vacuum_events_with_retention_overrides tasks).

Args:

  • batch_size: Number of events to accumulate before flushing
  • flush_every: Maximum time between flushes
  • queue_max_size: Maximum events in queue before dropping new events
  • max_flush_retries: Consecutive flush failures before dropping events

Classes

EventPersister <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/services/event_persister.py#L36" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

A service that persists events to the database as they arrive.

Methods:

all_services <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/services/base.py#L69" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
all_services(cls) -> Sequence[type[Self]]

Get list of all service classes

enabled <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/services/base.py#L64" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
enabled(cls) -> bool

Whether the service is enabled

enabled_services <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/services/base.py#L83" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
enabled_services(cls) -> list[type[Self]]

Get list of enabled service classes

environment_variable_name <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/services/base.py#L60" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
environment_variable_name(cls) -> str

run_services <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/services/base.py#L104" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
run_services(cls) -> NoReturn

Run enabled services until cancelled.

running <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/services/base.py#L89" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
running(cls) -> AsyncGenerator[None, None]

A context manager that runs enabled services on entry and stops them on exit.

service_settings <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/services/event_persister.py#L42" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
service_settings(cls) -> ServerServicesEventPersisterSettings

service_settings <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/services/base.py#L55" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
service_settings(cls) -> ServicesBaseSetting

The Prefect setting that controls whether the service is enabled

start <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/services/event_persister.py#L59" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
start(self) -> NoReturn

start <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/services/base.py#L114" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
start(self) -> NoReturn

Start running the service, which may run indefinitely

started_event <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/services/event_persister.py#L50" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
started_event(self) -> asyncio.Event

started_event <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/services/event_persister.py#L56" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
started_event(self, value: asyncio.Event) -> None

stop <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/events/services/event_persister.py#L84" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
stop(self) -> None

stop <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/services/base.py#L119" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
stop(self) -> None

Stop the service