docs/optimization/join-strategies.md
Daft's [df.join()][daft.DataFrame.join] supports seven join types and three execution strategies. The join type controls which rows appear in the output. The strategy controls how Daft physically executes the join - which side gets shuffled, broadcast, or sorted. Picking the right combination can mean the difference between a sub-second operation and an OOM crash.
| Type | SQL Equivalent | Output |
|---|---|---|
"inner" | INNER JOIN | Rows where both sides match on the join key |
"left" | LEFT OUTER JOIN | All rows from the left, with nulls where the right has no match |
"right" | RIGHT OUTER JOIN | All rows from the right, with nulls where the left has no match |
"outer" | FULL OUTER JOIN | All rows from both sides, with nulls where either side has no match |
"semi" | WHERE EXISTS | Rows from the left that have at least one match on the right |
"anti" | WHERE NOT EXISTS | Rows from the left that have no match on the right |
"cross" | CROSS JOIN | Cartesian product of both sides (no join key) |
Semi and anti joins are particularly useful for filtering. If you need to remove rows whose key appears in another table, an anti-join is the right tool:
# Remove all rows from df whose "id" appears in ids_to_remove
df_filtered = df.join(ids_to_remove, on="id", how="anti")
When both sides of a join share a non-key column name, Daft prepends "right." to the conflicting column from the right DataFrame. You can customize this with the prefix and suffix parameters:
# Default: conflicting columns get "right." prefix
joined = df1.join(df2, on="key")
# Schema: key, value, right.value
# Custom suffix instead
joined = df1.join(df2, on="key", suffix="_other")
# Schema: key, value, value_other
By default (strategy=None), Daft's query optimizer picks the strategy automatically. You can override this when you know something the optimizer doesn't.
Both sides are hash-partitioned on the join key and co-located. This is the general-purpose strategy and works for all join types.
df.join(other, on="key", strategy="hash")
Both sides get shuffled, so this is the most memory-intensive strategy for large datasets. If you're hitting memory pressure on joins, see Managing Memory Usage and consider whether broadcast or sort-merge might be a better fit.
One side of the join is replicated to every worker. No shuffle needed for the other side, which makes this dramatically cheaper when one table is small enough to fit in memory on each worker.
# Broadcast the small lookup table to all workers
df_large.join(df_small, on="key", strategy="broadcast")
Which side gets broadcast depends on the join type:
| Join Type | Broadcast Side |
|---|---|
"inner" | The smaller table (auto-selected) |
"left" | Right table |
"right" | Left table |
"semi", "anti" | The filtering side (right table) |
Broadcast joins do not support outer joins.
Both sides are sorted on the join key, then merged in a single linear pass. Useful when data is already sorted on the join key or when memory pressure makes the shuffle in a hash join too costly.
df.join(other, on="key", strategy="sort_merge")
Sort-merge only supports inner joins.
For most workloads, leaving strategy=None and letting the optimizer decide is the right call. Override when:
strategy="broadcast" to avoid a full shuffle. This is common in deduplication pipelines where you join a large dataset against a small set of IDs to remove.DAFT_SHUFFLE_ALGORITHM=flight_shuffle.strategy="sort_merge" can skip the partitioning step entirely for inner joins.When running on Ray, joins that shuffle data are subject to the object store's memory limits. If your join columns don't fit in distributed memory:
DAFT_SHUFFLE_ALGORITHM=flight_shuffle and point it at a local volume for spilling:daft.set_execution_config(flight_shuffle_dirs=["/mnt/spill"])
df.repartition(n) before the join.For more on memory management, see Managing Memory Usage and Partitioning and Batching.
When a query joins three or more tables, the optimizer reorders the join graph to minimize the size of intermediate results. By default, Daft uses a brute-force enumerator that handles up to 7 relations.
An experimental DP-ccp enumerator (Moerkotte & Neumann 2006) is available for larger graphs. Setting DAFT_DEV_ENABLE_DP_CCP_JOIN_ORDERING=1 switches to DP-ccp and raises the limit to 12 relations. On graphs with up to 7 relations, DP-ccp produces the same plans as brute force. On larger graphs, the expanded search space can expose weaknesses in our cost model and lead to suboptimal plans. This limitation will be resolved once statistics and cost estimation improve (tracked in #6765).