Back to Daytona

AsyncFileSystem

apps/docs/src/content/docs/en/python-sdk/async/async-file-system.mdx

0.175.023.4 KB
Original Source

AsyncFileSystem

python
class AsyncFileSystem()

Provides file system operations within a Sandbox.

This class implements a high-level interface to file system operations that can be performed within a Daytona Sandbox.

AsyncFileSystem.__init__

python
def __init__(api_client: FileSystemApi)

Initializes a new FileSystem instance.

Arguments:

  • api_client FileSystemApi - API client for Sandbox file system operations.

AsyncFileSystem.create_folder

python
@intercept_errors(message_prefix="Failed to create folder: ")
@with_instrumentation()
async def create_folder(path: str, mode: str) -> None

Creates a new directory in the Sandbox at the specified path with the given permissions.

Arguments:

  • path str - Path where the folder should be created. Relative paths are resolved based on the sandbox working directory.
  • mode str - Folder permissions in octal format (e.g., "755" for rwxr-xr-x).

Example:

python
# Create a directory with standard permissions
await sandbox.fs.create_folder("workspace/data", "755")

# Create a private directory
await sandbox.fs.create_folder("workspace/secrets", "700")

AsyncFileSystem.delete_file

python
@intercept_errors(message_prefix="Failed to delete file: ")
@with_instrumentation()
async def delete_file(path: str, recursive: bool = False) -> None

Deletes a file from the Sandbox.

Arguments:

  • path str - Path to the file to delete. Relative paths are resolved based on the sandbox working directory.
  • recursive bool - If the file is a directory, this must be true to delete it.

Example:

python
# Delete a file
await sandbox.fs.delete_file("workspace/data/old_file.txt")

AsyncFileSystem.download_file

python
@overload
async def download_file(remote_path: str, timeout: int = 30 * 60) -> bytes

Downloads a file from the Sandbox. Returns the file contents as a bytes object. This method is useful when you want to load the file into memory without saving it to disk. It can only be used for smaller files.

Arguments:

  • remote_path str - Path to the file in the Sandbox. Relative paths are resolved based on the sandbox working directory.
  • timeout int - Timeout for the download operation in seconds. 0 means no timeout. Default is 30 minutes.

Returns:

  • bytes - The file contents as a bytes object.

Example:

python
# Download and save a file locally
content = await sandbox.fs.download_file("workspace/data/file.txt")
with open("local_copy.txt", "wb") as f:
    f.write(content)

# Download and process text content
content = await sandbox.fs.download_file("workspace/data/config.json")
config = json.loads(content.decode('utf-8'))

AsyncFileSystem.download_file

python
@overload
async def download_file(remote_path: str,
                        local_path: str,
                        timeout: int = 30 * 60) -> None

Downloads a file from the Sandbox and saves it to a local file using stream. This method is useful when you want to download larger files that may not fit into memory.

Arguments:

  • remote_path str - Path to the file in the Sandbox. Relative paths are resolved based on the sandbox working directory.
  • local_path str - Path to save the file locally.
  • timeout int - Timeout for the download operation in seconds. 0 means no timeout. Default is 30 minutes.

Example:

python
local_path = "local_copy.txt"
await sandbox.fs.download_file("tmp/large_file.txt", local_path)
size_mb = os.path.getsize(local_path) / 1024 / 1024
print(f"Size of the downloaded file {local_path}: {size_mb} MB")

AsyncFileSystem.download_file_stream

python
@intercept_errors(message_prefix="Failed to download file: ")
@with_instrumentation()
async def download_file_stream(
        remote_path: str,
        timeout: int = 30 * 60,
        on_progress: Callable[[DownloadProgress], Awaitable[None] | None]
    | None = None,
        cancel_event: CancelEvent | None = None) -> AsyncIterator[bytes]

Downloads a single file from the Sandbox as a stream without buffering the entire file into memory. Returns an async iterator that yields file content in chunks, which can be piped directly to an HTTP response, written to a file incrementally, or processed on the fly.

