docs/integrations/prefect-dask/api-ref/prefect_dask-utils.mdx
prefect_dask.utilsUtils to use alongside prefect-dask.
get_dask_client <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dask/prefect_dask/utils.py#L52" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>get_dask_client(timeout: Optional[Union[int, float, str, timedelta]] = None, **client_kwargs: Dict[str, Any]) -> Generator[Client, None, None]
Yields a temporary synchronous dask client; this is useful
for parallelizing operations on dask collections,
such as a dask.DataFrame or dask.Bag.
Without invoking this, workers do not automatically get a client to connect to the full cluster. Therefore, it will attempt perform work within the worker itself serially, and potentially overwhelming the single worker.
When in an async context, we recommend using get_async_dask_client instead.
Args:
timeout: Timeout after which to error out; has no effect in
flow run contexts because the client has already started;
Defaults to the distributed.comm.timeouts.connect
configuration value.client_kwargs: Additional keyword arguments to pass to
distributed.Client, and overwrites inherited keyword arguments
from the task runner, if any.Examples:
Use get_dask_client to distribute work across workers.
import dask
from prefect import flow, task
from prefect_dask import DaskTaskRunner, get_dask_client
@task
def compute_task():
with get_dask_client(timeout="120s") as client:
df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
summary_df = client.compute(df.describe()).result()
return summary_df
@flow(task_runner=DaskTaskRunner())
def dask_flow():
prefect_future = compute_task.submit()
return prefect_future.result()
dask_flow()
get_async_dask_client <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dask/prefect_dask/utils.py#L109" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>get_async_dask_client(timeout: Optional[Union[int, float, str, timedelta]] = None, **client_kwargs: Dict[str, Any]) -> AsyncGenerator[Client, None]
Yields a temporary asynchronous dask client; this is useful
for parallelizing operations on dask collections,
such as a dask.DataFrame or dask.Bag.
Without invoking this, workers do not automatically get a client to connect to the full cluster. Therefore, it will attempt perform work within the worker itself serially, and potentially overwhelming the single worker.
Args:
timeout: Timeout after which to error out; has no effect in
flow run contexts because the client has already started;
Defaults to the distributed.comm.timeouts.connect
configuration value.client_kwargs: Additional keyword arguments to pass to
distributed.Client, and overwrites inherited keyword arguments
from the task runner, if any.Examples:
Use get_async_dask_client to distribute work across workers.
import dask
from prefect import flow, task
from prefect_dask import DaskTaskRunner, get_async_dask_client
@task
async def compute_task():
async with get_async_dask_client(timeout="120s") as client:
df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
summary_df = await client.compute(df.describe())
return summary_df
@flow(task_runner=DaskTaskRunner())
async def dask_flow():
prefect_future = await compute_task.submit()
return await prefect_future.result()
asyncio.run(dask_flow())