Back to Prefect

ordering

docs/integrations/prefect-redis/api-ref/prefect_redis-ordering.mdx

3.6.30.dev37.0 KB
Original Source

prefect_redis.ordering

Manages the partial causal ordering of events for a particular consumer. This module maintains a buffer of events to be processed, aiming to process them in the order they occurred causally.

Classes

EventProcessingCompletion <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-redis/prefect_redis/ordering.py#L34" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Holds the result of completing event processing, including any followers.

CausalOrdering <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-redis/prefect_redis/ordering.py#L83" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Methods:

complete_event_and_get_followers <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-redis/prefect_redis/ordering.py#L188" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
complete_event_and_get_followers(self, event: ReceivedEvent) -> list[ReceivedEvent]

Atomically marks the event as seen, retrieves any waiting followers, and releases the processing lock.

This operation is atomic to prevent a race condition where a follower could park itself between the lock release and the followers check.

event_has_been_seen <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-redis/prefect_redis/ordering.py#L113" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
event_has_been_seen(self, event: Union[UUID, Event]) -> bool

event_has_started_processing <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-redis/prefect_redis/ordering.py#L106" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
event_has_started_processing(self, event: Union[UUID, Event]) -> bool

event_is_processing <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-redis/prefect_redis/ordering.py#L210" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
event_is_processing(self, event: ReceivedEvent) -> AsyncGenerator[EventProcessingCompletion, None]

Mark an event as being processed for the duration of its lifespan through the ordering system.

Yields an EventProcessingCompletion object that will be populated with any followers after successful processing.

followers_by_id <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-redis/prefect_redis/ordering.py#L168" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
followers_by_id(self, follower_ids: list[UUID]) -> list[ReceivedEvent]

Returns the events with the given IDs, in the order they occurred

forget_event_is_processing <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-redis/prefect_redis/ordering.py#L110" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
forget_event_is_processing(self, event: ReceivedEvent) -> None

forget_follower <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-redis/prefect_redis/ordering.py#L132" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
forget_follower(self, follower: ReceivedEvent)

Forget that this event is waiting on another event to arrive

get_followers <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-redis/prefect_redis/ordering.py#L180" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_followers(self, leader: ReceivedEvent) -> list[ReceivedEvent]

Returns events that were waiting on this leader event to arrive

get_lost_followers <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-redis/prefect_redis/ordering.py#L142" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_lost_followers(self) -> list[ReceivedEvent]

Returns events that were waiting on a leader event that never arrived

preceding_event_confirmed <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-redis/prefect_redis/ordering.py#L302" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
preceding_event_confirmed(self, handler: event_handler, event: ReceivedEvent, depth: int = 0) -> AsyncGenerator[None, None]

Events may optionally declare that they logically follow another event, so that we can preserve important event orderings in the face of unreliable delivery and ordering of messages from the queues.

This function keeps track of the ID of each event that this shard has successfully processed going back to the PRECEDING_EVENT_LOOKBACK period. If an event arrives that must follow another one, confirm that we have recently seen and processed that event before proceeding.

is ready to be processed event (ReceivedEvent): The event to be processed. This object should include metadata indicating if and what event it follows. depth (int, optional): The current recursion depth, used to prevent infinite recursion due to cyclic dependencies between events. Defaults to 0.

Raises EventArrivedEarly if the current event shouldn't be processed yet.

record_event_as_processing <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-redis/prefect_redis/ordering.py#L93" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
record_event_as_processing(self, event: ReceivedEvent) -> bool

Record that an event is being processed, returning False if the event is already being processed.

record_event_as_seen <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-redis/prefect_redis/ordering.py#L117" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
record_event_as_seen(self, event: ReceivedEvent) -> None

record_follower <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-redis/prefect_redis/ordering.py#L120" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
record_follower(self, event: ReceivedEvent)

Remember that this event is waiting on another event to arrive

wait_for_leader <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-redis/prefect_redis/ordering.py#L236" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
wait_for_leader(self, event: ReceivedEvent)

Given an event, wait for its leader to be processed before proceeding, or raise EventArrivedEarly if we would wait too long in this attempt.