Back to Prefect

task_engine

docs/v3/api-ref/python/prefect-task_engine.mdx

3.6.30.dev317.9 KB
Original Source

prefect.task_engine

Functions

run_task_sync <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L1647" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
run_task_sync(task: 'Task[P, R]', task_run_id: Optional[UUID] = None, task_run: Optional[TaskRun] = None, parameters: Optional[dict[str, Any]] = None, wait_for: Optional['OneOrManyFutureOrResult[Any]'] = None, return_type: Literal['state', 'result'] = 'result', dependencies: Optional[dict[str, set[RunInput]]] = None, context: Optional[dict[str, Any]] = None) -> Union[R, State, None]

run_task_async <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L1678" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
run_task_async(task: 'Task[P, R]', task_run_id: Optional[UUID] = None, task_run: Optional[TaskRun] = None, parameters: Optional[dict[str, Any]] = None, wait_for: Optional['OneOrManyFutureOrResult[Any]'] = None, return_type: Literal['state', 'result'] = 'result', dependencies: Optional[dict[str, set[RunInput]]] = None, context: Optional[dict[str, Any]] = None) -> Union[R, State, None]

run_generator_task_sync <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L1709" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
run_generator_task_sync(task: 'Task[P, R]', task_run_id: Optional[UUID] = None, task_run: Optional[TaskRun] = None, parameters: Optional[dict[str, Any]] = None, wait_for: Optional['OneOrManyFutureOrResult[Any]'] = None, return_type: Literal['state', 'result'] = 'result', dependencies: Optional[dict[str, set[RunInput]]] = None, context: Optional[dict[str, Any]] = None) -> Generator[R, None, None]

run_generator_task_async <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L1768" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
run_generator_task_async(task: 'Task[P, R]', task_run_id: Optional[UUID] = None, task_run: Optional[TaskRun] = None, parameters: Optional[dict[str, Any]] = None, wait_for: Optional['OneOrManyFutureOrResult[Any]'] = None, return_type: Literal['state', 'result'] = 'result', dependencies: Optional[dict[str, set[RunInput]]] = None, context: Optional[dict[str, Any]] = None) -> AsyncGenerator[R, None]

run_task <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L1854" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
run_task(task: 'Task[P, Union[R, Coroutine[Any, Any, R]]]', task_run_id: Optional[UUID] = None, task_run: Optional[TaskRun] = None, parameters: Optional[dict[str, Any]] = None, wait_for: Optional['OneOrManyFutureOrResult[Any]'] = None, return_type: Literal['state', 'result'] = 'result', dependencies: Optional[dict[str, set[RunInput]]] = None, context: Optional[dict[str, Any]] = None) -> Union[R, State, None, Coroutine[Any, Any, Union[R, State, None]]]

Runs the provided task.

Args:

  • task: The task to run
  • task_run_id: The ID of the task run; if not provided, a new task run will be created
  • task_run: The task run object; if not provided, a new task run will be created
  • parameters: The parameters to pass to the task
  • wait_for: A list of futures to wait for before running the task
  • return_type: The return type to return; either "state" or "result"
  • dependencies: A dictionary of task run inputs to use for dependency tracking
  • context: A dictionary containing the context to use for the task run; only required if the task is running on in a remote environment

Returns:

  • The result of the task run

Classes

TaskRunTimeoutError <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L226" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Raised when a task run exceeds its timeout.

BaseTaskRunEngine <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L231" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Methods:

compute_transaction_key <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L268" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
compute_transaction_key(self) -> Optional[str]

handle_rollback <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L409" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
handle_rollback(self, txn: Transaction) -> None

is_cancelled <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L259" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
is_cancelled(self) -> bool

is_running <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L365" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
is_running(self) -> bool

Whether or not the engine is currently running a task.

log_finished_message <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L371" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
log_finished_message(self) -> None

record_terminal_state_timing <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L358" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
record_terminal_state_timing(self, state: State) -> None

state <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L254" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
state(self) -> State

SyncTaskRunEngine <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L426" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Methods:

asset_context <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L793" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
asset_context(self)

begin_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L510" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
begin_run(self) -> None

call_hooks <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L473" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
call_hooks(self, state: Optional[State] = None) -> None

call_task_fn <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L1020" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
call_task_fn(self, transaction: Transaction) -> Union[ResultRecord[Any], None, Coroutine[Any, Any, R], R]

