docs/optimization/shuffle.md
A shuffle is the all-to-all data movement behind [df.repartition(...)][daft.DataFrame.repartition], hash joins, sorts, and groupbys.
The shuffle_algorithm config option controls how Daft executes that movement, and the right choice depends on how big the shuffle is. Shuffles only happen on the distributed (Ray) runner — the native (single-machine) runner executes the entire query in one process and has no shuffle step.
If you're picking a partition count for repartition or thinking about batch size, start with Partitioning and Batching. Partition count is the input to shuffle cost, and into_batches controls the batch sizes shuffles produce.
TL;DR
- Stay on the default
shuffle_algorithm="auto"for most queries. Daft picks betweenmap_reduceandpre_shuffle_mergebased on partition count.- If your shuffle is >10 GB of data or >500K partition slots (
input_partitions × output_partitions), switch toflight_shuffle. Daft prints a hint in the query plan when it sees one.- When you enable
flight_shuffle, pointflight_shuffle_dirsat a fast local disk. The default is["/tmp"]. Spill files arelz4-compressed by default; setflight_shuffle_compression="zstd"on EBS or other networked volumes.
shuffle_algorithm takes four values: auto (the default) and three concrete algorithms. auto selects between map_reduce and pre_shuffle_merge at plan time; any of the three concrete options can also be set directly.
shuffle_algorithm | Data plane | Best for |
|---|---|---|
map_reduce | Ray object store, one object per (input, output) slot | Small to medium shuffles with moderate partition counts. |
pre_shuffle_merge | Ray object store, with input partitions merged first to reduce slot count | Shuffles where input_partitions × output_partitions is large but total bytes are moderate. |
flight_shuffle | Local disk plus Arrow Flight gRPC between workers | Large shuffles (≳ 10 GB or thousands of partitions on each side). Avoids the head-node bookkeeping cost. |
auto choosesUnder auto, Daft picks between map_reduce and pre_shuffle_merge based on the geometric mean of input and output partition counts. If sqrt(input_partitions × output_partitions) > pre_shuffle_merge_partition_threshold (default 200), Daft uses pre_shuffle_merge; otherwise map_reduce.
auto does not switch to flight_shuffle automatically. Instead, when Daft sees a shuffle likely to hit the object-store ceiling (input size ≥ 10 GiB or partition product ≥ 500,000), it prints a hint in the query plan with the configuration to enable.
map_reduce falls over at scalemap_reduce writes one Ray object per (input, output) slot, and each tracked object costs about 3 KB of metadata on the Ray driver. Multiply by M mappers × N reducers:
| Mappers × Reducers | Slots | Head-node metadata |
|---|---|---|
| 1024 × 1024 | 1.0M | ~3 GB |
| 2048 × 2048 | 4.2M | ~12 GB |
| 4096 × 4096 | 16.8M | ~50 GB |
| 8192 × 8192 | 67M | ~200 GB |
At 4096 × 4096 the driver holds 50 GB of pointers before any data has moved, which usually shows up as a head-node OOM or as a scheduler stall.
pre_shuffle_merge reduces this cost by coalescing small input partitions before the shuffle, lowering M, but it can't change the underlying M × N shape. flight_shuffle writes shuffle bytes to local disk and serves them between workers over Arrow Flight, reducing head-node cost from M × N × 3 KB to roughly (M + N) × 200 B of descriptors.
Symptoms that point to flight_shuffle:
flight_shuffleimport daft
daft.context.set_execution_config(
shuffle_algorithm="flight_shuffle",
flight_shuffle_dirs=["/mnt/nvme0", "/mnt/nvme1"], # round-robins across them
flight_shuffle_compression="lz4", # the default; set "zstd" on slower disk
)
This applies to every shuffle in the session.
flight_shuffle_dirsLocal directories where Daft writes shuffle spill files. Defaults to ["/tmp"].
i8ge.* or i4i.*. On Kubernetes it's whatever is mounted from node-local SSD.dataset_size ÷ compression_ratio of free space per node. A 10 TB shuffle across 32 workers is about 310 GB of spill per worker uncompressed; at the default lz4 (~2×) it's closer to 155 GB, and at zstd (~3×) closer to 100 GB.flight_shuffle_compressionArrow IPC compression for the spill files. One of "lz4" (the default), "zstd", or "none".
| Storage | Recommended | Why |
|---|---|---|
| Local NVMe | "lz4" (default) | Cheap enough on CPU that it wins even when disk isn't the ceiling — about 10% in our benchmarks. |
| gp3 EBS or network-attached | "zstd" | When bandwidth is the limit, compression is the biggest knob available — worth ~2.3× over uncompressed — and zstd's tighter ratio consistently beats lz4. |
| HDD or slow shared FS, or a network-constrained cluster | "zstd" | Same reasoning: bytes are the bottleneck. |
Set "none" only if you're CPU-bound on very fast storage — for example, several local NVMe drives whose aggregate bandwidth outruns what the CPU can compress through.
For most queries, leave shuffle_algorithm="auto". Override when:
auto printed a flight_shuffle hint in your query plan. Enable flight_shuffle and set flight_shuffle_dirs to a fast local disk.flight_shuffle anyway.shuffle_algorithm="map_reduce" for small partition counts, or "pre_shuffle_merge" when the partition product is large but total bytes are moderate. These are mainly useful for benchmarking.If you're unsure whether your shuffle has crossed the thresholds, run it once with auto and read the hint in the query plan.
repartition (the input to shuffle cost) and how into_batches controls batch sizes within a partition.