docs/v3/how-to-guides/workflows/run-work-concurrently.mdx
Tasks can be called directly (i.e. using __call__), but they will not run concurrently.
import time
from prefect import task
@task
def stop_at_floor(floor: int):
print(f"elevator moving to floor {floor}")
time.sleep(floor)
print(f"elevator stops on floor {floor}")
stop_at_floor(0.1) # blocks for 0.1 seconds
stop_at_floor(0.2) # blocks for 0.2 seconds
stop_at_floor(0.3) # blocks for 0.3 seconds
When you call a task directly, it will block the main thread until the task completes.
Continue reading to learn how to run tasks concurrently via task runners.
.submitTasks in your workflows can be run concurrently using the .submit() method.
import time
from prefect import flow, task
from prefect.futures import wait
@task
def stop_at_floor(floor: int):
print(f"elevator moving to floor {floor}")
time.sleep(floor)
print(f"elevator stops on floor {floor}")
@flow
def elevator():
floors = []
for floor in range(10, 0, -1):
floors.append(stop_at_floor.submit(floor))
wait(floors)
By default, tasks are submitted via the ThreadPoolTaskRunner, which runs tasks concurrently in a thread pool.
See Design Considerations for more information. </Warning>
To run submitted tasks with a different task runner, you can pass a task_runner argument to the @flow decorator.
import time
from prefect import flow, task
from prefect.futures import wait
from prefect_dask.task_runners import DaskTaskRunner
@task
def stop_at_floor(floor: int):
print(f"elevator moving to floor {floor}")
time.sleep(floor)
print(f"elevator stops on floor {floor}")
@flow(task_runner=DaskTaskRunner())
def elevator():
floors = []
for floor in range(10, 0, -1):
floors.append(stop_at_floor.submit(floor))
wait(floors)
When you submit a task, you'll receive a PrefectFuture object back which you can use to track the task's execution. Use the .result() method to get the result of the task.
from prefect import task, flow
@task
def cool_task():
return "sup"
@flow
def my_workflow():
future = cool_task.submit()
result = future.result()
print(result)
If you don't need to the result of the task, you can use the .wait() method to wait for execution to complete.
from prefect import task, flow
@task
def cool_task():
return "sup"
@flow
def my_workflow():
future = cool_task.submit()
future.wait()
To wait for multiple futures to complete, use the wait() utility.
from prefect import task, flow
from prefect.futures import wait
@task
def cool_task():
return "sup"
@flow
def my_workflow():
futures = [cool_task.submit() for _ in range(10)]
wait(futures)
If you pass PrefectFuture objects between tasks or flows, the task or flow receiving the future will wait for the future to complete before starting execution.
from prefect import task, flow
@task
def cool_task():
return "sup"
@task
def what_did_cool_task_say(what_it_said: str):
return f"cool task said {what_it_said}"
@flow
def my_workflow():
future = cool_task.submit()
print(what_did_cool_task_say(future))
.mapFor a convenient way to iteratively submit tasks, use the .map() method.
import time
from prefect import flow, task
from prefect.futures import wait
@task
def stop_at_floor(floor: int):
print(f"elevator moving to floor {floor}")
time.sleep(floor)
print(f"elevator stops on floor {floor}")
@flow
def elevator():
floors = list(range(10, 0, -1))
floors = stop_at_floor.map(floors)
wait(floors)
Like .submit, .map uses the task runner configured on the parent flow. Changing the task runner will change where mapped tasks are executed.
unmapped annotationSometimes you may not want to map a task over a certain input value.
By default, non-iterable values will not be mapped over (so unmapped is not required):
from prefect import flow, task
@task
def add_together(x, y):
return x + y
@flow
def sum_it(numbers: list[int], static_value: int):
futures = add_together.map(numbers, static_value)
return futures.result()
resulting_sum = sum_it([1, 2, 3], 5)
assert resulting_sum == [6, 7, 8]
... but if your argument is an iterable type, wrap it with unmapped to tell .map to treat it
as static:
from prefect import flow, task, unmapped
@task
def sum_plus(x, static_iterable):
return x + sum(static_iterable)
@flow
def sum_it(numbers, static_iterable):
futures = sum_plus.map(numbers, unmapped(static_iterable))
return futures.result()
resulting_sum = sum_it([4, 5, 6], [1, 2, 3])
assert resulting_sum == [10, 11, 12]
PrefectFuture operationsWhen using .map as in the above example, the result of the task is a list of futures.
You can wait for or retrieve the results from these futures with wait or result methods:
futures = some_task.map(some_iterable)
results = futures.result()
which is syntactic sugar for the corresponding list comprehension:
futures = some_task.map(some_iterable)
results = [future.result() for future in futures]
To model more complex concurrent workflows, you can map tasks within other tasks:
import re
from prefect import flow, task
from prefect.futures import wait
def count_words(text: str) -> int:
"""Count the number of words in a text."""
return len(text.split())
def extract_emails(text: str) -> list[str]:
return re.findall(r"[\w.+-]+@[\w-]+\.[\w.-]+", text)
@task
def analyze_texts(texts: list[str]) -> dict[str, list[int | list[str]]]:
futures = {
op.__name__: task(op).map(texts) for op in [count_words, extract_emails]
}
wait([f for futs in futures.values() for f in futs])
return {name: [f.result() for f in futs] for name, futs in futures.items()}
@flow
def run_text_analysis():
"""Analyze a batch of social media posts with multiple operations."""
results = analyze_texts(
texts=[
"Just visited #Paris! Contact me at [email protected] #travel #vacation",
"Working on my new #project. Reach out at [email protected] if interested!",
"Happy to announce our company event #celebration #milestone email: [email protected]",
]
)
print("\nAnalysis Results:")
print(f" Word counts: {results['count_words']}")
print(f" Extracted emails: {results['extract_emails']}\n")
return results
run_text_analysis()
Analysis Results: Word counts: [9, 11, 10] Extracted emails: [['[email protected]'], ['[email protected]'], ['[email protected]']]
00:03:45.491 | INFO | Flow run 'hilarious-collie' - Finished in state Completed()
</Accordion>
This pattern is useful when you need to:
1. Process combinations of parameters concurrently
2. Apply multiple transformations to multiple datasets
3. Create a grid of operations where each cell is an independent task
## Creating state dependencies
You may also use the [`wait_for=[]`](https://reference.prefect.io/prefect/tasks/#prefect.tasks.Task.submit) parameter
when calling a task by specifying upstream task dependencies. This enables you to control task execution
order for tasks that do not share data dependencies.
```python
from prefect import flow, task
@task
def task_a():
pass
@task
def task_b():
pass
@task
def task_c():
pass
@task
def task_d():
pass
@flow
def my_flow():
a = task_a.submit()
b = task_b.submit()
# Wait for task_a and task_b to complete
c = task_c.submit(wait_for=[a, b])
# task_d will wait for task_c to complete
# Note: If waiting for one task it must still be in a list.
d = task_d(wait_for=[c])
Instead of failing your entire workflow when a single concurrent task fails, you may want to run concurrent work to completion, even if some tasks fail. To run tasks concurrently and handle failures afterwards, use the wait() utility along with the .state attribute on futures:
from typing import Any
from prefect import flow, task
from prefect.futures import wait
from prefect.states import State
@task
def process_data(item: int) -> str:
if item < 0:
raise ValueError(f"Cannot process negative value: {item}")
return f"Processed {item}"
@flow
def batch_processing():
items = [1, -2, 3, -4, 5]
# Submit all tasks and return the futures
futures = process_data.map(items)
# Wait for all futures to complete concurrently
done, not_done = wait(futures)
# Check each future's state
successful: list[Any] = []
failed: list[State] = []
for future in done:
if future.state.is_completed():
successful.append(future.result())
else:
failed.append(future.state)
print(f"Processed {len(successful)} items successfully")
print(f"Failed to process {len(failed)} items")
return successful
The wait() function returns two sets of futures: done and not_done. Each future has a .state attribute you can inspect to determine how the associated task run's result (or exception) should be treated downstream.
asyncioIf you have tasks are defined as async functions, you can use asyncio from the Python standard library to run them concurrently.
import asyncio
from prefect import flow, task
@task
async def stop_at_floor(floor: int):
print(f"elevator moving to floor {floor}")
await asyncio.sleep(floor)
print(f"elevator stops on floor {floor}")
@flow
async def elevator():
floors = list(range(10, 0, -1))
await asyncio.gather(*[stop_at_floor(floor) for floor in floors])
if __name__ == "__main__":
asyncio.run(elevator())
Mapped tasks are particularly valuable in common data science and ETL workflows such as:
For example, imagine you want to find the best training configuration for a series of datasets, and you want to process all datasets concurrently:
import random
from dataclasses import dataclass
from prefect import flow, task
from prefect.futures import PrefectFuture, wait
@dataclass
class Dataset:
name: str
@dataclass
class ModelConfig:
name: str
@task(task_run_name="train on {dataset.name} with {model_config.name}")
def train_model(dataset: Dataset, model_config: ModelConfig) -> dict:
return {
"dataset": dataset.name,
"model": model_config.name,
"score": random.random(),
}
@flow
def evaluate_models(datasets: list[Dataset], model_configs: list[ModelConfig]):
all_futures: list[PrefectFuture[dict[str, object]]] = []
for dataset in datasets:
futures = train_model.map(
dataset=dataset,
model_config=model_configs,
)
all_futures.extend(futures)
results = [future.result() for future in wait(all_futures).done]
print(f"\nBest model: {max(results, key=lambda r: r['score'])}")
evaluate_models(
datasets=[
Dataset("customers"), Dataset("products"), Dataset("orders")
],
model_configs=[
ModelConfig("random_forest"), ModelConfig("gradient_boosting")
],
)
Best model: {'dataset': 'products', 'model': 'random_forest', 'score': 0.5603239415052655}
00:56:48.121 | INFO | Flow run 'precious-walrus' - Finished in state Completed()
</Accordion>