docs/v3/api-ref/python/prefect-utilities-asyncutils.mdx
prefect.utilities.asyncutilsUtilities for interoperability with async functions and workers from various contexts.
get_thread_limiter <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/asyncutils/__init__.py#L69" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>get_thread_limiter() -> anyio.CapacityLimiter
is_async_fn <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/asyncutils/__init__.py#L78" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>is_async_fn(func: _SyncOrAsyncCallable[P, R]) -> TypeGuard[Callable[P, Coroutine[Any, Any, Any]]]
Returns True if a function returns a coroutine.
See https://github.com/microsoft/pyright/issues/2142 for an example use
is_async_gen_fn <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/asyncutils/__init__.py#L90" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>is_async_gen_fn(func: Callable[P, Any]) -> TypeGuard[Callable[P, AsyncGenerator[Any, Any]]]
Returns True if a function is an async generator.
create_task <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/asyncutils/__init__.py#L100" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>create_task(coroutine: Coroutine[Any, Any, R]) -> asyncio.Task[R]
Replacement for asyncio.create_task that will ensure that tasks aren't garbage collected before they complete. Allows for "fire and forget" behavior in which tasks can be created and the application can move on. Tasks can also be awaited normally.
See https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task for details (and essentially this implementation)
run_coro_as_sync <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/asyncutils/__init__.py#L144" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>run_coro_as_sync(coroutine: Coroutine[Any, Any, R]) -> Optional[R]
Runs a coroutine from a synchronous context, as if it were a synchronous function.
The coroutine is scheduled to run in the "run sync" event loop, which is running in its own thread and is started the first time it is needed. This allows us to share objects like async httpx clients among all coroutines running in the loop.
If run_sync is called from within the run_sync loop, it will run the coroutine in a new thread, because otherwise a deadlock would occur. Note that this behavior should not appear anywhere in the Prefect codebase or in user code.
Args:
coroutine: The coroutine to be run as a synchronous function.force_new_thread: If True, the coroutine will always be run in a new thread.
Defaults to False.wait_for_result: If True, the function will wait for the coroutine to complete
and return the result. If False, the function will submit the coroutine to the "run sync"
event loop and return immediately, where it will eventually be run. Defaults to True.Returns:
run_sync_in_worker_thread <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/asyncutils/__init__.py#L215" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>run_sync_in_worker_thread(__fn: Callable[P, R], *args: P.args, **kwargs: P.kwargs) -> R
Runs a sync function in a new worker thread so that the main thread's event loop is not blocked.
Unlike the anyio function, this defaults to a cancellable thread and does not allow passing arguments to the anyio function so users can pass kwargs to their function.
Note that cancellation of threads will not result in interrupted computation, the thread may continue running — the outcome will just be ignored.
call_with_mark <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/asyncutils/__init__.py#L241" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>call_with_mark(call: Callable[..., R]) -> R
run_async_from_worker_thread <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/asyncutils/__init__.py#L246" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>run_async_from_worker_thread(__fn: Callable[P, Awaitable[R]], *args: P.args, **kwargs: P.kwargs) -> R
Runs an async function in the main thread's event loop, blocking the worker thread until completion
run_async_in_new_loop <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/asyncutils/__init__.py#L257" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>run_async_in_new_loop(__fn: Callable[P, Awaitable[R]], *args: P.args, **kwargs: P.kwargs) -> R
mark_as_worker_thread <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/asyncutils/__init__.py#L263" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>mark_as_worker_thread() -> None
in_async_worker_thread <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/asyncutils/__init__.py#L267" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>in_async_worker_thread() -> bool
in_async_main_thread <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/asyncutils/__init__.py#L271" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>in_async_main_thread() -> bool
sync_compatible <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/asyncutils/__init__.py#L281" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>sync_compatible(async_fn: Callable[P, Coroutine[Any, Any, R]]) -> Callable[P, Union[R, Coroutine[Any, Any, R]]]
Converts an async function into a dual async and sync function.
When the returned function is called, we will attempt to determine the best way to enter the async function.
Note: Type checkers will infer functions decorated with @sync_compatible are synchronous. If
you want to use the decorated function in an async context, you will need to ignore the types
and "cast" the return type to a coroutine. For example:
python result: Coroutine = sync_compatible(my_async_function)(arg1, arg2) # type: ignore
asyncnullcontext <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/asyncutils/__init__.py#L377" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>asyncnullcontext(value: Optional[R] = None, *args: Any, **kwargs: Any) -> AsyncGenerator[Any, Optional[R]]
sync <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/asyncutils/__init__.py#L383" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>sync(__async_fn: Callable[P, Awaitable[T]], *args: P.args, **kwargs: P.kwargs) -> T
Call an async function from a synchronous context. Block until completion.
If in an asynchronous context, we will run the code in a separate loop instead of failing but a warning will be displayed since this is not recommended.
add_event_loop_shutdown_callback <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/asyncutils/__init__.py#L407" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>add_event_loop_shutdown_callback(coroutine_fn: Callable[[], Awaitable[Any]]) -> None
Adds a callback to the given callable on event loop closure. The callable must be a coroutine function. It will be awaited when the current event loop is shutting down.
Requires use of asyncio.run() which waits for async generator shutdown by
default or explicit call of asyncio.shutdown_asyncgens(). If the application
is entered with asyncio.run_until_complete() and the user calls
asyncio.close() without the generator shutdown call, this will not trigger
callbacks.
asyncio does not provided any other way to clean up a resource when the event loop is about to close.
create_gather_task_group <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/asyncutils/__init__.py#L525" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>create_gather_task_group() -> GatherTaskGroup
Create a new task group that gathers results
gather <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/asyncutils/__init__.py#L533" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>gather(*calls: Callable[[], Coroutine[Any, Any, T]]) -> list[T]
Run calls concurrently and gather their results.
Unlike asyncio.gather this expects to receive callables not coroutines.
This matches anyio semantics.
GatherIncomplete <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/asyncutils/__init__.py#L456" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>Used to indicate retrieving gather results before completion
GatherTaskGroup <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/asyncutils/__init__.py#L460" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>A task group that gathers results.
AnyIO does not include gather support. This class extends the TaskGroup
interface to allow simple gathering.
See https://github.com/agronholm/anyio/issues/100
This class should be instantiated with create_gather_task_group.
Methods:
get_result <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/asyncutils/__init__.py#L504" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>get_result(self, key: UUID) -> Any
start <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/asyncutils/__init__.py#L497" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>start(self, func: object, *args: object) -> NoReturn
Since start returns the result of task_status.started() but here we must
return the key instead, we just won't support this method for now.
start_soon <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/asyncutils/__init__.py#L485" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>start_soon(self, func: Callable[[Unpack[PosArgsT]], Awaitable[Any]], *args: Unpack[PosArgsT]) -> UUID
LazySemaphore <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/asyncutils/__init__.py#L547" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>