Convenience method to call the task function. Returns a coroutine if the task is async.

can_retry <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L436" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
can_retry(self, exc_or_state: Exception | State[R]) -> bool

client <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L431" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
client(self) -> SyncPrefectClient

handle_crash <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L739" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
handle_crash(self, exc: BaseException) -> None

handle_exception <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L709" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
handle_exception(self, exc: Exception) -> None

handle_retry <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L654" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
handle_retry(self, exc_or_state: Exception | State[R]) -> bool

Handle any task run retries.

  • If the task has retries left, and the retry condition is met, set the task to retrying and return True.
  • If the task has a retry delay, place in AwaitingRetry state with a delayed scheduled time.
  • If the task has no retries left, or the retry condition is not met, return False.

handle_success <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L616" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
handle_success(self, result: R, transaction: Transaction) -> Union[ResultRecord[R], None, Coroutine[Any, Any, R], R]

handle_timeout <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L724" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
handle_timeout(self, exc: TimeoutError) -> None

initialize_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L811" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
initialize_run(self, task_run_id: Optional[UUID] = None, dependencies: Optional[dict[str, set[RunInput]]] = None) -> Generator[Self, Any, Any]

Enters a client context and creates a task run if needed.

result <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L601" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
result(self, raise_on_failure: bool = True) -> 'Union[R, State, None]'

run_context <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L984" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
run_context(self)

set_state <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L550" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
set_state(self, state: State[R], force: bool = False) -> State[R]

setup_run_context <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L749" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
setup_run_context(self, client: Optional[SyncPrefectClient] = None)

start <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L914" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
start(self, task_run_id: Optional[UUID] = None, dependencies: Optional[dict[str, set[RunInput]]] = None) -> Generator[None, None, None]

transaction_context <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L957" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
transaction_context(self) -> Generator[Transaction, None, None]

wait_until_ready <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L891" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
wait_until_ready(self) -> None

Sync version: Waits until the scheduled time (if its the future), then enters Running.

AsyncTaskRunEngine <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L1037" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Methods:

asset_context <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L1420" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
asset_context(self)

begin_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L1120" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
begin_run(self) -> None

call_hooks <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L1083" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
call_hooks(self, state: Optional[State] = None) -> None

call_task_fn <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L1631" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
call_task_fn(self, transaction: AsyncTransaction) -> Union[ResultRecord[Any], None, Coroutine[Any, Any, R], R]

Convenience method to call the task function. Returns a coroutine if the task is async.

can_retry <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L1047" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
can_retry(self, exc_or_state: Exception | State[R]) -> bool

client <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L1042" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
client(self) -> PrefectClient

handle_crash <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L1367" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
handle_crash(self, exc: BaseException) -> None

handle_exception <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L1335" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
handle_exception(self, exc: Exception) -> None

handle_retry <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L1279" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
handle_retry(self, exc_or_state: Exception | State[R]) -> bool

Handle any task run retries.

  • If the task has retries left, and the retry condition is met, set the task to retrying and return True.
  • If the task has a retry delay, place in AwaitingRetry state with a delayed scheduled time.
  • If the task has no retries left, or the retry condition is not met, return False.

handle_success <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L1242" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
handle_success(self, result: R, transaction: AsyncTransaction) -> Union[ResultRecord[R], None, Coroutine[Any, Any, R], R]

handle_timeout <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L1350" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
handle_timeout(self, exc: TimeoutError) -> None

initialize_run <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L1438" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
initialize_run(self, task_run_id: Optional[UUID] = None, dependencies: Optional[dict[str, set[RunInput]]] = None) -> AsyncGenerator[Self, Any]

Enters a client context and creates a task run if needed.

result <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L1227" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
result(self, raise_on_failure: bool = True) -> 'Union[R, State, None]'

run_context <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L1611" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
run_context(self)

set_state <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L1174" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
set_state(self, state: State, force: bool = False) -> State

setup_run_context <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L1378" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
setup_run_context(self, client: Optional[PrefectClient] = None)

start <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L1540" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
start(self, task_run_id: Optional[UUID] = None, dependencies: Optional[dict[str, set[RunInput]]] = None) -> AsyncGenerator[None, None]

transaction_context <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L1585" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
transaction_context(self) -> AsyncGenerator[AsyncTransaction, None]

wait_until_ready <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/task_engine.py#L1517" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
wait_until_ready(self) -> None

Waits until the scheduled time (if its the future), then enters Running.