Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ arrow-select = { version = "57" }
object_store = { version = "0.12.1" }
parquet = { version = "57" }

# datafusion
datafusion = "51.0"
datafusion-datasource = "51.0"
datafusion-ffi = "51.0"
datafusion-proto = "51.0"
# datafusion 52.1
datafusion = { version = "52.1.0" }
datafusion-datasource = { version = "52.1.0" }
datafusion-ffi = { version = "52.1.0" }
datafusion-proto = { version = "52.1.0" }
datafusion-physical-expr-adapter = { version = "52.1.0" }

# serde
serde = { version = "1.0.194", features = ["derive"] }
Expand Down
8 changes: 7 additions & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ object_store = { workspace = true }
datafusion = { workspace = true, optional = true }
datafusion-datasource = { workspace = true, optional = true }
datafusion-proto = { workspace = true, optional = true }
datafusion-physical-expr-adapter = { workspace = true, optional = true }

# serde
serde = { workspace = true, features = ["derive"] }
Expand Down Expand Up @@ -105,7 +106,12 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] }

[features]
default = ["rustls"]
datafusion = ["dep:datafusion", "datafusion-datasource", "datafusion-proto"]
datafusion = [
"dep:datafusion",
"datafusion-datasource",
"datafusion-proto",
"datafusion-physical-expr-adapter",
]
datafusion-ext = ["datafusion"]
json = ["parquet/json"]
python = ["arrow/pyarrow"]
Expand Down
1 change: 0 additions & 1 deletion crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ mod find_files;
pub mod logical;
pub mod physical;
pub mod planner;
mod schema_adapter;
mod session;
mod table_provider;
pub(crate) mod utils;
Expand Down
82 changes: 0 additions & 82 deletions crates/core/src/delta_datafusion/schema_adapter.rs

This file was deleted.

11 changes: 10 additions & 1 deletion crates/core/src/delta_datafusion/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,16 @@ impl Default for DeltaSessionConfig {
fn default() -> Self {
DeltaSessionConfig {
inner: SessionConfig::default()
.set_bool("datafusion.sql_parser.enable_ident_normalization", false),
.set_bool("datafusion.sql_parser.enable_ident_normalization", false)
.set_bool("datafusion.execution.parquet.schema_force_view_types", true)
// Workaround: hash-join dynamic filtering (IN-list pushdown) can panic when join
// keys include dictionary arrays (still reproducible with DF 52.1.x crates).
// Disable IN-list pushdown and fall back to hash lookups.
.set_usize("datafusion.optimizer.hash_join_inlist_pushdown_max_size", 0)
.set_usize(
"datafusion.optimizer.hash_join_inlist_pushdown_max_distinct_values",
0,
),
}
}
}
Expand Down
90 changes: 79 additions & 11 deletions crates/core/src/delta_datafusion/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@ use datafusion::catalog::TableProvider;
use datafusion::catalog::memory::DataSourceExec;
use datafusion::common::pruning::PruningStatistics;
use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion::common::{Column, DFSchemaRef, Result, Statistics, ToDFSchema};
use datafusion::common::{Column, ColumnStatistics, DFSchemaRef, Result, Statistics, ToDFSchema};
use datafusion::config::{ConfigOptions, TableParquetOptions};
use datafusion::datasource::TableType;
use datafusion::datasource::physical_plan::{
FileGroup, FileSource, wrap_partition_type_in_dict, wrap_partition_value_in_dict,
FileGroup, wrap_partition_type_in_dict, wrap_partition_value_in_dict,
};
use datafusion::datasource::physical_plan::{FileScanConfigBuilder, ParquetSource};
use datafusion::datasource::sink::DataSinkExec;
use datafusion::datasource::table_schema::TableSchema;
use datafusion::error::DataFusionError;
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::logical_expr::dml::InsertOp;
Expand Down Expand Up @@ -50,7 +51,7 @@ use url::Url;
use uuid::Uuid;

