Back to Datafusion

Upgrade Guides

docs/source/library-user-guide/upgrading/51.0.0.md

53.1.010.9 KB
Original Source
<!--- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -->

Upgrade Guides

DataFusion 51.0.0

arrow / parquet updated to 57.0.0

Upgrade to arrow 57.0.0 and parquet 57.0.0

This version of DataFusion upgrades the underlying Apache Arrow implementation to version 57.0.0, including several dependent crates such as prost, tonic, pyo3, and substrait. . See the release notes for more details.

MSRV updated to 1.88.0

The Minimum Supported Rust Version (MSRV) has been updated to 1.88.0.

FunctionRegistry exposes two additional methods

FunctionRegistry exposes two additional methods udafs and udwfs which expose set of registered user defined aggregation and window function names. To upgrade implement methods returning set of registered function names:

diff
impl FunctionRegistry for FunctionRegistryImpl {
      fn udfs(&self) -> HashSet<String> {
         self.scalar_functions.keys().cloned().collect()
     }
+    fn udafs(&self) -> HashSet<String> {
+        self.aggregate_functions.keys().cloned().collect()
+    }
+
+    fn udwfs(&self) -> HashSet<String> {
+        self.window_functions.keys().cloned().collect()
+    }
}

datafusion-proto use TaskContext rather than SessionContext in physical plan serde methods

There have been changes in the public API methods of datafusion-proto which handle physical plan serde.

Methods like physical_plan_from_bytes, parse_physical_expr and similar, expect TaskContext instead of SessionContext

diff
- let plan2 = physical_plan_from_bytes(&bytes, &ctx)?;
+ let plan2 = physical_plan_from_bytes(&bytes, &ctx.task_ctx())?;

as TaskContext contains RuntimeEnv methods such as try_into_physical_plan will not have explicit RuntimeEnv parameter.

diff
let result_exec_plan: Arc<dyn ExecutionPlan> = proto
-   .try_into_physical_plan(&ctx, runtime.deref(), &composed_codec)
+.  .try_into_physical_plan(&ctx.task_ctx(), &composed_codec)

PhysicalExtensionCodec::try_decode() expects TaskContext instead of FunctionRegistry:

diff
pub trait PhysicalExtensionCodec {
    fn try_decode(
        &self,
        buf: &[u8],
        inputs: &[Arc<dyn ExecutionPlan>],
-        registry: &dyn FunctionRegistry,
+        ctx: &TaskContext,
    ) -> Result<Arc<dyn ExecutionPlan>>;

See issue #17601 for more details.

SessionState's sql_to_statement method takes Dialect rather than a str

The dialect parameter of sql_to_statement method defined in datafusion::execution::session_state::SessionState has changed from &str to &Dialect. Dialect is an enum defined in the datafusion-common crate under the config module that provides type safety and better validation for SQL dialect selection

Reorganization of ListingTable into datafusion-catalog-listing crate

There has been a long standing request to remove features such as ListingTable from the datafusion crate to support faster build times. The structs ListingOptions, ListingTable, and ListingTableConfig are now available within the datafusion-catalog-listing crate. These are re-exported in the datafusion crate, so this should be a minimal impact to existing users.

See issue #14462 and issue #17713 for more details.

Reorganization of ArrowSource into datafusion-datasource-arrow crate

To support issue #17713 the ArrowSource code has been removed from the datafusion core crate into it's own crate, datafusion-datasource-arrow. This follows the pattern for the AVRO, CSV, JSON, and Parquet data sources. Users may need to update their paths to account for these changes.

See issue #17713 for more details.

FileScanConfig::projection renamed to FileScanConfig::projection_exprs

The projection field in FileScanConfig has been renamed to projection_exprs and its type has changed from Option<Vec<usize>> to Option<ProjectionExprs>. This change enables more powerful projection pushdown capabilities by supporting arbitrary physical expressions rather than just column indices.

Impact on direct field access:

If you directly access the projection field:

rust,ignore
let config: FileScanConfig = ...;
let projection = config.projection;

You should update to:

rust,ignore
let config: FileScanConfig = ...;
let projection_exprs = config.projection_exprs;

Impact on builders:

The FileScanConfigBuilder::with_projection() method has been deprecated in favor of with_projection_indices():

diff
let config = FileScanConfigBuilder::new(url, file_source)
-   .with_projection(Some(vec![0, 2, 3]))
+   .with_projection_indices(Some(vec![0, 2, 3]))
    .build();

Note: with_projection() still works but is deprecated and will be removed in a future release.

What is ProjectionExprs?

ProjectionExprs is a new type that represents a list of physical expressions for projection. While it can be constructed from column indices (which is what with_projection_indices does internally), it also supports arbitrary physical expressions, enabling advanced features like expression evaluation during scanning.

You can access column indices from ProjectionExprs using its methods if needed:

rust,ignore
let projection_exprs: ProjectionExprs = ...;
// Get the column indices if the projection only contains simple column references
let indices = projection_exprs.column_indices();

DESCRIBE query support

DESCRIBE query was previously an alias for EXPLAIN query, which outputs the execution plan of the query. With this release, DESCRIBE query now outputs the computed schema of the query, consistent with the behavior of DESCRIBE table_name.

datafusion.execution.time_zone default configuration changed

The default value for datafusion.execution.time_zone previously was a string value of +00:00 (GMT/Zulu time). This was changed to be an Option<String> with a default of None. If you want to change the timezone back to the previous value you can execute the sql:

sql
SET
TIMEZONE = '+00:00';

This change was made to better support using the default timezone in scalar UDF functions such as now, current_date, current_time, and to_timestamp among others.

Introduction of TableSchema and changes to FileSource::with_schema() method

A new TableSchema struct has been introduced in the datafusion-datasource crate to better manage table schemas with partition columns. This struct helps distinguish between:

