docs/source/library-user-guide/upgrading/52.0.0.md
To permit more efficient planning, several methods on DFSchema have been
changed to return references to the underlying [&FieldRef] rather than
[&Field]. This allows planners to more cheaply copy the references via
Arc::clone rather than cloning the entire Field structure.
You may need to change code to use Arc::clone instead of .as_ref().clone()
directly on the Field. For example:
- let field = df_schema.field("my_column").as_ref().clone();
+ let field = Arc::clone(df_schema.field("my_column"));
LIST commandsIn prior versions, ListingTableProvider would issue LIST commands to
the underlying object store each time it needed to list files for a query.
To improve performance, ListingTableProvider now caches the results of
LIST commands for the lifetime of the ListingTableProvider instance or
until a cache entry expires.
Note that by default the cache has no expiration time, so if files are added or removed
from the underlying object store, the ListingTableProvider will not see
those changes until the ListingTableProvider instance is dropped and recreated.
You can configure the maximum cache size and cache entry expiration time via configuration options:
datafusion.runtime.list_files_cache_limit - Limits the size of the cache in bytesdatafusion.runtime.list_files_cache_ttl - Limits the TTL (time-to-live) of an entry in secondsDetailed configuration information can be found in the DataFusion Runtime Configuration user's guide.
Caching can be disabled by setting the limit to 0:
SET datafusion.runtime.list_files_cache_limit TO "0K";
Note that the internal API has changed to use a trait ListFilesCache instead of a type alias.
newlines_in_values moved from FileScanConfig to CsvOptionsThe CSV-specific newlines_in_values configuration option has been moved from FileScanConfig to CsvOptions, as it only applies to CSV file parsing.
Who is affected:
newlines_in_values via FileScanConfigBuilder::with_newlines_in_values()Migration guide:
Set newlines_in_values in CsvOptions instead of on FileScanConfigBuilder:
Before:
let source = Arc::new(CsvSource::new(file_schema.clone()));
let config = FileScanConfigBuilder::new(object_store_url, source)
.with_newlines_in_values(true)
.build();
After:
let options = CsvOptions {
newlines_in_values: Some(true),
..Default::default()
};
let source = Arc::new(CsvSource::new(file_schema.clone())
.with_csv_options(options));
let config = FileScanConfigBuilder::new(object_store_url, source)
.build();
pyarrow featureThe pyarrow feature flag has been removed. This feature has been migrated to
the datafusion-python repository since version 44.0.0.
FileSource constructors and FileScanConfigBuilder to accept schemas upfrontThe way schemas are passed to file sources and scan configurations has been significantly refactored. File sources now require the schema (including partition columns) to be provided at construction time, and FileScanConfigBuilder no longer takes a separate schema parameter.
Who is affected:
FileScanConfig or file sources (ParquetSource, CsvSource, JsonSource, AvroSource) directlyFileFormat implementationsKey changes:
FileSource constructors now require TableSchema: All built-in file sources now take the schema in their constructor:
- let source = ParquetSource::default();
+ let source = ParquetSource::new(table_schema);
FileScanConfigBuilder no longer takes schema as a parameter: The schema is now passed via the FileSource:
- FileScanConfigBuilder::new(url, schema, source)
+ FileScanConfigBuilder::new(url, source)
Partition columns are now part of TableSchema: The with_table_partition_cols() method has been removed from FileScanConfigBuilder. Partition columns are now passed as part of the TableSchema to the FileSource constructor:
+ let table_schema = TableSchema::new(
+ file_schema,
+ vec![Arc::new(Field::new("date", DataType::Utf8, false))],
+ );
+ let source = ParquetSource::new(table_schema);
let config = FileScanConfigBuilder::new(url, source)
- .with_table_partition_cols(vec![Field::new("date", DataType::Utf8, false)])
.with_file(partitioned_file)
.build();
FileFormat::file_source() now takes TableSchema parameter: Custom FileFormat implementations must be updated:
impl FileFormat for MyFileFormat {
- fn file_source(&self) -> Arc<dyn FileSource> {
+ fn file_source(&self, table_schema: TableSchema) -> Arc<dyn FileSource> {
- Arc::new(MyFileSource::default())
+ Arc::new(MyFileSource::new(table_schema))
}
}
Migration examples:
For Parquet files:
- let source = Arc::new(ParquetSource::default());
- let config = FileScanConfigBuilder::new(url, schema, source)
+ let table_schema = TableSchema::new(schema, vec![]);
+ let source = Arc::new(ParquetSource::new(table_schema));
+ let config = FileScanConfigBuilder::new(url, source)
.with_file(partitioned_file)
.build();
For CSV files with partition columns:
- let source = Arc::new(CsvSource::new(true, b',', b'"'));
- let config = FileScanConfigBuilder::new(url, file_schema, source)
- .with_table_partition_cols(vec![Field::new("year", DataType::Int32, false)])
+ let options = CsvOptions {
+ has_header: Some(true),
+ delimiter: b',',
+ quote: b'"',
+ ..Default::default()
+ };
+ let table_schema = TableSchema::new(
+ file_schema,
+ vec![Arc::new(Field::new("year", DataType::Int32, false))],
+ );
+ let source = Arc::new(CsvSource::new(table_schema).with_csv_options(options));
+ let config = FileScanConfigBuilder::new(url, source)
.build();
As of Arrow 57.1.0, DataFusion uses a new adaptive filter strategy when evaluating pushed down filters for Parquet files. This new strategy improves performance for certain types of queries where the results of filtering are more efficiently represented with a bitmask rather than a selection. See arrow-rs #5523 for more details.
This change only applies to the built-in Parquet data source with filter-pushdown enabled ( which is not yet the default behavior).
You can disable the new behavior by setting the
datafusion.execution.parquet.force_filter_selections configuration setting to true.
> set datafusion.execution.parquet.force_filter_selections = true;
FileSource to FileScanConfigStatistics are now managed directly by FileScanConfig instead of being delegated to FileSource implementations. This simplifies the FileSource trait and provides more consistent statistics handling across all file formats.
Who is affected:
FileSource implementationsBreaking changes:
Two methods have been removed from the FileSource trait:
with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource>statistics(&self) -> Result<Statistics>Migration guide:
If you have a custom FileSource implementation, you need to:
with_statistics method implementationstatistics method implementationBefore:
#[derive(Clone)]
struct MyCustomSource {
table_schema: TableSchema,
projected_statistics: Option<Statistics>,
// other fields...
}
impl FileSource for MyCustomSource {
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
Arc::new(Self {
table_schema: self.table_schema.clone(),
projected_statistics: Some(statistics),
// other fields...
})
}
fn statistics(&self) -> Result<Statistics> {
Ok(self.projected_statistics.clone().unwrap_or_else(||
Statistics::new_unknown(self.table_schema.file_schema())
))
}
// other methods...
}
After:
#[derive(Clone)]
struct MyCustomSource {
table_schema: TableSchema,
// projected_statistics field removed
// other fields...
}
impl FileSource for MyCustomSource {
// with_statistics method removed
// statistics method removed
// other methods...
}
Accessing statistics:
Statistics are now accessed through FileScanConfig instead of FileSource:
- let stats = config.file_source.statistics()?;
+ let stats = config.statistics();
Note that FileScanConfig::statistics() automatically marks statistics as inexact when filters are present, ensuring correctness when filters are pushed down.
PhysicalExprAdapterPartition column replacement is now a separate preprocessing step performed before expression rewriting via PhysicalExprAdapter. This change provides better separation of concerns and makes the adapter more focused on schema differences rather than partition value substitution.
Who is affected:
PhysicalExprAdapterFactory that handle partition columnsFilePruner APIBreaking changes:
FilePruner::try_new() signature changed: the partition_fields parameter has been removed since partition column handling is now done separatelyreplace_columns_with_literals() before expressions are passed to the adapterMigration guide:
If you have code that creates a FilePruner with partition fields:
Before:
use datafusion_pruning::FilePruner;
let pruner = FilePruner::try_new(
predicate,
file_schema,
partition_fields, // This parameter is removed
file_stats,
)?;
After:
use datafusion_pruning::FilePruner;
// Partition fields are no longer needed
let pruner = FilePruner::try_new(
predicate,
file_schema,
file_stats,
)?;
If you have custom code that relies on PhysicalExprAdapter to handle partition columns, you must now call replace_columns_with_literals() separately:
Before:
// Adapter handled partition column replacement internally
let adapted_expr = adapter.rewrite(expr)?;
After:
use datafusion_physical_expr_adapter::replace_columns_with_literals;
// Replace partition columns first
let expr_with_literals = replace_columns_with_literals(expr, &partition_values)?;
// Then apply the adapter
let adapted_expr = adapter.rewrite(expr_with_literals)?;
build_row_filter signature simplifiedThe build_row_filter function in datafusion-datasource-parquet has been simplified to take a single schema parameter instead of two.
The expectation is now that the filter has been adapted to the physical file schema (the arrow representation of the parquet file's schema) before being passed to this function
using a PhysicalExprAdapter for example.
Who is affected:
build_row_filter directlyBreaking changes:
The function signature changed from:
pub fn build_row_filter(
expr: &Arc<dyn PhysicalExpr>,
physical_file_schema: &SchemaRef,
predicate_file_schema: &SchemaRef, // removed
metadata: &ParquetMetaData,
reorder_predicates: bool,
file_metrics: &ParquetFileMetrics,
) -> Result<Option<RowFilter>>
To:
pub fn build_row_filter(
expr: &Arc<dyn PhysicalExpr>,
file_schema: &SchemaRef,
metadata: &ParquetMetaData,
reorder_predicates: bool,
file_metrics: &ParquetFileMetrics,
) -> Result<Option<RowFilter>>
Migration guide:
Remove the duplicate schema parameter from your call:
- build_row_filter(&predicate, &file_schema, &file_schema, metadata, reorder, metrics)
+ build_row_filter(&predicate, &file_schema, metadata, reorder, metrics)
The SQL planner now enforces the aggregate UDF contract more strictly: the
WITHIN GROUP (ORDER BY ...) syntax is accepted only if the aggregate UDAF
explicitly advertises support by returning true from
AggregateUDFImpl::supports_within_group_clause().
Previously the planner forwarded a WITHIN GROUP clause to order-sensitive
aggregates even when they did not implement ordered-set semantics, which could
cause queries such as SUM(x) WITHIN GROUP (ORDER BY x) to plan successfully.
This behavior was too permissive and has been changed to match PostgreSQL and
the documented semantics.
Migration: If your UDAF intentionally implements ordered-set semantics and
wants to accept the WITHIN GROUP SQL syntax, update your implementation to
return true from supports_within_group_clause() and handle the ordering
semantics in your accumulator implementation. If your UDAF is merely
order-sensitive (but not an ordered-set aggregate), do not advertise
supports_within_group_clause() and clients should use alternative function
signatures (for example, explicit ordering as a function argument) instead.
AggregateUDFImpl::supports_null_handling_clause now defaults to falseThis method specifies whether an aggregate function allows IGNORE NULLS/RESPECT NULLS
during SQL parsing, with the implication it respects these configs during computation.
Most DataFusion aggregate functions silently ignored this syntax in prior versions
as they did not make use of it and it was permitted by default. We change this so
only the few functions which do respect this clause (e.g. array_agg, first_value,
last_value) need to implement it.
Custom user defined aggregate functions will also error if this syntax is used, unless they explicitly declare support by overriding the method.
For example, SQL parsing will now fail for queries such as this:
SELECT median(c1) IGNORE NULLS FROM table
Instead of silently succeeding.
CacheAccessor traitThe remove API no longer requires a mutable instance
Many of the structs in the datafusion-ffi crate have been updated to allow easier
conversion to the underlying trait types they represent. This simplifies some code
paths, but also provides an additional improvement in cases where library code goes
through a round trip via the foreign function interface.
To update your code, suppose you have a FFI_SchemaProvider called ffi_provider
and you wish to use this as a SchemaProvider. In the old approach you would do
something like:
let foreign_provider: ForeignSchemaProvider = ffi_provider.into();
let foreign_provider = Arc::new(foreign_provider) as Arc<dyn SchemaProvider>;
This code should now be written as:
let foreign_provider: Arc<dyn SchemaProvider + Send> = ffi_provider.into();
let foreign_provider = foreign_provider as Arc<dyn SchemaProvider>;
For the case of user defined functions, the updates are similar but you
may need to change the way you call the creation of the ScalarUDF.
Aggregate and window functions follow the same pattern.
Previously you may write:
let foreign_udf: ForeignScalarUDF = ffi_udf.try_into()?;
let foreign_udf: ScalarUDF = foreign_udf.into();
Instead this should now be:
let foreign_udf: Arc<dyn ScalarUDFImpl> = ffi_udf.into();
let foreign_udf = ScalarUDF::new_from_shared_impl(foreign_udf);
When creating any of the following structs, we now require the user to
provide a TaskContextProvider and optionally a LogicalExtensionCodec:
FFI_CatalogListProviderFFI_CatalogProviderFFI_SchemaProviderFFI_TableProviderFFI_TableFunctionEach of these structs has a new() and a new_with_ffi_codec() method for
instantiation. For example, when you previously would write
let table = Arc::new(MyTableProvider::new());
let ffi_table = FFI_TableProvider::new(table, None);
Now you will need to provide a TaskContextProvider. The most common
implementation of this trait is SessionContext.
let ctx = Arc::new(SessionContext::default());
let table = Arc::new(MyTableProvider::new());
let ffi_table = FFI_TableProvider::new(table, None, ctx, None);
The alternative function to create these structures may be more convenient
if you are doing many of these operations. A FFI_LogicalExtensionCodec will
store the TaskContextProvider as well.
let codec = Arc::new(DefaultLogicalExtensionCodec {});
let ctx = Arc::new(SessionContext::default());
let ffi_codec = FFI_LogicalExtensionCodec::new(codec, None, ctx);
let table = Arc::new(MyTableProvider::new());
let ffi_table = FFI_TableProvider::new_with_ffi_codec(table, None, ffi_codec);
Additional information about the usage of the TaskContextProvider can be
found in the crate README.
Additionally, the FFI structure for Scalar UDF's no longer contains a
return_type call. This code was not used since the ForeignScalarUDF
struct implements the return_field_from_args instead.
Projection handling has been moved from FileScanConfig into FileSource implementations. This enables format-specific projection pushdown (e.g., Parquet can push down struct field access, Vortex can push down computed expressions into un-decoded data).
Who is affected:
FileSource implementationsFileScanConfigBuilder::with_projection_indices directlyBreaking changes:
FileSource::with_projection replaced with try_pushdown_projection:
The with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> method has been removed and replaced with try_pushdown_projection(&self, projection: &ProjectionExprs) -> Result<Option<Arc<dyn FileSource>>>.
FileScanConfig.projection_exprs field removed:
Projections are now stored in the FileSource directly, not in FileScanConfig.
Various public helper methods that access projection information have been removed from FileScanConfig.
FileScanConfigBuilder::with_projection_indices now returns Result<Self>:
This method can now fail if the projection pushdown fails.
FileSource::create_file_opener now returns Result<Arc<dyn FileOpener>>:
Previously returned Arc<dyn FileOpener> directly.
Any FileSource implementation that may fail to create a FileOpener should now return an appropriate error.
DataSource::try_swapping_with_projection signature changed:
Parameter changed from &[ProjectionExpr] to &ProjectionExprs.
Migration guide:
If you have a custom FileSource implementation:
Before:
impl FileSource for MyCustomSource {
fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> {
// Apply projection from config
Arc::new(Self { /* ... */ })
}
fn create_file_opener(
&self,
object_store: Arc<dyn ObjectStore>,
base_config: &FileScanConfig,
partition: usize,
) -> Arc<dyn FileOpener> {
Arc::new(MyOpener { /* ... */ })
}
}
After:
impl FileSource for MyCustomSource {
fn try_pushdown_projection(
&self,
projection: &ProjectionExprs,
) -> Result<Option<Arc<dyn FileSource>>> {
// Return None if projection cannot be pushed down
// Return Some(new_source) with projection applied if it can
Ok(Some(Arc::new(Self {
projection: Some(projection.clone()),
/* ... */
})))
}
fn projection(&self) -> Option<&ProjectionExprs> {
self.projection.as_ref()
}
fn create_file_opener(
&self,
object_store: Arc<dyn ObjectStore>,
base_config: &FileScanConfig,
partition: usize,
) -> Result<Arc<dyn FileOpener>> {
Ok(Arc::new(MyOpener { /* ... */ }))
}
}
We recommend you look at #18627 that introduced these changes for more examples for how this was handled for the various built in file sources.
We have added SplitProjection and ProjectionOpener helpers to make it easier to handle projections in your FileSource implementations.
For file sources that can only handle simple column selections (not computed expressions), use the SplitProjection and ProjectionOpener helpers to split the projection into pushdownable and non-pushdownable parts:
use datafusion_datasource::projection::{SplitProjection, ProjectionOpener};
// In try_pushdown_projection:
let split = SplitProjection::new(projection, self.table_schema())?;
// Use split.file_projection() for what to push down to the file format
// The ProjectionOpener wrapper will handle the rest
For FileScanConfigBuilder users:
let config = FileScanConfigBuilder::new(url, source)
- .with_projection_indices(Some(vec![0, 2, 3]))
+ .with_projection_indices(Some(vec![0, 2, 3]))?
.build();
SchemaAdapter and SchemaAdapterFactory completely removedFollowing the deprecation announced in DataFusion 49.0.0, SchemaAdapterFactory has been fully removed from Parquet scanning. This applies to both:
The following symbols have been deprecated and will be removed in the next release:
SchemaAdapter traitSchemaAdapterFactory traitSchemaMapper traitSchemaMapping structDefaultSchemaAdapterFactory structThese types were previously used to adapt record batch schemas during file reading.
This functionality has been replaced by PhysicalExprAdapterFactory, which rewrites expressions at planning time rather than transforming batches at runtime.
If you were using a custom SchemaAdapterFactory for schema adaptation (e.g., default column values, type coercion), you should now implement PhysicalExprAdapterFactory instead.
See the default column values example for how to implement a custom PhysicalExprAdapterFactory.
Migration guide:
If you implemented a custom SchemaAdapterFactory, migrate to PhysicalExprAdapterFactory.
See the default column values example for a complete implementation.