docs/integrations/prefect-ray/api-ref/prefect_ray-task_runners.mdx
prefect_ray.task_runnersInterface and implementations of the Ray Task Runner. Task Runners in Prefect are responsible for managing the execution of Prefect task runs. Generally speaking, users are not expected to interact with task runners outside of configuring and initializing them for a flow.
Example: ```python import time
from prefect import flow, task
@task
def shout(number):
time.sleep(0.5)
print(f"#{number}")
@flow
def count_to(highest_number):
for number in range(highest_number):
shout.submit(number)
if __name__ == "__main__":
count_to(10)
# outputs
#0
#1
#2
#3
#4
#5
#6
#7
#8
#9
```
Switching to a `RayTaskRunner`:
```python
import time
from prefect import flow, task
from prefect_ray import RayTaskRunner
@task
def shout(number):
time.sleep(0.5)
print(f"#{number}")
@flow(task_runner=RayTaskRunner)
def count_to(highest_number):
shout.map(range(highest_number)).wait()
if __name__ == "__main__":
count_to(10)
# outputs
#3
#7
#2
#6
#4
#0
#1
#5
#8
#9
```
PrefectRayFuture <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-ray/prefect_ray/task_runners.py#L117" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>Methods:
add_done_callback <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-ray/prefect_ray/task_runners.py#L153" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>add_done_callback(self, fn: Callable[['PrefectRayFuture[R]'], Any])
result <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-ray/prefect_ray/task_runners.py#L128" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>result(self, timeout: float | None = None, raise_on_failure: bool = True) -> R
wait <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-ray/prefect_ray/task_runners.py#L118" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>wait(self, timeout: float | None = None) -> None
RayTaskRunner <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-ray/prefect_ray/task_runners.py#L165" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>A parallel task_runner that submits tasks to ray.
By default, a temporary Ray cluster is created for the duration of the flow run.
Alternatively, if you already have a ray instance running, you can provide
the connection URL via the address kwarg.
Args:
address (string, optional): Address of a currently running ray instance; if
one is not provided, a temporary instance will be created.
init_kwargs (dict, optional): Additional kwargs to use when calling ray.init.
Examples:
Using a temporary local ray cluster:
```python
from prefect import flow
from prefect_ray.task_runners import RayTaskRunner
@flow(task_runner=RayTaskRunner())
def my_flow():
...
```
Connecting to an existing ray instance:
```python
RayTaskRunner(address="ray://<head_node_host>:10001")
```
Methods:
duplicate <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-ray/prefect_ray/task_runners.py#L207" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>duplicate(self)
Return a new instance of with the same settings as this one.
map <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-ray/prefect_ray/task_runners.py#L293" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>map(self, task: 'Task[P, Coroutine[Any, Any, R]]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None) -> PrefectFutureList[PrefectRayFuture[R]]
map <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-ray/prefect_ray/task_runners.py#L301" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>map(self, task: 'Task[Any, R]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None) -> PrefectFutureList[PrefectRayFuture[R]]
map <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-ray/prefect_ray/task_runners.py#L308" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>map(self, task: 'Task[P, R]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None) -> PrefectFutureList[PrefectRayFuture[R]]
submit <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-ray/prefect_ray/task_runners.py#L225" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>submit(self, task: 'Task[P, Coroutine[Any, Any, R]]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None, dependencies: dict[str, set[RunInput]] | None = None) -> PrefectRayFuture[R]
submit <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-ray/prefect_ray/task_runners.py#L234" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>submit(self, task: 'Task[P, R]', parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None, dependencies: dict[str, set[RunInput]] | None = None) -> PrefectRayFuture[R]
submit <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-ray/prefect_ray/task_runners.py#L242" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>submit(self, task: Task[P, R], parameters: dict[str, Any], wait_for: Iterable[PrefectFuture[Any]] | None = None, dependencies: dict[str, set[RunInput]] | None = None)