Skip to content

Commit fa0010e

Browse files
committed
fix: match rewritten files using raw log paths
Signed-off-by: Ethan Urbanski <ethan@urbanskitech.com>
1 parent 2e4ca5b commit fa0010e

2 files changed

Lines changed: 210 additions & 7 deletions

File tree

crates/core/src/operations/merge/mod.rs

Lines changed: 143 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1547,10 +1547,9 @@ async fn execute(
15471547
let table_root = snapshot.table_configuration().table_root().clone();
15481548

15491549
for action in snapshot.log_data() {
1550-
let rel_path = action.path();
1551-
let rel_path_str = rel_path.as_ref();
1550+
let log_path = action.path_raw();
15521551

1553-
if should_remove_rewritten_file(&survivors, rel_path_str, &table_root)? {
1552+
if should_remove_rewritten_file(&survivors, log_path, &table_root)? {
15541553
metrics.num_target_files_removed += 1;
15551554
actions.push(action.remove_action(true).into());
15561555
}
@@ -1745,15 +1744,16 @@ fn get_metric_any_or(
17451744

17461745
fn should_remove_rewritten_file(
17471746
survivors: &barrier::BarrierSurvivorSet,
1748-
rel_path: &str,
1747+
log_path: &str,
17491748
table_root: &url::Url,
17501749
) -> DeltaResult<bool> {
1751-
if survivors.contains(rel_path) {
1750+
if survivors.contains(log_path) {
17521751
return Ok(true);
17531752
}
17541753

1755-
// Compare against normalized file IDs.
1756-
let full_id = normalize_path_as_file_id(rel_path, table_root, "merge remove")?;
1754+
// Compare against normalized file IDs built from the raw log path so percent-encoded
1755+
// partition values map back to the same canonical file ID used by DeltaScanNext.
1756+
let full_id = normalize_path_as_file_id(log_path, table_root, "merge remove")?;
17571757
Ok(survivors.contains(full_id.as_str()))
17581758
}
17591759
impl std::future::IntoFuture for MergeBuilder {
@@ -2153,6 +2153,142 @@ mod tests {
21532153
);
21542154
}
21552155

2156+
async fn assert_merge_encoded_partition_value_removes_original_file(
2157+
partition_value: &str,
2158+
expected_raw_encoded_segment: &str,
2159+
) {
2160+
let schema = get_arrow_schema(&None);
2161+
let table = setup_table(Some(vec!["modified"])).await;
2162+
2163+
let make_source = || {
2164+
let ctx = SessionContext::new();
2165+
let batch = RecordBatch::try_new(
2166+
Arc::clone(&schema),
2167+
vec![
2168+
Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C"])),
2169+
Arc::new(arrow::array::Int32Array::from(vec![1, 2, 3])),
2170+
Arc::new(arrow::array::StringArray::from(vec![
2171+
partition_value,
2172+
partition_value,
2173+
partition_value,
2174+
])),
2175+
],
2176+
)
2177+
.unwrap();
2178+
ctx.read_batch(batch).unwrap()
2179+
};
2180+
2181+
let predicate = col("target.modified")
2182+
.eq(lit(partition_value))
2183+
.and(col("target.id").eq(col("source.id")));
2184+
2185+
let (table, first_metrics) = table
2186+
.merge(make_source(), predicate.clone())
2187+
.with_source_alias("source")
2188+
.with_target_alias("target")
2189+
.when_matched_update(|update| {
2190+
update
2191+
.update("value", col("source.value"))
2192+
.update("modified", col("source.modified"))
2193+
})
2194+
.unwrap()
2195+
.when_not_matched_insert(|insert| {
2196+
insert
2197+
.set("id", col("source.id"))
2198+
.set("value", col("source.value"))
2199+
.set("modified", col("source.modified"))
2200+
})
2201+
.unwrap()
2202+
.await
2203+
.unwrap();
2204+
2205+
assert_eq!(first_metrics.num_target_rows_inserted, 3);
2206+
assert_eq!(first_metrics.num_target_files_removed, 0);
2207+
assert_eq!(table.snapshot().unwrap().log_data().num_files(), 1);
2208+
2209+
let original_file = table
2210+
.snapshot()
2211+
.unwrap()
2212+
.log_data()
2213+
.into_iter()
2214+
.next()
2215+
.unwrap();
2216+
let original_path = original_file.path().to_string();
2217+
let original_path_raw = original_file.path_raw().to_string();
2218+
assert!(
2219+
original_path_raw.contains(expected_raw_encoded_segment),
2220+
"expected raw encoded path to contain {expected_raw_encoded_segment}, got {original_path_raw}"
2221+
);
2222+
2223+
let (table, second_metrics) = table
2224+
.merge(make_source(), predicate)
2225+
.with_source_alias("source")
2226+
.with_target_alias("target")
2227+
.when_matched_update(|update| {
2228+
update
2229+
.update("value", col("source.value"))
2230+
.update("modified", col("source.modified"))
2231+
})
2232+
.unwrap()
2233+
.when_not_matched_insert(|insert| {
2234+
insert
2235+
.set("id", col("source.id"))
2236+
.set("value", col("source.value"))
2237+
.set("modified", col("source.modified"))
2238+
})
2239+
.unwrap()
2240+
.await
2241+
.unwrap();
2242+
2243+
assert_eq!(second_metrics.num_target_rows_updated, 3);
2244+
assert_eq!(second_metrics.num_target_files_removed, 1);
2245+
assert_eq!(table.snapshot().unwrap().log_data().num_files(), 1);
2246+
2247+
let snapshot_bytes = table
2248+
.log_store
2249+
.read_commit_entry(2)
2250+
.await
2251+
.unwrap()
2252+
.expect("failed to get snapshot bytes");
2253+
let actions = crate::logstore::get_actions(2, &snapshot_bytes).unwrap();
2254+
let removed_paths: Vec<_> = actions
2255+
.iter()
2256+
.filter_map(|action| match action {
2257+
Action::Remove(remove) => Some(remove.path.clone()),
2258+
_ => None,
2259+
})
2260+
.collect();
2261+
assert_eq!(removed_paths, vec![original_path]);
2262+
2263+
let expected = vec![
2264+
"+----+-------+------------+".to_string(),
2265+
"| id | value | modified |".to_string(),
2266+
"+----+-------+------------+".to_string(),
2267+
format!("| A | 1 | {partition_value} |"),
2268+
format!("| B | 2 | {partition_value} |"),
2269+
format!("| C | 3 | {partition_value} |"),
2270+
"+----+-------+------------+".to_string(),
2271+
];
2272+
let expected_refs: Vec<_> = expected.iter().map(String::as_str).collect();
2273+
let actual = get_data(&table).await;
2274+
assert_batches_sorted_eq!(&expected_refs, &actual);
2275+
}
2276+
2277+
#[tokio::test]
2278+
async fn test_merge_partition_value_with_space_removes_original_file() {
2279+
assert_merge_encoded_partition_value_removes_original_file("2021 02 01", "%2520").await;
2280+
}
2281+
2282+
#[tokio::test]
2283+
async fn test_merge_partition_value_with_slash_removes_original_file() {
2284+
assert_merge_encoded_partition_value_removes_original_file("2021/02/01", "%252F").await;
2285+
}
2286+
2287+
#[tokio::test]
2288+
async fn test_merge_partition_value_with_percent_removes_original_file() {
2289+
assert_merge_encoded_partition_value_removes_original_file("2021%02%01", "%2525").await;
2290+
}
2291+
21562292
// TODO(ion): property keys are not passed through or translated as table features.. fix this as well
21572293
#[tokio::test]
21582294
async fn test_merge_when_delta_table_is_append_only() {

python/tests/test_merge.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1980,6 +1980,73 @@ def test_merge_timestamps_partitioned_2344(tmp_path: pathlib.Path, timezone, pre
19801980
assert last_action["operationParameters"].get("predicate") == predicate
19811981

19821982

1983+
@pytest.mark.pyarrow
1984+
@pytest.mark.parametrize("streaming", (True, False))
1985+
def test_merge_partition_value_with_space_is_idempotent_4352(
1986+
tmp_path: pathlib.Path, streaming: bool
1987+
):
1988+
import pyarrow as pa
1989+
1990+
source = pa.table(
1991+
{
1992+
"group": pa.array(["foo bar", "foo bar", "foo bar"], type=pa.string()),
1993+
"region": pa.array(["A", "B", "C"], type=pa.string()),
1994+
"ts": pa.array(
1995+
[datetime.datetime(2024, 1, 1)] * 3, type=pa.timestamp("us")
1996+
),
1997+
"val": pa.array([1.0, 2.0, 3.0], type=pa.float64()),
1998+
}
1999+
)
2000+
2001+
DeltaTable.create(
2002+
tmp_path, schema=source.schema, partition_by=["group"], mode="overwrite"
2003+
)
2004+
dt = DeltaTable(tmp_path)
2005+
predicate = (
2006+
"target.`group` = source.`group` "
2007+
"AND target.region = source.region "
2008+
"AND target.ts = source.ts"
2009+
)
2010+
2011+
first_metrics = (
2012+
dt.merge(
2013+
source=source,
2014+
predicate=predicate,
2015+
source_alias="source",
2016+
target_alias="target",
2017+
streamed_exec=streaming,
2018+
)
2019+
.when_matched_update_all()
2020+
.when_not_matched_insert_all()
2021+
.execute()
2022+
)
2023+
dt.update_incremental()
2024+
2025+
second_metrics = (
2026+
dt.merge(
2027+
source=source,
2028+
predicate=predicate,
2029+
source_alias="source",
2030+
target_alias="target",
2031+
streamed_exec=streaming,
2032+
)
2033+
.when_matched_update_all()
2034+
.when_not_matched_insert_all()
2035+
.execute()
2036+
)
2037+
dt.update_incremental()
2038+
2039+
result = dt.to_pyarrow_table().sort_by([("region", "ascending")])
2040+
expected = source.sort_by([("region", "ascending")])
2041+
2042+
assert first_metrics["num_target_rows_inserted"] == 3
2043+
assert first_metrics["num_target_files_removed"] == 0
2044+
assert second_metrics["num_target_rows_updated"] == 3
2045+
assert second_metrics["num_target_files_removed"] == 1
2046+
assert len(dt.file_uris()) == 1
2047+
assert result == expected
2048+
2049+
19832050
def test_merge_partitioned_schema_evolution_with_existing_string_partition_4292(
19842051
tmp_path: pathlib.Path,
19852052
):

0 commit comments

Comments
 (0)