chore(deps): upgrade datafusion to 52.0.0#4092
Conversation
|
ACTION NEEDED delta-rs follows the Conventional Commits specification for release automation. The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification. |
|
"Arrow57 requires matching scalar types for comparison kernels" @alamb is there a reason why arrow 57 is more restrictive with literal comparison? |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #4092 +/- ##
==========================================
- Coverage 76.17% 76.07% -0.11%
==========================================
Files 165 164 -1
Lines 44692 45073 +381
Branches 44692 45073 +381
==========================================
+ Hits 34046 34291 +245
- Misses 9024 9140 +116
- Partials 1622 1642 +20 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| column_statistics: reordered_stats, | ||
| }; | ||
|
|
||
| // Add unknown stats for file_column if present (it's added as partition field but not in original schema) |
There was a problem hiding this comment.
Just curious, what does datafusion do if you have a column in the schema but no statistics at all, not even a mention of the column?
There was a problem hiding this comment.
Good question - Statistics.column_statistics is positional, must align 1:1 with TableSchema order. Missing entries break downstream assumptions, we remap by name and fill gaps with ColumnStatistics::new_unknown().
| base: &Schema, | ||
| ) -> Result<SchemaRef> { | ||
| // IMPORTANT: This schema is used for Parquet reading and predicate pushdown. | ||
| // It must NOT apply view-type conversions (Utf8View/BinaryView) because: |
There was a problem hiding this comment.
So what happens if you do not that?
There was a problem hiding this comment.
Pushdown errors on type mismatch. Parquet stores Utf8/Binary, if we told ParquetSource to operate on Utf8View/BinaryView, the comparison kernels would see different types. We keep base types for the Parquet schema, apply view conversions only in the result schema, and normalize view-typed literals before pushdown.
| "id": Array( | ||
| ["1", "2", "3", "4", "5"], | ||
| type=ArrowField("id", DataType.string_view(), nullable=True), | ||
| type=ArrowField("id", DataType.string(), nullable=True), |
There was a problem hiding this comment.
We should revert to view types for Python
There was a problem hiding this comment.
Agreed. The DataType.string() changes were a workaround. The Rust side literal normalization is already in place, reverting to DataType.string_view() should work.
I am not sure what this is referring to I don't think arrow changed in this regards DataFusion has a bunch of code to coerce literals to the corrct type, and I think some of the work to change how pushdown worked results in maybe not calling that code implicitly anymore -- you may have to call it explicitly Maybe @adriangb can point you at the correct APIs etc (e.g. the SchemaAdapter stuff) |
@ion-elgreco: With DF52, view types are more prevalent (schema_force_view_types defaults true). In next-scan we keep the Parquet pushdown schema as base Utf8/Binary and apply view conversions only in the result schema. Predicates may contain view-typed literals from the query, we normalize those to base types before pushdown. @alamb: In delta-rs we build pushdown predicates and call logical2physical directly, which bypasses the analyzer's coercion passes. DF's physical planning assumes coercion is already done and logical2physical won't coerce for us, we normalize literals to match the scan schema explicitly before calling it. If there's a canonical DF52 API for predicate coercion we should use instead, happy to switch. Will update the PR description to clarify this is a pushdown plumbing issue, not Arrow strictness. |
DataFusion now does something similar to this in 52 Specifically, it uses There is a corresponding So it sounds like (without looking at the code in detail) you may be able to use those two methods now instead of custom logic in delta-rs |
Thanks @alamb! Super helpful context, and I agree this is the right direction. Why I went with the literal normalization shim for now: next-scan builds the Parquet pushdown predicate already in file-schema space (base Utf8/Binary), then calls logical2physical. The adapter expects a table-schema expression to translate and using it would require restructuring the predicate planning flow. I had adapter wiring in #4054, but per guidance I split it to keep the DF52 upgrade reviewable. This PR is the minimal extraction; the adapter approach is planned as a focused follow up. Question for that follow up: Would you recommend wiring DefaultPhysicalExprAdapterFactory and PhysicalExprSimplifier directly (mirroring DF's Parquet scan path), or is a custom adapter more appropriate given next-scan's kernel/DV constraints? |
|
@roeap @ion-elgreco - confirming alignment on path forward: This PR: Minimal DF52 upgrade (API migrations, batch coalescing fix) and a narrow, temporary shim to normalize Utf8View/BinaryView literals for next-scan Parquet pushdown. Keeps base-typed Parquet read schema and avoids type mismatch failures. Proposed plan:
Does this plan work, or would you prefer different ordering or scope? |
a7b9e84 to
3e0ebec
Compare
|
BTW @adriangb added a helper here for batch conversion here: This will be released as part of the 52.1.0 release, which I anticipate being released towards the end of this week, if that is helpful |
Signed-off-by: Ethan Urbanski <ethan@urbanskitech.com>
Remove SchemaAdapter usage and switch scans to DF52 TableSchema + updated FileScanConfigBuilder. Update ColumnStatistics literals and align stats expectations with DF52 precision. Adjust ops session handling and python datafusion glue for DF52. Signed-off-by: Ethan Urbanski <ethan@urbanskitech.com>
DataFusion-ffi 52 requires a TaskContextProvider + codec when exporting TableProvider from the Python binding. Also mark byte_size statistics as absent when building PartitionedFile stats. Signed-off-by: Ethan Urbanski <ethan@urbanskitech.com>
The DF52 scan visitor expects to extract the KernelScanPlan from DeltaScanExec; keep a small accessor to avoid depending on struct field visibility. Signed-off-by: Ethan Urbanski <ethan@urbanskitech.com>
2af403b to
9d125aa
Compare
Enable view types by default and normalize predicate pushdown against Parquet base types while keeping the synthetic file_id as Dictionary<UInt16, Utf8>. Also update Python tests to expect view types, relax DF statistics assertions, and disable hash-join IN-list pushdown to avoid a DF52 dictionary panic. Signed-off-by: Ethan Urbanski <ethan@urbanskitech.com>
roeap
left a comment
There was a problem hiding this comment.
Sorry for so many comments @ethan-tyler - it's just that we are reverting some things that were at the top of my list when migrating schema providers. I.e. not needing to do so much schema gymnastics in larger parts of the codebase and emitting nicer physical types (i.e. string views).
Over the migration things got continuously better in that regard (not perfect though 😆), and keep getting better as we migrate.
So at least I would love to understand why we need to regress with DF52.
| impl DeltaScanStream { | ||
| /// Apply the per-file transformation to a RecordBatch. | ||
| fn batch_project(&mut self, batch: RecordBatch) -> Result<RecordBatch> { | ||
| fn batch_project(&mut self, batch: RecordBatch) -> Result<Vec<RecordBatch>> { |
There was a problem hiding this comment.
Not sure what is the exact case, but I was under the assumption that ParquetExec would not coalesce batches and we could expect that one batch always originates from one file. if this is not something we can rely on on, we should at the very least be able to rely on that consecutive roes will be from one file. In that case we might want to look into the partition kernel which we also use for when writing data to partitions.
There was a problem hiding this comment.
You're right that ParquetExec typically yields single file batches and the common case is one output batch. We return Vec<RecordBatch> defensively since DF can hand us a multi file batch (e.g., after coalescing), and DV masks + per-file transforms must apply in file context. Implementation does a linear scan for file_id run boundaries and zero-copy slice() per run with minimal overhead.
| statistics | ||
| .column_statistics | ||
| .push(ColumnStatistics::new_unknown()); |
There was a problem hiding this comment.
I a lo al attempt i found it advantageous to push this right when we create the partitioned file and also assign the other stats.
There was a problem hiding this comment.
Fixed. I moved file_id stats to PartitionedFile construction time as you suggested. We were double appending file_id stats. Now we use partition_values and with_statistics for per file stats and call compute_all_files_statistics with the full table schema.
| fn strip_view_types_from_schema(schema: &Schema) -> Schema { | ||
| Schema::new( | ||
| schema | ||
| .fields() | ||
| .iter() | ||
| .map(|f| { | ||
| let dt = strip_view_types_from_data_type(f.data_type()); | ||
| if &dt == f.data_type() { | ||
| f.as_ref().clone() | ||
| } else { | ||
| f.as_ref().clone().with_data_type(dt) | ||
| } | ||
| }) | ||
| .collect::<Vec<_>>(), | ||
| ) | ||
| } |
There was a problem hiding this comment.
I somehow cannot accept that we need to do this?
What starts failing if we don't. I just trying to get rid of more and more of our custom casting code. WHat spoecifically starts failing so that we need to add this?
There was a problem hiding this comment.
Removed strip_view_types_from_schema. The issue was parquet pushdown breaks when DF produces Utf8View/BinaryView literals but our read schema is base Utf8/Binary. DF's DefaultPhysicalExprAdapter casts columns but not literals (upstream TODO). We now use DF's adapter plus a literal rewrite (view to base) for pushdown. I also added regression tests in scan/mod.rs.
There was a problem hiding this comment.
PhysicalExprSimplifier moves the casts from columns to literals. Maybe look at BatchAdapter (either to use it or as inspo; it’s in main and will be in 52.1.0 not released yet)?
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_view_literal_conversion_in_predicate_pushdown() -> TestResult { |
There was a problem hiding this comment.
this test name make me think it's the new physical expression conversion we need?
There was a problem hiding this comment.
Renamed. Test covers the literal rewrite for Parquet pushdown when physical schema uses base types.
| let plan = get_read_plan( | ||
| &session.state(), | ||
| files_by_store, | ||
| &arrow_schema, |
There was a problem hiding this comment.
the schema we pass into get_read_plan should always be the most specific physical schema around ... i.e. contain VIew types.
If this is what we are testing here we need to update schema we are passing to have view types.
There was a problem hiding this comment.
Intentionally passing base types here. get_read_plan builds parquet_read_schema, which is used for parquet reads and predicate pushdown evaluation. Pushdown happens before DF's schema casts are applied, if we pass view types but the Parquet files use base types, we hit Utf8 == Utf8View mismatches during filter eval. DF coercions also aren't reliably recursive for nested types.
View types are still exposed at the DeltaScan boundary via table_schema just not in the read/pushdown path. We adapt predicates via with_expr_adapter to rewrite view literals to base for pushdown.
If you'd rather pass view types here and handle mismatches differently, open to discussing, this was the least invasive fix I found.
There was a problem hiding this comment.
for the partition columns yes, but especially for the read paths they are interesting? At least that where we are getting most of the benefit i believe?
There was a problem hiding this comment.
Yep - the read paths are actually the main win here. We always dictionary encode the synthetic file-id (file_url) when building PartitionedFile.partition_values and the file-id column itself is Dictionary<UInt16, Utf8>, we avoid repeating the full URL for every row. Partition cols are handled above Parquet via kernel transforms in the next provider and they don’t materially change the Parquet read plan.
| TableProviderFilterPushDown::Exact | ||
| } else { | ||
| TableProviderFilterPushDown::Inexact | ||
| }, | ||
| Some(expr), |
There was a problem hiding this comment.
DO we really want to declare more as exact. While this might be true, i did not find a precedent in datafusion where we set this to be exact for anything other then partition columns.
There was a problem hiding this comment.
TableProvider implementations should only declare as exact predicates they can completely evaluate during the call to scan (which should be replaced with scan_with_args btw). Everything else gets resolved later.
There was a problem hiding this comment.
Agreed with @adriangb - partition only predicates (fully evaluated during scan) now return Exact. Everything else returns Inexact, including Parquet pushdown predicates (still attempted when safe, but DF retains a post scan Filter as the correctness boundary). Renamed parquet_pushdown_exact to parquet_pushdown_enabled for clarity.
There was a problem hiding this comment.
Note for completeness: the Inexact filters may be removed by the filter pushdown physical optimizer if the data source accepts them as exact…
| .set_bool("datafusion.sql_parser.enable_ident_normalization", false), | ||
| .set_bool("datafusion.sql_parser.enable_ident_normalization", false) | ||
| .set_bool("datafusion.execution.parquet.schema_force_view_types", true) | ||
| // Work around DataFusion 52.0.0 bug: hash-join dynamic filtering (IN-list |
There was a problem hiding this comment.
do we have a link to this bug?
There was a problem hiding this comment.
I can very much believe that dictionary arrays are not handled correctly (we've also found plenty of panics in DataFusion because of it) but it would be great to track the issue upstream.
There was a problem hiding this comment.
I will create an issue and link it.
| // DF52's TableSchema outputs columns as: file_schema + partition_columns | ||
| // Source stats are indexed by TableConfiguration.schema() field order, which may differ | ||
| // from the scan schema order. We need name-based remapping, not index-based. |
There was a problem hiding this comment.
where does this reordering come in? My hope would be that the schema we push into the various operations pushes through ...
That said, the table provider needs to go and any changes that are purely motivated by needing to migrate this provider should ideally be included in a pre-factor o.a. so that we can revert them easily. I just a few commits away from removing that thing entirely.
There was a problem hiding this comment.
Stats reordering happens in table_provider.rs via FileScanConfigBuilder::with_statistics. DF52 expects column_statistics indexed as file_schema || partition_columns, but kernel returns them in TableConfiguration.schema() order. We rebuild by name lookup. Scoped to current provider and should be easy to drop when provider is removed.
There was a problem hiding this comment.
but parquet should have no awareness of partition columns. at the very least in the new provider. Partition columns should be 100% handled on the table provider level.
The only partition column we assign on the parquet level is the file id columns.
At least in my mind the parquet reader would read the file as whatever schema (physical types and odering) we push into it? of course sans any partition column. Where these appear would ultimately be decided in the expression we apply from the scan metadata.
There was a problem hiding this comment.
Come to think of it, could it be that some of the changes to the parquet schema in KernelScanPlan are used for this deprecated provider?
If so, can we keep that separate. It just hard to believe that DF would get so much worse in one release 😆.
There was a problem hiding this comment.
Agreed Parquet shouldn't know about Delta partition columns. This remap handles
DF's TableSchema/FileScanConfig expectations. DF52 appends partition fields
after the file schema (filled from PartitionedFile.partition_values), and
Statistics.column_statistics is positional which requires name based reordering when
source stats are in TableConfiguration.schema() order. In next, Parquet partition
fields are file_id only; Delta partition columns are handled above Parquet.
KernelScanPlan parquet schema changes stay separate. The remapping fixes a
latent stats-index vs scan-schema-index mismatch that DF52 exposes.
There was a problem hiding this comment.
but parquet should have no awareness of partition columns. at the very least in the new provider. Partition columns should be 100% handled on the table provider level
ParquetOpener does does have awareness of DataFusion's partition columns. Before we made that change filters like part_col = 1 OR file_col = 2 would give wrong results when filter Pushdown was enabled. Not sure if Delta partition columns are handled completely separately...
There was a problem hiding this comment.
@adriangb - The ParquetOpener partition handling you're referencing (part_col = 1 OR file_col = 2 correctness) applies to DF's built in partitioning. Next-scan sidesteps this entirely- Delta partition columns are handled outside DF's partition machinery:
- Parquet layer: only sees
file_id(provider metadata). No Delta partition columns inparquet_read_schema. - Value injection: kernel transforms add partition values post read, bypassing
FileScanConfig.partition_values. - Filter pushdown: partition predicates use kernel's stats based file skipping, not
ParquetOpener's partition aware path.
Kernel owns partition semantics and injects values above the scan.
The stats remapping in the legacy provider exists because it does use DF's partition column machinery and must align with DF52's file_schema + partition_columns index expectations. This will go away once the legacy provider is removed.
| "date": Array( | ||
| ["2021-01-01", "2021-01-02", "2021-01-03", "2021-12-31"], | ||
| ArrowField("date", type=DataType.string_view(), nullable=True), | ||
| ArrowField("date", type=DataType.string(), nullable=True), |
There was a problem hiding this comment.
we really want to keep outputting StingViews, why is this change needed?
There was a problem hiding this comment.
You're right, this change was an overcorrection and reverted. We still output StringView (via schema_force_view_types=true). The base types in parquet_read_schema are only for the pushdown path; user visible output remains view types.
| fn reset_state(&self) -> Arc<RwLock<dyn LazyBatchGenerator>> { | ||
| Arc::new(RwLock::new(TestBatchGenerator::new(self.data.clone()))) | ||
| } |
There was a problem hiding this comment.
where is this functionality needed (in DF?)? not sure how we would even reset that but this just copies it?=
There was a problem hiding this comment.
@adriangb is right again :) -reset_state is part of DF's LazyBatchGenerator contract and DF calls it when it needs to rerun a scan from the beginning.
For this test generator, we just clone the backing Vec<RecordBatch>. For real Python Arrow streams (which can't rewind), ArrowStreamBatchGenerator::reset_state returns an ExhaustedStreamGenerator that errors with guidance to buffer input if plan re-execution is required.
Align workspace and python bindings on DataFusion 52.1.0 to pick up bugfixes and keep versions consistent. Signed-off-by: Ethan Urbanski <ethan@urbanskitech.com>
Update next-scan planning/execution to handle DF52.1 schema/view-type behavior and keep predicate pushdown stable. Signed-off-by: Ethan Urbanski <ethan@urbanskitech.com>
Make python tests robust to DF52.1 view/base string and binary types, and work around pyarrow sort limitations for view arrays. Signed-off-by: Ethan Urbanski <ethan@urbanskitech.com>
Run cargo fmt to keep formatting consistent. Signed-off-by: Ethan Urbanski <ethan@urbanskitech.com>
roeap
left a comment
There was a problem hiding this comment.
Thanks for sticking with this @ethan-tyler!
Just flushing some comments, but the majority looks great!
The one thing I don't quite understand where its coming from is the updates to the parquet read schema in next/scan/plan.rs. It was a lot of effort to avoid such schema gymnastics this time around as they have been haunting us for years and I hope not to repeat old mistakes in the new provider.
Are we also using this in codepath that affect the old provider?
Realising I am mostly looking at the diff - I'll dive deeper to understand what is going on.
And thanks again for your patience 😄.
| }); | ||
| // NOTE: `PartitionedFile::with_statistics` appends exact stats for partition columns based | ||
| // on `partition_values`, so partition values must be set first. | ||
| partitioned_file.partition_values = vec![file_value.clone()]; |
There was a problem hiding this comment.
So by invterting the order of the assignment we get stats auto-assigned? nice 👍
| // DataFusion's DefaultPhysicalExprAdapter inserts casts on columns, but does not | ||
| // currently rewrite literals to match the physical schema. Parquet predicate | ||
| // evaluation/pushdown happens against the physical file schema prior to those casts. | ||
| // | ||
| // See upstream TODO: | ||
| // datafusion-physical-expr-adapter/src/schema_rewriter.rs | ||
| // "TODO: ... move the cast from the column to literal expressions ..." (52.1.0). |
There was a problem hiding this comment.
this is something we can expect in 52.1, right? i.e. the base optimizer that does this?
There was a problem hiding this comment.
No, sorry that comment is confusing and adjusted it. Adapter still casts columns but not literals, move cast from column to literal remains a TODO upstream (there was an earlier attempt). We should keep the workaround until there is an upstream fix.
| // extended schema with missing column and different data types | ||
| let arrow_schema_extended = Arc::new(Schema::new(vec![ | ||
| Field::new("id", DataType::Int32, false), | ||
| Field::new("value", DataType::Utf8View, true), | ||
| Field::new("value2", DataType::Utf8View, true), | ||
| ])); | ||
| let plan = get_read_plan( | ||
| &session.state(), | ||
| files_by_store, | ||
| &arrow_schema_extended, | ||
| Some(1), | ||
| &file_id_field, | ||
| None, | ||
| ) | ||
| .await?; | ||
| let batches = collect(plan, session.task_ctx()).await?; | ||
| let expected = vec![ | ||
| "+----+-------+--------+-----------------------------+", | ||
| "| id | value | value2 | __delta_rs_file_id__ |", | ||
| "+----+-------+--------+-----------------------------+", | ||
| "| 1 | a | | memory:///test_data.parquet |", | ||
| "+----+-------+--------+-----------------------------+", | ||
| ]; | ||
| assert_batches_sorted_eq!(&expected, &batches); | ||
| assert!(matches!( | ||
| batches[0].column(1).data_type(), | ||
| DataType::Utf8View | ||
| )); | ||
| assert!(matches!( | ||
| batches[0].column(2).data_type(), | ||
| DataType::Utf8View | ||
| )); | ||
|
|
There was a problem hiding this comment.
can we not fulfil this test anymore?
There was a problem hiding this comment.
I don't think get_read_plan() should be expected to produce Utf8View types. The next
provider forces the Parquet read/predicate schema to base types to keep predicate pushdown stable and view types are exposed only at the DeltaScan boundary.
I updated this by:
- keeping the missing column NULL/backfill assertion in the get_read_plan() test, but dropping the Utf8View type assertions there
- adding a boundary level assertion in KernelScanPlan that result_schema can have view types when enabled, while parquet_read_schema stays base types (crates/core/src/delta_datafusion/table_provider/next/scan/plan.rs)
| let plan = get_read_plan( | ||
| &session.state(), | ||
| files_by_store, | ||
| &arrow_schema, |
There was a problem hiding this comment.
for the partition columns yes, but especially for the read paths they are interesting? At least that where we are getting most of the benefit i believe?
| // Always use base types for the Parquet read schema. | ||
| DataType::Utf8View => DataType::Utf8, | ||
| DataType::BinaryView => DataType::Binary, |
There was a problem hiding this comment.
i am so sorry to keep pestering about this - but we especially want them in the parquet read, no?
There was a problem hiding this comment.
Why base types in parquet_read_schema are needed:
schema_force_view_types exists for the perf win, but pushdown/pruning correctness blocks it in DF 52.1.0. Utf8 == Utf8View is a hard error at the Arrow kernel level (arrow-rs #6283), and DF's physical expr adapter has known gaps around literal/cast rewriting (workaround at mod.rs:455-469, tests at :944/:1022).
For delta-rs, __delta_rs_file_id__ must be Dictionary<_, Utf8>
because Arrow can't dictionary-pack to Utf8View (plan.rs:232-242, arrow-rs
#9219). View typing the read schema without addressing this reintroduces mixed
base/view predicates on file-id.
Options I see:
-
Keep current behavior: Parquet read schema stays base for
stable pruning/pushdown and view types applied at DeltaScan boundary. -
Enable view types with safety shim: Keep file-id as
Dictionary<_, Utf8>,
adjust expr adapter to normalize view literals against base columns. Add
regression test filtering on__delta_rs_file_id__ = Utf8View(...). -
Enable view types end-to-end: After arrow-rs #9220 ships, switch
file-id toDictionary<_, Utf8View>and remove mixed schema handling.
|
|
||
| let parquet_pushdown_enabled = scan_config.enable_parquet_pushdown | ||
| && !config.is_feature_enabled(&TableFeature::RowTracking) | ||
| && !config.is_feature_enabled(&TableFeature::DeletionVectors); |
There was a problem hiding this comment.
oh yes, this would have been a nice way to mess up data without this check!!
| // DF52's TableSchema outputs columns as: file_schema + partition_columns | ||
| // Source stats are indexed by TableConfiguration.schema() field order, which may differ | ||
| // from the scan schema order. We need name-based remapping, not index-based. |
There was a problem hiding this comment.
but parquet should have no awareness of partition columns. at the very least in the new provider. Partition columns should be 100% handled on the table provider level.
The only partition column we assign on the parquet level is the file id columns.
At least in my mind the parquet reader would read the file as whatever schema (physical types and odering) we push into it? of course sans any partition column. Where these appear would ultimately be decided in the expression we apply from the scan metadata.
| // DF52's TableSchema outputs columns as: file_schema + partition_columns | ||
| // Source stats are indexed by TableConfiguration.schema() field order, which may differ | ||
| // from the scan schema order. We need name-based remapping, not index-based. |
There was a problem hiding this comment.
Come to think of it, could it be that some of the changes to the parquet schema in KernelScanPlan are used for this deprecated provider?
If so, can we keep that separate. It just hard to believe that DF would get so much worse in one release 😆.
roeap
left a comment
There was a problem hiding this comment.
I nailed it down to one line 😄
and it may just be the case that the current upgrade changed some fundamental behaviour of the parquet reader, then it is what it is, but trying to grock it it's I still don't fully understand why this change is need to upgrade the version.
It may also just be that in fact we were still using the old schema adapter approach by default in the parquet reader, but I think I read in teh docs, that by default the new mechanics would already be used in 51?
| pushdown: TableProviderFilterPushDown::Inexact, | ||
| kernel_predicate: None, | ||
| parquet_predicate: Some(expr), | ||
| parquet_predicate: parquet_pushdown_enabled.then_some(expr), |
There was a problem hiding this comment.
do we need to enciode this twice? i.e. we are checking the config parquet pushdown enabled also when we create the scan. so i think its fine we still pass the predicate if we can create it and then make the decision as we set uop the scan intead of here?
There was a problem hiding this comment.
Agree on single decision point. plan.rs is currently source of truth for
enable_parquet_pushdown and DV/RowTracking gates. mod.rs has an extra RowTracking
guard that doesn't mirror the config/DV checks. Removing the then_some(...) gate
in plan.rs requires moving the full condition to the read plan setup (keeping
has_selection_vectors guard).
Here's the options I can think of:
- Keep plan.rs gate, drop or convert mod.rs RowTracking check to invariant
- Always carry predicate, centralize all gating in get_read_plan and remove
redundant mod.rs check
| result_schema.clone() | ||
| }; | ||
| let parquet_read_schema = config.physical_arrow_schema( | ||
| let parquet_read_schema = config.parquet_file_schema( |
There was a problem hiding this comment.
Sorry for all the noise.
I went through this a bit more comprehensive, and ultimately it's this one line change and its implications that got me worried. This is fairly direct translation from delta kernel and might eventually contain additional metadata columns that need to get passed through that relate to specific features. So any transformation on this is worry-some. (we do transform the physical types as optimization). In any case this should not be processed based on partition columns - the "physical" specifically refers to only being related to physical data. partition columns should have no role at this level.
The idea is that the schema we specify here flows through and gets amended. We do need to look into kernel expression evaluation to fully make sure we don't do additional costly transformations there.
There was a problem hiding this comment.
Agreed - partition columns shouldn’t influence the Parquet/physical read schema. I updated parquet_read_schema to be derived only from scan.physical_schema() and removed the partition column dependent wrapping from parquet_file_schema.
The only remaining transformation there is Utf8View/BinaryView -> Utf8/Binary to keep DF predicate pushdown stable and partition columns are still injected above Parquet via the kernel transform path. This should also be safer if kernel later adds additional metadata columns to the physical schema, since we’re no longer keying off partition columns at this level.
Signed-off-by: Ethan Urbanski <ethan@urbanskitech.com>
Signed-off-by: Ethan Urbanski <ethan@urbanskitech.com>
We're going to clean more up in main
Description
Minimal upgrade to compile and pass existing tests on DF52.
Supersedes #4054.
Changes
Dependencies:
DF52 API migrations:
DF52/Arrow57 scalar compatibility:
Correctness fix:
DF52 enables batch coalescing by default, producing batches with rows from multiple files. Next-scan assumed one file per batch.
Test updates:
Follow Ups (PR Splits):
Verification:
cargo test -p deltalake-core --features datafusion --test integration_datafusionRelated Issue(s)