Arguments:

  • remote_path str - Path to the file in the Sandbox. Relative paths are resolved based on the sandbox working directory.
  • timeout int - Timeout for the download operation in seconds. 0 means no timeout. Default is 30 minutes.
  • on_progress Callable[[DownloadProgress], Awaitable[None] | None] | None - Optional callback invoked with cumulative bytes received and total bytes, when known, as the download progresses. May be either a regular function or an async def coroutine — coroutine returns are awaited before the next chunk is yielded. Default is None.
  • cancel_event CancelEvent | None - Optional asyncio.Event-compatible token. When set during streaming, the next chunk raises DaytonaError and the underlying HTTP connection is closed. Standard asyncio.CancelledError from task cancellation is also honoured automatically by the generator.

Returns:

  • AsyncIterator[bytes] - An async iterator yielding chunks of file content as bytes.

Raises:

  • DaytonaError - If the file does not exist, access is denied, or the download is cancelled via cancel_event.

Example:

python
# Stream to a local file without loading into memory
async with aiofiles.open("local_copy.bin", "wb") as f:
    async for chunk in await sandbox.fs.download_file_stream("workspace/large-file.bin"):
        await f.write(chunk)

# Cancel a download from another coroutine
import asyncio
cancel = asyncio.Event()
asyncio.get_running_loop().call_later(5.0, cancel.set)
async for chunk in await sandbox.fs.download_file_stream("workspace/big.bin", cancel_event=cancel):
    process(chunk)

AsyncFileSystem.download_files

python
@intercept_errors(message_prefix="Failed to download files: ")
@with_instrumentation()
async def download_files(files: list[FileDownloadRequest],
                         timeout: int = 30 * 60) -> list[FileDownloadResponse]

Downloads multiple files from the Sandbox. If the files already exist locally, they will be overwritten.

Arguments:

  • files list[FileDownloadRequest] - List of files to download.
  • timeout int - Timeout for the download operation in seconds. 0 means no timeout. Default is 30 minutes.

Returns:

  • list[FileDownloadResponse] - List of download results.

Raises:

  • Exception - Only if the request itself fails (network issues, invalid request/response, etc.). Individual file download errors are returned in FileDownloadResponse.error. When the daemon provides structured per-file metadata, it is also available in FileDownloadResponse.error_details.

Example:

python
# Download multiple files
results = await sandbox.fs.download_files([
    FileDownloadRequest(source="tmp/data.json"),
    FileDownloadRequest(source="tmp/config.json", destination="local_config.json")
])
for result in results:
    if result.error:
        print(f"Error downloading {result.source}: {result.error}")
    elif result.result:
        print(f"Downloaded {result.source} to {result.result}")

AsyncFileSystem.find_files

python
@intercept_errors(message_prefix="Failed to find files: ")
@with_instrumentation()
async def find_files(path: str, pattern: str) -> list[Match]

Searches for files containing a pattern, similar to the grep command.

Arguments:

  • path str - Path to the file or directory to search. If the path is a directory, the search will be performed recursively. Relative paths are resolved based on the sandbox working directory.
  • pattern str - Search pattern to match against file contents.

Returns:

  • list[Match] - List of matches found in files. Each Match object includes:
    • file: Path to the file containing the match
    • line: The line number where the match was found
    • content: The matching line content

Example:

python
# Search for TODOs in Python files
matches = await sandbox.fs.find_files("workspace/src", "TODO:")
for match in matches:
    print(f"{match.file}:{match.line}: {match.content.strip()}")

AsyncFileSystem.get_file_info

python
@intercept_errors(message_prefix="Failed to get file info: ")
@with_instrumentation()
async def get_file_info(path: str) -> FileInfo

Gets detailed information about a file or directory, including its size, permissions, and timestamps.

Arguments:

  • path str - Path to the file or directory. Relative paths are resolved based on the sandbox working directory.

Returns:

  • FileInfo - Detailed file information including:
    • name: File name
    • is_dir: Whether the path is a directory
    • size: File size in bytes
    • mode: File permissions
    • mod_time: Last modification timestamp
    • permissions: File permissions in octal format
    • owner: File owner
    • group: File group

Example:

python
# Get file metadata
info = await sandbox.fs.get_file_info("workspace/data/file.txt")
print(f"Size: {info.size} bytes")
print(f"Modified: {info.mod_time}")
print(f"Mode: {info.mode}")

