Handle merged schemas in parquet pruning#2170
Conversation
alamb
left a comment
There was a problem hiding this comment.
Thank you @thinkharderdev - I think this looks quite good.
The only thing I am not sure about is some rows that appear to show null passing a c2 = 0 filter. Otherwise good to go from my perspective
| use crate::logical_plan::Expr; | ||
| use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; | ||
| use crate::physical_plan::file_format::ParquetExec; | ||
| use crate::physical_plan::file_format::{ParquetExec, SchemaAdapter}; |
| RecordBatch::try_new(schema, columns).expect("error; creating record batch") | ||
| } | ||
|
|
||
| fn create_batch(columns: Vec<(&str, ArrayRef)>) -> RecordBatch { |
There was a problem hiding this comment.
This looks very similar / the same as RecordBatch::try_from_iter: https://docs.rs/arrow/11.1.0/arrow/record_batch/struct.RecordBatch.html#method.try_from_iter
| let c1_stats = &stats.column_statistics.as_ref().expect("missing c1 stats")[0]; | ||
| let c2_stats = &stats.column_statistics.as_ref().expect("missing c2 stats")[1]; | ||
| assert_eq!(c1_stats.null_count, Some(1)); | ||
| assert_eq!(c2_stats.null_count, Some(3)); |
There was a problem hiding this comment.
this is cool to fill in the null stats for the missing column 👍
| .statistics() | ||
| .columns() | ||
| .iter() | ||
| .find(|c| c.column_descr().name() == &$column.name) |
| .unwrap(); | ||
| let expected = vec![ | ||
| "+-----+----+----+", | ||
| "| c1 | c3 | c2 |", |
There was a problem hiding this comment.
🤔 if the filter is c2 = 0 then none of these rows should pass.... so something doesn't look quite right
There was a problem hiding this comment.
Yeah, this looked wrong to me as well. What I think is happening is that the min/max aren't set the pruning predicates aren't applied. In a "real" query where this predicate was pushed down from a filter stage this would still get piped into a FilerExec. I think we would have to special case the scenario where we fill in a null column to conform to a merged schema which may be worth doing. I can double check though and make sure there's not a bug here.
There was a problem hiding this comment.
Makes sense -- a comment in the test to explain why it is ok would be helpful for future readers
| .unwrap(); | ||
|
|
||
| let expected = vec![ | ||
| "+-----+----+", |
There was a problem hiding this comment.
same thing here -- I wouldn't expect null values in c2 to be returned...
|
FYI @Cheappie |
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
|
Thanks agian @thinkharderdev |
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
| .await | ||
| .unwrap(); | ||
|
|
||
| // This does not look correct since the "c2" values in the result do not in fact match the predicate `c2 == 0` |
| let c1: ArrayRef = | ||
| Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); | ||
|
|
||
| let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); |
There was a problem hiding this comment.
I might miss some point, but why values in c2 are not materialized if we weren't able to prune them ? I wonder how filter like c2 eq 1_i64 can be satisfied against null array ?
"+-----+----+",
"| c1 | c2 |",
"+-----+----+",
"| Foo | |",
"| | |",
"| bar | |",
"+-----+----+",
There was a problem hiding this comment.
I think the key point is that filtering is also applied after the initial parquet scan / pruning -- the pruning is just a first pass to try and reduce additional work.
So subsequent Filter operations will actually handle filtering out the columns with c2 = null
Which issue does this PR close?
Closes #2161
Rationale for this change
Adding schema merging to
ParquetFormatbroke pruning since the existing implementation assumes that each file in theListingTablehas the merged schema. In the best case this just prevents pruning row groups, but in certain cases (such as #2161) it can cause runtime errors and possibly incorrect query results.What changes are included in this PR?
Two changes:
ListingTablescan, we need to map file-level statistics to the expected schema of the tables merged schema.Are there any user-facing changes?
No
Yes, in order to map the statistics to the merged schema, we need to pass the merged schema to
FileFormat::infer_stats.