Back to Datafusion

Building Logical Plans

docs/source/library-user-guide/building-logical-plans.md

53.1.09.4 KB
Original Source
<!--- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -->

Building Logical Plans

A logical plan is a structured representation of a database query that describes the high-level operations and transformations needed to retrieve data from a database or data source. It abstracts away specific implementation details and focuses on the logical flow of the query, including operations like filtering, sorting, and joining tables.

This logical plan serves as an intermediate step before generating an optimized physical execution plan. This is explained in more detail in the Query Planning and Execution Overview section of the Architecture Guide.

Building Logical Plans Manually

DataFusion's LogicalPlan is an enum containing variants representing all the supported operators, and also contains an Extension variant that allows projects building on DataFusion to add custom logical operators.

It is possible to create logical plans by directly creating instances of the LogicalPlan enum as shown, but it is much easier to use the LogicalPlanBuilder, which is described in the next section.

Here is an example of building a logical plan directly:

rust
use datafusion::common::DataFusionError;
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::logical_expr::{Filter, LogicalPlan, TableScan, LogicalTableSource};
use datafusion::prelude::*;
use std::sync::Arc;

fn main() -> Result<(), DataFusionError> {
    // create a logical table source
    let schema = Schema::new(vec![
        Field::new("id", DataType::Int32, true),
        Field::new("name", DataType::Utf8, true),
    ]);
    let table_source = LogicalTableSource::new(SchemaRef::new(schema));

    // create a TableScan plan
    let projection = None; // optional projection
    let filters = vec![]; // optional filters to push down
    let fetch = None; // optional LIMIT
    let table_scan = LogicalPlan::TableScan(TableScan::try_new(
        "person",
        Arc::new(table_source),
        projection,
        filters,
        fetch,
        )?
    );

    // create a Filter plan that evaluates `id > 500` that wraps the TableScan
    let filter_expr = col("id").gt(lit(500));
    let plan = LogicalPlan::Filter(Filter::try_new(filter_expr, Arc::new(table_scan)) ? );

    // print the plan
    println!("{}", plan.display_indent_schema());
    Ok(())
}

This example produces the following plan:

text
Filter: person.id > Int32(500) [id:Int32;N, name:Utf8;N]
  TableScan: person [id:Int32;N, name:Utf8;N]

Building Logical Plans with LogicalPlanBuilder

DataFusion logical plans can be created using the LogicalPlanBuilder struct. There is also a DataFrame API which is a higher-level API that delegates to LogicalPlanBuilder.

There are several functions that can can be used to create a new builder, such as

  • empty - create an empty plan with no fields
  • values - create a plan from a set of literal values
  • scan - create a plan representing a table scan
  • scan_with_filters - create a plan representing a table scan with filters

Once the builder is created, transformation methods can be called to declare that further operations should be performed on the plan. Note that all we are doing at this stage is building up the logical plan structure. No query execution will be performed.

Here are some examples of transformation methods, but for a full list, refer to the LogicalPlanBuilder API documentation.

  • filter
  • limit
  • sort
  • distinct
  • join

The following example demonstrates building the same simple query plan as the previous example, with a table scan followed by a filter.

<!-- source for this example is in datafusion_docs::library_logical_plan::plan_builder_1 -->
rust
use datafusion::common::DataFusionError;
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::logical_expr::{LogicalPlanBuilder, LogicalTableSource};
use datafusion::prelude::*;
use std::sync::Arc;

fn main() -> Result<(), DataFusionError> {
    // create a logical table source
    let schema = Schema::new(vec![
        Field::new("id", DataType::Int32, true),
        Field::new("name", DataType::Utf8, true),
    ]);
    let table_source = LogicalTableSource::new(SchemaRef::new(schema));

    // optional projection
    let projection = None;

    // create a LogicalPlanBuilder for a table scan
    let builder = LogicalPlanBuilder::scan("person", Arc::new(table_source), projection)?;

    // perform a filter operation and build the plan
    let plan = builder
        .filter(col("id").gt(lit(500)))? // WHERE id > 500
        .build()?;

    // print the plan
    println!("{}", plan.display_indent_schema());
    Ok(())
}

This example produces the following plan:

text
Filter: person.id > Int32(500) [id:Int32;N, name:Utf8;N]
  TableScan: person [id:Int32;N, name:Utf8;N]

Translating Logical Plan to Physical Plan

Logical plans can not be directly executed. They must be "compiled" into an ExecutionPlan, which is often referred to as a "physical plan".

Compared to LogicalPlans, ExecutionPlans have many more details such as specific algorithms and detailed optimizations. Given a LogicalPlan, the easiest way to create an ExecutionPlan is using SessionState::create_physical_plan as shown below

rust
use datafusion::datasource::{provider_as_source, MemTable};
use datafusion::common::DataFusionError;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::logical_expr::{LogicalPlanBuilder, LogicalTableSource};
use datafusion::prelude::*;
use std::sync::Arc;

// Creating physical plans may access remote catalogs and data sources
// thus it must be run with an async runtime.
#[tokio::main]
async fn main() -> Result<(), DataFusionError> {

    // create a default table source
    let schema = Schema::new(vec![
        Field::new("id", DataType::Int32, true),
        Field::new("name", DataType::Utf8, true),
    ]);
    // To create an ExecutionPlan we must provide an actual
    // TableProvider. For this example, we don't provide any data
    // but in production code, this would have `RecordBatch`es with
    // in memory data
    let table_provider = Arc::new(MemTable::try_new(Arc::new(schema), vec![vec![]])?);
    // Use the provider_as_source function to convert the TableProvider to a table source
    let table_source = provider_as_source(table_provider);

    // create a LogicalPlanBuilder for a table scan without projection or filters
    let logical_plan = LogicalPlanBuilder::scan("person", table_source, None)?.build()?;

    // Now create the physical plan by calling `create_physical_plan`
    let ctx = SessionContext::new();
    let physical_plan = ctx.state().create_physical_plan(&logical_plan).await?;

    // print the plan
    println!("{}", DisplayableExecutionPlan::new(physical_plan.as_ref()).indent(true));
    Ok(())
}

This example produces the following physical plan:

text
DataSourceExec: partitions=0, partition_sizes=[]

Table Sources

The previous examples use a LogicalTableSource, which is used for tests and documentation in DataFusion, and is also suitable if you are using DataFusion to build logical plans but do not use DataFusion's physical planner.

However, it is more common to use a TableProvider. To get a TableSource from a TableProvider, use provider_as_source or DefaultTableSource.