Back to Cudf

10 Minutes to cuDF and Dask cuDF

notebooks/10min.ipynb

26.06.00a19.1 KB
Original Source

10 Minutes to cuDF and Dask cuDF

Modelled after 10 Minutes to Pandas, this is a short introduction to cuDF and Dask cuDF, geared mainly towards new users.

What are these Libraries?

cuDF is a Python GPU DataFrame library (built on the Apache Arrow columnar memory format) for loading, joining, aggregating, filtering, and otherwise manipulating tabular data using a DataFrame style API in the style of pandas.

Dask is a flexible library for parallel computing in Python that makes scaling out your workflow smooth and simple. On the CPU, Dask uses Pandas to execute operations in parallel on DataFrame partitions.

Dask cuDF extends Dask where necessary to allow its DataFrame partitions to be processed using cuDF GPU DataFrames instead of Pandas DataFrames. For instance, when you call dask_cudf.read_csv(...), your cluster's GPUs do the work of parsing the CSV file(s) by calling cudf.read_csv().

<div class="alert alert-block alert-info"> <b>Note:</b> This notebook uses the explicit Dask cuDF API (dask_cudf) for clarity. However, we strongly recommend that you use Dask's <a href="https://docs.dask.org/en/stable/configuration.html">configuration infrastructure</a> to set the "dataframe.backend" option to "cudf", and work with the Dask DataFrame API directly. Please see the <a href="https://github.com/rapidsai/cudf/tree/main/python/dask_cudf">Dask cuDF documentation</a> for more information. </div>

When to use cuDF and Dask cuDF

If your workflow is fast enough on a single GPU or your data comfortably fits in memory on a single GPU, you would want to use cuDF. If you want to distribute your workflow across multiple GPUs, have more data than you can fit in memory on a single GPU, or want to analyze data spread across many files at once, you would want to use Dask cuDF.

python
import os

import cudf
import cupy as cp
import dask_cudf
import pandas as pd

cp.random.seed(12)

#### Portions of this were borrowed and adapted from the
#### cuDF cheatsheet, existing cuDF documentation,
#### and 10 Minutes to Pandas.

Object Creation

Creating a cudf.Series and dask_cudf.Series.

python
s = cudf.Series([1, 2, 3, None, 4])
s
python
ds = dask_cudf.from_cudf(s, npartitions=2)
# Note the call to head here to show the first few entries, unlike
# cuDF objects, Dask-cuDF objects do not have a printing
# representation that shows values since they may not be in local
# memory.
ds.head(n=3)

Creating a cudf.DataFrame and a dask_cudf.DataFrame by specifying values for each column.

python
df = cudf.DataFrame(
    {
        "a": list(range(20)),
        "b": list(reversed(range(20))),
        "c": list(range(20)),
    }
)
df

Now we will convert our cuDF dataframe into a Dask-cuDF equivalent. Here we call out a key difference: to inspect the data we must call a method (here .head() to look at the first few values). In the general case (see the end of this notebook), the data in ddf will be distributed across multiple GPUs.

In this small case, we could call ddf.compute() to obtain a cuDF object from the Dask-cuDF object. In general, we should avoid calling .compute() on large dataframes, and restrict ourselves to using it when we have some (relatively) small postprocessed result that we wish to inspect. Hence, throughout this notebook we will generally call .head() to inspect the first few values of a Dask-cuDF dataframe, occasionally calling out places where we use .compute() and why.

To understand more of the differences between how cuDF and Dask cuDF behave here, visit the 10 Minutes to Dask tutorial after this one.

python
ddf = dask_cudf.from_cudf(df, npartitions=2)
ddf.head()

Creating a cudf.DataFrame from a pandas Dataframe and a dask_cudf.Dataframe from a cudf.Dataframe.

Note that best practice for using dask-cuDF is to read data directly into a dask_cudf.DataFrame with read_csv or other builtin I/O routines (discussed below).

python
pdf = pd.DataFrame({"a": [0, 1, 2, 3], "b": [0.1, 0.2, None, 0.3]})
gdf = cudf.DataFrame(pdf)
gdf
python
dask_gdf = dask_cudf.from_cudf(gdf, npartitions=2)
dask_gdf.head(n=2)

Viewing Data

Viewing the top rows of a GPU dataframe.

python
df.head(2)
python
ddf.head(2)

Sorting by values.

python
df.sort_values(by="b")
python
ddf.sort_values(by="b").head()

Selecting a Column

Selecting a single column, which initially yields a cudf.Series or dask_cudf.Series. Calling compute results in a cudf.Series (equivalent to df.a).

