Skip to content

Commit ffb794b

Browse files
ethan-tylerrtyler
andauthored
fix(datafusion): resolve DML predicates against execution scan schema (delta-io#4127)
# Description After the DataFusion 52 upgrade, DELETE and UPDATE operations fail with: `Invalid comparison operation: Utf8 == Utf8View` Error: `Arrow error: Invalid comparison operation: Utf8 == Utf8View` This occurs when using string equality predicates like table.delete().with_predicate("name = 'foo'"). ## Root Cause: With DataFusion 52, Parquet string/binary columns may be represented as view types (`Utf8View`/`BinaryView`) when `datafusion.execution.parquet.schema_force_view_types` is enabled (default `true`). DELETE/UPDATE were resolving predicates and SET expressions against `snapshot.arrow_schema()` (base `Utf8`/`Binary`), but executing against the scan/provider schema (which can include view types). This mismatch caused filter evaluation errors like `Utf8 == Utf8View` (and similarly `Binary == BinaryView`). ## Fix: Resolve predicates against DeltaScanConfig::table_schema() instead of snapshot.arrow_schema(). This ensures predicate literals are coerced to match the actual execution schema (view types when enabled, base types when disabled). ## Changes: - delete.rs: Use DeltaScanConfig::table_schema() for predicate resolution - update.rs: Use DeltaScanConfig::table_schema() for both SET expressions and WHERE predicates - expr.rs: Add ScalarValue::Dictionary support to fmt_expr_to_sql (partition columns are dictionary-encoded in execution schema) ## Tests: - test_delete_string_equality_utf8view_regression_4125 - test_update_string_equality_non_partition - test_delete_partition_string_predicate_dictionary_formatting - test_delete_binary_equality_non_partition - test_delete_custom_session_schema_force_view_types_disabled - Dictionary scalar formatting unit tests in expr.rs # Related Issue(s) - Fixes delta-io#4125 <!--- For example: - closes #106 ---> # Documentation <!--- Share links to useful documentation ---> --------- Signed-off-by: Ethan Urbanski <ethan@urbanskitech.com> Co-authored-by: R. Tyler Croy <rtyler@brokenco.de>
1 parent 5c91cbf commit ffb794b

4 files changed

Lines changed: 287 additions & 6 deletions

File tree

crates/core/src/delta_datafusion/expr.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -495,6 +495,13 @@ struct ScalarValueFormat<'a> {
495495
impl fmt::Display for ScalarValueFormat<'_> {
496496
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
497497
match self.scalar {
498+
ScalarValue::Dictionary(_key_type, inner) => write!(
499+
f,
500+
"{}",
501+
ScalarValueFormat {
502+
scalar: inner.as_ref()
503+
}
504+
)?,
498505
ScalarValue::Boolean(e) => format_option!(f, e)?,
499506
ScalarValue::Float32(e) => format_option!(f, e)?,
500507
ScalarValue::Float64(e) => format_option!(f, e)?,
@@ -761,6 +768,24 @@ mod test {
761768
expected: "_binary = decode('aa00ff', 'hex')".to_string(),
762769
override_expected_expr: Some(col("_binary").eq(decode(lit("aa00ff"), lit("hex")))),
763770
},
771+
ParseTest {
772+
expr: col("id").eq(lit(ScalarValue::Dictionary(
773+
Box::new(ArrowDataType::UInt16),
774+
Box::new(ScalarValue::Utf8(Some("A".into()))),
775+
))),
776+
expected: "id = 'A'".to_string(),
777+
// Parsing canonicalizes away dictionary wrappers.
778+
override_expected_expr: Some(col("id").eq(lit(ScalarValue::Utf8(Some("A".into()))))),
779+
},
780+
ParseTest {
781+
expr: col("value").eq(lit(ScalarValue::Dictionary(
782+
Box::new(ArrowDataType::UInt16),
783+
Box::new(ScalarValue::Int32(Some(3))),
784+
))),
785+
expected: "value = 3".to_string(),
786+
// Parsing canonicalizes away dictionary wrappers.
787+
override_expected_expr: Some(col("value").eq(lit(ScalarValue::Int64(Some(3))))),
788+
},
764789
simple!(
765790
col("value").between(lit(20_i64), lit(30_i64)),
766791
"value BETWEEN 20 AND 30".to_string()

crates/core/src/operations/delete.rs

Lines changed: 205 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ use uuid::Uuid;
5353
use super::Operation;
5454
use super::cdc::should_write_cdc;
5555
use crate::DeltaTable;
56+
use crate::delta_datafusion::DeltaScanConfig;
5657
use crate::delta_datafusion::expr::fmt_expr_to_sql;
5758
use crate::delta_datafusion::logical::{
5859
LogicalPlanBuilderExt as _, LogicalPlanExt, MetricObserver,
@@ -200,7 +201,13 @@ impl std::future::IntoFuture for DeleteBuilder {
200201

201202
let predicate = this
202203
.predicate
203-
.map(|p| p.resolve(session.as_ref(), snapshot.arrow_schema().to_dfschema_ref()?))
204+
.map(|p| {
205+
let scan_config = DeltaScanConfig::new_from_session(session.as_ref());
206+
let predicate_schema = scan_config
207+
.table_schema(snapshot.table_configuration())?
208+
.to_dfschema_ref()?;
209+
p.resolve(session.as_ref(), predicate_schema)
210+
})
204211
.transpose()?;
205212

206213
let operation = DeltaOperation::Delete {
@@ -440,6 +447,7 @@ mod tests {
440447
use arrow::datatypes::{Field, Schema};
441448
use arrow::record_batch::RecordBatch;
442449
use arrow_array::ArrayRef;
450+
use arrow_array::BinaryArray;
443451
use arrow_array::StringArray;
444452
use arrow_array::StructArray;
445453
use arrow_buffer::NullBuffer;
@@ -449,6 +457,7 @@ mod tests {
449457
use datafusion::physical_plan::ExecutionPlan;
450458
use datafusion::physical_plan::collect;
451459
use datafusion::prelude::*;
460+
use datafusion::scalar::ScalarValue;
452461
use delta_kernel::engine::arrow_conversion::TryIntoKernel;
453462
use delta_kernel::schema::PrimitiveType;
454463
use delta_kernel::schema::StructType;
@@ -913,6 +922,201 @@ mod tests {
913922
assert_batches_sorted_eq!(&expected, &actual);
914923
}
915924

925+
#[tokio::test]
926+
async fn test_delete_partition_string_predicate_dictionary_formatting() -> DeltaResult<()> {
927+
// Regression test: resolving predicates against the execution scan schema can
928+
// dictionary-encode partition columns, so fmt_expr_to_sql must support
929+
// ScalarValue::Dictionary scalars.
930+
let schema = get_arrow_schema(&None);
931+
let table = setup_table(Some(vec!["id"])).await;
932+
933+
let batch = RecordBatch::try_new(
934+
Arc::clone(&schema),
935+
vec![
936+
Arc::new(arrow::array::StringArray::from(vec!["A", "B", "A", "C"])),
937+
Arc::new(arrow::array::Int32Array::from(vec![1, 2, 3, 4])),
938+
Arc::new(arrow::array::StringArray::from(vec![
939+
"2021-02-02",
940+
"2021-02-02",
941+
"2021-02-02",
942+
"2021-02-02",
943+
])),
944+
],
945+
)?;
946+
947+
let table = write_batch(table, batch).await;
948+
949+
let (table, _metrics) = table.delete().with_predicate("id = 'A'").await?;
950+
951+
let last_commit = table.last_commit().await.unwrap();
952+
let parameters = last_commit.operation_parameters.clone().unwrap();
953+
assert_eq!(parameters["predicate"], json!("id = 'A'"));
954+
955+
let expected = [
956+
"+----+-------+------------+",
957+
"| id | value | modified |",
958+
"+----+-------+------------+",
959+
"| B | 2 | 2021-02-02 |",
960+
"| C | 4 | 2021-02-02 |",
961+
"+----+-------+------------+",
962+
];
963+
let actual = get_data(&table).await;
964+
assert_batches_sorted_eq!(&expected, &actual);
965+
Ok(())
966+
}
967+
968+
#[tokio::test]
969+
async fn test_delete_binary_equality_non_partition() -> DeltaResult<()> {
970+
// Regression test: DF52 view types can cause predicate evaluation to compare
971+
// Binary against BinaryView for equality predicates.
972+
let schema = Arc::new(Schema::new(vec![
973+
Field::new("data", DataType::Binary, true),
974+
Field::new("int32", DataType::Int32, true),
975+
]));
976+
977+
let batch = RecordBatch::try_new(
978+
schema,
979+
vec![
980+
Arc::new(BinaryArray::from_opt_vec(vec![
981+
Some(b"aaa".as_slice()),
982+
Some(b"bbb".as_slice()),
983+
Some(b"ccc".as_slice()),
984+
])) as ArrayRef,
985+
Arc::new(Int32Array::from(vec![0, 1, 2])) as ArrayRef,
986+
],
987+
)?;
988+
989+
let table = DeltaTable::new_in_memory().write(vec![batch]).await?;
990+
991+
let (table, _metrics) = table
992+
.delete()
993+
.with_predicate(col("data").eq(lit(ScalarValue::Binary(Some(b"bbb".to_vec())))))
994+
.await?;
995+
996+
let last_commit = table.last_commit().await.unwrap();
997+
let parameters = last_commit.operation_parameters.clone().unwrap();
998+
assert_eq!(
999+
parameters["predicate"],
1000+
json!("data = decode('626262', 'hex')")
1001+
);
1002+
1003+
let mut values = Vec::new();
1004+
for batch in get_data(&table).await {
1005+
let int32 = batch
1006+
.column(1)
1007+
.as_any()
1008+
.downcast_ref::<Int32Array>()
1009+
.unwrap();
1010+
for idx in 0..batch.num_rows() {
1011+
values.push(int32.value(idx));
1012+
}
1013+
}
1014+
values.sort();
1015+
assert_eq!(values, vec![0, 2]);
1016+
Ok(())
1017+
}
1018+
1019+
#[tokio::test]
1020+
async fn test_delete_string_equality_utf8view_regression_4125() -> DeltaResult<()> {
1021+
// Direct regression test for GitHub issue #4125:
1022+
// https://github.com/delta-io/delta-rs/issues/4125
1023+
//
1024+
// Non-partition string column with default session (view types enabled in DF52+).
1025+
// Was failing with: Invalid comparison operation: Utf8 == Utf8View
1026+
let schema = Arc::new(Schema::new(vec![
1027+
Field::new("name", DataType::Utf8, false),
1028+
Field::new("value", DataType::Int32, false),
1029+
]));
1030+
1031+
let batch = RecordBatch::try_new(
1032+
Arc::clone(&schema),
1033+
vec![
1034+
Arc::new(StringArray::from(vec!["alice", "bob", "charlie"])),
1035+
Arc::new(Int32Array::from(vec![1, 2, 3])),
1036+
],
1037+
)?;
1038+
1039+
let table = DeltaTable::new_in_memory().write(vec![batch]).await?;
1040+
1041+
// This was the failing operation in issue #4125
1042+
let (table, _metrics) = table.delete().with_predicate("name = 'bob'").await?;
1043+
1044+
let last_commit = table.last_commit().await.unwrap();
1045+
let parameters = last_commit.operation_parameters.clone().unwrap();
1046+
assert_eq!(parameters["predicate"], json!("name = 'bob'"));
1047+
1048+
let expected = [
1049+
"+---------+-------+",
1050+
"| name | value |",
1051+
"+---------+-------+",
1052+
"| alice | 1 |",
1053+
"| charlie | 3 |",
1054+
"+---------+-------+",
1055+
];
1056+
let actual = get_data(&table).await;
1057+
assert_batches_sorted_eq!(&expected, &actual);
1058+
Ok(())
1059+
}
1060+
1061+
#[tokio::test]
1062+
async fn test_delete_custom_session_schema_force_view_types_disabled() -> DeltaResult<()> {
1063+
// Integration guardrail: user-supplied DataFusion sessions may disable view types.
1064+
// Predicate resolution must respect the provided session's config.
1065+
let schema = Arc::new(Schema::new(vec![
1066+
Field::new("name", DataType::Utf8, false),
1067+
Field::new("value", DataType::Int32, false),
1068+
]));
1069+
1070+
let batch = RecordBatch::try_new(
1071+
Arc::clone(&schema),
1072+
vec![
1073+
Arc::new(StringArray::from(vec!["alice", "bob", "charlie"])),
1074+
Arc::new(Int32Array::from(vec![1, 2, 3])),
1075+
],
1076+
)?;
1077+
1078+
let table = DeltaTable::new_in_memory().write(vec![batch]).await?;
1079+
1080+
let config: datafusion::prelude::SessionConfig =
1081+
crate::delta_datafusion::DeltaSessionConfig::default().into();
1082+
let config = config.set_bool(
1083+
"datafusion.execution.parquet.schema_force_view_types",
1084+
false,
1085+
);
1086+
let runtime_env = datafusion::execution::runtime_env::RuntimeEnvBuilder::new()
1087+
.build_arc()
1088+
.unwrap();
1089+
let state = datafusion::execution::SessionStateBuilder::new()
1090+
.with_default_features()
1091+
.with_config(config)
1092+
.with_runtime_env(runtime_env)
1093+
.with_query_planner(crate::delta_datafusion::planner::DeltaPlanner::new())
1094+
.build();
1095+
let session = Arc::new(state);
1096+
1097+
let (table, _metrics) = table
1098+
.delete()
1099+
.with_session_state(session)
1100+
.with_predicate("name = 'bob'")
1101+
.await?;
1102+
1103+
let last_commit = table.last_commit().await.unwrap();
1104+
let parameters = last_commit.operation_parameters.clone().unwrap();
1105+
assert_eq!(parameters["predicate"], json!("name = 'bob'"));
1106+
1107+
let expected = [
1108+
"+---------+-------+",
1109+
"| name | value |",
1110+
"+---------+-------+",
1111+
"| alice | 1 |",
1112+
"| charlie | 3 |",
1113+
"+---------+-------+",
1114+
];
1115+
let actual = get_data(&table).await;
1116+
assert_batches_sorted_eq!(&expected, &actual);
1117+
Ok(())
1118+
}
1119+
9161120
#[tokio::test]
9171121
async fn test_failure_nondeterministic_query() {
9181122
// Deletion requires a deterministic predicate

crates/core/src/operations/update.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,9 @@ use super::{
4646
CustomExecuteHandler, Operation,
4747
write::execution::{write_execution_plan, write_execution_plan_cdc},
4848
};
49-
use crate::delta_datafusion::{Expression, scan_files_where_matches, update_datafusion_session};
49+
use crate::delta_datafusion::{
50+
DeltaScanConfig, Expression, scan_files_where_matches, update_datafusion_session,
51+
};
5052
use crate::kernel::resolve_snapshot;
5153
use crate::logstore::LogStoreRef;
5254
use crate::operations::cdc::*;
@@ -272,7 +274,10 @@ async fn execute(
272274
let exec_start = Instant::now();
273275
let mut metrics = UpdateMetrics::default();
274276

275-
let schema = snapshot.arrow_schema().to_dfschema_ref()?;
277+
let scan_config = DeltaScanConfig::new_from_session(session);
278+
let schema = scan_config
279+
.table_schema(snapshot.table_configuration())?
280+
.to_dfschema_ref()?;
276281
let updates: HashMap<_, _> = updates
277282
.into_iter()
278283
.map(|(key, expr)| expr.resolve(session, schema.clone()).map(|e| (key.name, e)))
@@ -442,7 +447,13 @@ impl std::future::IntoFuture for UpdateBuilder {
442447

443448
let predicate = this
444449
.predicate
445-
.map(|p| p.resolve(session.as_ref(), snapshot.arrow_schema().to_dfschema_ref()?))
450+
.map(|p| {
451+
let scan_config = DeltaScanConfig::new_from_session(session.as_ref());
452+
let predicate_schema = scan_config
453+
.table_schema(snapshot.table_configuration())?
454+
.to_dfschema_ref()?;
455+
p.resolve(session.as_ref(), predicate_schema)
456+
})
446457
.transpose()?;
447458

448459
let predicate = predicate.unwrap_or(lit(true));

crates/core/src/operations/update/tests.rs

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@ use super::*;
22

33
use crate::kernel::{Action, PrimitiveType, StructField, StructType};
44
use crate::kernel::{DataType as DeltaDataType, ProtocolInner};
5-
use crate::writer::test_utils::datafusion::get_data;
6-
use crate::writer::test_utils::datafusion::write_batch;
5+
use crate::writer::test_utils::datafusion::{get_data, write_batch};
76
use crate::writer::test_utils::{
87
get_arrow_schema, get_delta_schema, get_record_batch, setup_table_with_configuration,
98
};
@@ -124,6 +123,48 @@ async fn test_update_predicate_left_in_data() -> DeltaResult<()> {
124123
Ok(())
125124
}
126125

126+
#[tokio::test]
127+
async fn test_update_string_equality_non_partition() -> DeltaResult<()> {
128+
// Regression test: DF52 view types can cause predicate evaluation to compare
129+
// Utf8 against Utf8View for string equality predicates.
130+
let schema = Arc::new(Schema::new(vec![
131+
Field::new("utf8", DataType::Utf8, true),
132+
Field::new("int32", DataType::Int32, true),
133+
]));
134+
135+
let batch = RecordBatch::try_new(
136+
schema,
137+
vec![
138+
Arc::new(StringArray::from(vec![Some("0"), Some("1"), Some("2")])),
139+
Arc::new(Int32Array::from(vec![0, 1, 2])),
140+
],
141+
)?;
142+
143+
let table = DeltaTable::new_in_memory().write(vec![batch]).await?;
144+
145+
let (table, _metrics) = table
146+
.update()
147+
.with_predicate("utf8 = '1'")
148+
.with_update(
149+
"utf8",
150+
"CASE WHEN utf8 = '1' THEN 'hello world' ELSE utf8 END",
151+
)
152+
.await?;
153+
154+
let expected = [
155+
"+-------------+-------+",
156+
"| utf8 | int32 |",
157+
"+-------------+-------+",
158+
"| 0 | 0 |",
159+
"| 2 | 2 |",
160+
"| hello world | 1 |",
161+
"+-------------+-------+",
162+
];
163+
let actual = get_data(&table).await;
164+
assert_batches_sorted_eq!(&expected, &actual);
165+
Ok(())
166+
}
167+
127168
#[tokio::test]
128169
async fn test_update_no_predicate() {
129170
let schema = get_arrow_schema(&None);

0 commit comments

Comments
 (0)