  • File schema: The schema of actual data files on disk
  • Partition columns: Columns derived from directory structure (e.g., Hive-style partitioning)
  • Table schema: The complete schema combining both file and partition columns

As part of this change, the FileSource::with_schema() method signature has changed from accepting a SchemaRef to accepting a TableSchema.

Who is affected:

  • Users who have implemented custom FileSource implementations will need to update their code
  • Users who only use built-in file sources (Parquet, CSV, JSON, AVRO, Arrow) are not affected

Migration guide for custom FileSource implementations:

diff
 use datafusion_datasource::file::FileSource;
-use arrow::datatypes::SchemaRef;
+use datafusion_datasource::TableSchema;

 impl FileSource for MyCustomSource {
-    fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource> {
+    fn with_schema(&self, schema: TableSchema) -> Arc<dyn FileSource> {
         Arc::new(Self {
-            schema: Some(schema),
+            // Use schema.file_schema() to get the file schema without partition columns
+            schema: Some(Arc::clone(schema.file_schema())),
             ..self.clone()
         })
     }
 }

For implementations that need access to partition columns:

rust,ignore
fn with_schema(&self, schema: TableSchema) -> Arc<dyn FileSource> {
    Arc::new(Self {
        file_schema: Arc::clone(schema.file_schema()),
        partition_cols: schema.table_partition_cols().clone(),
        table_schema: Arc::clone(schema.table_schema()),
        ..self.clone()
    })
}

Note: Most FileSource implementations only need to store the file schema (without partition columns), as shown in the first example. The second pattern of storing all three schema components is typically only needed for advanced use cases where you need access to different schema representations for different operations (e.g., ParquetSource uses the file schema for building pruning predicates but needs the table schema for filter pushdown logic).

Using TableSchema directly:

If you're constructing a FileScanConfig or working with table schemas and partition columns, you can now use TableSchema:

rust
use datafusion_datasource::TableSchema;
use arrow::datatypes::{Schema, Field, DataType};
use std::sync::Arc;

// Create a TableSchema with partition columns
let file_schema = Arc::new(Schema::new(vec![
    Field::new("user_id", DataType::Int64, false),
    Field::new("amount", DataType::Float64, false),
]));

let partition_cols = vec![
    Arc::new(Field::new("date", DataType::Utf8, false)),
    Arc::new(Field::new("region", DataType::Utf8, false)),
];

let table_schema = TableSchema::new(file_schema, partition_cols);

// Access different schema representations
let file_schema_ref = table_schema.file_schema();      // Schema without partition columns
let full_schema = table_schema.table_schema();          // Complete schema with partition columns
let partition_cols_ref = table_schema.table_partition_cols(); // Just the partition columns

AggregateUDFImpl::is_ordered_set_aggregate has been renamed to AggregateUDFImpl::supports_within_group_clause

This method has been renamed to better reflect the actual impact it has for aggregate UDF implementations. The accompanying AggregateUDF::is_ordered_set_aggregate has also been renamed to AggregateUDF::supports_within_group_clause. No functionality has been changed with regards to this method; it still refers only to permitting use of WITHIN GROUP SQL syntax for the aggregate function.