python
df["a"]
python
ddf["a"].head()

Selecting Rows by Label

Selecting rows from index 2 to index 5 from columns 'a' and 'b'.

python
df.loc[2:5, ["a", "b"]]
python
ddf.loc[2:5, ["a", "b"]].head()

Selecting Rows by Position

Selecting via integers and integer slices, like numpy/pandas. Note that this functionality is not available for Dask-cuDF DataFrames.

python
df.iloc[0]
python
df.iloc[0:3, 0:2]

You can also select elements of a DataFrame or Series with direct index access.

python
df[3:5]
python
s[3:5]

Boolean Indexing

Selecting rows in a DataFrame or Series by direct Boolean indexing.

python
df[df.b > 15]
python
ddf[ddf.b > 15].head(n=3)

Selecting values from a DataFrame where a Boolean condition is met, via the query API.

python
df.query("b == 3")

Note here we call compute() rather than head() on the Dask-cuDF dataframe since we are happy that the number of matching rows will be small (and hence it is reasonable to bring the entire result back).

python
ddf.query("b == 3").compute()

You can also pass local variables to Dask-cuDF queries, via the local_dict keyword. With standard cuDF, you may either use the local_dict keyword or directly pass the variable via the @ keyword. Supported logical operators include >, <, >=, <=, ==, and !=.

python
cudf_comparator = 3
df.query("b == @cudf_comparator")
python
dask_cudf_comparator = 3
ddf.query("b == @val", local_dict={"val": dask_cudf_comparator}).compute()

Using the isin method for filtering.

python
df[df.a.isin([0, 5])]

MultiIndex

cuDF supports hierarchical indexing of DataFrames using MultiIndex. Grouping hierarchically (see Grouping below) automatically produces a DataFrame with a MultiIndex.

python
arrays = [["a", "a", "b", "b"], [1, 2, 3, 4]]
tuples = list(zip(*arrays, strict=True))
idx = cudf.MultiIndex.from_tuples(tuples)
idx

This index can back either axis of a DataFrame.

python
gdf1 = cudf.DataFrame(
    {"first": cp.random.rand(4), "second": cp.random.rand(4)}
)
gdf1.index = idx
gdf1
python
gdf2 = cudf.DataFrame(
    {"first": cp.random.rand(4), "second": cp.random.rand(4)}
).T
gdf2.columns = idx
gdf2

Accessing values of a DataFrame with a MultiIndex, both with .loc

python
gdf1.loc[("b", 3)]

And .iloc

python
gdf1.iloc[0:2]

Missing Data

Missing data can be replaced by using the fillna method.

python
s.fillna(999)
python
ds.fillna(999).head(n=3)

Stats

Calculating descriptive statistics for a Series.

python
s.mean(), s.var()

This serves as a prototypical example of when we might want to call .compute(). The result of computing the mean and variance is a single number in each case, so it is definitely reasonable to look at the entire result!

python
ds.mean().compute(), ds.var().compute()

Applymap

Applying functions to a Series. Note that applying user defined functions directly with Dask cuDF is not yet implemented. For now, you can use map_partitions to apply a function to each partition of the distributed dataframe.

python
def add_ten(num):
    return num + 10


df["a"].apply(add_ten)
python
ddf["a"].map_partitions(add_ten).head()

Histogramming

Counting the number of occurrences of each unique value of variable.

python
df.a.value_counts()
python
ddf.a.value_counts().head()

String Methods

Like pandas, cuDF provides string processing methods in the str attribute of Series. Full documentation of string methods is a work in progress. Please see the cuDF API documentation for more information.

python
s = cudf.Series(["A", "B", "C", "Aaba", "Baca", None, "CABA", "dog", "cat"])
s.str.lower()
python
ds = dask_cudf.from_cudf(s, npartitions=2)
ds.str.lower().head(n=4)

As well as simple manipulation, We can also match strings using regular expressions.

python
s.str.match("^[aAc].+")
python
ds.str.match("^[aAc].+").head()

Concat

Concatenating Series and DataFrames row-wise.

python
s = cudf.Series([1, 2, 3, None, 5])
cudf.concat([s, s])
python
ds2 = dask_cudf.from_cudf(s, npartitions=2)
dask_cudf.concat([ds2, ds2]).head(n=3)

Join

Performing SQL style merges. Note that the dataframe order is not maintained, but may be restored post-merge by sorting by the index.

python
df_a = cudf.DataFrame()
df_a["key"] = ["a", "b", "c", "d", "e"]
df_a["vals_a"] = [float(i + 10) for i in range(5)]

