docs/integrations/prefect-dask/api-ref/prefect_dask-task_runners.mdx
prefect_dask.task_runnersInterface and implementations of the Dask Task Runner. Task Runners in Prefect are responsible for managing the execution of Prefect task runs. Generally speaking, users are not expected to interact with task runners outside of configuring and initializing them for a flow.
Example: ```python import time
from prefect import flow, task
@task
def shout(number):
time.sleep(0.5)
print(f"#{number}")
@flow
def count_to(highest_number):
for number in range(highest_number):
shout.submit(number)
if __name__ == "__main__":
count_to(10)
# outputs
#0
#1
#2
#3
#4
#5
#6
#7
#8
#9
```
Switching to a `DaskTaskRunner`:
```python
import time
from prefect import flow, task
from prefect_dask import DaskTaskRunner
@task
def shout(number):
time.sleep(0.5)
print(f"#{number}")
@flow(task_runner=DaskTaskRunner)
def count_to(highest_number):
for number in range(highest_number):
shout.submit(number)
if __name__ == "__main__":
count_to(10)
# outputs
#3
#7
#2
#6
#4
#0
#1
#5
#8
#9
```
PrefectDaskFuture <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dask/prefect_dask/task_runners.py#L113" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>A Prefect future that wraps a distributed.Future. This future is used when the task run is submitted to a DaskTaskRunner.
Methods:
result <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dask/prefect_dask/task_runners.py#L128" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>result(self, timeout: Optional[float] = None, raise_on_failure: bool = True) -> R
wait <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dask/prefect_dask/task_runners.py#L119" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>wait(self, timeout: Optional[float] = None) -> None
DaskTaskRunner <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dask/prefect_dask/task_runners.py#L149" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>A parallel task_runner that submits tasks to the dask.distributed scheduler.
By default a temporary distributed.LocalCluster is created (and
subsequently torn down) within the start() contextmanager. To use a
different cluster class (e.g.
dask_kubernetes.KubeCluster), you can
specify cluster_class/cluster_kwargs.
Alternatively, if you already have a dask cluster running, you can provide
the cluster object via the cluster kwarg or the address of the scheduler
via the address kwarg.
!!! warning "Multiprocessing safety"
Note that, because the DaskTaskRunner uses multiprocessing, calls to flows
in scripts must be guarded with if __name__ == "__main__": or warnings will
be displayed.
Args:
cluster: Currently running dask cluster;
if one is not provider (or specified via address kwarg), a temporary
cluster will be created in DaskTaskRunner.start(). Defaults to None.address: Address of a currently running dask
scheduler. Defaults to None.cluster_class: The cluster class to use
when creating a temporary dask cluster. Can be either the full
class name (e.g. "distributed.LocalCluster"), or the class itself.cluster_kwargs: Additional kwargs to pass to the
cluster_class when creating a temporary dask cluster.adapt_kwargs: Additional kwargs to pass to cluster.adapt
when creating a temporary dask cluster. Note that adaptive scaling
is only enabled if adapt_kwargs are provided.client_kwargs: Additional kwargs to use when creating a
dask.distributed.Client.performance_report_path: Path where the Dask performance report
will be saved. If not provided, no performance report will be generated.Examples:
Using a temporary local dask cluster:
from prefect import flow
from prefect_dask.task_runners import DaskTaskRunner
@flow(task_runner=DaskTaskRunner)
def my_flow():
...
Using a temporary cluster running elsewhere. Any Dask cluster class should work, here we use dask-cloudprovider:
DaskTaskRunner(
cluster_class="dask_cloudprovider.FargateCluster",
cluster_kwargs={
"image": "prefecthq/prefect:latest",
"n_workers": 5,
},
)
Connecting to an existing dask cluster:
DaskTaskRunner(address="192.0.2.255:8786")
Methods:
client <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dask/prefect_dask/task_runners.py#L306" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>client(self) -> PrefectDaskClient
Get the Dask client for the task runner.
The client is created on first access. If a remote cluster is not provided, the client will attempt to create/connect to a local cluster.
duplicate <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dask/prefect_dask/task_runners.py#L394" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>duplicate(self)
Create a new instance of the task runner with the same settings.
map <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dask/prefect_dask/task_runners.py#L453" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>map(self, task: 'Task[P, Coroutine[Any, Any, R]]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None) -> PrefectFutureList[PrefectDaskFuture[R]]
map <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dask/prefect_dask/task_runners.py#L461" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>map(self, task: 'Task[P, R]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None) -> PrefectFutureList[PrefectDaskFuture[R]]
map <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dask/prefect_dask/task_runners.py#L468" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>map(self, task: 'Task[P, R]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None)
submit <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dask/prefect_dask/task_runners.py#L408" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>submit(self, task: 'Task[P, Coroutine[Any, Any, R]]', parameters: dict[str, Any], wait_for: Iterable[PrefectDaskFuture[R]] | None = None, dependencies: dict[str, Set[RunInput]] | None = None) -> PrefectDaskFuture[R]
submit <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dask/prefect_dask/task_runners.py#L417" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>submit(self, task: 'Task[Any, R]', parameters: dict[str, Any], wait_for: Iterable[PrefectDaskFuture[R]] | None = None, dependencies: dict[str, Set[RunInput]] | None = None) -> PrefectDaskFuture[R]
submit <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dask/prefect_dask/task_runners.py#L425" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>submit(self, task: 'Union[Task[P, R], Task[P, Coroutine[Any, Any, R]]]', parameters: dict[str, Any], wait_for: Iterable[PrefectDaskFuture[R]] | None = None, dependencies: dict[str, Set[RunInput]] | None = None) -> PrefectDaskFuture[R]