Back to Cudf

Getting started

python/cudf_polars/docs/overview.md

26.08.00a25.3 KB
Original Source

Getting started

You will need:

  1. Rust development environment. If you use the rapids combined devcontainer, add "./features/src/rust": {"version": "latest", "profile": "default"}, to your preferred configuration. Or else, use rustup
  2. A cudf development environment. The combined devcontainer works, or whatever your favourite approach is.

:::{note} These instructions will get simpler as we merge code in. :::

Installing polars

The cudf-polars pyproject.toml advertises which polars versions it works with. So for pure cudf-polars development, installing as normal and satisfying the dependencies in the repository is sufficient. For development, if we're adding things to the polars side of things, we will need to build polars from source:

sh
git clone https://github.com/pola-rs/polars
cd polars

We will install build dependencies in the same environment that we created for building cudf. Note that polars offers a make build command that sets up a separate virtual environment, but we don't want to do that right now. So in the polars clone:

sh
# cudf environment (conda or pip) is active
pip install --upgrade uv
uv pip install --upgrade -r py-polars/requirements-dev.txt

:::{note} plain pip install works fine, but uv is much faster! :::

Now we have the necessary machinery to build polars

sh
cd py-polars
# build in debug mode, best option for development/debugging
maturin develop -m Cargo.toml

For benchmarking purposes we should build in release mode

sh
RUSTFLAGS='-C target-cpu=native' maturin develop -m Cargo.toml --release

After any update of the polars code, we need to rerun the maturin build command.

Installing the cudf polars executor

The executor for the polars logical plan lives in the cudf repo, in python/cudf_polars. Build cudf as normal and then install the cudf_polars package in editable mode:

sh
cd cudf/python/cudf_polars
pip install --no-build-isolation --no-deps -e .

You should now be able to run the tests in the cudf_polars package:

sh
pytest -v tests

Executor design

The polars LazyFrame.collect functionality offers configuration of the engine to use for collection through the engine argument. At a low level, this provides for configuration of a "post-optimization" callback that may be used by a third party library to replace a node (or more, though we only replace a single node) in the optimized logical plan with a Python callback that is to deliver the result of evaluating the plan. This splits the execution of the plan into two phases. First, a symbolic phase which translates to our internal representation (IR). Second, an execution phase which executes using our IR.

The translation phase receives the a low-level Rust NodeTraverser object that delivers Python representations of the plan nodes (and expressions) one at a time. During translation, we endeavour to raise NotImplementedError for any unsupported functionality. This way, if we can't execute something, we just don't modify the logical plan at all: if we can translate the IR, it is assumed that evaluation will later succeed.

The usage of the cudf-based executor is therefore selected with the gpu engine:

python
import polars as pl

result = q.collect(engine="gpu")

This should either transparently run on the GPU and deliver a polars dataframe, or else fail (but be handled) and just run the normal CPU execution. If POLARS_VERBOSE is true, then fallback is logged with a PerformanceWarning.

As well as a string argument, the engine can also be specified with a polars GPUEngine object. This allows passing more configuration in. Currently, the public properties are device, to select the device, and memory_resource, to select the RMM memory resource used for allocations during the collection phase.

For example:

python
import polars as pl

result = q.collect(engine=pl.GPUEngine(device=1, memory_resource=mr))

Uses device-1, and the given memory resource. Note that the memory resource provided must be valid for allocations on the specified device, no checking is performed.

For debugging purposes, we can also pass undocumented keyword arguments, at the moment, raise_on_fail is also supported, which raises, rather than falling back, during translation:

python
result = q.collect(engine=pl.GPUEngine(raise_on_fail=True))

This is mostly useful when writing tests, since in that case we want any failures to propagate, rather than falling back to the CPU mode.

IR versioning

On the polars side, the NodeTraverser object advertises an internal version (via NodeTraverser.version() as a (major, minor) tuple). minor version bumps are for backwards compatible changes (e.g. exposing new nodes), whereas major bumps are for incompatible changes. We can therefore attempt to detect the IR version (independently of the polars version) and dispatch, or error appropriately. This should be done during IR translation in translate.py.

IR design

As noted, we translate the polars DSL into our own IR. This is both so that we can smooth out minor version differences (advertised by NodeTraverser version changes) within cudf-polars, and so that we have the freedom to introduce new IR nodes and rewrite rules as might be appropriate for GPU execution.

