docs/sdk/python/execution.mdx
Run commands inside a running sandbox: collect output in one shot, stream events as they arrive, or bridge your terminal to an interactive PTY. These methods live on a running Sandbox. See Commands for usage examples.
import sys
from microsandbox import Sandbox
sandbox = await Sandbox.create("api", image="python")
# 1. one-shot
out = await sandbox.exec("python3", ["-c", "print(1 + 1)"])
print(out.stdout_text) # "2\n"
# 2. stream
handle = await sandbox.exec_stream("tail", ["-f", "/var/log/app.log"])
async for event in handle:
if event.event_type == "stdout":
sys.stdout.buffer.write(event.data)
elif event.event_type == "exited":
break
await sandbox.stop()
async def exec(
cmd: str,
args: list[str] | Mapping[str, Any] | None = None,
*,
cwd: str | None = None,
user: str | None = None,
env: Mapping[str, str] | None = None,
timeout: float | None = None,
stdin: Stdin | bytes | str | None = None,
tty: bool = False,
rlimits: list[Rlimit] | None = None,
) -> ExecOutput
Run a command inside the sandbox and wait for it to complete, buffering all stdout and stderr into memory. The keyword-only options apply to this call alone and don't change the sandbox's defaults. For long-running processes or large output, use exec_stream() instead. Raises ExecTimeoutError if timeout elapses and ExecFailedError if the process can't be spawned.
out = await sandbox.exec(
"python3",
["script.py"],
cwd="/app",
env={"PYTHONPATH": "/app/lib"},
timeout=30.0,
)
print(out.stdout_text)
print(out.exit_code) # 0
async def shell(
script: str,
*,
cwd: str | None = None,
user: str | None = None,
env: Mapping[str, str] | None = None,
timeout: float | None = None,
stdin: Stdin | bytes | str | None = None,
tty: bool = False,
rlimits: list[Rlimit] | None = None,
) -> ExecOutput
Run a command through the sandbox's configured shell (defaults to /bin/sh). Shell syntax like pipes, redirects, and && chains works. Accepts the same keyword-only options as exec().
out = await sandbox.shell("ls -la /app && echo done")
print(out.stdout_text)
async def exec_stream(
cmd: str,
args: list[str] | Mapping[str, Any] | None = None,
*,
cwd: str | None = None,
user: str | None = None,
env: Mapping[str, str] | None = None,
timeout: float | None = None,
stdin: Stdin | bytes | str | None = None,
tty: bool = False,
rlimits: list[Rlimit] | None = None,
) -> ExecHandle
Run a command with streaming output. Returns an ExecHandle that emits stdout, stderr, and exit events as they happen rather than buffering everything. Takes the same per-call options as exec(). Pass stdin=Stdin.pipe() to write to the process while it runs via take_stdin().
import sys
handle = await sandbox.exec_stream("tail", ["-f", "/var/log/app.log"])
async for event in handle:
if event.event_type == "stdout":
sys.stdout.buffer.write(event.data)
elif event.event_type == "exited":
break
async def shell_stream(
script: str,
*,
cwd: str | None = None,
user: str | None = None,
env: Mapping[str, str] | None = None,
timeout: float | None = None,
stdin: Stdin | bytes | str | None = None,
tty: bool = False,
rlimits: list[Rlimit] | None = None,
) -> ExecHandle
Streaming variant of shell(): runs script through the configured shell but returns an ExecHandle instead of buffering output.
import sys
handle = await sandbox.shell_stream("for i in 1 2 3; do echo $i; sleep 1; done")
async for event in handle:
if event.event_type == "stdout":
sys.stdout.buffer.write(event.data)
async def attach(
cmd: str,
args: list[str] | None = None,
*,
cwd: str | None = None,
user: str | None = None,
env: Mapping[str, str] | None = None,
detach_keys: str | None = None,
) -> int
Bridge your terminal directly to a process inside the sandbox for a fully interactive PTY session. Press the configured detach key sequence (default Ctrl+]) to disconnect without stopping the process. Returns the process exit code.
code = await sandbox.attach("bash", cwd="/app", env={"EDITOR": "vim"})
async def attach_shell() -> int
Bridge your terminal to the sandbox's default shell in a fully interactive PTY session. Returns the shell's exit code.
<p className="msb-label">Returns</p> <div className="msb-params"> <div className="msb-param"> <div className="msb-param-key"><span className="msb-type">int</span></div> <div className="msb-param-desc">Exit code of the shell process.</div> </div> </div> <Accordion title="Example">code = await sandbox.attach_shell()
The result of a completed command execution: collected output plus exit status. All members are properties.
| Property | Type | Description |
|---|---|---|
| exit_code | int | Process exit code |
| success | bool | True when exit_code == 0 |
| stdout_text | str | Collected stdout decoded as UTF-8. Raises on invalid encoding |
| stderr_text | str | Collected stderr decoded as UTF-8. Raises on invalid encoding |
| stdout_bytes | bytes | Raw stdout bytes |
| stderr_bytes | bytes | Raw stderr bytes |
A handle to a running streaming execution. It is an async iterator, so async for event in handle: yields each ExecEvent until the stream ends.
| Property / Method | Type | Description |
|---|---|---|
| id | str | Correlation ID for this execution |
| take_stdin() | ExecSink | None | Take the stdin writer. Returns None after the first call, or when stdin wasn't piped |
| recv() | ExecEvent | None | (async) Receive the next event. Returns None when the stream ends |
| wait() | tuple[int, bool] | (async) Wait for the process to exit. Returns (code, success) |
| collect() | ExecOutput | (async) Drain remaining output and wait for exit |
| signal(sig) | None | (async) Send a POSIX signal (numeric) to the process |
| kill() | None | (async) Send SIGKILL to the process |
Writer for sending data to a running process's stdin. Obtained from ExecHandle.take_stdin() when the execution was configured with stdin=Stdin.pipe().
| Method | Parameters | Description |
|---|---|---|
| write(data) | data: bytes | (async) Write bytes to the process's stdin |
| close() | - | (async) Close stdin. The process sees EOF |
Native event object emitted by recv() and by iterating an ExecHandle. Fields that don't apply to a given event are None.
| Property | Type | Description |
|---|---|---|
| event_type | str | "started", "stdout", "stderr", "exited", "failed", or "stdin_error" |
| pid | int | None | Guest PID, set on "started" |
| data | bytes | None | Output bytes on "stdout" / "stderr", or a UTF-8 failure message on "failed" / "stdin_error" |
| code | int | None | Exit code on "exited", or errno when available on "failed" / "stdin_error" |
Frozen dataclass describing a process exit result. ExecHandle.wait() returns the same information as a (code, success) tuple.
| Field | Type | Description |
|---|---|---|
| code | int | Process exit code |
| success | bool | True when code == 0 |
Frozen dataclass with factory methods for configuring process stdin. Pass the result as the stdin argument to an exec or shell method. Raw bytes and str are also accepted directly and are sent inline.
| Factory | Parameters | Description |
|---|---|---|
| Stdin.null() | - | Connect stdin to /dev/null (default) |
| Stdin.pipe() | - | Open a writable pipe. Write via take_stdin() on the handle |
| Stdin.bytes(data) | data: bytes | Inline data sent before the process starts, then EOF |
Frozen dataclass describing a POSIX resource limit. Construct one directly or via a factory, then pass a list as the rlimits argument.
| Factory | Parameters | Description |
|---|---|---|
| Rlimit.nofile(limit) | limit: int | Max open file descriptors |
| Rlimit.cpu(secs) | secs: int | CPU time limit in seconds |
| Rlimit.as_(*, soft, hard) | soft: int, hard: int | Virtual memory size |
| Rlimit.nproc(limit) | limit: int | Max number of processes |
| Rlimit.fsize(limit) | limit: int | Max file size |
| Rlimit.memlock(limit) | limit: int | Max locked memory |
| Rlimit.stack(limit) | limit: int | Max stack size |
Fields
| Field | Type | Description |
|---|---|---|
| resource | RlimitResource | Which resource is limited |
| soft | int | Soft limit |
| hard | int | Hard limit |
String enum (enum.StrEnum) naming a limitable POSIX resource.
| Value | Description |
|---|---|
CPU | CPU time |
FSIZE | File size |
DATA | Data segment size |
STACK | Stack size |
CORE | Core file size |
RSS | Resident set size |
NPROC | Number of processes |
NOFILE | Open file descriptors |
MEMLOCK | Locked memory |
AS | Virtual memory |
LOCKS | File locks |
SIGPENDING | Pending signals |
MSGQUEUE | Message queue size |
NICE | Nice priority ceiling |
RTPRIO | Real-time priority ceiling |
RTTIME | Real-time CPU time |