-
Notifications
You must be signed in to change notification settings - Fork 611
chore(deps): upgrade datafusion to 52.0.0 #4092
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
c88fe29
cac9fc1
00e01d6
9d125aa
0761c04
9e53434
6b11c13
2d682f8
d660425
195a737
2b798d9
35429f3
4f7c980
5aa47c0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,14 +12,15 @@ use datafusion::catalog::TableProvider; | |
| use datafusion::catalog::memory::DataSourceExec; | ||
| use datafusion::common::pruning::PruningStatistics; | ||
| use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion}; | ||
| use datafusion::common::{Column, DFSchemaRef, Result, Statistics, ToDFSchema}; | ||
| use datafusion::common::{Column, ColumnStatistics, DFSchemaRef, Result, Statistics, ToDFSchema}; | ||
| use datafusion::config::{ConfigOptions, TableParquetOptions}; | ||
| use datafusion::datasource::TableType; | ||
| use datafusion::datasource::physical_plan::{ | ||
| FileGroup, FileSource, wrap_partition_type_in_dict, wrap_partition_value_in_dict, | ||
| FileGroup, wrap_partition_type_in_dict, wrap_partition_value_in_dict, | ||
| }; | ||
| use datafusion::datasource::physical_plan::{FileScanConfigBuilder, ParquetSource}; | ||
| use datafusion::datasource::sink::DataSinkExec; | ||
| use datafusion::datasource::table_schema::TableSchema; | ||
| use datafusion::error::DataFusionError; | ||
| use datafusion::execution::{SendableRecordBatchStream, TaskContext}; | ||
| use datafusion::logical_expr::dml::InsertOp; | ||
|
|
@@ -50,7 +51,7 @@ use url::Url; | |
| use uuid::Uuid; | ||
|
|
||
| use crate::delta_datafusion::engine::AsObjectStoreUrl; | ||
| use crate::delta_datafusion::schema_adapter::DeltaSchemaAdapterFactory; | ||
|
|
||
| use crate::delta_datafusion::table_provider::next::SnapshotWrapper; | ||
| use crate::delta_datafusion::{ | ||
| DataFusionMixins as _, FindFilesExprProperties, LogDataHandler, get_null_of_arrow_type, | ||
|
|
@@ -151,7 +152,7 @@ impl DeltaScanConfigBuilder { | |
| Some(name) => { | ||
| if column_names.contains(name) { | ||
| return Err(DeltaTableError::Generic(format!( | ||
| "Unable to add file path column since column with name {name} exits" | ||
| "Unable to add file path column since column with name {name} exists" | ||
| ))); | ||
| } | ||
|
|
||
|
|
@@ -179,7 +180,7 @@ impl DeltaScanConfigBuilder { | |
| wrap_partition_values: self.wrap_partition_values.unwrap_or(true), | ||
| enable_parquet_pushdown: self.enable_parquet_pushdown, | ||
| schema: self.schema.clone(), | ||
| schema_force_view_types: false, | ||
| schema_force_view_types: true, | ||
| }) | ||
| } | ||
| } | ||
|
|
@@ -538,12 +539,82 @@ impl<'a> DeltaScanBuilder<'a> { | |
|
|
||
| let stats = stats.unwrap_or(Statistics::new_unknown(&schema)); | ||
|
|
||
| // DF52's TableSchema outputs columns as: file_schema + partition_columns | ||
| // Source stats are indexed by TableConfiguration.schema() field order, which may differ | ||
| // from the scan schema order. We need name-based remapping, not index-based. | ||
| let partition_col_names = self.snapshot.metadata().partition_columns(); | ||
|
|
||
| // Build name -> ColumnStatistics map from source stats (keyed by TableConfiguration schema order) | ||
| let source_schema = self.snapshot.schema(); | ||
| let stats_by_name: HashMap<String, ColumnStatistics> = source_schema | ||
| .fields() | ||
| .enumerate() | ||
| .filter_map(|(idx, field)| { | ||
| stats | ||
| .column_statistics | ||
| .get(idx) | ||
| .map(|s| (field.name().to_string(), s.clone())) | ||
| }) | ||
| .collect(); | ||
|
|
||
| // Build stats in DF52 order: file_schema columns first, then partition_columns | ||
| // file_schema columns are in file_schema field order (non-partition from logical_schema) | ||
| let file_col_stats: Vec<ColumnStatistics> = file_schema | ||
| .fields() | ||
| .iter() | ||
| .map(|f| { | ||
| stats_by_name | ||
| .get(f.name()) | ||
| .cloned() | ||
| .unwrap_or_else(ColumnStatistics::new_unknown) | ||
| }) | ||
| .collect(); | ||
|
|
||
| // Partition columns must be in metadata.partition_columns() order (not schema encounter order) | ||
| let partition_col_stats: Vec<ColumnStatistics> = partition_col_names | ||
| .iter() | ||
| .map(|name| { | ||
| stats_by_name | ||
| .get(name) | ||
| .cloned() | ||
| .unwrap_or_else(ColumnStatistics::new_unknown) | ||
| }) | ||
| .collect(); | ||
|
|
||
| // Combine: file columns first, then partition columns | ||
| let mut reordered_stats = file_col_stats; | ||
| reordered_stats.extend(partition_col_stats); | ||
|
|
||
| let stats = Statistics { | ||
| num_rows: stats.num_rows, | ||
| total_byte_size: stats.total_byte_size, | ||
| column_statistics: reordered_stats, | ||
| }; | ||
|
|
||
| // Add unknown stats for file_column if present (it's added as partition field but not in original schema) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just curious, what does datafusion do if you have a column in the schema but no statistics at all, not even a mention of the column?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good question - Statistics.column_statistics is positional, must align 1:1 with TableSchema order. Missing entries break downstream assumptions, we remap by name and fill gaps with ColumnStatistics::new_unknown(). |
||
| let stats = if config.file_column_name.is_some() { | ||
| let mut col_stats = stats.column_statistics; | ||
| col_stats.push(ColumnStatistics::new_unknown()); | ||
| Statistics { | ||
| num_rows: stats.num_rows, | ||
| total_byte_size: stats.total_byte_size, | ||
| column_statistics: col_stats, | ||
| } | ||
| } else { | ||
| stats | ||
| }; | ||
|
|
||
| let parquet_options = TableParquetOptions { | ||
| global: self.session.config().options().execution.parquet.clone(), | ||
| ..Default::default() | ||
| }; | ||
|
|
||
| let mut file_source = ParquetSource::new(parquet_options); | ||
| let partition_fields: Vec<Arc<Field>> = | ||
| table_partition_cols.into_iter().map(Arc::new).collect(); | ||
| let table_schema = TableSchema::new(file_schema, partition_fields); | ||
|
|
||
| let mut file_source = | ||
| ParquetSource::new(table_schema).with_table_parquet_options(parquet_options); | ||
|
|
||
| // Sometimes (i.e Merge) we want to prune files that don't make the | ||
| // filter and read the entire contents for files that do match the | ||
|
|
@@ -553,11 +624,9 @@ impl<'a> DeltaScanBuilder<'a> { | |
| { | ||
| file_source = file_source.with_predicate(predicate); | ||
| }; | ||
| let file_source = | ||
| file_source.with_schema_adapter_factory(Arc::new(DeltaSchemaAdapterFactory {}))?; | ||
|
|
||
| let file_scan_config = | ||
| FileScanConfigBuilder::new(self.log_store.object_store_url(), file_schema, file_source) | ||
| FileScanConfigBuilder::new(self.log_store.object_store_url(), Arc::new(file_source)) | ||
| .with_file_groups( | ||
| // If all files were filtered out, we still need to emit at least one partition to | ||
| // pass datafusion sanity checks. | ||
|
|
@@ -570,9 +639,8 @@ impl<'a> DeltaScanBuilder<'a> { | |
| }, | ||
| ) | ||
| .with_statistics(stats) | ||
| .with_projection_indices(self.projection.cloned()) | ||
| .with_projection_indices(self.projection.cloned())? | ||
| .with_limit(self.limit) | ||
| .with_table_partition_cols(table_partition_cols) | ||
| .build(); | ||
|
|
||
| let metrics = ExecutionPlanMetricsSet::new(); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where does this reordering come in? My hope would be that the schema we push into the various operations pushes through ...
That said, the table provider needs to go and any changes that are purely motivated by needing to migrate this provider should ideally be included in a pre-factor o.a. so that we can revert them easily. I just a few commits away from removing that thing entirely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stats reordering happens in
table_provider.rsviaFileScanConfigBuilder::with_statistics. DF52 expectscolumn_statisticsindexed asfile_schema || partition_columns, but kernel returns them inTableConfiguration.schema()order. We rebuild by name lookup. Scoped to current provider and should be easy to drop when provider is removed.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but parquet should have no awareness of partition columns. at the very least in the new provider. Partition columns should be 100% handled on the table provider level.
The only partition column we assign on the parquet level is the file id columns.
At least in my mind the parquet reader would read the file as whatever schema (physical types and odering) we push into it? of course sans any partition column. Where these appear would ultimately be decided in the expression we apply from the scan metadata.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Come to think of it, could it be that some of the changes to the parquet schema in
KernelScanPlanare used for this deprecated provider?If so, can we keep that separate. It just hard to believe that DF would get so much worse in one release 😆.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed Parquet shouldn't know about Delta partition columns. This remap handles
DF's TableSchema/FileScanConfig expectations. DF52 appends partition fields
after the file schema (filled from PartitionedFile.partition_values), and
Statistics.column_statistics is positional which requires name based reordering when
source stats are in TableConfiguration.schema() order. In next, Parquet partition
fields are file_id only; Delta partition columns are handled above Parquet.
KernelScanPlan parquet schema changes stay separate. The remapping fixes a
latent stats-index vs scan-schema-index mismatch that DF52 exposes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ParquetOpenerdoes does have awareness of DataFusion's partition columns. Before we made that change filters likepart_col = 1 OR file_col = 2would give wrong results when filter Pushdown was enabled. Not sure if Delta partition columns are handled completely separately...There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@adriangb - The
ParquetOpenerpartition handling you're referencing (part_col = 1 OR file_col = 2correctness) applies to DF's built in partitioning. Next-scan sidesteps this entirely- Delta partition columns are handled outside DF's partition machinery:file_id(provider metadata). No Delta partition columns inparquet_read_schema.FileScanConfig.partition_values.ParquetOpener's partition aware path.Kernel owns partition semantics and injects values above the scan.
The stats remapping in the legacy provider exists because it does use DF's partition column machinery and must align with DF52's
file_schema + partition_columnsindex expectations. This will go away once the legacy provider is removed.