apps/docs/src/content/docs/en/python-sdk/async/async-file-system.mdx
class AsyncFileSystem()
Provides file system operations within a Sandbox.
This class implements a high-level interface to file system operations that can be performed within a Daytona Sandbox.
def __init__(api_client: FileSystemApi)
Initializes a new FileSystem instance.
Arguments:
api_client FileSystemApi - API client for Sandbox file system operations.@intercept_errors(message_prefix="Failed to create folder: ")
@with_instrumentation()
async def create_folder(path: str, mode: str) -> None
Creates a new directory in the Sandbox at the specified path with the given permissions.
Arguments:
path str - Path where the folder should be created. Relative paths are resolved based
on the sandbox working directory.mode str - Folder permissions in octal format (e.g., "755" for rwxr-xr-x).Example:
# Create a directory with standard permissions
await sandbox.fs.create_folder("workspace/data", "755")
# Create a private directory
await sandbox.fs.create_folder("workspace/secrets", "700")
@intercept_errors(message_prefix="Failed to delete file: ")
@with_instrumentation()
async def delete_file(path: str, recursive: bool = False) -> None
Deletes a file from the Sandbox.
Arguments:
path str - Path to the file to delete. Relative paths are resolved based on the sandbox working directory.recursive bool - If the file is a directory, this must be true to delete it.Example:
# Delete a file
await sandbox.fs.delete_file("workspace/data/old_file.txt")
@overload
async def download_file(remote_path: str, timeout: int = 30 * 60) -> bytes
Downloads a file from the Sandbox. Returns the file contents as a bytes object. This method is useful when you want to load the file into memory without saving it to disk. It can only be used for smaller files.
Arguments:
remote_path str - Path to the file in the Sandbox. Relative paths are resolved based
on the sandbox working directory.timeout int - Timeout for the download operation in seconds. 0 means no timeout. Default is 30 minutes.Returns:
bytes - The file contents as a bytes object.Example:
# Download and save a file locally
content = await sandbox.fs.download_file("workspace/data/file.txt")
with open("local_copy.txt", "wb") as f:
f.write(content)
# Download and process text content
content = await sandbox.fs.download_file("workspace/data/config.json")
config = json.loads(content.decode('utf-8'))
@overload
async def download_file(remote_path: str,
local_path: str,
timeout: int = 30 * 60) -> None
Downloads a file from the Sandbox and saves it to a local file using stream. This method is useful when you want to download larger files that may not fit into memory.
Arguments:
remote_path str - Path to the file in the Sandbox. Relative paths are resolved based
on the sandbox working directory.local_path str - Path to save the file locally.timeout int - Timeout for the download operation in seconds. 0 means no timeout. Default is 30 minutes.Example:
local_path = "local_copy.txt"
await sandbox.fs.download_file("tmp/large_file.txt", local_path)
size_mb = os.path.getsize(local_path) / 1024 / 1024
print(f"Size of the downloaded file {local_path}: {size_mb} MB")
@intercept_errors(message_prefix="Failed to download file: ")
@with_instrumentation()
async def download_file_stream(
remote_path: str,
timeout: int = 30 * 60,
on_progress: Callable[[DownloadProgress], Awaitable[None] | None]
| None = None,
cancel_event: CancelEvent | None = None) -> AsyncIterator[bytes]
Downloads a single file from the Sandbox as a stream without buffering the entire file into memory. Returns an async iterator that yields file content in chunks, which can be piped directly to an HTTP response, written to a file incrementally, or processed on the fly.
Arguments:
remote_path str - Path to the file in the Sandbox. Relative paths are resolved based
on the sandbox working directory.timeout int - Timeout for the download operation in seconds. 0 means no timeout.
Default is 30 minutes.on_progress Callable[[DownloadProgress], Awaitable[None] | None] | None - Optional
callback invoked with cumulative bytes received and total bytes, when known, as
the download progresses. May be either a regular function or an async def
coroutine — coroutine returns are awaited before the next chunk is yielded.
Default is None.cancel_event CancelEvent | None - Optional asyncio.Event-compatible token. When
set during streaming, the next chunk raises DaytonaError and the underlying
HTTP connection is closed. Standard asyncio.CancelledError from task
cancellation is also honoured automatically by the generator.Returns:
AsyncIterator[bytes] - An async iterator yielding chunks of file content as bytes.Raises:
DaytonaError - If the file does not exist, access is denied, or the download is
cancelled via cancel_event.Example:
# Stream to a local file without loading into memory
async with aiofiles.open("local_copy.bin", "wb") as f:
async for chunk in await sandbox.fs.download_file_stream("workspace/large-file.bin"):
await f.write(chunk)
# Cancel a download from another coroutine
import asyncio
cancel = asyncio.Event()
asyncio.get_running_loop().call_later(5.0, cancel.set)
async for chunk in await sandbox.fs.download_file_stream("workspace/big.bin", cancel_event=cancel):
process(chunk)
@intercept_errors(message_prefix="Failed to download files: ")
@with_instrumentation()
async def download_files(files: list[FileDownloadRequest],
timeout: int = 30 * 60) -> list[FileDownloadResponse]
Downloads multiple files from the Sandbox. If the files already exist locally, they will be overwritten.
Arguments:
files list[FileDownloadRequest] - List of files to download.timeout int - Timeout for the download operation in seconds. 0 means no timeout. Default is 30 minutes.Returns:
list[FileDownloadResponse] - List of download results.Raises:
Exception - Only if the request itself fails (network issues, invalid request/response, etc.). Individual
file download errors are returned in FileDownloadResponse.error. When the daemon provides structured
per-file metadata, it is also available in FileDownloadResponse.error_details.Example:
# Download multiple files
results = await sandbox.fs.download_files([
FileDownloadRequest(source="tmp/data.json"),
FileDownloadRequest(source="tmp/config.json", destination="local_config.json")
])
for result in results:
if result.error:
print(f"Error downloading {result.source}: {result.error}")
elif result.result:
print(f"Downloaded {result.source} to {result.result}")
@intercept_errors(message_prefix="Failed to find files: ")
@with_instrumentation()
async def find_files(path: str, pattern: str) -> list[Match]
Searches for files containing a pattern, similar to the grep command.
Arguments:
path str - Path to the file or directory to search. If the path is a directory,
the search will be performed recursively. Relative paths are resolved based
on the sandbox working directory.pattern str - Search pattern to match against file contents.Returns:
list[Match] - List of matches found in files. Each Match object includes:
Example:
# Search for TODOs in Python files
matches = await sandbox.fs.find_files("workspace/src", "TODO:")
for match in matches:
print(f"{match.file}:{match.line}: {match.content.strip()}")
@intercept_errors(message_prefix="Failed to get file info: ")
@with_instrumentation()
async def get_file_info(path: str) -> FileInfo
Gets detailed information about a file or directory, including its size, permissions, and timestamps.
Arguments:
path str - Path to the file or directory. Relative paths are resolved based
on the sandbox working directory.Returns:
FileInfo - Detailed file information including:
Example:
# Get file metadata
info = await sandbox.fs.get_file_info("workspace/data/file.txt")
print(f"Size: {info.size} bytes")
print(f"Modified: {info.mod_time}")
print(f"Mode: {info.mode}")
# Check if path is a directory
info = await sandbox.fs.get_file_info("workspace/data")
if info.is_dir:
print("Path is a directory")
@intercept_errors(message_prefix="Failed to list files: ")
@with_instrumentation()
async def list_files(path: str) -> list[FileInfo]
Lists files and directories in a given path and returns their information, similar to the ls -l command.
Arguments:
path str - Path to the directory to list contents from. Relative paths are resolved
based on the sandbox working directory.Returns:
list[FileInfo] - List of file and directory information. Each FileInfo
object includes the same fields as described in get_file_info().Example:
# List directory contents
files = await sandbox.fs.list_files("workspace/data")
# Print files and their sizes
for file in files:
if not file.is_dir:
print(f"{file.name}: {file.size} bytes")
# List only directories
dirs = [f for f in files if f.is_dir]
print("Subdirectories:", ", ".join(d.name for d in dirs))
@intercept_errors(message_prefix="Failed to move files: ")
@with_instrumentation()
async def move_files(source: str, destination: str) -> None
Moves or renames a file or directory. The parent directory of the destination must exist.
Arguments:
source str - Path to the source file or directory. Relative paths are resolved
based on the sandbox working directory.destination str - Path to the destination. Relative paths are resolved based on
the sandbox working directory.Example:
# Rename a file
await sandbox.fs.move_files(
"workspace/data/old_name.txt",
"workspace/data/new_name.txt"
)
# Move a file to a different directory
await sandbox.fs.move_files(
"workspace/data/file.txt",
"workspace/archive/file.txt"
)
# Move a directory
await sandbox.fs.move_files(
"workspace/old_dir",
"workspace/new_dir"
)
@intercept_errors(message_prefix="Failed to replace in files: ")
@with_instrumentation()
async def replace_in_files(files: list[str], pattern: str,
new_value: str) -> list[ReplaceResult]
Performs search and replace operations across multiple files.
Arguments:
files list[str] - List of file paths to perform replacements in. Relative paths are
resolved based on the sandbox working directory.pattern str - Pattern to search for.new_value str - Text to replace matches with.Returns:
list[ReplaceResult] - List of results indicating replacements made in
each file. Each ReplaceResult includes:
Example:
# Replace in specific files
results = await sandbox.fs.replace_in_files(
files=["workspace/src/file1.py", "workspace/src/file2.py"],
pattern="old_function",
new_value="new_function"
)
# Print results
for result in results:
if result.success:
print(f"{result.file}: {result.success}")
else:
print(f"{result.file}: {result.error}")
@intercept_errors(message_prefix="Failed to search files: ")
@with_instrumentation()
async def search_files(path: str, pattern: str) -> SearchFilesResponse
Searches for files and directories whose names match the specified pattern. The pattern can be a simple string or a glob pattern.
Arguments:
path str - Path to the root directory to start search from. Relative paths are resolved
based on the sandbox working directory.pattern str - Pattern to match against file names. Supports glob
patterns (e.g., "*.py" for Python files).Returns:
SearchFilesResponse - Search results containing:
Example:
# Find all Python files
result = await sandbox.fs.search_files("workspace", "*.py")
for file in result.files:
print(file)
# Find files with specific prefix
result = await sandbox.fs.search_files("workspace/data", "test_*")
print(f"Found {len(result.files)} test files")
@intercept_errors(message_prefix="Failed to set file permissions: ")
@with_instrumentation()
async def set_file_permissions(path: str,
mode: str | None = None,
owner: str | None = None,
group: str | None = None) -> None
Sets permissions and ownership for a file or directory. Any of the parameters can be None to leave that attribute unchanged.
Arguments:
path str - Path to the file or directory. Relative paths are resolved based on
the sandbox working directory.mode str | None - File mode/permissions in octal format
(e.g., "644" for rw-r--r--).owner str | None - User owner of the file.group str | None - Group owner of the file.Example:
# Make a file executable
await sandbox.fs.set_file_permissions(
path="workspace/scripts/run.sh",
mode="755" # rwxr-xr-x
)
# Change file owner
await sandbox.fs.set_file_permissions(
path="workspace/data/file.txt",
owner="daytona",
group="daytona"
)
@overload
async def upload_file(file: bytes,
remote_path: str,
timeout: int = 30 * 60) -> None
Uploads a file to the specified path in the Sandbox. If a file already exists at the destination path, it will be overwritten. This method is useful when you want to upload small files that fit into memory.
Arguments:
file bytes - File contents as a bytes object.remote_path str - Path to the destination file. Relative paths are resolved based on
the sandbox working directory.timeout int - Timeout for the upload operation in seconds. 0 means no timeout. Default is 30 minutes.Example:
# Upload a text file
content = b"Hello, World!"
await sandbox.fs.upload_file(content, "tmp/hello.txt")
# Upload a local file
with open("local_file.txt", "rb") as f:
content = f.read()
await sandbox.fs.upload_file(content, "tmp/file.txt")
# Upload binary data
import json
data = {"key": "value"}
content = json.dumps(data).encode('utf-8')
await sandbox.fs.upload_file(content, "tmp/config.json")
@overload
async def upload_file(local_path: str,
remote_path: str,
timeout: int = 30 * 60) -> None
Uploads a file from the local file system to the specified path in the Sandbox. If a file already exists at the destination path, it will be overwritten. This method uses streaming to upload the file, so it is useful when you want to upload larger files that may not fit into memory.
Arguments:
local_path str - Path to the local file to upload.remote_path str - Path to the destination file in the Sandbox. Relative paths are
resolved based on the sandbox working directory.timeout int - Timeout for the upload operation in seconds. 0 means no timeout. Default is 30 minutes.Example:
await sandbox.fs.upload_file("local_file.txt", "tmp/large_file.txt")
@intercept_errors(message_prefix="Failed to upload files: ")
@with_instrumentation()
async def upload_files(files: list[FileUpload],
timeout: int = 30 * 60) -> None
Uploads multiple files to the Sandbox. If files already exist at the destination paths, they will be overwritten.
Arguments:
files list[FileUpload] - List of files to upload.timeout int - Timeout for the upload operation in seconds. 0 means no timeout. Default is 30 minutes.Example:
# Upload multiple text files
files = [
FileUpload(
source=b"Content of file 1",
destination="/tmp/file1.txt"
),
FileUpload(
source="workspace/data/file2.txt",
destination="/tmp/file2.txt"
),
FileUpload(
source=b'{"key": "value"}',
destination="/tmp/config.json"
)
]
await sandbox.fs.upload_files(files)
@intercept_errors(message_prefix="Failed to upload file: ")
@with_instrumentation()
async def upload_file_stream(source: bytes | bytearray | str | io.IOBase
| AsyncIterable[bytes] | object,
remote_path: str,
timeout: int = 30 * 60,
on_progress: Callable[[UploadProgress],
Awaitable[None] | None]
| None = None,
cancel_event: CancelEvent | None = None) -> None
Uploads a single file to the Sandbox using true streaming, with optional progress tracking and cancellation. Memory usage stays flat regardless of source size. The HTTP layer uses chunked transfer encoding, so the source's natural EOF terminates the upload — no advance size is needed.
Arguments:
source - Data to upload. Accepts:
bytes / bytearray — uploaded from memory.str — treated as a local file path and read in chunks..read(n) -> bytes) — streamed as-is.async def read(n) -> bytes,
e.g. an aiofiles handle) — streamed without ever blocking the loop.AsyncIterable[bytes] — yielded chunks are forwarded to the wire.remote_path str - Destination path in the Sandbox.
timeout int - Timeout in seconds. 0 means no timeout. Default is 30 minutes.
on_progress Callable[[UploadProgress], Awaitable[None] | None] | None - Optional
callback invoked with cumulative bytes sent. May be sync or async def
when paired with an async source (async file-like or AsyncIterable[bytes]).
Sync sources (bytes, str path, sync file-like) require a sync callback
because the underlying httpx multipart serializer pulls bytes through a
synchronous .read(); passing an async callback alongside a sync source
raises DaytonaError.
cancel_event CancelEvent | None - Optional asyncio.Event-compatible token.
When set during streaming, the next chunk raises DaytonaError and
tears down the request. Standard asyncio.CancelledError from task
cancellation is also honoured automatically.
Raises:
DaytonaError - If the upload fails or is cancelled via cancel_event.Example:
import aiofiles, asyncio
cancel = asyncio.Event()
async with aiofiles.open("large_dataset.csv", "rb") as f:
await sandbox.fs.upload_file_stream(
f,
"tmp/dataset.csv",
on_progress=lambda p: print(f"{p.bytes_sent} bytes sent"),
cancel_event=cancel,
)
@runtime_checkable
class CancelEvent(Protocol)
Duck-typed cancellation token. Compatible with threading.Event and
asyncio.Event (both expose is_set()). When supplied to a streaming
download, the next chunk read after the event becomes set raises
DaytonaError, closing the underlying HTTP connection.
@dataclass
class FileUpload()
Represents a file to be uploaded to the Sandbox.
Attributes:
source bytes | str - File contents as a bytes object or a local file path. If a bytes object is provided,
make sure it fits into memory, otherwise use the local file path which content will be streamed to the Sandbox.destination str - Absolute destination path in the Sandbox. Relative paths are resolved based on
the sandbox working directory.@dataclass
class DownloadProgress()
Progress information for a streaming download.
Attributes:
bytes_received int - Cumulative bytes received so far.total_bytes int | None - Total bytes expected, if known.@dataclass
class UploadProgress()
Progress information for a streaming upload.
Attributes:
bytes_sent int - Cumulative bytes sent so far.@dataclass
class FileDownloadRequest()
Represents a request to download a single file from the Sandbox.
Attributes:
source str - Source path in the Sandbox. Relative paths are resolved based on the user's
root directory.destination str | None - Destination path in the local filesystem where the file content will be
streamed to. If not provided, the file will be downloaded in the bytes buffer
(might cause memory issues if the file is large).@dataclass
class FileDownloadResponse()
Represents the response to a single file download request.
Attributes:
source str - The original source path requested for download.result str | bytes | None - The download result - file path (if destination provided in the request)
or bytes content (if no destination in the request), None if failed or no data received.error str | None - Error message if the download failed, None if successful.error_details FileDownloadErrorDetails | None - Structured error metadata when the server provides it.@dataclass
class FileDownloadErrorDetails()
Structured error metadata for a failed bulk file download item.
def create_file_download_error(response: FileDownloadResponse) -> DaytonaError
Create the appropriate Daytona exception for a failed file download response.
def raise_if_stream_error(remote_path: str, error_text: str | None,
error_details: FileDownloadErrorDetails | None,
received_file_data: bool) -> None
Raise the appropriate error after streaming a single-file multipart download.
def parse_file_download_error_payload(
payload: bytes, content_type: str | None
) -> tuple[str, FileDownloadErrorDetails | None]
Parse a bulk-download error part into the legacy message and structured metadata.