docs/integrations/prefect-dask/index.mdx
Dask can run your tasks in parallel and distribute them over multiple machines.
The prefect-dask integration makes it easy to accelerate your flow runs with Dask.
prefect-daskThe following command will install a version of prefect-dask compatible with your installed version of prefect.
If you don't already have prefect installed, it will install the newest version of prefect as well.
pip install "prefect[dask]"
uv pip install "prefect[dask]"
Upgrade to the latest versions of prefect and prefect-dask:
pip install -U "prefect[dask]"
uv pip install -U "prefect[dask]"
Say your flow downloads many images to train a machine learning model. It takes longer than you'd like for the flow to run because it executes sequentially.
To accelerate your flow code, parallelize it with prefect-dask in three steps:
from prefect_dask import DaskTaskRunner@flow(task_runner=DaskTaskRunner)a_task.submit(*args, **kwargs)Below is code with and without the DaskTaskRunner: <Tabs> <Tab title="Sequential by default">
# Completed in 15.2 seconds
from typing import List
from pathlib import Path
import httpx
from prefect import flow, task
URL_FORMAT = (
"https://www.cpc.ncep.noaa.gov/products/NMME/archive/"
"{year:04d}{month:02d}0800/current/images/nino34.rescaling.ENSMEAN.png"
)
@task
def download_image(year: int, month: int, directory: Path) -> Path:
# download image from URL
url = URL_FORMAT.format(year=year, month=month)
resp = httpx.get(url)
# save content to directory/YYYYMM.png
file_path = (directory / url.split("/")[-1]).with_stem(f"{year:04d}{month:02d}")
file_path.write_bytes(resp.content)
return file_path
@flow
def download_nino_34_plumes_from_year(year: int) -> List[Path]:
# create a directory to hold images
directory = Path("data")
directory.mkdir(exist_ok=True)
# download all images
file_paths = []
for month in range(1, 12 + 1):
file_path = download_image(year, month, directory)
file_paths.append(file_path)
return file_paths
if __name__ == "__main__":
download_nino_34_plumes_from_year(2022)
# Completed in 5.7 seconds
from typing import List
from pathlib import Path
import httpx
from prefect import flow, task
from prefect_dask import DaskTaskRunner
URL_FORMAT = (
"https://www.cpc.ncep.noaa.gov/products/NMME/archive/"
"{year:04d}{month:02d}0800/current/images/nino34.rescaling.ENSMEAN.png"
)
@task
def download_image(year: int, month: int, directory: Path) -> Path:
# download image from URL
url = URL_FORMAT.format(year=year, month=month)
resp = httpx.get(url)
# save content to directory/YYYYMM.png
file_path = (directory / url.split("/")[-1]).with_stem(f"{year:04d}{month:02d}")
file_path.write_bytes(resp.content)
return file_path
@flow(task_runner=DaskTaskRunner(cluster_kwargs={"processes": False}))
def download_nino_34_plumes_from_year(year: int) -> List[Path]:
# create a directory to hold images
directory = Path("data")
directory.mkdir(exist_ok=True)
# download all images
file_paths = []
for month in range(1, 12 + 1):
file_path = download_image.submit(year, month, directory)
file_paths.append(file_path)
return file_paths
if __name__ == "__main__":
download_nino_34_plumes_from_year(2022)
In our tests, the flow run took 15.2 seconds to execute sequentially.
Using the DaskTaskRunner reduced the runtime to 5.7 seconds!
The DaskTaskRunner is a task runner that submits tasks to the dask.distributed scheduler.
By default, when the DaskTaskRunner is specified for a flow run, a temporary Dask cluster is created and used for the duration of the flow run.
If you already have a Dask cluster running, either cloud-hosted or local, you can provide the connection URL with the address kwarg.
DaskTaskRunner accepts the following optional parameters:
| Parameter | Description |
|---|---|
| address | Address of a currently running Dask scheduler. |
| cluster_class | The cluster class to use when creating a temporary Dask cluster. It can be either the full class name (for example, "distributed.LocalCluster"), or the class itself. |
| cluster_kwargs | Additional kwargs to pass to the cluster_class when creating a temporary Dask cluster. |
| adapt_kwargs | Additional kwargs to pass to cluster.adapt when creating a temporary Dask cluster. Note that adaptive scaling is only enabled if adapt_kwargs are provided. |
| client_kwargs | Additional kwargs to use when creating a dask.distributed.Client. |
Because the DaskTaskRunner uses multiprocessing, calls to flows in scripts must be guarded with if __name__ == "__main__": or you will encounter warnings and errors.
</Warning>
If you don't provide the address of a Dask scheduler, Prefect creates a temporary local cluster automatically.
The number of workers used is based on the number of cores on your machine.
The default provides a mix of processes and threads that work well for most workloads.
To specify this explicitly, pass values for n_workers or threads_per_worker to cluster_kwargs:
from prefect_dask import DaskTaskRunner
# Use 4 worker processes, each with 2 threads
DaskTaskRunner(
cluster_kwargs={"n_workers": 4, "threads_per_worker": 2}
)
The DaskTaskRunner can create a temporary cluster using any of Dask's cluster-manager options.
This is useful when you want each flow run to have its own Dask cluster, allowing for per-flow adaptive scaling.
To configure it, provide a cluster_class.
This can be:
"dask_cloudprovider.aws.FargateCluster")You can also configure cluster_kwargs.
This takes a dictionary of keyword arguments to pass to cluster_class when starting the flow run.
For example, to configure a flow to use a temporary dask_cloudprovider.aws.FargateCluster with four workers running with an image named my-prefect-image:
from prefect_dask import DaskTaskRunner
DaskTaskRunner(
cluster_class="dask_cloudprovider.aws.FargateCluster",
cluster_kwargs={"n_workers": 4, "image": "my-prefect-image"},
)
For larger workloads, you can accelerate execution further by distributing task runs over multiple machines.
Multiple Prefect flow runs can use the same existing Dask cluster. You might manage a single long-running Dask cluster (for example, using the Dask Helm Chart) and configure flows to connect to it during execution. This has disadvantages compared to using a temporary Dask cluster:
Still, you may prefer managing a single long-running Dask cluster.
To configure a DaskTaskRunner to connect to an existing cluster, pass in the address of the scheduler to the address argument:
from prefect_dask import DaskTaskRunner
@flow(task_runner=DaskTaskRunner(address="http://my-dask-cluster"))
def my_flow():
...
Suppose you have an existing Dask client/cluster such as a dask.dataframe.DataFrame.
With prefect-dask, it takes just a few steps:
task and flow decoratorsget_dask_client context manager to distribute work across Dask workersimport dask.dataframe
import dask.distributed
client = dask.distributed.Client()
def read_data(start: str, end: str) -> dask.dataframe.DataFrame:
df = dask.datasets.timeseries(start, end, partition_freq="4w")
return df
def process_data(df: dask.dataframe.DataFrame) -> dask.dataframe.DataFrame:
df_yearly_avg = df.groupby(df.index.year).mean()
return df_yearly_avg.compute()
def dask_pipeline():
df = read_data("1988", "2022")
df_yearly_average = process_data(df)
return df_yearly_average
if __name__ == "__main__":
dask_pipeline()
import dask.dataframe
import dask.distributed
from prefect import flow, task
from prefect_dask import DaskTaskRunner, get_dask_client
client = dask.distributed.Client()
@task
def read_data(start: str, end: str) -> dask.dataframe.DataFrame:
df = dask.datasets.timeseries(start, end, partition_freq="4w")
return df
@task
def process_data(df: dask.dataframe.DataFrame) -> dask.dataframe.DataFrame:
with get_dask_client():
df_yearly_avg = df.groupby(df.index.year).mean()
return df_yearly_avg.compute()
@flow(task_runner=DaskTaskRunner(address=client.scheduler.address))
def dask_pipeline():
df = read_data.submit("1988", "2022")
df_yearly_average = process_data.submit(df)
return df_yearly_average
if __name__ == "__main__":
dask_pipeline()
A key feature of using a DaskTaskRunner is the ability to scale adaptively to the workload.
Instead of specifying n_workers as a fixed number, you can specify a minimum and maximum number of workers to use, and the Dask cluster scales up and down as needed.
To do this, pass adapt_kwargs to DaskTaskRunner.
This takes the following fields:
maximum (int or None, optional): the maximum number of workers to scale to. Set to None for no maximum.minimum (int or None, optional): the minimum number of workers to scale to. Set to None for no minimum.For example, this configures a flow to run on a FargateCluster scaling up to a maximum of 10 workers:
from prefect_dask import DaskTaskRunner
DaskTaskRunner(
cluster_class="dask_cloudprovider.aws.FargateCluster",
adapt_kwargs={"maximum": 10}
)
Use Dask annotations to further control the behavior of tasks. For example, set the priority of tasks in the Dask scheduler:
import dask
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner
@task
def show(x):
print(x)
@flow(task_runner=DaskTaskRunner())
def my_flow():
with dask.annotate(priority=-10):
future = show.submit(1) # low priority task
with dask.annotate(priority=10):
future = show.submit(2) # high priority task
priority is a soft scheduling hint for queued work. It does not preempt tasks
that have already been assigned to a worker, so sequential .submit() calls may
still start in submission order even when later tasks have a higher priority.
Use wait_for or stage your submissions if you need strict ordering.
Another common use case is resource annotations:
import dask
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner
@task
def show(x):
print(x)
# Create a `LocalCluster` with some resource annotations
# Annotations are abstract in dask and not inferred from your system.
# Here, we claim that our system has 1 GPU and 1 process available per worker
@flow(
task_runner=DaskTaskRunner(
cluster_kwargs={"n_workers": 1, "resources": {"GPU": 1, "process": 1}}
)
)
def my_flow():
with dask.annotate(resources={'GPU': 1}):
future = show(0) # this task requires 1 GPU resource on a worker
with dask.annotate(resources={'process': 1}):
# These tasks each require 1 process on a worker; because we've
# specified that our cluster has 1 process per worker and 1 worker,
# these tasks will run sequentially
future = show(1)
future = show(2)
future = show(3)
if __name__ == "__main__":
my_flow()
Refer to the prefect-dask SDK documentation to explore all the capabilities of the prefect-dask library.
For assistance using Dask, consult the Dask documentation
<Warning> **Resolving futures in sync client**Note, by default, dask_collection.compute() returns concrete values while client.compute(dask_collection) returns Dask Futures. Therefore, if you call client.compute, you must resolve all futures before exiting out of the context manager by either:
sync=Truewith get_dask_client() as client:
df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
summary_df = client.compute(df.describe(), sync=True)
result()with get_dask_client() as client:
df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
summary_df = client.compute(df.describe()).result()
For more information, visit the docs on Waiting on Futures. </Warning>
There is also an equivalent context manager for asynchronous tasks and flows: get_async_dask_client. When using the async client, you must await client.compute(dask_collection) before exiting the context manager.
Note that task submission (.submit()) and future resolution (.result()) are always synchronous operations in Prefect, even when working with async tasks and flows.