-
Notifications
You must be signed in to change notification settings - Fork 611
chore(deps): upgrade datafusion to 52.0.0 #4092
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
c88fe29
cac9fc1
00e01d6
9d125aa
0761c04
9e53434
6b11c13
2d682f8
d660425
195a737
2b798d9
35429f3
4f7c980
5aa47c0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,14 +12,15 @@ use datafusion::catalog::TableProvider; | |
| use datafusion::catalog::memory::DataSourceExec; | ||
| use datafusion::common::pruning::PruningStatistics; | ||
| use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion}; | ||
| use datafusion::common::{Column, DFSchemaRef, Result, Statistics, ToDFSchema}; | ||
| use datafusion::common::{Column, ColumnStatistics, DFSchemaRef, Result, Statistics, ToDFSchema}; | ||
| use datafusion::config::{ConfigOptions, TableParquetOptions}; | ||
| use datafusion::datasource::TableType; | ||
| use datafusion::datasource::physical_plan::{ | ||
| FileGroup, FileSource, wrap_partition_type_in_dict, wrap_partition_value_in_dict, | ||
| FileGroup, wrap_partition_type_in_dict, wrap_partition_value_in_dict, | ||
| }; | ||
| use datafusion::datasource::physical_plan::{FileScanConfigBuilder, ParquetSource}; | ||
| use datafusion::datasource::sink::DataSinkExec; | ||
| use datafusion::datasource::table_schema::TableSchema; | ||
| use datafusion::error::DataFusionError; | ||
| use datafusion::execution::{SendableRecordBatchStream, TaskContext}; | ||
| use datafusion::logical_expr::dml::InsertOp; | ||
|
|
@@ -50,7 +51,7 @@ use url::Url; | |
| use uuid::Uuid; | ||
|
|
||
| use crate::delta_datafusion::engine::AsObjectStoreUrl; | ||
| use crate::delta_datafusion::schema_adapter::DeltaSchemaAdapterFactory; | ||
|
|
||
| use crate::delta_datafusion::table_provider::next::SnapshotWrapper; | ||
| use crate::delta_datafusion::{ | ||
| DataFusionMixins as _, FindFilesExprProperties, LogDataHandler, get_null_of_arrow_type, | ||
|
|
@@ -151,7 +152,7 @@ impl DeltaScanConfigBuilder { | |
| Some(name) => { | ||
| if column_names.contains(name) { | ||
| return Err(DeltaTableError::Generic(format!( | ||
| "Unable to add file path column since column with name {name} exits" | ||
| "Unable to add file path column since column with name {name} exists" | ||
| ))); | ||
| } | ||
|
|
||
|
|
@@ -179,7 +180,7 @@ impl DeltaScanConfigBuilder { | |
| wrap_partition_values: self.wrap_partition_values.unwrap_or(true), | ||
| enable_parquet_pushdown: self.enable_parquet_pushdown, | ||
| schema: self.schema.clone(), | ||
| schema_force_view_types: false, | ||
| schema_force_view_types: true, | ||
| }) | ||
| } | ||
| } | ||
|
|
@@ -538,12 +539,82 @@ impl<'a> DeltaScanBuilder<'a> { | |
|
|
||
| let stats = stats.unwrap_or(Statistics::new_unknown(&schema)); | ||
|
|
||
| // DF52's TableSchema outputs columns as: file_schema + partition_columns | ||
| // Source stats are indexed by TableConfiguration.schema() field order, which may differ | ||
| // from the scan schema order. We need name-based remapping, not index-based. | ||
|
Comment on lines
+542
to
+544
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. where does this reordering come in? My hope would be that the schema we push into the various operations pushes through ... That said, the table provider needs to go and any changes that are purely motivated by needing to migrate this provider should ideally be included in a pre-factor o.a. so that we can revert them easily. I just a few commits away from removing that thing entirely.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Stats reordering happens in
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. but parquet should have no awareness of partition columns. at the very least in the new provider. Partition columns should be 100% handled on the table provider level. The only partition column we assign on the parquet level is the file id columns. At least in my mind the parquet reader would read the file as whatever schema (physical types and odering) we push into it? of course sans any partition column. Where these appear would ultimately be decided in the expression we apply from the scan metadata.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Come to think of it, could it be that some of the changes to the parquet schema in If so, can we keep that separate. It just hard to believe that DF would get so much worse in one release 😆.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed Parquet shouldn't know about Delta partition columns. This remap handles KernelScanPlan parquet schema changes stay separate. The remapping fixes a
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @adriangb - The
Kernel owns partition semantics and injects values above the scan. The stats remapping in the legacy provider exists because it does use DF's partition column machinery and must align with DF52's |
||
| let partition_col_names = self.snapshot.metadata().partition_columns(); | ||
|
|
||
| // Build name -> ColumnStatistics map from source stats (keyed by TableConfiguration schema order) | ||
| let source_schema = self.snapshot.schema(); | ||
| let stats_by_name: HashMap<String, ColumnStatistics> = source_schema | ||
| .fields() | ||
| .enumerate() | ||
| .filter_map(|(idx, field)| { | ||
| stats | ||
| .column_statistics | ||
| .get(idx) | ||
| .map(|s| (field.name().to_string(), s.clone())) | ||
| }) | ||
| .collect(); | ||
|
|
||
| // Build stats in DF52 order: file_schema columns first, then partition_columns | ||
| // file_schema columns are in file_schema field order (non-partition from logical_schema) | ||
| let file_col_stats: Vec<ColumnStatistics> = file_schema | ||
| .fields() | ||
| .iter() | ||
| .map(|f| { | ||
| stats_by_name | ||
| .get(f.name()) | ||
| .cloned() | ||
| .unwrap_or_else(ColumnStatistics::new_unknown) | ||
| }) | ||
| .collect(); | ||
|
|
||
| // Partition columns must be in metadata.partition_columns() order (not schema encounter order) | ||
| let partition_col_stats: Vec<ColumnStatistics> = partition_col_names | ||
| .iter() | ||
| .map(|name| { | ||
| stats_by_name | ||
| .get(name) | ||
| .cloned() | ||
| .unwrap_or_else(ColumnStatistics::new_unknown) | ||
| }) | ||
| .collect(); | ||
|
|
||
| // Combine: file columns first, then partition columns | ||
| let mut reordered_stats = file_col_stats; | ||
| reordered_stats.extend(partition_col_stats); | ||
|
|
||
| let stats = Statistics { | ||
| num_rows: stats.num_rows, | ||
| total_byte_size: stats.total_byte_size, | ||
| column_statistics: reordered_stats, | ||
| }; | ||
|
|
||
| // Add unknown stats for file_column if present (it's added as partition field but not in original schema) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just curious, what does datafusion do if you have a column in the schema but no statistics at all, not even a mention of the column?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good question - Statistics.column_statistics is positional, must align 1:1 with TableSchema order. Missing entries break downstream assumptions, we remap by name and fill gaps with ColumnStatistics::new_unknown(). |
||
| let stats = if config.file_column_name.is_some() { | ||
| let mut col_stats = stats.column_statistics; | ||
| col_stats.push(ColumnStatistics::new_unknown()); | ||
| Statistics { | ||
| num_rows: stats.num_rows, | ||
| total_byte_size: stats.total_byte_size, | ||
| column_statistics: col_stats, | ||
| } | ||
| } else { | ||
| stats | ||
| }; | ||
|
|
||
| let parquet_options = TableParquetOptions { | ||
| global: self.session.config().options().execution.parquet.clone(), | ||
| ..Default::default() | ||
| }; | ||
|
|
||
| let mut file_source = ParquetSource::new(parquet_options); | ||
| let partition_fields: Vec<Arc<Field>> = | ||
| table_partition_cols.into_iter().map(Arc::new).collect(); | ||
| let table_schema = TableSchema::new(file_schema, partition_fields); | ||
|
|
||
| let mut file_source = | ||
| ParquetSource::new(table_schema).with_table_parquet_options(parquet_options); | ||
|
|
||
| // Sometimes (i.e Merge) we want to prune files that don't make the | ||
| // filter and read the entire contents for files that do match the | ||
|
|
@@ -553,11 +624,9 @@ impl<'a> DeltaScanBuilder<'a> { | |
| { | ||
| file_source = file_source.with_predicate(predicate); | ||
| }; | ||
| let file_source = | ||
| file_source.with_schema_adapter_factory(Arc::new(DeltaSchemaAdapterFactory {}))?; | ||
|
|
||
| let file_scan_config = | ||
| FileScanConfigBuilder::new(self.log_store.object_store_url(), file_schema, file_source) | ||
| FileScanConfigBuilder::new(self.log_store.object_store_url(), Arc::new(file_source)) | ||
| .with_file_groups( | ||
| // If all files were filtered out, we still need to emit at least one partition to | ||
| // pass datafusion sanity checks. | ||
|
|
@@ -570,9 +639,8 @@ impl<'a> DeltaScanBuilder<'a> { | |
| }, | ||
| ) | ||
| .with_statistics(stats) | ||
| .with_projection_indices(self.projection.cloned()) | ||
| .with_projection_indices(self.projection.cloned())? | ||
| .with_limit(self.limit) | ||
| .with_table_partition_cols(table_partition_cols) | ||
| .build(); | ||
|
|
||
| let metrics = ExecutionPlanMetricsSet::new(); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we have a link to this bug?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can very much believe that dictionary arrays are not handled correctly (we've also found plenty of panics in DataFusion because of it) but it would be great to track the issue upstream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will create an issue and link it.