To that end, we provide facilities for definition of nodes as well as writing traversals and rewrite rules. The abstract base class Node in dsl/nodebase.py defines the interface for implementing new nodes, and provides many useful default methods. See also the docstrings of the Node class.

:::{note} This generic implementation relies on nodes being treated as immutable. Do not implement in-place modification of nodes, bad things will happen. :::

Defining nodes

A concrete node type (cudf-polars has expression nodes, Expr; and plan nodes, IR), should inherit from Node. Nodes have two types of data:

  1. children: a tuple (possibly empty) of concrete nodes;
  2. non-child: arbitrary data attached to the node that is not a concrete node.

The base Node class requires that one advertise the names of the non-child attributes in the _non_child class variable. The constructor of the concrete node should take its arguments in the order *_non_child (ordered as the class variable does) and then *children. For example, the Sort node, which sorts a column generated by an expression, has this definition:

python
class Expr(Node):
    children: tuple[Expr, ...]

class Sort(Expr):
    _non_child = ("dtype", "options")
    children: tuple[Expr]
    def __init__(self, dtype, options, column: Expr):
        self.dtype = dtype
        self.options = options
        self.children = (column,)

By following this pattern, we get an automatic (caching) implementation of __hash__ and __eq__, as well as a useful reconstruct method that will rebuild the node with new children.

If you want to control the behaviour of __hash__ and __eq__ for a single node, override (respectively) the get_hashable and is_equal methods.

Adding new translation rules from the polars IR

Plan nodes

Plan node definitions live in cudf_polars/dsl/ir.py, these all inherit from the base IR node. The evaluation of a plan node is done by implementing the do_evaluate method. This method takes in the non-child arguments specified in _non_child_args, followed by pre-evaluated child nodes (DataFrame objects), and finally a keyword-only context argument (an IRExecutionContext object containing runtime execution context). To perform the evaluation, one should use the base class (generic) evaluate method which handles the recursive evaluation of child nodes.

Plan nodes must also declare an _n_non_child_args attribute giving the length of the _non_child_args tuple. This is used by tracing to know how many non-child (dataframe) inputs to expect without introspection.

To translate the plan node, add a case handler in translate_ir that lives in cudf_polars/dsl/translate.py.

As well as child nodes that are plans, most plan nodes contain child expressions, which should be transformed using the input to the plan as a context. The translation of expressions is handled via translate_expr in cudf_polars/dsl/translate.py. So that data-type resolution is performed correctly any expression should be translated with the correct plan node "active" in the visitor. For example, when translating a Join node, the left keys (expressions) should be translated with the left input active (and right keys with right input). To facilitate this, use the set_node context manager.

Expression nodes

Adding a handle for an expression node is very similar to a plan node. Expressions are defined in cudf_polars/dsl/expressions/ and exported into the dsl namespace via expr.py. They inherit from Expr.

