Skip to content

Commit 1612ba1

Browse files
authored
fix: align stats to view typed schema + harden parquet predicate pushdown (delta-io#4144)
# Description Fix next-scan Parquet predicate pushdown when `parquet_read_schema` uses Arrow view types (`Utf8View` / `BinaryView`). DataFusion may replace constant columns with literals using `PartitionedFile.statistics`. If those scalars are base-typed while the scan schema is view-typed, DataFusion errors: ``` expected Utf8View but found Utf8 ``` **Changes:** - Align per file `PartitionedFile.statistics` scalar types to the Parquet read schema. Now DF's constant column/literal replacement doesn't produce base typed arrays under a view typed schema - Build Parquet pushdown predicates via DF session state (`create_physical_expr`) to get canonical type coercion/rewrites - Add focused unit tests for stats alignment behavior (string/binary view conversions, dictionary inner types, length-mismatch policy) **Notes:** Scoped to next-scan, predicate pushdown, and stats typing only. No write path changes cc @roeap # Related Issue(s) <!--- For example: - closes #106 ---> # Documentation <!--- Share links to useful documentation ---> --------- Signed-off-by: Ethan Urbanski <ethan@urbanskitech.com>
1 parent 5121983 commit 1612ba1

2 files changed

Lines changed: 426 additions & 5 deletions

File tree

  • crates/core/src/delta_datafusion/table_provider/next/scan

crates/core/src/delta_datafusion/table_provider/next/scan/mod.rs

Lines changed: 262 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,13 @@ use chrono::{TimeZone as _, Utc};
2424
use dashmap::DashMap;
2525
use datafusion::{
2626
catalog::Session,
27-
common::{ColumnStatistics, HashMap, Result, Statistics, plan_err, stats::Precision},
27+
common::{
28+
ColumnStatistics, HashMap, Result, Statistics, ToDFSchema, plan_err, stats::Precision,
29+
},
2830
config::TableParquetOptions,
2931
datasource::physical_plan::{ParquetSource, parquet::CachedParquetFileReaderFactory},
3032
error::DataFusionError,
3133
execution::object_store::ObjectStoreUrl,
32-
physical_expr::planner::logical2physical,
3334
physical_plan::{
3435
ExecutionPlan,
3536
empty::EmptyExec,
@@ -293,6 +294,10 @@ type FilesByStore = (ObjectStoreUrl, Vec<(PartitionedFile, Option<Vec<bool>>)>);
293294
async fn get_read_plan(
294295
state: &dyn Session,
295296
files_by_store: impl IntoIterator<Item = FilesByStore>,
297+
// Schema of physical file columns to read from Parquet (no Delta partitions, no file-id).
298+
//
299+
// This is also the schema used for Parquet pruning/pushdown. It may include view types
300+
// (e.g. Utf8View/BinaryView) depending on `DeltaScanConfig`.
296301
parquet_read_schema: &SchemaRef,
297302
limit: Option<usize>,
298303
file_id_field: &FieldRef,
@@ -308,6 +313,7 @@ async fn get_read_plan(
308313
let mut full_read_schema = SchemaBuilder::from(parquet_read_schema.as_ref().clone());
309314
full_read_schema.push(file_id_field.as_ref().clone().with_nullable(true));
310315
let full_read_schema = Arc::new(full_read_schema.finish());
316+
let full_read_df_schema = full_read_schema.clone().to_dfschema()?;
311317

312318
for (store_url, files) in files_by_store.into_iter() {
313319
let reader_factory = Arc::new(CachedParquetFileReaderFactory::new(
@@ -332,7 +338,7 @@ async fn get_read_plan(
332338
if !has_selection_vectors && let Some(pred) = predicate {
333339
// Predicate pushdown can reference the synthetic file-id partition column.
334340
// Use the full read schema (data columns + file-id) when planning.
335-
let physical = logical2physical(pred, full_read_schema.as_ref());
341+
let physical = state.create_physical_expr(pred.clone(), &full_read_df_schema)?;
336342
file_source = file_source
337343
.with_predicate(physical)
338344
.with_pushdown_filters(true);
@@ -405,6 +411,7 @@ fn cast_record_batch(batch: RecordBatch, target_schema: &SchemaRef) -> Result<Re
405411
}
406412
return Ok(batch);
407413
}
414+
408415
let options = CastOptions {
409416
safe: true,
410417
..Default::default()
@@ -420,7 +427,9 @@ fn cast_record_batch(batch: RecordBatch, target_schema: &SchemaRef) -> Result<Re
420427

421428
#[cfg(test)]
422429
mod tests {
423-
use arrow_array::{Int32Array, RecordBatch, StringArray, StructArray};
430+
use arrow_array::{
431+
BinaryArray, BinaryViewArray, Int32Array, RecordBatch, StringArray, StructArray,
432+
};
424433
use arrow_schema::{DataType, Field, Fields, Schema};
425434
use datafusion::{
426435
physical_plan::collect,
@@ -834,6 +843,255 @@ mod tests {
834843
];
835844
assert_batches_sorted_eq!(&expected, &batches);
836845

846+
Ok(())
847+
}
848+
#[tokio::test]
849+
async fn test_predicate_pushdown_allows_view_literal_against_base_parquet_file() -> TestResult {
850+
use datafusion::scalar::ScalarValue;
851+
852+
let store = Arc::new(InMemory::new());
853+
let store_url = Url::parse("memory:///")?;
854+
let session = Arc::new(create_session().into_inner());
855+
session
856+
.runtime_env()
857+
.register_object_store(&store_url, store.clone());
858+
859+
// Write a Parquet file with base types, but read it with a view-typed schema.
860+
let file_schema = Arc::new(Schema::new(vec![
861+
Field::new("id", DataType::Int32, false),
862+
Field::new("name", DataType::Utf8, true),
863+
]));
864+
let parquet_read_schema = Arc::new(Schema::new(vec![
865+
Field::new("id", DataType::Int32, false),
866+
Field::new("name", DataType::Utf8View, true),
867+
]));
868+
let data = RecordBatch::try_new(
869+
file_schema.clone(),
870+
vec![
871+
Arc::new(Int32Array::from(vec![1, 2, 3])),
872+
Arc::new(StringArray::from(vec![
873+
Some("alice"),
874+
Some("bob"),
875+
Some("charlie"),
876+
])),
877+
],
878+
)?;
879+
880+
let mut buffer = Vec::new();
881+
let mut arrow_writer = ArrowWriter::try_new(&mut buffer, file_schema.clone(), None)?;
882+
arrow_writer.write(&data)?;
883+
arrow_writer.close()?;
884+
885+
let path = Path::from("test_view_literal.parquet");
886+
store.put(&path, buffer.into()).await?;
887+
let mut file: PartitionedFile = store.head(&path).await?.into();
888+
file.partition_values
889+
.push(wrap_partition_value_in_dict(ScalarValue::Utf8(Some(
890+
"memory:///test_view_literal.parquet".to_string(),
891+
))));
892+
893+
let files_by_store = vec![(
894+
store_url.as_object_store_url(),
895+
vec![(file, None::<Vec<bool>>)],
896+
)];
897+
898+
let file_id_field = Arc::new(Field::new(
899+
FILE_ID_COLUMN_DEFAULT,
900+
DataType::Dictionary(DataType::UInt16.into(), DataType::Utf8.into()),
901+
false,
902+
));
903+
904+
let predicate = col("name").eq(lit(ScalarValue::Utf8View(Some("bob".to_string()))));
905+
let plan = get_read_plan(
906+
&session.state(),
907+
files_by_store,
908+
&parquet_read_schema,
909+
None,
910+
&file_id_field,
911+
Some(&predicate),
912+
)
913+
.await?;
914+
let batches = collect(plan, session.task_ctx()).await?;
915+
916+
let expected = vec![
917+
"+----+------+-------------------------------------+",
918+
"| id | name | __delta_rs_file_id__ |",
919+
"+----+------+-------------------------------------+",
920+
"| 2 | bob | memory:///test_view_literal.parquet |",
921+
"+----+------+-------------------------------------+",
922+
];
923+
assert_batches_sorted_eq!(&expected, &batches);
924+
925+
Ok(())
926+
}
927+
928+
#[tokio::test]
929+
async fn test_predicate_pushdown_allows_sql_literal_against_view_schema() -> TestResult {
930+
use datafusion::scalar::ScalarValue;
931+
932+
let store = Arc::new(InMemory::new());
933+
let store_url = Url::parse("memory:///")?;
934+
let session = Arc::new(create_session().into_inner());
935+
session
936+
.runtime_env()
937+
.register_object_store(&store_url, store.clone());
938+
939+
// Write a Parquet file with base types, but read it with a view-typed schema.
940+
let file_schema = Arc::new(Schema::new(vec![
941+
Field::new("id", DataType::Int32, false),
942+
Field::new("name", DataType::Utf8, true),
943+
]));
944+
let parquet_read_schema = Arc::new(Schema::new(vec![
945+
Field::new("id", DataType::Int32, false),
946+
Field::new("name", DataType::Utf8View, true),
947+
]));
948+
let data = RecordBatch::try_new(
949+
file_schema.clone(),
950+
vec![
951+
Arc::new(Int32Array::from(vec![1, 2, 3])),
952+
Arc::new(StringArray::from(vec![
953+
Some("alice"),
954+
Some("bob"),
955+
Some("charlie"),
956+
])),
957+
],
958+
)?;
959+
960+
let mut buffer = Vec::new();
961+
let mut arrow_writer = ArrowWriter::try_new(&mut buffer, file_schema.clone(), None)?;
962+
arrow_writer.write(&data)?;
963+
arrow_writer.close()?;
964+
965+
let path = Path::from("test_sql_literal.parquet");
966+
store.put(&path, buffer.into()).await?;
967+
let mut file: PartitionedFile = store.head(&path).await?.into();
968+
file.partition_values
969+
.push(wrap_partition_value_in_dict(ScalarValue::Utf8(Some(
970+
"memory:///test_sql_literal.parquet".to_string(),
971+
))));
972+
973+
let files_by_store = vec![(
974+
store_url.as_object_store_url(),
975+
vec![(file, None::<Vec<bool>>)],
976+
)];
977+
978+
let file_id_field = Arc::new(Field::new(
979+
FILE_ID_COLUMN_DEFAULT,
980+
DataType::Dictionary(DataType::UInt16.into(), DataType::Utf8.into()),
981+
false,
982+
));
983+
984+
let predicate = col("name").eq(lit("bob"));
985+
let plan = get_read_plan(
986+
&session.state(),
987+
files_by_store,
988+
&parquet_read_schema,
989+
None,
990+
&file_id_field,
991+
Some(&predicate),
992+
)
993+
.await?;
994+
let batches = collect(plan, session.task_ctx()).await?;
995+
996+
let expected = vec![
997+
"+----+------+------------------------------------+",
998+
"| id | name | __delta_rs_file_id__ |",
999+
"+----+------+------------------------------------+",
1000+
"| 2 | bob | memory:///test_sql_literal.parquet |",
1001+
"+----+------+------------------------------------+",
1002+
];
1003+
assert_batches_sorted_eq!(&expected, &batches);
1004+
1005+
Ok(())
1006+
}
1007+
1008+
#[tokio::test]
1009+
async fn test_predicate_pushdown_allows_binaryview_literal_against_base_parquet_file()
1010+
-> TestResult {
1011+
use datafusion::scalar::ScalarValue;
1012+
1013+
let store = Arc::new(InMemory::new());
1014+
let store_url = Url::parse("memory:///")?;
1015+
let session = Arc::new(create_session().into_inner());
1016+
session
1017+
.runtime_env()
1018+
.register_object_store(&store_url, store.clone());
1019+
1020+
let file_schema = Arc::new(Schema::new(vec![
1021+
Field::new("id", DataType::Int32, false),
1022+
Field::new("data", DataType::Binary, true),
1023+
]));
1024+
let parquet_read_schema = Arc::new(Schema::new(vec![
1025+
Field::new("id", DataType::Int32, false),
1026+
Field::new("data", DataType::BinaryView, true),
1027+
]));
1028+
let data = RecordBatch::try_new(
1029+
file_schema.clone(),
1030+
vec![
1031+
Arc::new(Int32Array::from(vec![1, 2, 3])),
1032+
Arc::new(BinaryArray::from_opt_vec(vec![
1033+
Some(b"aaa".as_slice()),
1034+
Some(b"bbb".as_slice()),
1035+
Some(b"ccc".as_slice()),
1036+
])),
1037+
],
1038+
)?;
1039+
1040+
let mut buffer = Vec::new();
1041+
let mut arrow_writer = ArrowWriter::try_new(&mut buffer, file_schema.clone(), None)?;
1042+
arrow_writer.write(&data)?;
1043+
arrow_writer.close()?;
1044+
1045+
let path = Path::from("test_binary_view.parquet");
1046+
store.put(&path, buffer.into()).await?;
1047+
let mut file: PartitionedFile = store.head(&path).await?.into();
1048+
file.partition_values
1049+
.push(wrap_partition_value_in_dict(ScalarValue::Utf8(Some(
1050+
"memory:///test_binary_view.parquet".to_string(),
1051+
))));
1052+
1053+
let files_by_store = vec![(
1054+
store_url.as_object_store_url(),
1055+
vec![(file, None::<Vec<bool>>)],
1056+
)];
1057+
1058+
let file_id_field = Arc::new(Field::new(
1059+
FILE_ID_COLUMN_DEFAULT,
1060+
DataType::Dictionary(DataType::UInt16.into(), DataType::Utf8.into()),
1061+
false,
1062+
));
1063+
1064+
let predicate = col("data").eq(lit(ScalarValue::BinaryView(Some(b"bbb".to_vec()))));
1065+
let plan = get_read_plan(
1066+
&session.state(),
1067+
files_by_store,
1068+
&parquet_read_schema,
1069+
None,
1070+
&file_id_field,
1071+
Some(&predicate),
1072+
)
1073+
.await?;
1074+
let batches = collect(plan, session.task_ctx()).await?;
1075+
1076+
assert_eq!(batches.len(), 1);
1077+
assert_eq!(batches[0].num_rows(), 1);
1078+
let id_col = batches[0]
1079+
.column(0)
1080+
.as_any()
1081+
.downcast_ref::<Int32Array>()
1082+
.unwrap();
1083+
assert_eq!(id_col.value(0), 2);
1084+
1085+
let data_col = batches[0]
1086+
.column(1)
1087+
.as_any()
1088+
.downcast_ref::<BinaryViewArray>()
1089+
.unwrap();
1090+
assert_eq!(data_col.value(0), b"bbb");
1091+
1092+
assert_eq!(batches[0].num_columns(), 3);
1093+
assert_eq!(batches[0].schema().field(2).name(), FILE_ID_COLUMN_DEFAULT);
1094+
8371095
Ok(())
8381096
}
8391097
}

0 commit comments

Comments
 (0)