df_b = cudf.DataFrame()
df_b["key"] = ["a", "c", "e"]
df_b["vals_b"] = [float(i + 100) for i in range(3)]

merged = df_a.merge(df_b, on=["key"], how="left")
merged
python
ddf_a = dask_cudf.from_cudf(df_a, npartitions=2)
ddf_b = dask_cudf.from_cudf(df_b, npartitions=2)

merged = ddf_a.merge(ddf_b, on=["key"], how="left").head(n=4)
merged

Grouping

Like pandas, cuDF and Dask-cuDF support the Split-Apply-Combine groupby paradigm.

python
df["agg_col1"] = [1 if x % 2 == 0 else 0 for x in range(len(df))]
df["agg_col2"] = [1 if x % 3 == 0 else 0 for x in range(len(df))]

ddf = dask_cudf.from_cudf(df, npartitions=2)

Grouping and then applying the sum function to the grouped data.

python
df.groupby("agg_col1").sum()
python
ddf.groupby("agg_col1").sum().compute()

Grouping hierarchically then applying the sum function to grouped data.

python
df.groupby(["agg_col1", "agg_col2"]).sum()
python
ddf.groupby(["agg_col1", "agg_col2"]).sum().compute()

Grouping and applying statistical functions to specific columns, using agg.

python
df.groupby("agg_col1").agg({"a": "max", "b": "mean", "c": "sum"})
python
ddf.groupby("agg_col1").agg({"a": "max", "b": "mean", "c": "sum"}).compute()

Transpose

Transposing a dataframe, using either the transpose method or T property. Currently, all columns must have the same type. Transposing is not currently implemented in Dask cuDF.

python
sample = cudf.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})
sample
python
sample.transpose()

Time Series

DataFrames supports datetime typed columns, which allow users to interact with and filter data based on specific timestamps.

python
import datetime as dt

date_df = cudf.DataFrame()
date_df["date"] = pd.date_range("11/20/2018", periods=72, freq="D")
date_df["value"] = cp.random.sample(len(date_df))

search_date = dt.datetime.strptime("2018-11-23", "%Y-%m-%d")
date_df.query("date <= @search_date")
python
date_ddf = dask_cudf.from_cudf(date_df, npartitions=2)
date_ddf.query(
    "date <= @search_date", local_dict={"search_date": search_date}
).compute()

Categoricals

DataFrames support categorical columns.

python
gdf = cudf.DataFrame(
    {"id": [1, 2, 3, 4, 5, 6], "grade": ["a", "b", "b", "a", "a", "e"]}
)
gdf["grade"] = gdf["grade"].astype("category")
gdf
python
dgdf = dask_cudf.from_cudf(gdf, npartitions=2)
dgdf.head(n=3)

Accessing the categories of a column. Note that this is currently not supported in Dask-cuDF.

python
gdf.grade.cat.categories

Accessing the underlying code values of each categorical observation.

python
gdf.grade.cat.codes
python
dgdf.grade.cat.codes.compute()

Converting to Pandas

Converting a cuDF and Dask-cuDF DataFrame to a pandas DataFrame.

python
df.head().to_pandas()

To convert the first few entries to pandas, we similarly call .head() on the Dask-cuDF dataframe to obtain a local cuDF dataframe, which we can then convert.

python
ddf.head().to_pandas()

In contrast, if we want to convert the entire frame, we need to call .compute() on ddf to get a local cuDF dataframe, and then call to_pandas(), followed by subsequent processing. This workflow is less recommended, since it both puts high memory pressure on a single GPU (the .compute() call) and does not take advantage of GPU acceleration for processing (the computation happens on in pandas).

python
ddf.compute().to_pandas().head()

Converting to Numpy

Converting a cuDF or Dask-cuDF DataFrame to a numpy ndarray.

python
df.to_numpy()
python
ddf.compute().to_numpy()

Converting a cuDF or Dask-cuDF Series to a numpy ndarray.

python
df["a"].to_numpy()
python
ddf["a"].compute().to_numpy()

Converting to Arrow

Converting a cuDF or Dask-cuDF DataFrame to a PyArrow Table.

python
df.to_arrow()
python
ddf.head().to_arrow()

Reading/Writing CSV Files

Writing to a CSV file.

python
if not os.path.exists("example_output"):
    os.mkdir("example_output")

df.to_csv("example_output/foo.csv", index=False)
python
ddf.compute().to_csv("example_output/foo_dask.csv", index=False)