# Check if path is a directory
info = await sandbox.fs.get_file_info("workspace/data")
if info.is_dir:
    print("Path is a directory")

AsyncFileSystem.list_files

python
@intercept_errors(message_prefix="Failed to list files: ")
@with_instrumentation()
async def list_files(path: str) -> list[FileInfo]

Lists files and directories in a given path and returns their information, similar to the ls -l command.

Arguments:

  • path str - Path to the directory to list contents from. Relative paths are resolved based on the sandbox working directory.

Returns:

  • list[FileInfo] - List of file and directory information. Each FileInfo object includes the same fields as described in get_file_info().

Example:

python
# List directory contents
files = await sandbox.fs.list_files("workspace/data")

# Print files and their sizes
for file in files:
    if not file.is_dir:
        print(f"{file.name}: {file.size} bytes")

# List only directories
dirs = [f for f in files if f.is_dir]
print("Subdirectories:", ", ".join(d.name for d in dirs))

AsyncFileSystem.move_files

python
@intercept_errors(message_prefix="Failed to move files: ")
@with_instrumentation()
async def move_files(source: str, destination: str) -> None

Moves or renames a file or directory. The parent directory of the destination must exist.

Arguments:

  • source str - Path to the source file or directory. Relative paths are resolved based on the sandbox working directory.
  • destination str - Path to the destination. Relative paths are resolved based on the sandbox working directory.

Example:

python
# Rename a file
await sandbox.fs.move_files(
    "workspace/data/old_name.txt",
    "workspace/data/new_name.txt"
)

# Move a file to a different directory
await sandbox.fs.move_files(
    "workspace/data/file.txt",
    "workspace/archive/file.txt"
)

# Move a directory
await sandbox.fs.move_files(
    "workspace/old_dir",
    "workspace/new_dir"
)

AsyncFileSystem.replace_in_files

python
@intercept_errors(message_prefix="Failed to replace in files: ")
@with_instrumentation()
async def replace_in_files(files: list[str], pattern: str,
                           new_value: str) -> list[ReplaceResult]

Performs search and replace operations across multiple files.

Arguments:

  • files list[str] - List of file paths to perform replacements in. Relative paths are resolved based on the sandbox working directory.
  • pattern str - Pattern to search for.
  • new_value str - Text to replace matches with.

Returns:

  • list[ReplaceResult] - List of results indicating replacements made in each file. Each ReplaceResult includes:
    • file: Path to the modified file
    • success: Whether the operation was successful
    • error: Error message if the operation failed

Example:

python
# Replace in specific files
results = await sandbox.fs.replace_in_files(
    files=["workspace/src/file1.py", "workspace/src/file2.py"],
    pattern="old_function",
    new_value="new_function"
)

# Print results
for result in results:
    if result.success:
        print(f"{result.file}: {result.success}")
    else:
        print(f"{result.file}: {result.error}")

AsyncFileSystem.search_files

python
@intercept_errors(message_prefix="Failed to search files: ")
@with_instrumentation()
async def search_files(path: str, pattern: str) -> SearchFilesResponse

Searches for files and directories whose names match the specified pattern. The pattern can be a simple string or a glob pattern.

Arguments:

  • path str - Path to the root directory to start search from. Relative paths are resolved based on the sandbox working directory.
  • pattern str - Pattern to match against file names. Supports glob patterns (e.g., "*.py" for Python files).

Returns:

  • SearchFilesResponse - Search results containing:
    • files: List of matching file and directory paths

Example:

python
# Find all Python files
result = await sandbox.fs.search_files("workspace", "*.py")
for file in result.files:
    print(file)

# Find files with specific prefix
result = await sandbox.fs.search_files("workspace/data", "test_*")
print(f"Found {len(result.files)} test files")

AsyncFileSystem.set_file_permissions

python
@intercept_errors(message_prefix="Failed to set file permissions: ")
@with_instrumentation()
async def set_file_permissions(path: str,
                               mode: str | None = None,
                               owner: str | None = None,
                               group: str | None = None) -> None

Sets permissions and ownership for a file or directory. Any of the parameters can be None to leave that attribute unchanged.

