Back to Prefect

task_runners

docs/integrations/prefect-dask/api-ref/prefect_dask-task_runners.mdx

3.6.30.dev38.1 KB
Original Source

prefect_dask.task_runners

Interface and implementations of the Dask Task Runner. Task Runners in Prefect are responsible for managing the execution of Prefect task runs. Generally speaking, users are not expected to interact with task runners outside of configuring and initializing them for a flow.

Example: ```python import time

from prefect import flow, task

@task
def shout(number):
    time.sleep(0.5)
    print(f"#{number}")

@flow
def count_to(highest_number):
    for number in range(highest_number):
        shout.submit(number)

if __name__ == "__main__":
    count_to(10)

# outputs
#0
#1
#2
#3
#4
#5
#6
#7
#8
#9
```

Switching to a `DaskTaskRunner`:
```python
import time

from prefect import flow, task
from prefect_dask import DaskTaskRunner

@task
def shout(number):
    time.sleep(0.5)
    print(f"#{number}")

@flow(task_runner=DaskTaskRunner)
def count_to(highest_number):
    for number in range(highest_number):
        shout.submit(number)

if __name__ == "__main__":
    count_to(10)

# outputs
#3
#7
#2
#6
#4
#0
#1
#5
#8
#9
```

Classes

PrefectDaskFuture <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dask/prefect_dask/task_runners.py#L113" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

A Prefect future that wraps a distributed.Future. This future is used when the task run is submitted to a DaskTaskRunner.

Methods:

result <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dask/prefect_dask/task_runners.py#L128" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
result(self, timeout: Optional[float] = None, raise_on_failure: bool = True) -> R

wait <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dask/prefect_dask/task_runners.py#L119" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
wait(self, timeout: Optional[float] = None) -> None

DaskTaskRunner <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dask/prefect_dask/task_runners.py#L149" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

A parallel task_runner that submits tasks to the dask.distributed scheduler. By default a temporary distributed.LocalCluster is created (and subsequently torn down) within the start() contextmanager. To use a different cluster class (e.g. dask_kubernetes.KubeCluster), you can specify cluster_class/cluster_kwargs.

Alternatively, if you already have a dask cluster running, you can provide the cluster object via the cluster kwarg or the address of the scheduler via the address kwarg. !!! warning "Multiprocessing safety" Note that, because the DaskTaskRunner uses multiprocessing, calls to flows in scripts must be guarded with if __name__ == "__main__": or warnings will be displayed.

Args:

  • cluster: Currently running dask cluster; if one is not provider (or specified via address kwarg), a temporary cluster will be created in DaskTaskRunner.start(). Defaults to None.
  • address: Address of a currently running dask scheduler. Defaults to None.
  • cluster_class: The cluster class to use when creating a temporary dask cluster. Can be either the full class name (e.g. "distributed.LocalCluster"), or the class itself.
  • cluster_kwargs: Additional kwargs to pass to the cluster_class when creating a temporary dask cluster.
  • adapt_kwargs: Additional kwargs to pass to cluster.adapt when creating a temporary dask cluster. Note that adaptive scaling is only enabled if adapt_kwargs are provided.
  • client_kwargs: Additional kwargs to use when creating a dask.distributed.Client.
  • performance_report_path: Path where the Dask performance report will be saved. If not provided, no performance report will be generated.

Examples:

Using a temporary local dask cluster:

python
from prefect import flow
from prefect_dask.task_runners import DaskTaskRunner

@flow(task_runner=DaskTaskRunner)
def my_flow():
    ...

Using a temporary cluster running elsewhere. Any Dask cluster class should work, here we use dask-cloudprovider:

python
DaskTaskRunner(
    cluster_class="dask_cloudprovider.FargateCluster",
    cluster_kwargs={
        "image": "prefecthq/prefect:latest",
        "n_workers": 5,
    },
)

Connecting to an existing dask cluster:

python
DaskTaskRunner(address="192.0.2.255:8786")

Methods:

client <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dask/prefect_dask/task_runners.py#L306" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
client(self) -> PrefectDaskClient

Get the Dask client for the task runner.

The client is created on first access. If a remote cluster is not provided, the client will attempt to create/connect to a local cluster.

duplicate <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dask/prefect_dask/task_runners.py#L394" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
duplicate(self)

Create a new instance of the task runner with the same settings.

map <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dask/prefect_dask/task_runners.py#L453" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
map(self, task: 'Task[P, Coroutine[Any, Any, R]]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None) -> PrefectFutureList[PrefectDaskFuture[R]]

map <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dask/prefect_dask/task_runners.py#L461" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
map(self, task: 'Task[P, R]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None) -> PrefectFutureList[PrefectDaskFuture[R]]

map <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dask/prefect_dask/task_runners.py#L468" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
map(self, task: 'Task[P, R]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None)

submit <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dask/prefect_dask/task_runners.py#L408" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
submit(self, task: 'Task[P, Coroutine[Any, Any, R]]', parameters: dict[str, Any], wait_for: Iterable[PrefectDaskFuture[R]] | None = None, dependencies: dict[str, Set[RunInput]] | None = None) -> PrefectDaskFuture[R]

submit <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dask/prefect_dask/task_runners.py#L417" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
submit(self, task: 'Task[Any, R]', parameters: dict[str, Any], wait_for: Iterable[PrefectDaskFuture[R]] | None = None, dependencies: dict[str, Set[RunInput]] | None = None) -> PrefectDaskFuture[R]

submit <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dask/prefect_dask/task_runners.py#L425" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
submit(self, task: 'Union[Task[P, R], Task[P, Coroutine[Any, Any, R]]]', parameters: dict[str, Any], wait_for: Iterable[PrefectDaskFuture[R]] | None = None, dependencies: dict[str, Set[RunInput]] | None = None) -> PrefectDaskFuture[R]