use crate::delta_datafusion::engine::AsObjectStoreUrl;
use crate::delta_datafusion::schema_adapter::DeltaSchemaAdapterFactory;

use crate::delta_datafusion::table_provider::next::SnapshotWrapper;
use crate::delta_datafusion::{
DataFusionMixins as _, FindFilesExprProperties, LogDataHandler, get_null_of_arrow_type,
Expand Down Expand Up @@ -151,7 +152,7 @@ impl DeltaScanConfigBuilder {
Some(name) => {
if column_names.contains(name) {
return Err(DeltaTableError::Generic(format!(
"Unable to add file path column since column with name {name} exits"
"Unable to add file path column since column with name {name} exists"
)));
}

Expand Down Expand Up @@ -179,7 +180,7 @@ impl DeltaScanConfigBuilder {
wrap_partition_values: self.wrap_partition_values.unwrap_or(true),
enable_parquet_pushdown: self.enable_parquet_pushdown,
schema: self.schema.clone(),
schema_force_view_types: false,
schema_force_view_types: true,
})
}
}
Expand Down Expand Up @@ -538,12 +539,82 @@ impl<'a> DeltaScanBuilder<'a> {

let stats = stats.unwrap_or(Statistics::new_unknown(&schema));

// DF52's TableSchema outputs columns as: file_schema + partition_columns
// Source stats are indexed by TableConfiguration.schema() field order, which may differ
// from the scan schema order. We need name-based remapping, not index-based.
Comment on lines +542 to +544
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where does this reordering come in? My hope would be that the schema we push into the various operations pushes through ...

That said, the table provider needs to go and any changes that are purely motivated by needing to migrate this provider should ideally be included in a pre-factor o.a. so that we can revert them easily. I just a few commits away from removing that thing entirely.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stats reordering happens in table_provider.rs via FileScanConfigBuilder::with_statistics. DF52 expects column_statistics indexed as file_schema || partition_columns, but kernel returns them in TableConfiguration.schema() order. We rebuild by name lookup. Scoped to current provider and should be easy to drop when provider is removed.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but parquet should have no awareness of partition columns. at the very least in the new provider. Partition columns should be 100% handled on the table provider level.

The only partition column we assign on the parquet level is the file id columns.

At least in my mind the parquet reader would read the file as whatever schema (physical types and odering) we push into it? of course sans any partition column. Where these appear would ultimately be decided in the expression we apply from the scan metadata.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Come to think of it, could it be that some of the changes to the parquet schema in KernelScanPlan are used for this deprecated provider?

If so, can we keep that separate. It just hard to believe that DF would get so much worse in one release 😆.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed Parquet shouldn't know about Delta partition columns. This remap handles
DF's TableSchema/FileScanConfig expectations. DF52 appends partition fields
after the file schema (filled from PartitionedFile.partition_values), and
Statistics.column_statistics is positional which requires name based reordering when
source stats are in TableConfiguration.schema() order. In next, Parquet partition
fields are file_id only; Delta partition columns are handled above Parquet.

KernelScanPlan parquet schema changes stay separate. The remapping fixes a
latent stats-index vs scan-schema-index mismatch that DF52 exposes.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but parquet should have no awareness of partition columns. at the very least in the new provider. Partition columns should be 100% handled on the table provider level

ParquetOpener does does have awareness of DataFusion's partition columns. Before we made that change filters like part_col = 1 OR file_col = 2 would give wrong results when filter Pushdown was enabled. Not sure if Delta partition columns are handled completely separately...

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adriangb - The ParquetOpener partition handling you're referencing (part_col = 1 OR file_col = 2 correctness) applies to DF's built in partitioning. Next-scan sidesteps this entirely- Delta partition columns are handled outside DF's partition machinery:

  1. Parquet layer: only sees file_id (provider metadata). No Delta partition columns in parquet_read_schema.
  2. Value injection: kernel transforms add partition values post read, bypassing FileScanConfig.partition_values.
  3. Filter pushdown: partition predicates use kernel's stats based file skipping, not ParquetOpener's partition aware path.

Kernel owns partition semantics and injects values above the scan.

The stats remapping in the legacy provider exists because it does use DF's partition column machinery and must align with DF52's file_schema + partition_columns index expectations. This will go away once the legacy provider is removed.

let partition_col_names = self.snapshot.metadata().partition_columns();

// Build name -> ColumnStatistics map from source stats (keyed by TableConfiguration schema order)
let source_schema = self.snapshot.schema();
let stats_by_name: HashMap<String, ColumnStatistics> = source_schema
.fields()
.enumerate()
.filter_map(|(idx, field)| {
stats
.column_statistics
.get(idx)
.map(|s| (field.name().to_string(), s.clone()))
})
.collect();

// Build stats in DF52 order: file_schema columns first, then partition_columns
// file_schema columns are in file_schema field order (non-partition from logical_schema)
let file_col_stats: Vec<ColumnStatistics> = file_schema
.fields()
.iter()
.map(|f| {
stats_by_name
.get(f.name())
.cloned()
.unwrap_or_else(ColumnStatistics::new_unknown)
})
.collect();

// Partition columns must be in metadata.partition_columns() order (not schema encounter order)
let partition_col_stats: Vec<ColumnStatistics> = partition_col_names
.iter()
.map(|name| {
stats_by_name
.get(name)
.cloned()
.unwrap_or_else(ColumnStatistics::new_unknown)
})
.collect();

// Combine: file columns first, then partition columns
let mut reordered_stats = file_col_stats;
reordered_stats.extend(partition_col_stats);

let stats = Statistics {
num_rows: stats.num_rows,
total_byte_size: stats.total_byte_size,
column_statistics: reordered_stats,
};

// Add unknown stats for file_column if present (it's added as partition field but not in original schema)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, what does datafusion do if you have a column in the schema but no statistics at all, not even a mention of the column?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question - Statistics.column_statistics is positional, must align 1:1 with TableSchema order. Missing entries break downstream assumptions, we remap by name and fill gaps with ColumnStatistics::new_unknown().

let stats = if config.file_column_name.is_some() {
let mut col_stats = stats.column_statistics;
col_stats.push(ColumnStatistics::new_unknown());
Statistics {
num_rows: stats.num_rows,
total_byte_size: stats.total_byte_size,
column_statistics: col_stats,
}
} else {
stats
};

let parquet_options = TableParquetOptions {
global: self.session.config().options().execution.parquet.clone(),
..Default::default()
};

let mut file_source = ParquetSource::new(parquet_options);
let partition_fields: Vec<Arc<Field>> =
table_partition_cols.into_iter().map(Arc::new).collect();
let table_schema = TableSchema::new(file_schema, partition_fields);

let mut file_source =
ParquetSource::new(table_schema).with_table_parquet_options(parquet_options);

// Sometimes (i.e Merge) we want to prune files that don't make the
// filter and read the entire contents for files that do match the
Expand All @@ -553,11 +624,9 @@ impl<'a> DeltaScanBuilder<'a> {
{
file_source = file_source.with_predicate(predicate);
};
let file_source =
file_source.with_schema_adapter_factory(Arc::new(DeltaSchemaAdapterFactory {}))?;

let file_scan_config =
FileScanConfigBuilder::new(self.log_store.object_store_url(), file_schema, file_source)
FileScanConfigBuilder::new(self.log_store.object_store_url(), Arc::new(file_source))
.with_file_groups(
// If all files were filtered out, we still need to emit at least one partition to
// pass datafusion sanity checks.
Expand All @@ -570,9 +639,8 @@ impl<'a> DeltaScanBuilder<'a> {
},
)
.with_statistics(stats)
.with_projection_indices(self.projection.cloned())
.with_projection_indices(self.projection.cloned())?
.with_limit(self.limit)
.with_table_partition_cols(table_partition_cols)
.build();

let metrics = ExecutionPlanMetricsSet::new();
Expand Down
Loading
Loading