Arguments:

  • path str - Path to the file or directory. Relative paths are resolved based on the sandbox working directory.
  • mode str | None - File mode/permissions in octal format (e.g., "644" for rw-r--r--).
  • owner str | None - User owner of the file.
  • group str | None - Group owner of the file.

Example:

python
# Make a file executable
await sandbox.fs.set_file_permissions(
    path="workspace/scripts/run.sh",
    mode="755"  # rwxr-xr-x
)

# Change file owner
await sandbox.fs.set_file_permissions(
    path="workspace/data/file.txt",
    owner="daytona",
    group="daytona"
)

AsyncFileSystem.upload_file

python
@overload
async def upload_file(file: bytes,
                      remote_path: str,
                      timeout: int = 30 * 60) -> None

Uploads a file to the specified path in the Sandbox. If a file already exists at the destination path, it will be overwritten. This method is useful when you want to upload small files that fit into memory.

Arguments:

  • file bytes - File contents as a bytes object.
  • remote_path str - Path to the destination file. Relative paths are resolved based on the sandbox working directory.
  • timeout int - Timeout for the upload operation in seconds. 0 means no timeout. Default is 30 minutes.

Example:

python
# Upload a text file
content = b"Hello, World!"
await sandbox.fs.upload_file(content, "tmp/hello.txt")

# Upload a local file
with open("local_file.txt", "rb") as f:
    content = f.read()
await sandbox.fs.upload_file(content, "tmp/file.txt")

# Upload binary data
import json
data = {"key": "value"}
content = json.dumps(data).encode('utf-8')
await sandbox.fs.upload_file(content, "tmp/config.json")

AsyncFileSystem.upload_file

python
@overload
async def upload_file(local_path: str,
                      remote_path: str,
                      timeout: int = 30 * 60) -> None

Uploads a file from the local file system to the specified path in the Sandbox. If a file already exists at the destination path, it will be overwritten. This method uses streaming to upload the file, so it is useful when you want to upload larger files that may not fit into memory.

Arguments:

  • local_path str - Path to the local file to upload.
  • remote_path str - Path to the destination file in the Sandbox. Relative paths are resolved based on the sandbox working directory.
  • timeout int - Timeout for the upload operation in seconds. 0 means no timeout. Default is 30 minutes.

Example:

python
await sandbox.fs.upload_file("local_file.txt", "tmp/large_file.txt")

AsyncFileSystem.upload_files

python
@intercept_errors(message_prefix="Failed to upload files: ")
@with_instrumentation()
async def upload_files(files: list[FileUpload],
                       timeout: int = 30 * 60) -> None

Uploads multiple files to the Sandbox. If files already exist at the destination paths, they will be overwritten.

Arguments:

  • files list[FileUpload] - List of files to upload.
  • timeout int - Timeout for the upload operation in seconds. 0 means no timeout. Default is 30 minutes.

Example:

python
# Upload multiple text files
files = [
    FileUpload(
        source=b"Content of file 1",
        destination="/tmp/file1.txt"
    ),
    FileUpload(
        source="workspace/data/file2.txt",
        destination="/tmp/file2.txt"
    ),
    FileUpload(
        source=b'{"key": "value"}',
        destination="/tmp/config.json"
    )
]
await sandbox.fs.upload_files(files)

AsyncFileSystem.upload_file_stream

python
@intercept_errors(message_prefix="Failed to upload file: ")
@with_instrumentation()
async def upload_file_stream(source: bytes | bytearray | str | io.IOBase
                             | AsyncIterable[bytes] | object,
                             remote_path: str,
                             timeout: int = 30 * 60,
                             on_progress: Callable[[UploadProgress],
                                                   Awaitable[None] | None]
                             | None = None,
                             cancel_event: CancelEvent | None = None) -> None

Uploads a single file to the Sandbox using true streaming, with optional progress tracking and cancellation. Memory usage stays flat regardless of source size. The HTTP layer uses chunked transfer encoding, so the source's natural EOF terminates the upload — no advance size is needed.

