docs/source/cpp/acero/user_guide.rst
.. Licensed to the Apache Software Foundation (ASF) under one .. or more contributor license agreements. See the NOTICE file .. distributed with this work for additional information .. regarding copyright ownership. The ASF licenses this file .. to you under the Apache License, Version 2.0 (the .. "License"); you may not use this file except in compliance .. with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing, .. software distributed under the License is distributed on an .. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY .. KIND, either express or implied. See the License for the .. specific language governing permissions and limitations .. under the License.
.. default-domain:: cpp .. highlight:: cpp .. cpp:namespace:: arrow::acero
This page describes how to use Acero. It's recommended that you read the overview first and familiarize yourself with the basic concepts.
The basic workflow for Acero is this:
#. First, create a graph of :class:Declaration objects describing the plan
#. Call one of the DeclarationToXyz methods to execute the Declaration.
a. A new ExecPlan is created from the graph of Declarations. Each Declaration will correspond to one ExecNode in the plan. In addition, a sink node will be added, depending on which DeclarationToXyz method was used.
b. The ExecPlan is executed. Typically this happens as part of the DeclarationToXyz call but in DeclarationToReader the reader is returned before the plan is finished executing.
c. Once the plan is finished it is destroyed
Substrait is the preferred mechanism for creating a plan (graph of :class:Declaration). There are a few
reasons for this:
Substrait producers spend a lot of time and energy in creating user-friendly APIs for producing complex
execution plans in a simple way. For example, the pivot_wider operation can be achieved using a complex
series of aggregate nodes. Rather than create all of those aggregate nodes by hand a producer will
give you a much simpler API.
If you are using Substrait then you can easily switch out to any other Substrait-consuming engine should you at some point find that it serves your needs better than Acero.
We hope that tools will eventually emerge for Substrait-based optimizers and planners. By using Substrait you will be making it much easier to use these tools in the future.
You could create the Substrait plan yourself but you'll probably have a much easier time finding an existing
Substrait producer. For example, you could use ibis-substrait <https://github.com/ibis-project/ibis-substrait>_
to easily create Substrait plans from python expressions. There are a few different tools that are able to create
Substrait plans from SQL. Eventually, we hope that C++ based Substrait producers will emerge. However, we
are not aware of any at this time.
Detailed instructions on creating an execution plan from Substrait can be found in
:ref:the Substrait page<acero-substrait>
Creating an execution plan programmatically is simpler than creating a plan from Substrait, though loses some of the flexibility and future-proofing guarantees. The simplest way to create a Declaration is to simply instantiate one. You will need the name of the declaration, a vector of inputs, and an options object. For example:
.. literalinclude:: ../../../../cpp/examples/arrow/execution_plan_documentation_examples.cc :language: cpp :start-after: (Doc section: Project Example) :end-before: (Doc section: Project Example) :linenos: :lineno-match:
The above code creates a scan declaration (which has no inputs) and a project declaration (using the scan as
input). This is simple enough but we can make it slightly easier. If you are creating a linear sequence of
declarations (like in the above example) then you can also use the :func:Declaration::Sequence function.
.. literalinclude:: ../../../../cpp/examples/arrow/execution_plan_documentation_examples.cc :language: cpp :start-after: (Doc section: Project Sequence Example) :end-before: (Doc section: Project Sequence Example) :linenos: :lineno-match:
There are many more examples of programmatic plan creation later in this document.
There are a number of different methods that can be used to execute a declaration. Each one provides the
data in a slightly different form. Since all of these methods start with DeclarationTo... this guide
will often refer to these methods as the DeclarationToXyz methods.
The :func:DeclarationToTable method will accumulate all of the results into a single :class:arrow::Table.
This is perhaps the simplest way to collect results from Acero. The main disadvantage to this approach is
that it requires accumulating all results into memory.
.. note::
Acero processes large datasets in small chunks. This is described in more detail in the developer's guide.
As a result, you may be surprised to find that a table collected with DeclarationToTable is chunked
differently than your input. For example, your input might be a large table with a single chunk with 2
million rows. Your output table might then have 64 chunks with 32Ki rows each. There is a current request
to specify the chunk size for the output in GH-15155 <https://github.com/apache/arrow/issues/15155>_.
The :func:DeclarationToReader method allows you to iteratively consume the results. It will create an
:class:arrow::RecordBatchReader which you can read from at your leisure. If you do not read from the
reader quickly enough then backpressure will be applied and the execution plan will pause. Closing the
reader will cancel the running execution plan and the reader's destructor will wait for the execution plan
to finish whatever it is doing and so it may block.
The :func:DeclarationToStatus method is useful if you want to run the plan but do not actually want to
consume the results. For example, this is useful when benchmarking or when the plan has side effects such
as a dataset write node. If the plan generates any results then they will be immediately discarded.
If one of the DeclarationToXyz methods is not sufficient for some reason then it is possible to run a plan
directly. This should only be needed if you are doing something unique. For example, if you have created a
custom sink node or if you need a plan that has multiple outputs.
.. note:: In academic literature and many existing systems there is a general assumption that an execution plan has at most one output. There are some things in Acero, such as the DeclarationToXyz methods, which will expect this. However, there is nothing in the design that strictly prevents having multiple sink nodes.
Detailed instructions on how to do this are out of scope for this guide but the rough steps are:
ExecPlan object.Declaration objects (this is the only type you will need
to create declarations for sink nodes)Declaration::AddToPlan to add your declaration to your plan (if you have more than one output
then you will not be able to use this method and will need to add your nodes one at a time)ExecPlan::ValidateExecPlan::StartProducingExecPlan::finished to complete.Input data for an exec plan can come from a variety of sources. It is often read from files stored on some kind of filesystem. It is also common for input to come from in-memory data. In-memory data is typical, for example, in a pandas-like frontend. Input could also come from network streams like a Flight request. Acero can support all of these cases and can even support unique and custom situations not mentioned here.
There are pre-defined source nodes that cover the most common input scenarios. These are listed below. However,
if your source data is unique then you will need to use the generic source node. This node expects you to
provide an asynchronous stream of batches and is covered in more detail :ref:here <stream_execution_source_docs>.
.. _ExecNode List:
ExecNode ImplementationsThe following tables quickly summarize the available operators.
These nodes can be used as sources of data
.. list-table:: Source Nodes :widths: 25 25 50 :header-rows: 1
sourceSourceNodeOptionsexample <stream_execution_source_docs>)table_sourceTableSourceNodeOptionsarrow::Table (:ref:example <stream_execution_table_source_docs>)record_batch_sourceRecordBatchSourceNodeOptionsarrow::RecordBatchrecord_batch_reader_sourceRecordBatchReaderSourceNodeOptionsarrow::RecordBatchReaderexec_batch_sourceExecBatchSourceNodeOptionsarrow::compute::ExecBatcharray_vector_sourceArrayVectorSourceNodeOptionsarrow::Arrayscanarrow::dataset::ScanNodeOptionsarrow::dataset::Dataset (requires the datasets module)
(:ref:example <stream_execution_scan_docs>)These nodes perform computations on data and may transform or reshape the data
.. list-table:: Compute Nodes :widths: 25 25 50 :header-rows: 1
filterFilterNodeOptionsexample <stream_execution_filter_docs>)projectProjectNodeOptionsexample <stream_execution_project_docs>)aggregateAggregateNodeOptionsexample <stream_execution_aggregate_docs>)pivot_longerPivotLongerNodeOptionsThese nodes reorder, combine, or slice streams of data
.. list-table:: Arrangement Nodes :widths: 25 25 50 :header-rows: 1
hash_joinHashJoinNodeOptionsexample <stream_execution_hashjoin_docs>)asofjoinAsofJoinNodeOptionsunionexample <stream_execution_union_docs>)order_byOrderByNodeOptionsfetchFetchNodeOptionsThese nodes terminate a plan. Users do not typically create sink nodes as they are selected based on the DeclarationToXyz method used to consume the plan. However, this list may be useful for those developing new sink nodes or using Acero in advanced ways.
.. list-table:: Sink Nodes :widths: 25 25 50 :header-rows: 1
sinkSinkNodeOptionswritearrow::dataset::WriteNodeOptionsexample <stream_execution_write_docs>)consuming_sinkConsumingSinkNodeOptionstable_sinkTableSinkNodeOptionsarrow::Tableorder_by_sinkOrderBySinkNodeOptionsselect_k_sinkSelectKSinkNodeOptionsThe rest of this document contains example execution plans. Each example highlights the behavior of a specific execution node.
.. _stream_execution_source_docs:
sourceA source operation can be considered as an entry point to create a streaming execution plan.
:class:SourceNodeOptions are used to create the source operation. The
source operation is the most generic and flexible type of source currently available but it can
be quite tricky to configure. First you should review the other source node types to ensure there
isn't a simpler choice.
The source node requires some kind of function that can be called to poll for more data. This
function should take no arguments and should return an
arrow::Future<std::optional<arrow::ExecBatch>>.
This function might be reading a file, iterating through an in memory structure, or receiving data
from a network connection. The arrow library refers to these functions as arrow::AsyncGenerator
and there are a number of utilities for working with these functions. For this example we use
a vector of record batches that we've already stored in memory.
In addition, the schema of the data must be known up front. Acero must know the schema of the data
at each stage of the execution graph before any processing has begun. This means we must supply the
schema for a source node separately from the data itself.
Here we define a struct to hold the data generator definition. This includes in-memory batches, schema and a function that serves as a data generator :
.. literalinclude:: ../../../../cpp/examples/arrow/execution_plan_documentation_examples.cc :language: cpp :start-after: (Doc section: BatchesWithSchema Definition) :end-before: (Doc section: BatchesWithSchema Definition) :linenos: :lineno-match:
Generating sample batches for computation:
.. literalinclude:: ../../../../cpp/examples/arrow/execution_plan_documentation_examples.cc :language: cpp :start-after: (Doc section: MakeBasicBatches Definition) :end-before: (Doc section: MakeBasicBatches Definition) :linenos: :lineno-match:
Example of using source (usage of sink is explained in detail in :ref:sink<stream_execution_sink_docs>):
.. literalinclude:: ../../../../cpp/examples/arrow/execution_plan_documentation_examples.cc :language: cpp :start-after: (Doc section: Source Example) :end-before: (Doc section: Source Example) :linenos: :lineno-match:
table_source.. _stream_execution_table_source_docs:
In the previous example, :ref:source node <stream_execution_source_docs>, a source node
was used to input the data. But when developing an application, if the data is already in memory
as a table, it is much easier, and more performant to use :class:TableSourceNodeOptions.
Here the input data can be passed as a std::shared_ptr<arrow::Table> along with a max_batch_size.
The max_batch_size is to break up large record batches so that they can be processed in parallel.
It is important to note that the table batches will not get merged to form larger batches when the source
table has a smaller batch size.
Example of using table_source
.. literalinclude:: ../../../../cpp/examples/arrow/execution_plan_documentation_examples.cc :language: cpp :start-after: (Doc section: Table Source Example) :end-before: (Doc section: Table Source Example) :linenos: :lineno-match:
.. _stream_execution_filter_docs:
filterfilter operation, as the name suggests, provides an option to define data filtering
criteria. It selects rows where the given expression evaluates to true. Filters can be written using
:class:arrow::compute::Expression, and the expression should have a return type of boolean.
For example, if we wish to keep rows where the value
of column b is greater than 3, then we can use the following expression.
Filter example:
.. literalinclude:: ../../../../cpp/examples/arrow/execution_plan_documentation_examples.cc :language: cpp :start-after: (Doc section: Filter Example) :end-before: (Doc section: Filter Example) :linenos: :lineno-match:
.. _stream_execution_project_docs:
projectproject operation rearranges, deletes, transforms, and creates columns.
Each output column is computed by evaluating an expression
against the source record batch. These must be scalar expressions
(expressions consisting of scalar literals, field references and scalar
functions, i.e. elementwise functions that return one value for each input
row independent of the value of all other rows).
This is exposed via :class:ProjectNodeOptions which requires,
an :class:arrow::compute::Expression and name for each of the output columns (if names are not
provided, the string representations of exprs will be used).
Project example:
.. literalinclude:: ../../../../cpp/examples/arrow/execution_plan_documentation_examples.cc :language: cpp :start-after: (Doc section: Project Example) :end-before: (Doc section: Project Example) :linenos: :lineno-match:
.. _stream_execution_aggregate_docs:
aggregateThe aggregate node computes various types of aggregates over data.
Arrow supports two types of aggregates: "scalar" aggregates, and
"hash" aggregates. Scalar aggregates reduce an array or scalar input
to a single scalar output (e.g. computing the mean of a column). Hash
aggregates act like GROUP BY in SQL and first partition data based
on one or more key columns, then reduce the data in each
partition. The aggregate node supports both types of computation,
and can compute any number of aggregations at once.
:class:AggregateNodeOptions is used to define the
aggregation criteria. It takes a list of aggregation functions and
their options; a list of target fields to aggregate, one per function;
and a list of names for the output fields, one per function.
Optionally, it takes a list of columns that are used to partition the
data, in the case of a hash aggregation. The aggregation functions
can be selected from :ref:this list of aggregation functions <aggregation-option-list>.
.. note:: This node is a "pipeline breaker" and will fully materialize the dataset in memory. In the future, spillover mechanisms will be added which should alleviate this constraint.
The aggregation can provide results as a group or scalar. For instances,
an operation like hash_count provides the counts per each unique record
as a grouped result while an operation like sum provides a single record.
Scalar Aggregation example:
.. literalinclude:: ../../../../cpp/examples/arrow/execution_plan_documentation_examples.cc :language: cpp :start-after: (Doc section: Scalar Aggregate Example) :end-before: (Doc section: Scalar Aggregate Example) :linenos: :lineno-match:
Group Aggregation example:
.. literalinclude:: ../../../../cpp/examples/arrow/execution_plan_documentation_examples.cc :language: cpp :start-after: (Doc section: Group Aggregate Example) :end-before: (Doc section: Group Aggregate Example) :linenos: :lineno-match:
.. _stream_execution_sink_docs:
sinksink operation provides output and is the final node of a streaming
execution definition. :class:SinkNodeOptions interface is used to pass
the required options. Similar to the source operator the sink operator exposes the output
with a function that returns a record batch future each time it is called. It is expected the
caller will repeatedly call this function until the generator function is exhausted (returns
std::optional::nullopt). If this function is not called often enough then record batches
will accumulate in memory. An execution plan should only have one
"terminal" node (one sink node). An :class:ExecPlan can terminate early due to cancellation or
an error, before the output is fully consumed. However, the plan can be safely destroyed independently
of the sink, which will hold the unconsumed batches by exec_plan->finished().
As a part of the Source Example, the Sink operation is also included;
.. literalinclude:: ../../../../cpp/examples/arrow/execution_plan_documentation_examples.cc :language: cpp :start-after: (Doc section: Source Example) :end-before: (Doc section: Source Example) :linenos: :lineno-match:
.. _stream_execution_consuming_sink_docs:
consuming_sinkconsuming_sink operator is a sink operation containing consuming operation within the
execution plan (i.e. the exec plan should not complete until the consumption has completed).
Unlike the sink node this node takes in a callback function that is expected to consume the
batch. Once this callback has finished the execution plan will no longer hold any reference to
the batch.
The consuming function may be called before a previous invocation has completed. If the consuming
function does not run quickly enough then many concurrent executions could pile up, blocking the
CPU thread pool. The execution plan will not be marked finished until all consuming function callbacks
have been completed.
Once all batches have been delivered the execution plan will wait for the finish future to complete
before marking the execution plan finished. This allows for workflows where the consumption function
converts batches into async tasks (this is currently done internally for the dataset write node).
Example::
// define a Custom SinkNodeConsumer std::atomic<uint32_t> batches_seen{0}; arrow::Future<> finish = arrow::Future<>::Make(); struct CustomSinkNodeConsumer : public cp::SinkNodeConsumer {
CustomSinkNodeConsumer(std::atomic<uint32_t> *batches_seen, arrow::Future<>finish):
batches_seen(batches_seen), finish(std::move(finish)) {}
// Consumption logic can be written here
arrow::Status Consume(cp::ExecBatch batch) override {
// data can be consumed in the expected way
// transfer to another system or just do some work
// and write to disk
(*batches_seen)++;
return arrow::Status::OK();
}
arrow::Future<> Finish() override { return finish; }
std::atomic<uint32_t> *batches_seen;
arrow::Future<> finish;
};
std::shared_ptr<CustomSinkNodeConsumer> consumer = std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);
arrow::acero::ExecNode *consuming_sink;
ARROW_ASSIGN_OR_RAISE(consuming_sink, MakeExecNode("consuming_sink", plan.get(), {source}, cp::ConsumingSinkNodeOptions(consumer)));
Consuming-Sink example:
.. literalinclude:: ../../../../cpp/examples/arrow/execution_plan_documentation_examples.cc :language: cpp :start-after: (Doc section: ConsumingSink Example) :end-before: (Doc section: ConsumingSink Example) :linenos: :lineno-match:
.. _stream_execution_order_by_sink_docs:
order_by_sinkorder_by_sink operation is an extension to the sink operation.
This operation provides the ability to guarantee the ordering of the
stream by providing the :class:OrderBySinkNodeOptions.
Here the :class:arrow::compute::SortOptions are provided to define which columns
are used for sorting and whether to sort by ascending or descending values.
.. note:: This node is a "pipeline breaker" and will fully materialize the dataset in memory. In the future, spillover mechanisms will be added which should alleviate this constraint.
Order-By-Sink example:
.. literalinclude:: ../../../../cpp/examples/arrow/execution_plan_documentation_examples.cc :language: cpp :start-after: (Doc section: OrderBySink Example) :end-before: (Doc section: OrderBySink Example) :linenos: :lineno-match:
.. _stream_execution_select_k_docs:
select_k_sinkselect_k_sink option enables selecting the top/bottom K elements,
similar to a SQL ORDER BY ... LIMIT K clause.
:class:SelectKOptions which is a defined by
using :struct:OrderBySinkNode definition. This option returns a sink node that receives
inputs and then compute top_k/bottom_k.
.. note:: This node is a "pipeline breaker" and will fully materialize the input in memory. In the future, spillover mechanisms will be added which should alleviate this constraint.
SelectK example:
.. literalinclude:: ../../../../cpp/examples/arrow/execution_plan_documentation_examples.cc :language: cpp :start-after: (Doc section: KSelect Example) :end-before: (Doc section: KSelect Example) :linenos: :lineno-match:
table_sink.. _stream_execution_table_sink_docs:
The table_sink node provides the ability to receive the output as an in-memory table.
This is simpler to use than the other sink nodes provided by the streaming execution engine
but it only makes sense when the output fits comfortably in memory.
The node is created using :class:TableSinkNodeOptions.
Example of using table_sink
.. literalinclude:: ../../../../cpp/examples/arrow/execution_plan_documentation_examples.cc :language: cpp :start-after: (Doc section: Table Sink Example) :end-before: (Doc section: Table Sink Example) :linenos: :lineno-match:
.. _stream_execution_scan_docs:
scanscan is an operation used to load and process datasets. It should be preferred over the
more generic source node when your input is a dataset. The behavior is defined using
:class:arrow::dataset::ScanNodeOptions. More information on datasets and the various
scan options can be found in :doc:../dataset.
This node is capable of applying pushdown filters to the file readers which reduce the amount of data that needs to be read. This means you may supply the same filter expression to the scan node that you also supply to the FilterNode because the filtering is done in two different places.
Scan example:
.. literalinclude:: ../../../../cpp/examples/arrow/execution_plan_documentation_examples.cc :language: cpp :start-after: (Doc section: Scan Example) :end-before: (Doc section: Scan Example) :linenos: :lineno-match:
.. _stream_execution_write_docs:
writeThe write node saves query results as a dataset of files in a
format like Parquet, Feather, CSV, etc. using the :doc:../dataset
functionality in Arrow. The write options are provided via the
:class:arrow::dataset::WriteNodeOptions which in turn contains
:class:arrow::dataset::FileSystemDatasetWriteOptions.
:class:arrow::dataset::FileSystemDatasetWriteOptions provides
control over the written dataset, including options like the output
directory, file naming scheme, and so on.
Write example:
.. literalinclude:: ../../../../cpp/examples/arrow/execution_plan_documentation_examples.cc :language: cpp :start-after: (Doc section: Write Example) :end-before: (Doc section: Write Example) :linenos: :lineno-match:
.. _stream_execution_union_docs:
unionunion merges multiple data streams with the same schema into one, similar to
a SQL UNION ALL clause.
The following example demonstrates how this can be achieved using two data sources.
Union example:
.. literalinclude:: ../../../../cpp/examples/arrow/execution_plan_documentation_examples.cc :language: cpp :start-after: (Doc section: Union Example) :end-before: (Doc section: Union Example) :linenos: :lineno-match:
.. _stream_execution_hashjoin_docs:
hash_joinhash_join operation provides the relational algebra operation, join using hash-based
algorithm. :class:HashJoinNodeOptions contains the options required in
defining a join. The hash_join supports
left/right/full semi/anti/outerjoins <https://en.wikipedia.org/wiki/Join_(SQL)>_.
Also the join-key (i.e. the column(s) to join on), and suffixes (i.e a suffix term like "x"
which can be appended as a suffix for column names duplicated in both left and right
relations.) can be set via the join options.
Read more on hash-joins <https://en.wikipedia.org/wiki/Hash_join>.
Hash-Join example:
.. literalinclude:: ../../../../cpp/examples/arrow/execution_plan_documentation_examples.cc :language: cpp :start-after: (Doc section: HashJoin Example) :end-before: (Doc section: HashJoin Example) :linenos: :lineno-match:
There are examples of these nodes which can be found in
cpp/examples/arrow/execution_plan_documentation_examples.cc in the Arrow source.
Complete Example:
.. literalinclude:: ../../../../cpp/examples/arrow/execution_plan_documentation_examples.cc :language: cpp :start-after: (Doc section: Execution Plan Documentation Example) :end-before: (Doc section: Execution Plan Documentation Example) :linenos: :lineno-match: