Back to Prefect

sync

docs/v3/api-ref/python/prefect-concurrency-sync.mdx

3.6.30.dev33.3 KB
Original Source

prefect.concurrency.sync

Functions

concurrency <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/concurrency/sync.py#L22" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
concurrency(names: Union[str, list[str]], occupy: int = 1, timeout_seconds: Optional[float] = None, max_retries: Optional[int] = None, lease_duration: float = 300, strict: bool = False, holder: 'Optional[ConcurrencyLeaseHolder]' = None, raise_on_lease_renewal_failure: Optional[bool] = None) -> Generator[None, None, None]

A context manager that acquires and releases concurrency slots from the given concurrency limits.

Args:

  • names: The names of the concurrency limits to acquire slots from.
  • occupy: The number of slots to acquire and hold from each limit.
  • timeout_seconds: The number of seconds to wait for the slots to be acquired before raising a TimeoutError. A timeout of None will wait indefinitely.
  • max_retries: The maximum number of retries to acquire the concurrency slots.
  • lease_duration: The duration of the lease for the acquired slots in seconds.
  • strict: A boolean specifying whether to raise an error if the concurrency limit does not exist. Defaults to False.
  • holder: A dictionary containing information about the holder of the concurrency slots. Typically includes 'type' and 'id' keys.
  • raise_on_lease_renewal_failure: Controls whether to terminate execution when lease renewal fails. When None (default), follows the strict parameter for backward compatibility. Set to False to allow long-running tasks to continue even if a transient lease renewal error occurs. Set to True to terminate execution immediately on renewal failure.

Raises:

  • TimeoutError: If the slots are not acquired within the given timeout.
  • ConcurrencySlotAcquisitionError: If the concurrency limit does not exist and strict is True.

Example: A simple example of using the sync concurrency context manager:

python
from prefect.concurrency.sync import concurrency

def resource_heavy():
    with concurrency("test", occupy=1):
        print("Resource heavy task")

def main():
    resource_heavy()

rate_limit <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/concurrency/sync.py#L83" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
rate_limit(names: Union[str, list[str]], occupy: int = 1, timeout_seconds: Optional[float] = None, strict: bool = False) -> None

Block execution until an occupy number of slots of the concurrency limits given in names are acquired. Requires that all given concurrency limits have a slot decay.

Args:

  • names: The names of the concurrency limits to acquire slots from.
  • occupy: The number of slots to acquire and hold from each limit.
  • timeout_seconds: The number of seconds to wait for the slots to be acquired before raising a TimeoutError. A timeout of None will wait indefinitely.
  • strict: A boolean specifying whether to raise an error if the concurrency limit does not exist. Defaults to False.

Raises:

  • TimeoutError: If the slots are not acquired within the given timeout.
  • ConcurrencySlotAcquisitionError: If the concurrency limit does not exist and strict is True.