Arguments:

  • source - Data to upload. Accepts:

    • bytes / bytearray — uploaded from memory.
    • str — treated as a local file path and read in chunks.
    • sync file-like (anything with .read(n) -> bytes) — streamed as-is.
    • async file-like (anything with async def read(n) -> bytes, e.g. an aiofiles handle) — streamed without ever blocking the loop.
    • AsyncIterable[bytes] — yielded chunks are forwarded to the wire.
  • remote_path str - Destination path in the Sandbox.

  • timeout int - Timeout in seconds. 0 means no timeout. Default is 30 minutes.

  • on_progress Callable[[UploadProgress], Awaitable[None] | None] | None - Optional callback invoked with cumulative bytes sent. May be sync or async def when paired with an async source (async file-like or AsyncIterable[bytes]). Sync sources (bytes, str path, sync file-like) require a sync callback because the underlying httpx multipart serializer pulls bytes through a synchronous .read(); passing an async callback alongside a sync source raises DaytonaError.

  • cancel_event CancelEvent | None - Optional asyncio.Event-compatible token. When set during streaming, the next chunk raises DaytonaError and tears down the request. Standard asyncio.CancelledError from task cancellation is also honoured automatically.

Raises:

  • DaytonaError - If the upload fails or is cancelled via cancel_event.

Example:

python
import aiofiles, asyncio
cancel = asyncio.Event()
async with aiofiles.open("large_dataset.csv", "rb") as f:
    await sandbox.fs.upload_file_stream(
        f,
        "tmp/dataset.csv",
        on_progress=lambda p: print(f"{p.bytes_sent} bytes sent"),
        cancel_event=cancel,
    )

CancelEvent

python
@runtime_checkable
class CancelEvent(Protocol)

Duck-typed cancellation token. Compatible with threading.Event and asyncio.Event (both expose is_set()). When supplied to a streaming download, the next chunk read after the event becomes set raises DaytonaError, closing the underlying HTTP connection.

FileUpload

python
@dataclass
class FileUpload()

Represents a file to be uploaded to the Sandbox.

Attributes:

  • source bytes | str - File contents as a bytes object or a local file path. If a bytes object is provided, make sure it fits into memory, otherwise use the local file path which content will be streamed to the Sandbox.
  • destination str - Absolute destination path in the Sandbox. Relative paths are resolved based on the sandbox working directory.

DownloadProgress

python
@dataclass
class DownloadProgress()

Progress information for a streaming download.

Attributes:

  • bytes_received int - Cumulative bytes received so far.
  • total_bytes int | None - Total bytes expected, if known.

UploadProgress

python
@dataclass
class UploadProgress()

Progress information for a streaming upload.

Attributes:

  • bytes_sent int - Cumulative bytes sent so far.

FileDownloadRequest

python
@dataclass
class FileDownloadRequest()

Represents a request to download a single file from the Sandbox.

Attributes:

  • source str - Source path in the Sandbox. Relative paths are resolved based on the user's root directory.
  • destination str | None - Destination path in the local filesystem where the file content will be streamed to. If not provided, the file will be downloaded in the bytes buffer (might cause memory issues if the file is large).

FileDownloadResponse

python
@dataclass
class FileDownloadResponse()

Represents the response to a single file download request.

Attributes:

  • source str - The original source path requested for download.
  • result str | bytes | None - The download result - file path (if destination provided in the request) or bytes content (if no destination in the request), None if failed or no data received.
  • error str | None - Error message if the download failed, None if successful.
  • error_details FileDownloadErrorDetails | None - Structured error metadata when the server provides it.

FileDownloadErrorDetails

python
@dataclass
class FileDownloadErrorDetails()

Structured error metadata for a failed bulk file download item.

create_file_download_error

python
def create_file_download_error(response: FileDownloadResponse) -> DaytonaError

Create the appropriate Daytona exception for a failed file download response.

raise_if_stream_error

python
def raise_if_stream_error(remote_path: str, error_text: str | None,
                          error_details: FileDownloadErrorDetails | None,
                          received_file_data: bool) -> None

Raise the appropriate error after streaming a single-file multipart download.

parse_file_download_error_payload

python
def parse_file_download_error_payload(
        payload: bytes, content_type: str | None
) -> tuple[str, FileDownloadErrorDetails | None]

Parse a bulk-download error part into the legacy message and structured metadata.