docs/v3/api-ref/python/prefect-server-utilities-messaging-memory.mdx
prefect.server.utilities.messaging.memorylog_metrics_periodically <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L45" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>log_metrics_periodically(interval: float = 2.0) -> None
update_metric <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L65" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>update_metric(topic: str, key: str, amount: int = 1) -> None
break_topic <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L241" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>break_topic()
ephemeral_subscription <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L387" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>ephemeral_subscription(topic: str) -> AsyncGenerator[Mapping[str, Any], None]
MemoryMessage <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L74" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>Subscription <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L80" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>A subscription to a topic.
Messages are delivered to the subscription's queue and retried up to a maximum number of times. If a message cannot be delivered after the maximum number of retries it is moved to the dead letter queue.
The dead letter queue is a directory of JSON files containing the serialized message.
Messages remain in the dead letter queue until they are removed manually.
Attributes:
topic: The topic that the subscription receives messages from.max_retries: The maximum number of times a message will be retried for
this subscription.dead_letter_queue_path: The path to the dead letter queue folder.Methods:
deliver <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L116" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>deliver(self, message: MemoryMessage) -> None
Deliver a message to the subscription's queue.
Args:
message: The message to deliver.get <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L169" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>get(self) -> MemoryMessage
Get a message from the subscription's queue.
retry <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L140" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>retry(self, message: MemoryMessage) -> None
Place a message back on the retry queue.
If the message has retried more than the maximum number of times it is moved to the dead letter queue.
Args:
message: The message to retry.send_to_dead_letter_queue <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L177" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>send_to_dead_letter_queue(self, message: MemoryMessage) -> None
Send a message to the dead letter queue.
The dead letter queue is a directory of JSON files containing the serialized messages.
Args:
message: The message to send to the dead letter queue.Topic <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L196" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>Methods:
by_name <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L207" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>by_name(cls, name: str) -> 'Topic'
clear <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L229" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>clear(self) -> None
clear_all <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L216" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>clear_all(cls) -> None
publish <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L234" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>publish(self, message: MemoryMessage) -> None
subscribe <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L221" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>subscribe(self, **subscription_kwargs: Any) -> Subscription
unsubscribe <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L226" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>unsubscribe(self, subscription: Subscription) -> None
Cache <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L256" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>Methods:
clear_recently_seen_messages <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L262" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>clear_recently_seen_messages(self) -> None
forget_duplicates <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L289" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>forget_duplicates(self, attribute: str, messages: Iterable[M]) -> None
without_duplicates <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L265" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>without_duplicates(self, attribute: str, messages: Iterable[M]) -> list[M]
Publisher <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L301" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>Methods:
publish_data <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L318" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>publish_data(self, data: bytes, attributes: Mapping[str, str]) -> None
Consumer <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L334" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>Methods:
cleanup <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L361" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>cleanup(self) -> None
Cleanup resources by unsubscribing from the topic.
This should be called when the consumer is no longer needed to prevent memory leaks from orphaned subscriptions.
run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/utilities/messaging/memory.py#L349" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>run(self, handler: MessageHandler) -> None