Skip to content

Commit 255f271

Browse files
tobixdevethan-tyler
authored andcommitted
Update new scan path with stable schema for filters
Signed-off-by: Tobias Schwarzinger <tobias.schwarzinger@tuwien.ac.at>
1 parent 0943fe2 commit 255f271

1 file changed

Lines changed: 40 additions & 7 deletions

File tree

  • crates/core/src/delta_datafusion/table_provider/next/scan

crates/core/src/delta_datafusion/table_provider/next/scan/plan.rs

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ impl KernelScanPlan {
131131
let missing_columns: Vec<_> = columns_in_filters
132132
.difference(&columns_in_scan)
133133
.cloned()
134+
.sorted() // Prevent non-deterministic ordering from HashSet
134135
.collect();
135136

136137
let file_id_field = config.file_id_field();
@@ -518,20 +519,19 @@ fn rewrite_expression(expr: Expr, config: &TableConfiguration) -> Result<Expr> {
518519

519520
#[cfg(test)]
520521
mod tests {
522+
use super::*;
523+
use crate::{
524+
delta_datafusion::create_session,
525+
test_utils::{TestResult, open_fs_path},
526+
};
527+
use datafusion::logical_expr::and;
521528
use datafusion::{
522529
assert_batches_sorted_eq,
523530
physical_plan::collect,
524531
prelude::{col, lit},
525532
scalar::ScalarValue,
526533
};
527534

528-
use crate::{
529-
delta_datafusion::create_session,
530-
test_utils::{TestResult, open_fs_path},
531-
};
532-
533-
use super::*;
534-
535535
fn schema_has_view_types(schema: &Schema) -> bool {
536536
schema
537537
.fields()
@@ -860,4 +860,37 @@ mod tests {
860860
&crate::delta_datafusion::file_id::file_id_data_type()
861861
);
862862
}
863+
864+
/// The scan in this test only projects one column. This requires the scan plan to add the
865+
/// columns required for the filter to the physical schema. This test should assert that
866+
/// these additional columns in the physical schema are created deterministically.
867+
#[tokio::test]
868+
async fn test_scan_with_projection_has_stable_schema_for_filters() {
869+
let mut table = open_fs_path("../test/tests/data/COVID-19_NYT");
870+
table.load().await.unwrap();
871+
872+
let snapshot = table.snapshot().unwrap().snapshot().snapshot();
873+
let filter = and(
874+
col("state").eq(lit("Louisiana")),
875+
col("county").eq(lit("Cameron")),
876+
);
877+
let scan_plan = KernelScanPlan::try_new(
878+
snapshot,
879+
Some(&vec![4]),
880+
&[filter],
881+
&DeltaScanConfig::default(),
882+
None,
883+
)
884+
.unwrap();
885+
886+
let expected_schema = snapshot
887+
.schema()
888+
.project(&vec!["cases", "county", "state"])
889+
.unwrap();
890+
// Assert string representation as the equality check is order-insensitive.
891+
assert_eq!(
892+
scan_plan.scan.physical_schema().to_string(),
893+
expected_schema.to_string()
894+
)
895+
}
863896
}

0 commit comments

Comments
 (0)