scientific-skills/dask/references/dataframes.md
Dask DataFrames enable parallel processing of large tabular data by distributing work across multiple pandas DataFrames. As described in the documentation, "Dask DataFrames are a collection of many pandas DataFrames" with identical APIs, making the transition from pandas straightforward.
A Dask DataFrame is divided into multiple pandas DataFrames (partitions) along the index:
Use Dask When:
Stick with Pandas When:
.apply() functionsDask mirrors pandas reading syntax with added support for multiple files:
import dask.dataframe as dd
# Read single file
ddf = dd.read_csv('data.csv')
ddf = dd.read_parquet('data.parquet')
# Read multiple files using glob patterns
ddf = dd.read_csv('data/*.csv')
ddf = dd.read_parquet('s3://mybucket/data/*.parquet')
# Read with path structure
ddf = dd.read_parquet('data/year=*/month=*/day=*.parquet')
# Specify columns to read (reduces memory)
ddf = dd.read_parquet('data.parquet', columns=['col1', 'col2'])
# Control partitioning
ddf = dd.read_csv('data.csv', blocksize='64MB') # Creates 64MB partitions
All operations are lazy until .compute() is called.
# Same as pandas
filtered = ddf[ddf['column'] > 100]
filtered = ddf.query('column > 100')
# Add columns
ddf['new_column'] = ddf['col1'] + ddf['col2']
# Select columns
subset = ddf[['col1', 'col2', 'col3']]
# Drop columns
ddf = ddf.drop(columns=['unnecessary_col'])
# Standard aggregations work as expected
mean = ddf['column'].mean().compute()
sum_total = ddf['column'].sum().compute()
counts = ddf['category'].value_counts().compute()
# GroupBy operations (may require shuffle)
grouped = ddf.groupby('category')['value'].mean().compute()
# Multiple aggregations
agg_result = ddf.groupby('category').agg({
'value': ['mean', 'sum', 'count'],
'amount': 'sum'
}).compute()
# Merge DataFrames
merged = dd.merge(ddf1, ddf2, on='key', how='left')
# Join on index
joined = ddf1.join(ddf2, on='key')
# Sorting (expensive operation, requires data movement)
sorted_ddf = ddf.sort_values('column')
result = sorted_ddf.compute()
To Partitions (Efficient):
# Apply function to entire partitions
def custom_partition_function(partition_df):
# partition_df is a pandas DataFrame
return partition_df.assign(new_col=partition_df['col1'] * 2)
ddf = ddf.map_partitions(custom_partition_function)
To Rows (Less Efficient):
# Apply to each row (creates many tasks)
ddf['result'] = ddf.apply(lambda row: custom_function(row), axis=1, meta=('result', 'float'))
Note: Always prefer map_partitions over row-wise apply for better performance.
When Dask can't infer output structure, specify the meta parameter:
# For apply operations
ddf['new'] = ddf.apply(func, axis=1, meta=('new', 'float64'))
# For map_partitions
ddf = ddf.map_partitions(func, meta=pd.DataFrame({
'col1': pd.Series(dtype='float64'),
'col2': pd.Series(dtype='int64')
}))
# These operations are lazy (instant, no computation)
filtered = ddf[ddf['value'] > 100]
aggregated = filtered.groupby('category').mean()
final = aggregated[aggregated['value'] < 500]
# Nothing has computed yet
# Compute single result
result = final.compute()
# Compute multiple results efficiently
result1, result2, result3 = dask.compute(
operation1,
operation2,
operation3
)
# Keep results in distributed memory for reuse
ddf_cached = ddf.persist()
# Now multiple operations on ddf_cached won't recompute
result1 = ddf_cached.mean().compute()
result2 = ddf_cached.sum().compute()
# Set index (required for efficient joins and certain operations)
ddf = ddf.set_index('timestamp', sorted=True)
# Write to multiple files (one per partition)
ddf.to_parquet('output/data.parquet')
ddf.to_csv('output/data-*.csv')
# Write to single file (forces computation and concatenation)
ddf.compute().to_csv('output/single_file.csv')
# Convert to pandas (loads all data in memory)
pdf = ddf.compute()
map_partitions1. Select Columns Early
# Better: Read only needed columns
ddf = dd.read_parquet('data.parquet', columns=['col1', 'col2'])
2. Filter Before GroupBy
# Better: Reduce data before expensive operations
result = ddf[ddf['year'] == 2024].groupby('category').sum().compute()
3. Use Efficient File Formats
# Use Parquet instead of CSV for better performance
ddf.to_parquet('data.parquet') # Faster, smaller, columnar
4. Repartition Appropriately
# If partitions are too small
ddf = ddf.repartition(npartitions=10)
# If partitions are too large
ddf = ddf.repartition(partition_size='100MB')
import dask.dataframe as dd
# Read data
ddf = dd.read_csv('raw_data/*.csv')
# Transform
ddf = ddf[ddf['status'] == 'valid']
ddf['amount'] = ddf['amount'].astype('float64')
ddf = ddf.dropna(subset=['important_col'])
# Aggregate
summary = ddf.groupby('category').agg({
'amount': ['sum', 'mean'],
'quantity': 'count'
})
# Write results
summary.to_parquet('output/summary.parquet')
# Read time series data
ddf = dd.read_parquet('timeseries/*.parquet')
# Set timestamp index
ddf = ddf.set_index('timestamp', sorted=True)
# Resample (if available in Dask version)
hourly = ddf.resample('1H').mean()
# Compute statistics
result = hourly.compute()
# Read multiple files as single DataFrame
ddf = dd.read_csv('data/2024-*.csv')
# Process combined data
result = ddf.groupby('category')['value'].sum().compute()
Some pandas operations are not implemented in Dask:
.compute()# Get number of partitions
print(ddf.npartitions)
# Compute single partition
first_partition = ddf.get_partition(0).compute()
# View first few rows (computes first partition)
print(ddf.head())
# Test on small sample first
sample = ddf.head(1000)
# Validate logic works
# Then scale to full dataset
result = ddf.compute()
# Verify data types are correct
print(ddf.dtypes)