docs/source/library-user-guide/custom-table-providers.md
One of DataFusion's greatest strengths is its extensibility. If your data lives in a custom format, behind an API, or in a system that DataFusion does not natively support, you can teach DataFusion to read it by implementing a custom table provider. This post walks through the three layers you need to understand to design a table provider and where planning and execution work should happen.
For details on how table constraints such as primary keys or unique constraints are handled, see Table Constraint Enforcement.
The majority of this content was originally posted in the blog Writing Custom Table Providers in Apache DataFusion.
When DataFusion executes a query against a table, three abstractions collaborate to produce results:
RecordBatches one at a time.Think of these as a funnel: TableProvider::scan() is called once during
planning to create an ExecutionPlan, then ExecutionPlan::execute() is called
once per partition to create a stream, and those streams are where rows are
actually produced during execution.
Before diving into the three layers, it helps to understand how DataFusion processes a query. There are several phases between a SQL string (or DataFrame call) and streaming results:
SQL / DataFrame API
→ Logical Plan (abstract: what to compute)
→ Logical Optimization (rewrite rules that preserve semantics)
→ Physical Plan (concrete: how to compute it)
→ Physical Optimization (hardware- and data-aware rewrites)
→ Execution (streaming RecordBatches)
A logical plan describes what the query computes without specifying how.
It is a tree of relational operators -- TableScan, Filter, Projection,
Aggregate, Join, Sort, Limit, and so on. The logical optimizer rewrites
this tree to reduce work while preserving the query's meaning. Some logical
optimizations include:
1 = 1 or
x AND true into simpler forms.IN / EXISTS subqueries
into more efficient semi-joins.LIMIT earlier in the plan so operators
produce less data.The physical planner converts the optimized logical plan into an
ExecutionPlan tree -- the concrete plan that will actually run. This is where
decisions like "use a hash join vs. a sort-merge join" or "how many partitions
to scan" are made. The physical optimizer then refines this tree further with rewrites such as:
RepartitionExec nodes so that data
is partitioned correctly for joins and aggregations.SortExec nodes where ordering is required,
and removes them where the data is already sorted.Your TableProvider sits at the boundary between logical and physical planning.
During logical optimization, DataFusion determines which filters and projections
could be pushed down to the source. When scan() is called during physical
planning, those hints are passed to you. By implementing capabilities like
supports_filters_pushdown, you influence what the optimizer can do -- and the
metadata you declare in your ExecutionPlan (partitioning, ordering) directly
affects which physical optimizations apply.
Not every custom data source requires implementing all three layers from scratch. DataFusion provides building blocks that let you plug in at whatever level makes sense:
| If your data is... | Start with | You implement |
|---|---|---|
Already in RecordBatches in memory | MemTable | Nothing -- just construct it |
| An async stream of batches | StreamTable | A stream factory |
| A logical transformation of other tables | ViewTable wrapping a logical plan | The logical plan |
| A variant of an existing file format | ListingTable with a custom FileFormat wrapping an existing one | A thin FileFormat wrapper |
| Files in a custom format on disk or object storage | ListingTable with a custom FileFormat, FileSource, and FileOpener | The format, source, and opener |
| A custom source needing full control | TableProvider + ExecutionPlan + stream | All three layers |
If your data is file-based, ListingTable handles file discovery, partition
column inference, and plan construction -- you only need to implement
FileFormat, FileSource, and FileOpener to describe how to read your
files. See the custom_file_format example for a minimal wrapping approach,
or ParquetSource and ParquetOpener for a full custom implementation to
use as a reference.
The rest of this post focuses on the full TableProvider + ExecutionPlan +
stream path, which gives you complete control and applies to any data source.
A TableProvider represents a queryable data source. For a minimal read-only table, you need three methods:
impl TableProvider for MyTable {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
fn table_type(&self) -> TableType {
TableType::Base
}
async fn scan(
&self,
state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
// Build and return an ExecutionPlan -- don't do any execution work here -- keep lightweight!
Ok(Arc::new(MyExecPlan::new(
Arc::clone(&self.schema),
projection,
limit,
)))
}
}
The scan method is the heart of TableProvider. It receives three pushdown
hints from the optimizer, each reducing the amount of data your source needs
to produce:
projection -- Which columns are needed. This reduces the width of
the output. If your source supports it, read only these columns rather than
the full schema.filters -- Predicates the engine would like you to apply during the
scan. This reduces the number of rows by skipping data that does not
match. Implement supports_filters_pushdown to advertise which filters you
can handle.limit -- A row count cap. This also reduces the number of rows --
if you can stop reading early once you have produced enough rows, this avoids
unnecessary work.You can also use the scan_with_args() variant that provides additional pushdown information for other advanced use cases.
scan() LightweightThis is a critical point: scan() runs during planning, not execution. It
should return quickly. Best practice is to avoid performing I/O, network
calls, or heavy computation here. The scan method's job is to describe how
the data will be produced, not to produce it. All the real work belongs in the
stream (Layer 3).
A common pitfall is to fetch data or open connections in scan(). This blocks
the planning thread and can cause timeouts or deadlocks, especially if the query
involves multiple tables or subqueries that all need to be planned before
execution begins.
DataFusion ships several TableProvider implementations that are excellent
references:
Vec<RecordBatch>. The simplest
possible provider; great for tests and small datasets.An ExecutionPlan is a node in the physical query plan tree. Your table
provider's scan() method returns one. The required methods are:
impl ExecutionPlan for MyExecPlan {
fn name(&self) -> &str { "MyExecPlan" }
fn properties(&self) -> &Arc<PlanProperties> {
&self.properties
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![] // Leaf node -- no children
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
assert!(children.is_empty());
Ok(self)
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
// This is where you build and return your stream
// ...
}
}
The key properties to set correctly in PlanProperties are output partitioning and output ordering.
Output partitioning tells the engine how many partitions your data has, which determines parallelism. If your source naturally partitions data (e.g., by file or by shard), expose that here.
Output ordering declares whether your data is naturally sorted. This
enables the optimizer to avoid inserting a SortExec when a query requires
ordered data. Getting this right can be a significant performance win.
Since execute() is called once per partition, partitioning directly controls
the parallelism of your table scan. Each partition produces an independent
stream that DataFusion schedules as a task on the tokio runtime. It is
important to distinguish tasks from threads: tasks are lightweight units of
async work that are multiplexed onto a thread pool. You can have many more
tasks (partitions) than physical threads -- the runtime will interleave them
efficiently as they await I/O or yield.
Start simple: match your data's natural layout. If you have 4 files, expose
4 partitions. If your source has 8 shards, expose 8 partitions. DataFusion will
insert a RepartitionExec above your scan when downstream operators need a
different distribution. You can also implement the
repartitioned
method on your ExecutionPlan to let DataFusion request a different partition
count directly from your source, avoiding the extra operator entirely.
Consider how your data source naturally divides its data:
Advanced: aligning with target_partitions. Once you have something
working, you can tune further. Having too many partitions is not free: each
partition adds scheduling overhead, and downstream operators may need to
repartition the data anyway. The session configuration exposes a
target partition count that reflects how many partitions the optimizer
expects to work with:
async fn scan(
&self,
state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let target_partitions = state.config().target_partitions();
// Optionally coalesce or split partitions to match target_partitions.
// ...
}
If your source produces data in exactly target_partitions partitions, the
optimizer is less likely to insert a RepartitionExec above your scan.
For small datasets, target_partitions may be set to 1, which avoids any
repartitioning overhead entirely.
Advanced: declaring hash partitioning. If your source stores data
pre-partitioned by a specific key (e.g., customer_id), you can declare this
in your output partitioning. For a query like:
SELECT customer_id, SUM(amount)
FROM my_table
GROUP BY customer_id;
If you declare your output partitioning as Hash([customer_id], N), the
optimizer recognizes that the data is already distributed correctly for the
aggregation and eliminates the RepartitionExec that would otherwise appear
in the plan. You can verify this with EXPLAIN (more on this below).
Conversely, if you report UnknownPartitioning, DataFusion must assume the
worst case and will always insert repartitioning operators as needed.
execute() Lightweight TooLike scan(), the execute() method should construct and return a stream
without doing heavy work. The actual data production happens when the stream
is polled. Do not block on async operations here -- build the stream and let
the runtime drive it.
SendableRecordBatchStream is where the real work happens. It is defined as:
type SendableRecordBatchStream =
Pin<Box<dyn RecordBatchStream<Item = Result<RecordBatch>> + Send>>;
This is an async stream of RecordBatches that can be sent across threads. When
the DataFusion runtime polls this stream, your code runs: reading files, calling
APIs, transforming data, etc.
The easiest way to create a SendableRecordBatchStream is with
RecordBatchStreamAdapter. It bridges any futures::Stream<Item = Result<RecordBatch>> into the SendableRecordBatchStream type:
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let schema = self.schema();
let config = self.config.clone();
let stream = futures::stream::once(async move {
// ALL the heavy work happens here, inside the stream:
// - Open connections
// - Read data from external sources
// - Transform and batch the results
let batches = fetch_data_from_source(&config).await?;
Ok(batches)
})
.flat_map(|result| match result {
Ok(batch) => futures::stream::iter(vec![Ok(batch)]),
Err(e) => futures::stream::iter(vec![Err(e)]),
});
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
}
If your stream performs blocking work -- such as blocking I/O, or CPU work that runs for hundreds of milliseconds without yielding -- you must avoid blocking the tokio async runtime. Short CPU work (e.g., parsing a batch in a few milliseconds) is fine to do inline as long as your code yields back to the runtime frequently. But for long-running synchronous work that cannot yield, offload to a dedicated thread pool and send results back through a channel:
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let schema = self.schema();
let config = self.config.clone();
let (tx, rx) = tokio::sync::mpsc::channel(2);
// Spawn blocking work on a dedicated thread pool
tokio::task::spawn_blocking(move || {
let batches = generate_data(&config);
for batch in batches {
if tx.blocking_send(Ok(batch)).is_err() {
break; // Receiver dropped, query was cancelled
}
}
});
let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
}
This pattern keeps the async runtime responsive while long-running synchronous work runs on its own threads. For a working example that shows how to configure separate thread pools for I/O and CPU work, see the thread_pools example in the DataFusion repository.
This table summarizes what belongs at each layer:
| Layer | Runs During | Should Do | Should NOT Do |
|---|---|---|---|
TableProvider::scan() | Planning | Build an ExecutionPlan with metadata | I/O, network calls, heavy computation |
ExecutionPlan::execute() | Execution (once per partition) | Construct a stream, set up channels | Block on async work, read data |
RecordBatchStream (polling) | Execution | All I/O, computation, data production | -- |
The guiding principle: push work as late as possible. Planning should be fast so the optimizer can do its job. Execution setup should be fast so all partitions can start promptly. The stream is where you spend time producing data.
When scan() does heavy work, several problems arise:
scan()
takes 500ms, planning alone takes 5 seconds before any data flows.scan() runs on a single thread during
planning, so any work done there cannot benefit from the parallel execution
that DataFusion provides across partitions.One of the most impactful optimizations you can add to a custom table provider is filter pushdown -- letting the source skip data that the query does not need, rather than reading everything and filtering it afterward.
When DataFusion plans a query with a WHERE clause, it passes the filter
predicates to your scan() method as the filters parameter. By default,
DataFusion assumes your provider cannot handle any filters and inserts a
FilterExec node above your scan to apply them. But if your source can
evaluate some predicates during scanning -- for example, by skipping files,
partitions, or row groups that cannot match -- you can eliminate a huge amount
of unnecessary I/O.
To opt in, implement supports_filters_pushdown:
# use std::any::Any;
# use std::sync::Arc;
# use arrow::datatypes::SchemaRef;
# use datafusion::catalog::{TableProvider, Session};
# use datafusion::common::Result;
# use datafusion::datasource::TableType;
# use datafusion::logical_expr::{Expr, BinaryExpr, Operator, TableProviderFilterPushDown};
# use datafusion::physical_plan::ExecutionPlan;
#
# fn is_partition_column(_expr: &Expr) -> bool { false }
#
# #[derive(Debug)]
# struct MyFilterTable;
#
# #[async_trait::async_trait]
# impl TableProvider for MyFilterTable {
# fn schema(&self) -> SchemaRef { todo!() }
# fn table_type(&self) -> TableType { TableType::Base }
# async fn scan(&self, _: &dyn Session, _: Option<&Vec<usize>>, _: &[Expr], _: Option<usize>) -> Result<Arc<dyn ExecutionPlan>> { todo!() }
#
fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
Ok(filters.iter().map(|f| {
match f {
// We can fully evaluate equality filters on
// the partition column at the source
Expr::BinaryExpr(BinaryExpr {
left, op: Operator::Eq, right
}) if is_partition_column(left) || is_partition_column(right) => {
TableProviderFilterPushDown::Exact
}
// All other filters: let DataFusion handle them
_ => TableProviderFilterPushDown::Unsupported,
}
}).collect())
}
# }
The three possible responses for each filter are:
Exact -- Your source guarantees that no output rows will have a false
value for this predicate. Because the filter is fully evaluated at the source,
DataFusion will not add a FilterExec for it.Inexact -- Your source has the ability to reduce the data produced, but
the output may still include rows that do not satisfy the predicate. For
example, you might skip entire files based on metadata statistics but not
filter individual rows within a file. DataFusion will still add a FilterExec
above your scan to remove any remaining rows that slipped through.Unsupported -- Your source ignores this filter entirely. DataFusion
handles it.Consider a table with 1 billion rows partitioned by region, and a query:
SELECT * FROM events WHERE region = 'us-east-1' AND event_type = 'click';
Without filter pushdown: Your table provider reads all 1 billion rows across all regions. DataFusion then applies both filters, discarding the vast majority of the data.
With filter pushdown on region: Your scan() method sees the
region = 'us-east-1' filter and constructs an execution plan that only reads
the us-east-1 partition. If that partition holds 100 million rows, you have
just eliminated 90% of the I/O. DataFusion still applies the event_type
filter via FilterExec if you reported it as Unsupported.
DataFusion already pushes filters as close to the data source as possible, typically placing them directly above the scan. FilterExec is also highly optimized, with vectorized evaluation and type-specialized kernels for fast predicate evaluation.
Because of this, you should only implement filter pushdown when your data source can do strictly better -- for example, by avoiding I/O entirely through skipping files or partitions based on metadata. If your data source cannot eliminate I/O in this way, it is usually better to let DataFusion handle the filter, as its in-memory execution is already highly efficient.
The EXPLAIN statement is your best tool for understanding what DataFusion is
actually doing with your table provider. It shows the physical plan that
DataFusion will execute, including any operators it inserted:
EXPLAIN SELECT * FROM events WHERE region = 'us-east-1' AND event_type = 'click';
If you are using DataFrames, call .explain(false, false) for the logical plan
or .explain(false, true) for the physical plan. You can also print the plans
in verbose mode with .explain(true, true).
Before filter pushdown, the plan might look like:
FilterExec: region@0 = us-east-1 AND event_type@1 = click
MyExecPlan: partitions=50
Here DataFusion is reading all 50 partitions and filtering everything
afterward. The FilterExec above your scan is doing all the predicate work.
After implementing pushdown for region (reported as Exact):
FilterExec: event_type@1 = click
MyExecPlan: partitions=5, filter=[region = us-east-1]
Now your exec reads only the 5 partitions for us-east-1, and the remaining
FilterExec only handles the event_type predicate. The region filter has
been fully absorbed by your scan.
After implementing pushdown for both filters (both Exact):
MyExecPlan: partitions=5, filter=[region = us-east-1 AND event_type = click]
No FilterExec at all -- your source handles everything.
Similarly, EXPLAIN will reveal whether DataFusion is inserting unnecessary
SortExec or RepartitionExec nodes that you could eliminate by declaring
better output properties. Whenever your queries seem slower than expected,
EXPLAIN is the first place to look.
To make filter pushdown concrete, here is an illustrative example. Imagine a
table provider that reads from a set of date-partitioned directories on disk
(e.g., data/2026-03-01/, data/2026-03-02/, ...). Each directory contains
one or more Parquet files for that date. By pushing down a filter on the date
column, the provider can skip entire directories -- avoiding the I/O of listing
and reading files that cannot possibly match the query.
# use std::any::Any;
# use std::collections::HashMap;
# use std::fmt;
# use std::sync::Arc;
# use arrow::datatypes::SchemaRef;
# use datafusion::catalog::{TableProvider, Session};
# use datafusion::common::Result;
# use datafusion::common::tree_node::TreeNodeRecursion;
# use datafusion::datasource::TableType;
# use datafusion::execution::SendableRecordBatchStream;
# use datafusion::execution::context::TaskContext;
# use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
# use datafusion::physical_expr::EquivalenceProperties;
# use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr, PlanProperties};
# use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
#
/// A table provider backed by date-partitioned directories.
/// Each date directory contains data files; by filtering on the
/// `date` column we can skip entire directories of I/O.
# #[derive(Debug)]
struct DatePartitionedTable {
schema: SchemaRef,
/// Maps date strings ("2026-03-01") to directory paths
partitions: HashMap<String, String>,
}
#[async_trait::async_trait]
impl TableProvider for DatePartitionedTable {
fn schema(&self) -> SchemaRef { Arc::clone(&self.schema) }
fn table_type(&self) -> TableType { TableType::Base }
fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
Ok(filters.iter().map(|f| {
if Self::is_date_equality_filter(f) {
// We can fully evaluate this: we will only read
// directories matching the date, so no rows with
// a different date will appear in the output.
TableProviderFilterPushDown::Exact
} else {
TableProviderFilterPushDown::Unsupported
}
}).collect())
}
async fn scan(
&self,
_state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
// Determine which date partitions to read by inspecting
// the pushed-down filters. This is the key optimization:
// we decide *during planning* which directories to scan,
// so that execution never touches irrelevant data.
let dates_to_read: Vec<String> = self
.extract_date_values(filters)
.unwrap_or_else(||
self.partitions.keys().cloned().collect()
);
let dirs: Vec<String> = dates_to_read
.iter()
.filter_map(|d| self.partitions.get(d).cloned())
.collect();
let num_dirs = dirs.len();
Ok(Arc::new(DatePartitionedExec {
schema: Arc::clone(&self.schema),
directories: dirs,
properties: Arc::new(PlanProperties::new(
EquivalenceProperties::new(
Arc::clone(&self.schema),
),
// One partition per date directory -- these
// will be read in parallel.
Partitioning::UnknownPartitioning(num_dirs),
EmissionType::Incremental,
Boundedness::Bounded,
)),
}))
}
}
impl DatePartitionedTable {
/// Check if a filter is an equality comparison on the `date` column.
fn is_date_equality_filter(expr: &Expr) -> bool {
// In practice, match on BinaryExpr { left, op: Eq, right }
// and check if either side references the "date" column.
// Simplified here for clarity.
todo!("match on date equality expressions")
}
/// Extract date literal values from pushed-down equality filters.
fn extract_date_values(&self, filters: &[Expr]) -> Option<Vec<String>> {
// Parse filters like `date = '2026-03-01'` and return
// the literal date strings. Returns None if no date
// filters are present (meaning: read all partitions).
todo!("extract date literals from filter expressions")
}
}
#
# #[derive(Debug)]
# struct DatePartitionedExec {
# schema: SchemaRef,
# directories: Vec<String>,
# properties: Arc<PlanProperties>,
# }
#
# impl DisplayAs for DatePartitionedExec {
# fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
# write!(f, "DatePartitionedExec")
# }
# }
#
# impl ExecutionPlan for DatePartitionedExec {
# fn name(&self) -> &str { "DatePartitionedExec" }
# fn properties(&self) -> &Arc<PlanProperties> { &self.properties }
# fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> { vec![] }
# fn with_new_children(self: Arc<Self>, _: Vec<Arc<dyn ExecutionPlan>>) -> Result<Arc<dyn ExecutionPlan>> { Ok(self) }
# fn execute(&self, _: usize, _: Arc<TaskContext>) -> Result<SendableRecordBatchStream> { todo!() }
# }
The key insight is that the filter pushdown decision (supports_filters_pushdown)
and the partition pruning (scan()) work together: the first tells DataFusion
that a FilterExec is unnecessary for the date predicate, and the second
ensures that only the relevant directories are scanned. The actual file reading
happens later, in the stream produced by execute().
Here is a minimal but complete example of a custom table provider that generates data lazily during streaming:
use std::any::Any;
# use std::fmt;
use std::sync::Arc;
use arrow::array::Int64Array;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion::catalog::TableProvider;
use datafusion::common::Result;
# use datafusion::common::tree_node::TreeNodeRecursion;
use datafusion::datasource::TableType;
use datafusion::catalog::Session;
use datafusion::execution::SendableRecordBatchStream;
# use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::Expr;
use datafusion::physical_expr::EquivalenceProperties;
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
# DisplayAs, DisplayFormatType,
ExecutionPlan, Partitioning,
# PhysicalExpr,
PlanProperties,
};
use futures::stream;
/// A table provider that generates sequential numbers on demand.
# #[derive(Debug)]
struct CountingTable {
schema: SchemaRef,
num_partitions: usize,
rows_per_partition: usize,
}
impl CountingTable {
fn new(num_partitions: usize, rows_per_partition: usize) -> Self {
let schema = Arc::new(Schema::new(vec![
Field::new("partition", DataType::Int64, false),
Field::new("value", DataType::Int64, false),
]));
Self { schema, num_partitions, rows_per_partition }
}
}
#[async_trait::async_trait]
impl TableProvider for CountingTable {
fn schema(&self) -> SchemaRef { Arc::clone(&self.schema) }
fn table_type(&self) -> TableType { TableType::Base }
async fn scan(
&self,
_state: &dyn Session,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
// Light work only: build the plan with metadata
Ok(Arc::new(CountingExec {
schema: Arc::clone(&self.schema),
num_partitions: self.num_partitions,
rows_per_partition: limit
.unwrap_or(self.rows_per_partition)
.min(self.rows_per_partition),
properties: Arc::new(PlanProperties::new(
EquivalenceProperties::new(Arc::clone(&self.schema)),
Partitioning::UnknownPartitioning(self.num_partitions),
EmissionType::Incremental,
Boundedness::Bounded,
)),
}))
}
}
# #[derive(Debug)]
struct CountingExec {
schema: SchemaRef,
num_partitions: usize,
rows_per_partition: usize,
properties: Arc<PlanProperties>,
}
# impl DisplayAs for CountingExec {
# fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
# write!(f, "CountingExec: partitions={}", self.num_partitions)
# }
# }
#
impl ExecutionPlan for CountingExec {
fn name(&self) -> &str { "CountingExec" }
fn properties(&self) -> &Arc<PlanProperties> { &self.properties }
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> { vec![] }
fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(self)
}
fn execute(
&self,
partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let schema = Arc::clone(&self.schema);
let rows = self.rows_per_partition;
// The heavy work (data generation) happens inside the stream,
// not here in execute().
let batch_stream = stream::once(async move {
let partitions = Int64Array::from(
vec![partition as i64; rows],
);
let values = Int64Array::from(
(0..rows as i64).collect::<Vec<_>>(),
);
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![Arc::new(partitions), Arc::new(values)],
)?;
Ok(batch)
});
Ok(Box::pin(RecordBatchStreamAdapter::new(
Arc::clone(&self.schema),
batch_stream,
)))
}
}
Once you have implemented a TableProvider, register it with a SessionContext
to make it queryable:
use datafusion::execution::context::SessionContext;
#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
let provider = CountingTable::new(4, 1000);
ctx.register_table("counting", Arc::new(provider))?;
let df = ctx.sql("SELECT * FROM counting LIMIT 10").await?;
df.show().await?;
Ok(())
}
TableProvider API docsExecutionPlan API docsSendableRecordBatchStream API docs