Back to Daft

Generic File Source Options

docs/connectors/generic-file-source-options.md

0.7.165.9 KB
Original Source

Generic File Source Options

These options apply to read_parquet, read_csv, and read_iceberg. They are not tied to any single connector or format. Other readers (read_json, read_warc, read_text) do not support these options.

Ignoring Corrupt Files

When reading large collections of files, some files may be unreadable — corrupt, truncated, or deleted between the time Daft lists them and the time it reads them. By default, Daft raises an error and halts the query. The ignore_corrupt_files option changes that behavior: qualifying files are silently skipped and the query continues with the remaining data.

Enabling ignore_corrupt_files

Pass ignore_corrupt_files=True to any of the supported reader functions:

python
import daft

# Parquet / CSV (glob-based)
df = daft.read_parquet("s3://my-bucket/data/**/*.parquet", ignore_corrupt_files=True)
df = daft.read_csv("s3://my-bucket/data/**/*.csv", ignore_corrupt_files=True)

# Iceberg
import pyiceberg
table = pyiceberg.table.StaticTable.from_metadata("s3://bucket/iceberg/metadata.json")
df = daft.read_iceberg(table, ignore_corrupt_files=True)

df.collect()

What counts as "corrupt"

Daft skips a file when it encounters a problem that is specific to the file itself and cannot be resolved by retrying:

CategoryExamples
Invalid formatBad Parquet magic bytes, truncated footer, mismatched row/column counts
Corrupt dataUnreadable row group, invalid CSV encoding, wrong field count in a row
Missing fileFile deleted between listing and reading (e.g. concurrent compaction or partition overwrite)

Daft does not skip files for transient infrastructure problems, because those can and should be retried:

CategoryExamples
Network errorsConnection reset, read timeout, throttled I/O
Permission errorsAccess denied, insufficient credentials

This distinction matters. Silently retrying a permission error would mask a misconfiguration that needs human attention.

Observability: knowing what was skipped

ignore_corrupt_files is designed around the principle that errors should be visible, not hidden. Daft provides two complementary observability mechanisms.

Python warning logs

Daft emits a WARNING-level log message for every skipped file, including the file path and the reason:

WARNING daft.io - Skipping corrupt Parquet file s3://my-bucket/data/bad.parquet: ...
WARNING daft.io - Skipping corrupt CSV chunk in s3://my-bucket/data/partial.csv: ...

You can see these with standard Python logging:

python
import logging
logging.basicConfig(level=logging.WARNING)

df.skipped_corrupt_files — programmatic access

After materializing the dataframe with .collect(), the skipped_corrupt_files property returns the list of skipped (path, reason) pairs as structured data, so your pipeline code can act on them:

python
df = daft.read_parquet("s3://my-bucket/data/**/*.parquet", ignore_corrupt_files=True)
df.collect()

skipped = df.skipped_corrupt_files  # list[tuple[str, str, bool]]
for path, reason, partial in skipped:
    tag = " (partial)" if partial else ""
    print(f"Skipped{tag}: {path}\n  Reason: {reason}")

Each entry is a (path, reason, partial) tuple. When partial is True, some batches from the file were already emitted before the corruption was detected — the file was not fully skipped. This can happen when corruption appears in a later row group.

skipped_corrupt_files is available after calling .collect() on the dataframe. Other execution methods such as .count_rows() do not populate this property, because they operate on an internal dataframe rather than materializing the original one.

Handling skipped files in production

Because skipped_corrupt_files is plain Python data, you can plug it directly into your existing alerting or data-quality workflows:

python
import daft

df = daft.read_parquet("s3://my-bucket/nightly/**/*.parquet", ignore_corrupt_files=True)
df.write_parquet("s3://my-bucket/processed/")

skipped = df.skipped_corrupt_files
if skipped:
    # Option 1: send an alert
    send_alert(f"{len(skipped)} file(s) skipped during nightly run", details=skipped)

    # Option 2: push to a dead-letter queue for later reprocessing
    for path, reason, partial in skipped:
        dead_letter_queue.put({"path": path, "reason": reason, "partial": partial, "run": TODAY})

This pattern — errors visible, impact contained, tooling to fix — lets automated batch jobs complete reliably while still surfacing problems for human review.

!!! warning "Do not use ignore_corrupt_files as a catch-all" This option is designed for files that are genuinely unreadable. It should not be used to suppress transient I/O errors (network issues, throttling) — Daft already retries those automatically. If you find yourself needing ignore_corrupt_files for a large fraction of your files, investigate the root cause rather than silencing the errors.

Supported formats

FormatFile-level skipWithin-file error skip
Parquet (read_parquet)Yes (bad footer, wrong magic bytes, file too small)Yes (corrupt row group data)
CSV (read_csv)Yes (unreadable file, truncated)Yes (bad encoding, wrong field count in chunk)
Iceberg (read_iceberg)Yes (data files go through the Rust Parquet reader)Yes

!!! note "Iceberg delete files" Corruption in Iceberg delete files is not covered. If a delete file is unreadable, Daft will raise an error regardless of ignore_corrupt_files. Delete files are small metadata structures and corruption there generally indicates a more serious catalog inconsistency.

!!! note "Count pushdown" When ignore_corrupt_files is enabled for Parquet, count pushdown is disabled. This means df.count() will read all row-group data instead of using the metadata-only optimization, which may be slower on large datasets.