Back to Prefect

memory

docs/v3/api-ref/python/prefect-server-utilities-messaging-memory.mdx

3.6.30.dev38.3 KB
Original Source

prefect.server.utilities.messaging.memory

Functions

log_metrics_periodically <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L45" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
log_metrics_periodically(interval: float = 2.0) -> None

update_metric <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L65" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
update_metric(topic: str, key: str, amount: int = 1) -> None

break_topic <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L241" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
break_topic()

ephemeral_subscription <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L387" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
ephemeral_subscription(topic: str) -> AsyncGenerator[Mapping[str, Any], None]

Classes

MemoryMessage <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L74" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Subscription <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L80" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

A subscription to a topic.

Messages are delivered to the subscription's queue and retried up to a maximum number of times. If a message cannot be delivered after the maximum number of retries it is moved to the dead letter queue.

The dead letter queue is a directory of JSON files containing the serialized message.

Messages remain in the dead letter queue until they are removed manually.

Attributes:

  • topic: The topic that the subscription receives messages from.
  • max_retries: The maximum number of times a message will be retried for this subscription.
  • dead_letter_queue_path: The path to the dead letter queue folder.

Methods:

deliver <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L116" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
deliver(self, message: MemoryMessage) -> None

Deliver a message to the subscription's queue.

Args:

  • message: The message to deliver.

get <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L169" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get(self) -> MemoryMessage

Get a message from the subscription's queue.

retry <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L140" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
retry(self, message: MemoryMessage) -> None

Place a message back on the retry queue.

If the message has retried more than the maximum number of times it is moved to the dead letter queue.

Args:

  • message: The message to retry.

send_to_dead_letter_queue <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L177" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
send_to_dead_letter_queue(self, message: MemoryMessage) -> None

Send a message to the dead letter queue.

The dead letter queue is a directory of JSON files containing the serialized messages.

Args:

  • message: The message to send to the dead letter queue.

Topic <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L196" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Methods:

by_name <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L207" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
by_name(cls, name: str) -> 'Topic'

clear <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L229" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
clear(self) -> None

clear_all <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L216" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
clear_all(cls) -> None

publish <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L234" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
publish(self, message: MemoryMessage) -> None

subscribe <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L221" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
subscribe(self, **subscription_kwargs: Any) -> Subscription

unsubscribe <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L226" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
unsubscribe(self, subscription: Subscription) -> None

Cache <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L256" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Methods:

clear_recently_seen_messages <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L262" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
clear_recently_seen_messages(self) -> None

forget_duplicates <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L289" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
forget_duplicates(self, attribute: str, messages: Iterable[M]) -> None

without_duplicates <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L265" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
without_duplicates(self, attribute: str, messages: Iterable[M]) -> list[M]

Publisher <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L301" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Methods:

publish_data <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L318" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
publish_data(self, data: bytes, attributes: Mapping[str, str]) -> None

Consumer <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L334" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Methods:

cleanup <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L361" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
cleanup(self) -> None

Cleanup resources by unsubscribing from the topic.

This should be called when the consumer is no longer needed to prevent memory leaks from orphaned subscriptions.

run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L349" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
run(self, handler: MessageHandler) -> None