Reading from a csv file.

python
df = cudf.read_csv("example_output/foo.csv")
df

Note that for the Dask-cuDF case, we use dask_cudf.read_csv in preference to dask_cudf.from_cudf(cudf.read_csv) since the former can parallelize across multiple GPUs and handle larger CSV files that would fit in memory on a single GPU.

python
ddf = dask_cudf.read_csv("example_output/foo_dask.csv")
ddf.head()

Reading all CSV files in a directory into a single dask_cudf.DataFrame, using the star wildcard.

python
ddf = dask_cudf.read_csv("example_output/*.csv")
ddf.head()

Reading/Writing Parquet Files

Writing to parquet files with cuDF's GPU-accelerated parquet writer

python
df.to_parquet("example_output/temp_parquet")

Reading parquet files with cuDF's GPU-accelerated parquet reader.

python
df = cudf.read_parquet("example_output/temp_parquet")
df

Writing to parquet files from a dask_cudf.DataFrame using cuDF's parquet writer under the hood.

python
ddf.to_parquet("example_output/ddf_parquet_files")

Reading/Writing ORC Files

Writing ORC files.

python
df.to_orc("example_output/temp_orc")

And reading

python
df2 = cudf.read_orc("example_output/temp_orc")
df2

Dask Performance Tips

Like Apache Spark, Dask operations are lazy. Instead of being executed immediately, most operations are added to a task graph and the actual evaluation is delayed until the result is needed.

Sometimes, though, we want to force the execution of operations. Calling persist on a Dask collection fully computes it (or actively computes it in the background), persisting the result into memory. When we're using distributed systems, we may want to wait until persist is finished before beginning any downstream operations. We can enforce this contract by using wait. Wrapping an operation with wait will ensure it doesn't begin executing until all necessary upstream operations have finished.

The snippets below provide basic examples, using LocalCUDACluster to create one dask-worker per GPU on the local machine. For more detailed information about persist and wait, please see the Dask documentation for persist and wait. Wait relies on the concept of Futures, which is beyond the scope of this tutorial. For more information on Futures, see the Dask Futures documentation. For more information about multi-GPU clusters, please see the dask-cuda library (documentation is in progress).

First, we set up a GPU cluster. With our client set up, Dask-cuDF computation will be distributed across the GPUs in the cluster.

python
import time

from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster

cluster = LocalCUDACluster()
client = Client(cluster)

Persisting Data

Next, we create our Dask-cuDF DataFrame and apply a transformation, storing the result as a new column.

python
nrows = 10000000

df2 = cudf.DataFrame({"a": cp.arange(nrows), "b": cp.arange(nrows)})
ddf2 = dask_cudf.from_cudf(df2, npartitions=16)
ddf2["c"] = ddf2["a"] + 5
ddf2
python
!nvidia-smi

Because Dask is lazy, the computation has not yet occurred. We can see that there are sixty-four tasks in the task graph and we're using about 330 MB of device memory on each GPU. We can force computation by using persist. By forcing execution, the result is now explicitly in memory and our task graph only contains one task per partition (the baseline).

python
ddf2 = ddf2.persist()
ddf2
python
# Sleep to ensure the persist finishes and shows in the memory usage
!sleep 5; nvidia-smi

Because we forced computation, we now have a larger object in distributed GPU memory. Note that actual numbers will differ between systems (for example depending on how many devices are available).

Wait

Depending on our workflow or distributed computing setup, we may want to wait until all upstream tasks have finished before proceeding with a specific function. This section shows an example of this behavior, adapted from the Dask documentation.

First, we create a new Dask DataFrame and define a function that we'll map to every partition in the dataframe.

python
import random

nrows = 10000000

df1 = cudf.DataFrame({"a": cp.arange(nrows), "b": cp.arange(nrows)})
ddf1 = dask_cudf.from_cudf(df1, npartitions=100)


def func(df):
    time.sleep(random.randint(1, 10))
    return (df + 5) * 3 - 11

This function will do a basic transformation of every column in the dataframe, but the time spent in the function will vary due to the time.sleep statement randomly adding 1-10 seconds of time. We'll run this on every partition of our dataframe using map_partitions, which adds the task to our task-graph, and store the result. We can then call persist to force execution.

python
results_ddf = ddf2.map_partitions(func)
results_ddf = results_ddf.persist()

However, some partitions will be done much sooner than others. If we had downstream processes that should wait for all partitions to be completed, we can enforce that behavior using wait.

python
wait(results_ddf)

With wait completed, we can safely proceed on in our workflow.