Skip to content

Commit 0a9f995

Browse files
authored
refactor: migrate merge target scan to DeltaScanNext (delta-io#4266)
# Description Migrate merge target scan to `DeltaScanNext`. Route the merge predicate into file skipping instead of scan filters, and match rewritten files against full file IDs. # Related Issue(s) - delta-io#4239 <!--- For example: - closes #106 ---> # Documentation <!--- Share links to useful documentation ---> --------- Signed-off-by: Ethan Urbanski <ethan@urbanskitech.com>
1 parent f77572e commit 0a9f995

3 files changed

Lines changed: 705 additions & 73 deletions

File tree

crates/core/src/delta_datafusion/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,14 @@ pub(crate) use data_validation::{
7171
DataValidationExec, constraints_to_exprs, generated_columns_to_exprs, validation_predicates,
7272
};
7373
pub(crate) use find_files::*;
74+
pub(crate) use table_provider::next::normalize_path_as_file_id;
7475
pub use table_provider::{
7576
DeltaScan, DeltaScanConfig, DeltaScanConfigBuilder, DeltaTableProvider, TableProviderBuilder,
7677
next::DeltaScanExec,
7778
};
7879
pub(crate) use table_provider::{
79-
DeltaScanBuilder, next::FILE_ID_COLUMN_DEFAULT, update_datafusion_session,
80+
DeltaScanBuilder, next::FILE_ID_COLUMN_DEFAULT, resolve_file_column_name,
81+
update_datafusion_session,
8082
};
8183

8284
pub(crate) const PATH_COLUMN: &str = "__delta_rs_path";

crates/core/src/delta_datafusion/table_provider.rs

Lines changed: 65 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,41 @@ pub(crate) mod next;
6565

6666
const PATH_COLUMN: &str = "__delta_rs_path";
6767

68+
pub(crate) fn resolve_file_column_name(
69+
input_schema: &Schema,
70+
file_column_name: Option<&str>,
71+
) -> DeltaResult<String> {
72+
let column_names: HashSet<&str> = input_schema
73+
.fields()
74+
.iter()
75+
.map(|field| field.name().as_str())
76+
.collect();
77+
78+
match file_column_name {
79+
Some(name) => {
80+
if column_names.contains(name) {
81+
return Err(DeltaTableError::Generic(format!(
82+
"Unable to add file path column since column with name {name} exists"
83+
)));
84+
}
85+
86+
Ok(name.to_owned())
87+
}
88+
None => {
89+
let prefix = PATH_COLUMN;
90+
let mut idx = 0;
91+
let mut name = prefix.to_owned();
92+
93+
while column_names.contains(name.as_str()) {
94+
idx += 1;
95+
name = format!("{prefix}_{idx}");
96+
}
97+
98+
Ok(name)
99+
}
100+
}
101+
}
102+
68103
#[derive(Debug, Clone)]
69104
/// Used to specify if additional metadata columns are exposed to the user
70105
pub struct DeltaScanConfigBuilder {
@@ -138,35 +173,10 @@ impl DeltaScanConfigBuilder {
138173
/// Build a DeltaScanConfig and ensure no column name conflicts occur during downstream processing
139174
pub fn build(&self, snapshot: &EagerSnapshot) -> DeltaResult<DeltaScanConfig> {
140175
let file_column_name = if self.include_file_column {
141-
let input_schema = snapshot.input_schema();
142-
let mut column_names: HashSet<&String> = HashSet::new();
143-
for field in input_schema.fields.iter() {
144-
column_names.insert(field.name());
145-
}
146-
147-
match &self.file_column_name {
148-
Some(name) => {
149-
if column_names.contains(name) {
150-
return Err(DeltaTableError::Generic(format!(
151-
"Unable to add file path column since column with name {name} exists"
152-
)));
153-
}
154-
155-
Some(name.to_owned())
156-
}
157-
None => {
158-
let prefix = PATH_COLUMN;
159-
let mut idx = 0;
160-
let mut name = prefix.to_owned();
161-
162-
while column_names.contains(&name) {
163-
idx += 1;
164-
name = format!("{prefix}_{idx}");
165-
}
166-
167-
Some(name)
168-
}
169-
}
176+
Some(resolve_file_column_name(
177+
snapshot.input_schema().as_ref(),
178+
self.file_column_name.as_deref(),
179+
)?)
170180
} else {
171181
None
172182
};
@@ -1325,6 +1335,32 @@ mod tests {
13251335
Arc::new(SessionStateBuilder::new().with_config(config).build())
13261336
}
13271337

1338+
#[test]
1339+
fn test_resolve_file_column_name_avoids_collisions() {
1340+
let schema = Schema::new(vec![
1341+
Field::new("id", ArrowDataType::Int64, false),
1342+
Field::new(PATH_COLUMN, ArrowDataType::Utf8, true),
1343+
Field::new(format!("{PATH_COLUMN}_1"), ArrowDataType::Utf8, true),
1344+
]);
1345+
1346+
let resolved = resolve_file_column_name(&schema, None).unwrap();
1347+
assert_eq!(resolved, format!("{PATH_COLUMN}_2"));
1348+
}
1349+
1350+
#[test]
1351+
fn test_resolve_file_column_name_rejects_explicit_collision() {
1352+
let schema = Schema::new(vec![
1353+
Field::new("id", ArrowDataType::Int64, false),
1354+
Field::new("file_col", ArrowDataType::Utf8, true),
1355+
]);
1356+
1357+
let err = resolve_file_column_name(&schema, Some("file_col")).unwrap_err();
1358+
assert!(
1359+
err.to_string()
1360+
.contains("Unable to add file path column since column with name file_col exists")
1361+
);
1362+
}
1363+
13281364
#[tokio::test]
13291365
async fn test_insert_into_simple() {
13301366
let table = create_test_table().await.unwrap();

0 commit comments

Comments
 (0)