Back to Prefect

processutils

docs/v3/api-ref/python/prefect-utilities-processutils.mdx

3.6.30.dev35.1 KB
Original Source

prefect.utilities.processutils

Functions

sanitize_subprocess_env <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/processutils/__init__.py#L39" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
sanitize_subprocess_env(env: Mapping[str, str | None] | None) -> dict[str, str]

Normalize environment variables before launching a subprocess.

None means "omit this key" for subprocess launch paths. Downstream APIs like subprocess, anyio.open_process, and os.environ.update(...) all expect concrete string values.

command_to_string <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/processutils/__init__.py#L253" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
command_to_string(command: list[str]) -> str

Serialize a command list to a platform-neutral string.

We use POSIX shell quoting so stored commands round-trip across platforms when paired with command_from_string.

command_from_string <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/processutils/__init__.py#L263" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
command_from_string(command: str) -> list[str]

Parse a command string back into argv tokens.

Prefect-owned command strings use POSIX shell quoting. Other command strings keep native parsing so existing Windows configuration still works.

open_process <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/processutils/__init__.py#L281" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
open_process(command: list[str], **kwargs: Any) -> AsyncGenerator[anyio.abc.Process, Any]

Like anyio.open_process but with:

  • Support for Windows command joining
  • Termination of the process on exception during yield
  • Forced cleanup of process resources during cancellation

run_process <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/processutils/__init__.py#L380" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
run_process(command: list[str], **kwargs: Any) -> anyio.abc.Process

Like anyio.run_process but with:

  • Use of our open_process utility to ensure resources are cleaned up
  • Simple stream_output support to connect the subprocess to the parent stdout/err
  • Support for submission with TaskGroup.start marking as 'started' after the process has been created. When used, the PID is returned to the task status.

consume_process_output <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/processutils/__init__.py#L424" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
consume_process_output(process: anyio.abc.Process, stdout_sink: Optional[TextSink[str]] = None, stderr_sink: Optional[TextSink[str]] = None) -> None

stream_text <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/processutils/__init__.py#L444" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
stream_text(source: TextReceiveStream, *sinks: Optional[TextSink[str]]) -> None

forward_signal_handler <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/processutils/__init__.py#L475" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
forward_signal_handler(pid: int, signum: int, *signums: int) -> None

Forward subsequent signum events (e.g. interrupts) to respective signums.

setup_signal_handlers_server <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/processutils/__init__.py#L510" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
setup_signal_handlers_server(pid: int, process_name: str, print_fn: PrintFn) -> None

Handle interrupts of the server gracefully.

setup_signal_handlers_agent <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/processutils/__init__.py#L529" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
setup_signal_handlers_agent(pid: int, process_name: str, print_fn: PrintFn) -> None

Handle interrupts of the agent gracefully.

setup_signal_handlers_worker <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/processutils/__init__.py#L547" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
setup_signal_handlers_worker(pid: int, process_name: str, print_fn: PrintFn) -> None

Handle interrupts of workers gracefully.

get_sys_executable <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/processutils/__init__.py#L567" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
get_sys_executable() -> str