Expressions are evaluated by implementing a do_evaluate method that takes a DataFrame as context (this provides columns) along with an ExecutionContext parameter (indicating what context we're evaluating this expression in, currently unused) and a mapping from expressions to evaluated Columns. This approach enables a simple form of expression rewriting during evaluation of expressions that is used in evaluation of, for example, groupby-aggregations. To perform the evaluation, one should use the base class (generic) evaluate method which handles the boilerplate for looking up in the substitution mapping.

To simplify state tracking, all columns should be considered immutable on construction. This matches the "functional" description coming from the logical plan in any case, so is reasonably natural.

Traversing and transforming nodes

In addition to representing and evaluating nodes. We also provide facilities for traversing a tree of nodes and defining transformation rules in dsl/traversal.py. The simplest is traversal, a pre-order visit of all unique nodes in an expression. Use this if you want to know some specific thing about an expression. For example, to determine if an expression contains a Literal node:

python
def has_literal(node: Expr) -> bool:
    return any(isinstance(e, Literal) for e in traversal(node))

It is often convenient to provide (immutable) state to a visitor, as well as some facility to perform DAG-aware rewrites (reusing a transformation for an expression if we have already seen it). We therefore adopt the following pattern of writing DAG-aware visitors. Suppose we want a rewrite rule (rewrite) between expressions (Expr) and some new type T. We define our general transformation function rewrite with type Expr -> (Expr -> T) -> T:

python
from cudf_polars.typing import GenericTransformer
from typing import TypedDict

class State(TypedDict):
    ...


@singledispatch
def rewrite(e: Expr, rec: GenericTransformer[Expr, T, State]) -> T:
    ...

Note in particular that the function to perform the recursion is passed as the second argument. Rather than defining methods on each node in turn for a particular rewrite rule, we prefer free functions and use functools.singledispatch to provide dispatching. We now, in the usual fashion, register handlers for different expression types. To use this function, we need to be able to provide both the expression to convert and the recursive function itself. To do this we must convert our rewrite function into something that only takes a single argument (the expression to rewrite), but carries around information about how to perform the recursion. To this end, we have two utilities in traversal.py:

  • make_recursive and
  • CachingVisitor.

These both implement the GenericTransformer protocol, and can be wrapped around a transformation function like rewrite to provide a function Expr -> T. They also allow us to attach arbitrary immutable state to our visitor by passing a state dictionary. The state dictionary should be given as some TypedDict so that the transformation function knows which fields are available. make_recursive is very simple, and provides no caching of intermediate results (so any DAGs that are visited will be viewed as trees). CachingVisitor provides the same interface, but maintains a cache of intermediate results, and reuses them if the same expression is seen again.

Finally, for writing transformations that take nodes and deliver new nodes (e.g. rewrite rules), we have a final utility reuse_if_unchanged that can be used as a base case transformation for node to node rewrites. It is a depth-first visit that transforms children but only returns a new node with new children if the rewrite of children returned new nodes.

To see how these pieces fit together, let us consider writing a rename function that takes an expression (potentially with references to columns) along with a mapping defining a renaming between (some subset of) column names. The goal is to deliver a new expression with appropriate columns renamed.

To start, we define the dispatch function

python
from collections.abc import Mapping
from functools import singledispatch
from cudf_polars.dsl.traversal import (
    CachingVisitor, make_recursive, reuse_if_unchanged
)
from cudf_polars.dsl.expr import Col, Expr
from cudf_polars.dsl.to_ast import ExprTransformer


@singledispatch
def _rename(e: Expr, rec: ExprTransformer) -> Expr:
    raise NotImplementedError(f"No handler for {type(e)}")

then we register specific handlers, first for columns:

python
@_rename.register
def _(e: Col, rec: ExprTransformer) -> Expr:
    mapping = rec.state["mapping"] # state set on rec
    if e.name in mapping:
        # If we have a rename, return a new Col reference
        # with a new name
        return type(e)(e.dtype, mapping[e.name])
    return e

and then for the remaining expressions

python
_rename.register(Expr)(reuse_if_unchanged)

:::{note} In this case, we could have put the generic handler in the _rename function, however, then we would not get a nice error message if we accidentally sent in an object of the incorrect type. :::

Finally we tie everything together with a public function:

python
from typing import TypedDict

class State(TypedDict):
    mapping: Mapping[str, str]


def rename(e: Expr, mapping: Mapping[str, str]) -> Expr:
    """Rename column references in an expression."""
    mapper = CachingVisitor(_rename, state=State(mapping=mapping))
    # or
    # mapper = make_recursive(_rename, state=State(mapping=mapping))
    return mapper(e)

IO partition statistics

:::{note} IO partition statistics are experimental and the details are likely to change in the future. :::

The streaming executor samples Parquet footer metadata to estimate per-column storage sizes, then uses those estimates together with target_partition_size (bytes) on the streaming executor to decide how to split or fuse file-level partitions so that each input partition is close to that target size. Tune target_partition_size via executor_options or CUDF_POLARS__EXECUTOR__TARGET_PARTITION_SIZE.

python
engine = pl.GPUEngine(
    executor="streaming",
    executor_options={"target_partition_size": 2_000_000_000},
)

Internally, collect_statistics walks the IR graph, groups Parquet Scan nodes that share the same file paths (unioning projected columns for sampling), and builds one DataSourceInfo per path group. It then attaches that source to every Scan in the group. Leaf DataFrameScan nodes are handled separately. Results live in StatsCollector.scan_stats. Parquet metadata sampling is shared across scans that read the same files.

Containers

Containers should be constructed as relatively lightweight objects around their pylibcudf counterparts. We have three (in cudf_polars/containers/):

  1. Scalar (a wrapper around a pylibcudf Scalar)
  2. Column (a wrapper around a pylibcudf Column)
  3. DataFrame (a wrapper around a pylibcudf Table)

The interfaces offered by these are somewhat in flux, but broadly speaking, a DataFrame is just a mapping from string names to Columns, and thus also holds a pylibcudf Table. Names are only attached to Columns and hence inserted into DataFrames via NamedExprs, which are the top-level expression nodes that live inside an IR node. This means that the expression evaluator never has to concern itself with column names: columns are only ever decorated with names when constructing a DataFrame.

The columns keep track of metadata (for example, whether or not they are sorted). We could imagine tracking more metadata, like minimum and maximum, though perhaps that is better left to libcudf itself.

We offer some utility methods for transferring metadata when constructing new dataframes and columns, both DataFrame and Column offer a sorted_like(like: Self) call which copies metadata from the template.

All methods on containers that modify in place should return self, to facilitate use in a "fluent" style. It makes it much easier to write iteration over objects and collect the results if everyone always returns a value.

CUDA Streams

CUDA Streams are used to manage concurrent operations. These build on libcudf's and pylibcudf's usage of streams when performing operations on pylibcudf Columns and Tables.

In cudf-polars, we attach a Stream to cudf_polars.containers.DataFrame. This stream (or a new stream that it's joined into) is used for all pylibcudf operations on the data backing that DataFrame.

When creating a cudf_polars.containers.DataFrame you must ensure that all the provided pylibcudf Tables / Columns are valid on the provided stream.

Take special care when creating a DataFrame that combines pylibcudf Tables or Columns from multiple DataFrames, or "bare" pylibcudf objects that don't come from a DataFrame at all. This also applies to DataFrame methods like DataFrame.with_columns and DataFrame.filter which accept cudf_polars.containers.Column objects that might not be valid on the DataFrame's original stream.

Here's an example of the simpler case where a pylibcudf.Table is created on some CUDA stream and that same stream is used for the DataFrame:

python
import polars as pl
import pyarrow as pa
import pylibcudf as plc
from rmm.pylibrmm.stream import Stream

from cudf_polars.containers import DataFrame, DataType

stream = Stream()
t = plc.Table.from_arrow(
    pa.Table.from_pylist([{"a": 1, "b": 0}, {"a": 1, "b": 1}, {"a": 2, "b": 0}]),
    stream=stream
)
# t is valid on `stream`. So we must provide `stream` or some CUDA Stream that's
# downstream of it
df = DataFrame.from_table(
    t,
    names=['a', 'b'],
    dtypes=[DataType(pl.Int64()), DataType(pl.Int64())],
    stream=stream
)

Managing multiple containers, which are potentially valid on different streams, is more challenging. We have some utilities that can help correctly handle data from multiple independent sources. For example, to add a new Column to df that's valid on some independent CUDA stream, we'd use cudf_polars.utils.cuda_stream.get_joined_cuda_stream to get a new CUDA stream that's downstream of both the original stream and stream_b.

python
from cudf_polars.containers import Column
from cudf_polars.utils.cuda_stream import get_joined_cuda_stream

stream_b = Stream()
col = Column(plc.Column.from_arrow(pa.array([1, 2, 3]), stream=stream_b), dtype=pl.Int64(), name="c")

new_stream = get_joined_cuda_stream(upstreams=(stream, stream_b))
df2 = df.with_columns([col], stream=new_stream)

The same principle applies to using the cudf_polars.containers.DataFrame constructor with multiple cudf_polars.containers.Column objects that are valid on multiple streams. It's the caller's responsibility to provide a stream that all the Columns are valid on, likely by joining together the streams that each individual stream is valid on.

Writing tests

We use pytest, tests live in the tests/ subdirectory, organisationally the top-level test files each handle one of the IR nodes. The goal is that they are parametrized over all the options each node will handle, to have reasonable coverage. Tests of expression functionality should live in tests/expressions/.

To write a test an assert correctness, build a lazyframe as a query, and then use the utility assertion function from cudf_polars.testing.asserts. This runs the query using both the cudf executor and polars CPU, and checks that they match. So:

python
from cudf_polars.testing.asserts import assert_gpu_result_equal


def test_whatever():
    query = pl.LazyFrame(...).(...)

    assert_gpu_result_equal(query)

Test coverage and asserting failure modes

Where translation of a query should fail due to the feature being unsupported we should test this. To assert that translation raises an exception (usually NotImplementedError), use the utility function assert_ir_translation_raises:

python
from cudf_polars.testing.asserts import assert_ir_translation_raises


def test_whatever():
    unsupported_query = ...
    assert_ir_translation_raises(unsupported_query, NotImplementedError)

This test will fail if translation does not raise.

Debugging

If the callback execution fails during the polars collect call, we obtain an error, but are not able to drop into the debugger and inspect the stack properly: we can't cross the language barrier.

However, we can drive the translation and execution of the DSL by hand. Given some LazyFrame representing a query, we can first translate it to our intermediate representation (IR), and then execute and convert back to polars:

python
from cudf_polars.dsl.translate import Translator
from cudf_polars.dsl.ir import IRExecutionContext
from rmm.pylibrmm.stream import DEFAULT_STREAM
import polars as pl

q = ...

# Convert to our IR
translator = Translator(q._ldf.visit(), pl.GPUEngine())
ir = translator.translate_ir()

# DataFrame living on the device
result = ir.evaluate(cache={}, timer=None, context=IRExecutionContext())

# Polars dataframe
host_result = result.to_polars()

If we get any exceptions, we can then debug as normal in Python.

Configuration

Polars users can configure various options about how the plan is executed through the pl.GPUEngine(). This includes some configuration options defined in polars itself: https://docs.pola.rs/api/python/dev/reference/lazyframe/api/polars.lazyframe.engine_config.GPUEngine.html

All additional keyword arguments are made available to cudf-polars through engine.config.

To centralize validation and keep things well-typed internally, we model our additional configuration as a set of dataclasses defined in cudf_polars/utils/config.py. To transition from user-provided options to our (validated) internal options, use ConfigOptions.from_polars_engine.

Profiling

You can profile cudf_polars using NVIDIA NSight Systems. Each .collect() or .sink() call has two top-level ranges under the cudf_polars domain:

  1. ConvertIR: measures the time spent converting the polars query plan to cudf-polars' IR.
  2. ExecuteIR: measures the time spent executing the cudf-polars IR.

The majority of time should be spent in the ExecuteIR range. Within ExecuteIR, each individual IR node's do_evaluate method is wrapped in another nvtx range (e.g. Scan.do_evaluate, GroupBy.do_evaluate, etc.). These provide a higher-level grouping over the lower-level libcudf calls (e.g. read_chunk, aggregate).

Finally, if using rapidsmpf for shuffling, the methods inserting and extracting partitions to shuffle are annotated with nvtx ranges.

Query Plans

The module cudf_polars.experimental.explain contains functions for dumping the query for a given LazyFrame.

Structured Output

cudf_polars.experimental.explain.serialize_query can be used to output the query plan in a structured format.

python
>>> import dataclasses
>>> import polars as pl
>>> from cudf_polars.experimental.explain import serialize_query
>>> q = pl.LazyFrame({"a": ['a', 'b', 'a'], "b": [1, 2, 3]}).group_by("a").agg(pl.len())
>>> dataclasses.asdict(serialize_query(q, engine=pl.GPUEngine()))
{'roots': ['526964741'],
 'nodes': {'526964741': {'id': '526964741',
   'children': ['1694929589'],
   'schema': {'a': 'STRING', 'len': 'UINT32'},
   'properties': {'columns': ['a', 'len']},
   'type': 'Select'},
  '1694929589': {'id': '1694929589',
   'children': ['2632275007'],
   'schema': {'a': 'STRING', '___0': 'UINT32'},
   'properties': {'keys': ['a']},
   'type': 'GroupBy'},
  '2632275007': {'id': '2632275007',
   'children': [],
   'schema': {'a': 'STRING'},
   'properties': {},
   'type': 'DataFrameScan'}},
 'partition_info': {'526964741': {'count': 1, 'partitioned_on': ()},
  '1694929589': {'count': 1, 'partitioned_on': ()},
  '2632275007': {'count': 1, 'partitioned_on': ()}}}

The structured schema has three top-level fields:

  1. roots: the integer ID for the "root" (final) nodes in the query plan
  2. partition_info: partitioning information at each stage of the query
  3. nodes: A mapping from integer node id to node details. Each node ID that appears in the output will be present in this mapping. Inspect children to understand which nodes this node depends on.

Note that all integers are stored as strings to make round-tripping to JSON easier.