Back to Prefect

task_runners

docs/integrations/prefect-ray/api-ref/prefect_ray-task_runners.mdx

3.6.30.dev36.2 KB
Original Source

prefect_ray.task_runners

Interface and implementations of the Ray Task Runner. Task Runners in Prefect are responsible for managing the execution of Prefect task runs. Generally speaking, users are not expected to interact with task runners outside of configuring and initializing them for a flow.

Example: ```python import time

from prefect import flow, task

@task
def shout(number):
    time.sleep(0.5)
    print(f"#{number}")

@flow
def count_to(highest_number):
    for number in range(highest_number):
        shout.submit(number)

if __name__ == "__main__":
    count_to(10)

# outputs
#0
#1
#2
#3
#4
#5
#6
#7
#8
#9
```

Switching to a `RayTaskRunner`:
```python
import time

from prefect import flow, task
from prefect_ray import RayTaskRunner

@task
def shout(number):
    time.sleep(0.5)
    print(f"#{number}")

@flow(task_runner=RayTaskRunner)
def count_to(highest_number):
    shout.map(range(highest_number)).wait()

if __name__ == "__main__":
    count_to(10)

# outputs
#3
#7
#2
#6
#4
#0
#1
#5
#8
#9
```

Classes

PrefectRayFuture <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-ray/prefect_ray/task_runners.py#L117" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Methods:

add_done_callback <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-ray/prefect_ray/task_runners.py#L153" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
add_done_callback(self, fn: Callable[['PrefectRayFuture[R]'], Any])

result <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-ray/prefect_ray/task_runners.py#L128" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
result(self, timeout: float | None = None, raise_on_failure: bool = True) -> R

wait <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-ray/prefect_ray/task_runners.py#L118" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
wait(self, timeout: float | None = None) -> None

RayTaskRunner <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-ray/prefect_ray/task_runners.py#L165" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

A parallel task_runner that submits tasks to ray. By default, a temporary Ray cluster is created for the duration of the flow run. Alternatively, if you already have a ray instance running, you can provide the connection URL via the address kwarg. Args: address (string, optional): Address of a currently running ray instance; if one is not provided, a temporary instance will be created. init_kwargs (dict, optional): Additional kwargs to use when calling ray.init. Examples: Using a temporary local ray cluster: ```python from prefect import flow from prefect_ray.task_runners import RayTaskRunner

@flow(task_runner=RayTaskRunner())
def my_flow():
    ...
```
Connecting to an existing ray instance:
```python
RayTaskRunner(address="ray://<head_node_host>:10001")
```

Methods:

duplicate <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-ray/prefect_ray/task_runners.py#L207" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
duplicate(self)

Return a new instance of with the same settings as this one.

map <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-ray/prefect_ray/task_runners.py#L293" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
map(self, task: 'Task[P, Coroutine[Any, Any, R]]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None) -> PrefectFutureList[PrefectRayFuture[R]]

map <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-ray/prefect_ray/task_runners.py#L301" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
map(self, task: 'Task[Any, R]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None) -> PrefectFutureList[PrefectRayFuture[R]]

map <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-ray/prefect_ray/task_runners.py#L308" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
map(self, task: 'Task[P, R]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None) -> PrefectFutureList[PrefectRayFuture[R]]

submit <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-ray/prefect_ray/task_runners.py#L225" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
submit(self, task: 'Task[P, Coroutine[Any, Any, R]]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None, dependencies: dict[str, set[RunInput]] | None = None) -> PrefectRayFuture[R]

submit <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-ray/prefect_ray/task_runners.py#L234" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
submit(self, task: 'Task[P, R]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None, dependencies: dict[str, set[RunInput]] | None = None) -> PrefectRayFuture[R]

submit <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-ray/prefect_ray/task_runners.py#L242" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
submit(self, task: Task[P, R], parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None, dependencies: dict[str, set[RunInput]] | None = None)