scientific-skills/dask/references/best-practices.md
Before implementing parallel computing with Dask, explore these alternatives:
These alternatives often provide better returns than distributed systems and should be exhausted before scaling to parallel computing.
Critical Rule: Chunks should be small enough that many fit in a worker's available memory at once.
Recommended Target: Size chunks so workers can hold 10 chunks per core without exceeding available memory.
Why It Matters:
Example Calculation:
The Dask dashboard provides essential visibility into:
Access the dashboard to understand what's actually slow in parallel workloads rather than guessing at optimizations.
Wrong Approach:
import pandas as pd
import dask.dataframe as dd
# Loads entire dataset into memory first
df = pd.read_csv('large_file.csv')
ddf = dd.from_pandas(df, npartitions=10)
Correct Approach:
import dask.dataframe as dd
# Let Dask handle the loading
ddf = dd.read_csv('large_file.csv')
Why: Loading data with pandas or NumPy first forces the scheduler to serialize and embed those objects in task graphs, defeating the purpose of parallel computing.
Key Principle: Use Dask methods to load data and use Dask to control the results.
Wrong Approach:
results = []
for item in items:
result = dask_computation(item).compute() # Each compute is separate
results.append(result)
Correct Approach:
computations = [dask_computation(item) for item in items]
results = dask.compute(*computations) # Single compute for all
Why: Calling compute in loops prevents Dask from:
Symptoms:
Solutions:
map_partitions or map_blocks to fuse operationsExample Using map_partitions:
# Instead of applying function to each row
ddf['result'] = ddf.apply(complex_function, axis=1) # Many tasks
# Apply to entire partitions at once
ddf = ddf.map_partitions(lambda df: df.assign(result=complex_function(df)))
Use Threads For:
Use Processes For:
Use Distributed Scheduler For:
Recommendation: Aim for roughly 4 threads per process on numeric workloads.
Rationale:
Persist Strategically:
# Persist intermediate results that are reused
intermediate = expensive_computation(data).persist()
result1 = intermediate.operation1().compute()
result2 = intermediate.operation2().compute()
Clear Memory When Done:
# Explicitly delete large objects
del intermediate
For Tabular Data:
For Array Data:
Read Multiple Files Efficiently:
# Use glob patterns to read multiple files in parallel
ddf = dd.read_parquet('data/year=2024/month=*/day=*.parquet')
Specify Useful Columns Early:
# Only read needed columns
ddf = dd.read_parquet('data.parquet', columns=['col1', 'col2', 'col3'])
For independent computations, use Futures:
from dask.distributed import Client
client = Client()
futures = [client.submit(func, arg) for arg in args]
results = client.gather(futures)
Use Bags for initial ETL, then convert to structured formats:
import dask.bag as db
# Process raw JSON
bag = db.read_text('logs/*.json').map(json.loads)
bag = bag.filter(lambda x: x['status'] == 'success')
# Convert to DataFrame for analysis
ddf = bag.to_dataframe()
Persist data between iterations:
data = dd.read_parquet('data.parquet')
data = data.persist() # Keep in memory across iterations
for iteration in range(num_iterations):
data = update_function(data)
data = data.persist() # Persist updated version
For debugging with pdb or detailed error inspection:
import dask
dask.config.set(scheduler='synchronous')
result = computation.compute() # Runs in single thread for debugging
Before computing, check the number of tasks:
print(len(ddf.__dask_graph__())) # Should be reasonable, not millions
Test logic on small subset before scaling:
# Test on first partition
sample = ddf.head(1000)
# Validate results
# Then scale to full dataset
Likely Cause: Task graph is too large Solution: Increase chunk sizes or use map_partitions
Likely Causes:
Solutions:
Likely Causes:
Solutions: