Skip to content

Commit 4b6fd8f

Browse files
authored
fix: delete partition fallback batching and add action coalescing (delta-io#4211)
# Description some follow up/hardening changes from the partition only delete work done recently DELETE partition only fallback and add action evaluation could materialize all actions into a single batch, which breaks on large tables Changes: - DELETE fallback uses batched partition metadata instead of single batch materialization - Shared partition metadata MemTable builder across scan and DELETE paths - Snapshot fast path for partition only column projection - add_actions coalescing streams directly into BatchCoalescer instead of pre-collecting - Python docs note get_add_actions() return type migration <!--- For example: - closes #106 ---> # Documentation <!--- Share links to useful documentation ---> --------- Signed-off-by: Ethan Urbanski <ethan@urbanskitech.com>
1 parent d364f62 commit 4b6fd8f

6 files changed

Lines changed: 401 additions & 103 deletions

File tree

crates/core/src/delta_datafusion/find_files.rs

Lines changed: 138 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,17 @@ pub(crate) struct FindFiles {
4444
pub partition_scan: bool,
4545
}
4646

47+
/// Abstraction over sources that can provide partition metadata add-action batches.
48+
pub(crate) trait PartitionAddActionsProvider {
49+
fn add_actions_partition_batches(&self) -> DeltaResult<Vec<RecordBatch>>;
50+
}
51+
52+
impl PartitionAddActionsProvider for EagerSnapshot {
53+
fn add_actions_partition_batches(&self) -> DeltaResult<Vec<RecordBatch>> {
54+
EagerSnapshot::add_actions_partition_batches(self)
55+
}
56+
}
57+
4758
/// Finds files in a snapshot that match the provided predicate.
4859
#[instrument(
4960
skip_all,
@@ -316,55 +327,78 @@ async fn find_files_scan(
316327
Ok(result)
317328
}
318329

319-
async fn scan_memory_table(snapshot: &EagerSnapshot, predicate: &Expr) -> DeltaResult<Vec<Add>> {
320-
let actions = snapshot
321-
.log_data()
322-
.iter()
323-
.map(|f| f.add_action())
324-
.collect_vec();
325-
326-
// Use add_actions_batches to avoid concatenating all file metadata into a
327-
// single RecordBatch which can exceed the 2GB Arrow 32-bit offset limit
328-
// for tables with a very large number of files.
329-
let all_batches = snapshot.add_actions_batches(true)?;
330-
if all_batches.is_empty() {
331-
return Ok(vec![]);
330+
/// Build a partition-metadata MemTable from snapshot add actions.
331+
///
332+
/// Projects only the `path` column and `partition.*` columns for evaluating
333+
/// partition predicates without materializing full add-action metadata.
334+
/// Returns `None` when the snapshot contains no add actions.
335+
pub(crate) fn add_actions_partition_mem_table(
336+
snapshot: &(impl PartitionAddActionsProvider + ?Sized),
337+
) -> DeltaResult<Option<MemTable>> {
338+
let add_action_batches = snapshot.add_actions_partition_batches()?;
339+
if add_action_batches.is_empty() {
340+
return Ok(None);
332341
}
333342

334-
let first_schema = all_batches[0].schema();
335-
let mut fields = Vec::with_capacity(first_schema.fields().len());
336-
fields.push(Field::new(PATH_COLUMN, DataType::Utf8, false));
343+
let first_schema = add_action_batches[0].schema();
344+
let mut projected_columns = Vec::with_capacity(first_schema.fields().len());
345+
projected_columns.push((
346+
"path".to_string(),
347+
Field::new(PATH_COLUMN, DataType::Utf8, false),
348+
));
337349
for field in first_schema.fields() {
338350
if let Some(name) = field.name().strip_prefix("partition.") {
339-
fields.push(field.as_ref().clone().with_name(name));
351+
projected_columns.push((
352+
field.name().to_string(),
353+
field.as_ref().clone().with_name(name),
354+
));
340355
}
341356
}
342-
let schema = Arc::new(Schema::new(fields));
343-
344-
let mut mem_batches = Vec::with_capacity(all_batches.len());
345-
for batch in &all_batches {
346-
let batch_schema = batch.schema();
347-
let mut arrays = Vec::with_capacity(schema.fields().len());
348-
349-
arrays.push(
350-
batch
351-
.column_by_name("path")
352-
.ok_or(DeltaTableError::Generic(
353-
"Column with name `path` does not exist".to_owned(),
354-
))?
355-
.to_owned(),
356-
);
357357

358-
for field in batch_schema.fields() {
359-
if field.name().strip_prefix("partition.").is_some() {
360-
arrays.push(batch.column_by_name(field.name()).unwrap().to_owned());
361-
}
362-
}
358+
let projected_schema = Arc::new(Schema::new(
359+
projected_columns
360+
.iter()
361+
.map(|(_, field)| field.clone())
362+
.collect::<Vec<_>>(),
363+
));
363364

364-
mem_batches.push(RecordBatch::try_new(schema.clone(), arrays)?);
365+
let mut projected_batches = Vec::with_capacity(add_action_batches.len());
366+
for batch in add_action_batches {
367+
let projected_arrays = projected_columns
368+
.iter()
369+
.map(|(source_column_name, _)| {
370+
batch
371+
.column_by_name(source_column_name)
372+
.cloned()
373+
.ok_or_else(|| {
374+
DeltaTableError::Generic(format!(
375+
"Column with name `{source_column_name}` does not exist"
376+
))
377+
})
378+
})
379+
.collect::<DeltaResult<Vec<_>>>()?;
380+
projected_batches.push(RecordBatch::try_new(
381+
projected_schema.clone(),
382+
projected_arrays,
383+
)?);
365384
}
366385

367-
let mem_table = MemTable::try_new(schema, vec![mem_batches])?;
386+
Ok(Some(MemTable::try_new(
387+
projected_schema,
388+
vec![projected_batches],
389+
)?))
390+
}
391+
392+
async fn scan_memory_table(snapshot: &EagerSnapshot, predicate: &Expr) -> DeltaResult<Vec<Add>> {
393+
let actions = snapshot
394+
.log_data()
395+
.iter()
396+
.map(|f| f.add_action())
397+
.collect_vec();
398+
399+
let Some(mem_table) = add_actions_partition_mem_table(snapshot)? else {
400+
return Ok(vec![]);
401+
};
368402

369403
let ctx = SessionContext::new();
370404
let mut df = ctx.read_table(Arc::new(mem_table))?;
@@ -576,7 +610,7 @@ mod tests {
576610
DeltaTable,
577611
delta_datafusion::create_session,
578612
protocol::SaveMode,
579-
test_utils::{TestResult, open_fs_path},
613+
test_utils::{TestResult, multibatch_add_actions_for_partition, open_fs_path},
580614
writer::test_utils::{get_delta_schema, get_record_batch},
581615
};
582616

@@ -661,4 +695,68 @@ mod tests {
661695
assert!(matches.len() < total_actions);
662696
Ok(())
663697
}
698+
699+
#[tokio::test]
700+
async fn test_scan_memory_table_multibatch_stress_partition_filtering() -> TestResult {
701+
use std::collections::HashSet;
702+
703+
let action_count = 9000usize;
704+
let expected_matches = action_count / 2;
705+
let actions = multibatch_add_actions_for_partition(
706+
action_count,
707+
"modified",
708+
"2021-02-02",
709+
"2021-02-03",
710+
);
711+
712+
let table = DeltaTable::new_in_memory()
713+
.create()
714+
.with_columns(get_delta_schema().fields().cloned())
715+
.with_partition_columns(["modified"])
716+
.with_actions(actions)
717+
.await?;
718+
719+
let snapshot = table.snapshot()?.snapshot();
720+
let add_action_batches = snapshot.add_actions_partition_batches()?;
721+
assert!(
722+
add_action_batches.len() > 1,
723+
"expected multi-batch partition metadata fixture"
724+
);
725+
726+
let predicate = col("modified").eq(lit("2021-02-02"));
727+
let matches = scan_memory_table(snapshot, &predicate).await?;
728+
729+
assert_eq!(matches.len(), expected_matches);
730+
731+
let match_paths = matches
732+
.iter()
733+
.map(|add| add.path.clone())
734+
.collect::<HashSet<_>>();
735+
let expected_paths = (0..action_count)
736+
.filter(|idx| idx % 2 == 0)
737+
.map(|idx| format!("modified=2021-02-02/file-{idx:05}.parquet"))
738+
.collect::<HashSet<_>>();
739+
assert_eq!(match_paths, expected_paths);
740+
Ok(())
741+
}
742+
743+
struct MockPartitionProvider {
744+
batches: Vec<RecordBatch>,
745+
}
746+
747+
impl PartitionAddActionsProvider for MockPartitionProvider {
748+
fn add_actions_partition_batches(&self) -> DeltaResult<Vec<RecordBatch>> {
749+
Ok(self.batches.clone())
750+
}
751+
}
752+
753+
#[test]
754+
fn test_add_actions_partition_mem_table_accepts_generic_provider() -> DeltaResult<()> {
755+
let provider = MockPartitionProvider {
756+
batches: Vec::new(),
757+
};
758+
let mem_table = add_actions_partition_mem_table(&provider)?;
759+
assert!(mem_table.is_none());
760+
Ok(())
761+
}
664762
}

crates/core/src/operations/delete.rs

Lines changed: 80 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,8 @@ use crate::delta_datafusion::logical::{
6666
};
6767
use crate::delta_datafusion::physical::{MetricObserverExec, find_metric_node, get_metric};
6868
use crate::delta_datafusion::{
69-
Expression, create_session, resolve_session_state, scan_files_where_matches,
70-
update_datafusion_session,
69+
Expression, add_actions_partition_mem_table, create_session, resolve_session_state,
70+
scan_files_where_matches, update_datafusion_session,
7171
};
7272
use crate::errors::DeltaResult;
7373
use crate::kernel::transaction::{CommitBuilder, CommitProperties, PROTOCOL};
@@ -534,46 +534,18 @@ async fn find_file_paths_by_partition_predicate_datafusion(
534534
snapshot: &EagerSnapshot,
535535
predicate: &Expr,
536536
) -> DeltaResult<std::collections::HashSet<String>> {
537-
use arrow_array::RecordBatch;
538537
use arrow_array::StringArray;
539-
use arrow_schema::DataType;
540-
use arrow_schema::Field;
541-
use arrow_schema::Schema;
542538
use datafusion::logical_expr::LogicalPlanBuilder;
543539
use datafusion::logical_expr::col;
544540

545541
use crate::delta_datafusion::PATH_COLUMN;
546542
use crate::errors::DeltaTableError;
547-
use datafusion::datasource::{MemTable, provider_as_source};
543+
use datafusion::datasource::provider_as_source;
548544
use datafusion::physical_plan::collect;
549545

550-
let batch = snapshot.add_actions_table(true)?;
551-
let schema = batch.schema();
552-
let mut arrays = Vec::with_capacity(schema.fields().len());
553-
let mut fields = Vec::with_capacity(schema.fields().len());
554-
555-
arrays.push(
556-
batch
557-
.column_by_name("path")
558-
.ok_or(DeltaTableError::Generic(
559-
"Column with name `path` does not exist".to_owned(),
560-
))?
561-
.to_owned(),
562-
);
563-
fields.push(Field::new(PATH_COLUMN, DataType::Utf8, false));
564-
565-
for field in schema.fields() {
566-
if let Some(name) = field.name().strip_prefix("partition.") {
567-
arrays.push(batch.column_by_name(field.name()).unwrap().to_owned());
568-
fields.push(field.as_ref().clone().with_name(name));
569-
}
570-
}
571-
572-
let schema = Arc::new(Schema::new(fields));
573-
let mem_table = MemTable::try_new(
574-
schema.clone(),
575-
vec![vec![RecordBatch::try_new(schema, arrays)?]],
576-
)?;
546+
let Some(mem_table) = add_actions_partition_mem_table(snapshot)? else {
547+
return Ok(std::collections::HashSet::new());
548+
};
577549

578550
let plan = LogicalPlanBuilder::scan(
579551
"partition_predicate",
@@ -1347,6 +1319,80 @@ mod tests {
13471319
Ok(())
13481320
}
13491321

1322+
#[tokio::test]
1323+
async fn test_delete_partition_only_fallback_multibatch_ignores_missing_files()
1324+
-> DeltaResult<()> {
1325+
use chrono::Utc;
1326+
1327+
use crate::DeltaTable;
1328+
use crate::kernel::{
1329+
Action, DataType as DeltaDataType, PrimitiveType, StructField, StructType,
1330+
};
1331+
use crate::test_utils::make_test_add;
1332+
1333+
let table_schema = StructType::try_new(vec![
1334+
StructField::new(
1335+
"dt".to_string(),
1336+
DeltaDataType::Primitive(PrimitiveType::String),
1337+
true,
1338+
),
1339+
StructField::new(
1340+
"hour".to_string(),
1341+
DeltaDataType::Primitive(PrimitiveType::Integer),
1342+
true,
1343+
),
1344+
StructField::new(
1345+
"value".to_string(),
1346+
DeltaDataType::Primitive(PrimitiveType::Integer),
1347+
true,
1348+
),
1349+
])?;
1350+
1351+
let action_count = 9000;
1352+
let expected_removed = action_count / 2;
1353+
let now_ms = Utc::now().timestamp_millis();
1354+
1355+
let adds = (0..action_count)
1356+
.map(|idx| {
1357+
let hour = if idx % 2 == 0 { 10 } else { 20 };
1358+
let path = format!("dt=2025-11-12/hour={hour}/file-{idx:05}.parquet");
1359+
let hour_str = if hour == 10 { "10" } else { "20" };
1360+
Action::Add(make_test_add(
1361+
path,
1362+
&[("dt", "2025-11-12"), ("hour", hour_str)],
1363+
now_ms,
1364+
))
1365+
})
1366+
.collect::<Vec<_>>();
1367+
1368+
let table = DeltaTable::new_in_memory()
1369+
.create()
1370+
.with_columns(table_schema.fields().cloned())
1371+
.with_partition_columns(vec!["dt", "hour"])
1372+
.with_actions(adds)
1373+
.await?;
1374+
1375+
// Intentionally do not write parquet files. This validates that the partition-only
1376+
// DataFusion fallback path can resolve file candidates from metadata only.
1377+
let (table, metrics) = table
1378+
.delete()
1379+
.with_predicate("CAST(hour AS STRING) LIKE '1%'")
1380+
.await?;
1381+
1382+
assert_eq!(metrics.num_added_files, 0);
1383+
assert_eq!(metrics.num_removed_files, expected_removed);
1384+
assert_eq!(metrics.num_deleted_rows, 0);
1385+
assert_eq!(metrics.num_copied_rows, 0);
1386+
1387+
let state = table.snapshot()?;
1388+
assert_eq!(
1389+
state.log_data().num_files(),
1390+
action_count - expected_removed
1391+
);
1392+
1393+
Ok(())
1394+
}
1395+
13501396
#[tokio::test]
13511397
async fn test_delete_on_mixed_columns() {
13521398
// Test predicates that contain non-partition and partition column

0 commit comments

Comments
 (0)