Skip to content

Commit bee832d

Browse files
authored
chore: harden scan adapter caching and DV mask edge cases (delta-io#4199)
# Description Follow up to the DF scan adapter integration. Adds a cached schema adapter path in scan execution to avoid rebuilding adapters for repeated source schemas. Updates scan finalization to use the cached path. Fixes DV mask handling edge cases with perserving remainder across multi batch file reads, pads short masks with implicit true, errors when mask length exceeds batch row count. # Related Issue(s) <!--- For example: - closes #106 ---> # Documentation <!--- Share links to useful documentation ---> --------- Signed-off-by: Ethan Urbanski <ethan@urbanskitech.com>
1 parent b211db5 commit bee832d

4 files changed

Lines changed: 300 additions & 87 deletions

File tree

crates/core/src/delta_datafusion/table_provider/next/scan/exec.rs

Lines changed: 66 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,18 @@ pub(crate) struct DvMaskResult {
4343
pub should_remove: bool,
4444
}
4545

46+
/// Consume the per-file deletion-vector keep-mask for the current batch.
47+
///
48+
/// The keep-mask is stored once per file and consumed incrementally as parquet
49+
/// batches are produced:
50+
/// - If the mask is shorter than the batch, missing trailing entries are
51+
/// treated as `true` (keep row).
52+
/// - If the mask is longer than the batch, the remainder is preserved for the
53+
/// next batch from the same file.
54+
///
55+
/// This function intentionally does not error when `selection_vector.len()` is
56+
/// greater than `batch_num_rows`; that is expected when one file spans multiple
57+
/// input batches.
4658
pub(crate) fn consume_dv_mask(
4759
selection_vector: &mut Vec<bool>,
4860
batch_num_rows: usize,
@@ -282,6 +294,7 @@ impl ExecutionPlan for DeltaScanExec {
282294
file_id_column: self.file_id_column.clone(),
283295
return_file_ids: self.retain_file_ids,
284296
pending: VecDeque::new(),
297+
schema_adapter: super::SchemaAdapter::new(Arc::clone(&self.scan_plan.result_schema)),
285298
}))
286299
}
287300

@@ -360,10 +373,12 @@ struct DeltaScanStream {
360373
/// Denotes if file ids should be returned as part of the output
361374
return_file_ids: bool,
362375
pending: VecDeque<RecordBatch>,
376+
/// Cached schema adapter for efficient batch adaptation across batches
377+
schema_adapter: super::SchemaAdapter,
363378
}
364379

365380
impl DeltaScanStream {
366-
fn batch_project(&self, batch: RecordBatch) -> Result<Vec<RecordBatch>> {
381+
fn batch_project(&mut self, batch: RecordBatch) -> Result<Vec<RecordBatch>> {
367382
let _timer = self.baseline_metrics.elapsed_compute().timer();
368383

369384
if batch.num_rows() == 0 {
@@ -377,18 +392,34 @@ impl DeltaScanStream {
377392

378393
file_runs
379394
.into_iter()
380-
.map(|(file_id, slice)| self.batch_project_single_file(slice, file_id, file_id_idx))
395+
.map(|(file_id, slice)| {
396+
Self::batch_project_single_file_inner(
397+
&self.scan_plan,
398+
&self.kernel_type,
399+
&self.transforms,
400+
&self.selection_vectors,
401+
self.return_file_ids,
402+
&mut self.schema_adapter,
403+
slice,
404+
file_id,
405+
file_id_idx,
406+
)
407+
})
381408
.collect::<Result<Vec<_>>>()
382409
}
383410

384-
fn batch_project_single_file(
385-
&self,
411+
fn batch_project_single_file_inner(
412+
scan_plan: &KernelScanPlan,
413+
kernel_type: &KernelDataType,
414+
transforms: &HashMap<String, ExpressionRef>,
415+
selection_vectors: &DashMap<String, Vec<bool>>,
416+
return_file_ids: bool,
417+
schema_adapter: &mut super::SchemaAdapter,
386418
batch: RecordBatch,
387419
file_id: String,
388420
file_id_idx: usize,
389421
) -> Result<RecordBatch> {
390-
let dv_result = if let Some(mut selection_vector) = self.selection_vectors.get_mut(&file_id)
391-
{
422+
let dv_result = if let Some(mut selection_vector) = selection_vectors.get_mut(&file_id) {
392423
consume_dv_mask(&mut selection_vector, batch.num_rows())
393424
} else {
394425
DvMaskResult {
@@ -398,7 +429,7 @@ impl DeltaScanStream {
398429
};
399430

400431
if dv_result.should_remove {
401-
self.selection_vectors.remove(&file_id);
432+
selection_vectors.remove(&file_id);
402433
}
403434

404435
let mut batch = if let Some(selection) = dv_result.selection {
@@ -410,12 +441,12 @@ impl DeltaScanStream {
410441
let file_id_field = batch.schema_ref().field(file_id_idx).clone();
411442
let file_id_col = batch.remove_column(file_id_idx);
412443

413-
let result = if let Some(transform) = self.transforms.get(&file_id) {
444+
let result = if let Some(transform) = transforms.get(&file_id) {
414445
let evaluator = ARROW_HANDLER
415446
.new_expression_evaluator(
416-
self.scan_plan.scan.physical_schema().clone(),
447+
scan_plan.scan.physical_schema().clone(),
417448
transform.clone(),
418-
self.kernel_type.clone(),
449+
kernel_type.clone(),
419450
)
420451
.map_err(|e| DataFusionError::External(Box::new(e)))?;
421452

@@ -426,14 +457,15 @@ impl DeltaScanStream {
426457
batch
427458
};
428459

429-
if self.return_file_ids {
460+
if return_file_ids {
430461
super::finalize_transformed_batch(
431462
result,
432-
&self.scan_plan,
463+
scan_plan,
433464
Some((file_id_col, Arc::new(file_id_field))),
465+
schema_adapter,
434466
)
435467
} else {
436-
super::finalize_transformed_batch(result, &self.scan_plan, None)
468+
super::finalize_transformed_batch(result, scan_plan, None, schema_adapter)
437469
}
438470
}
439471
}
@@ -1162,6 +1194,7 @@ mod tests {
11621194
futures::stream::iter(input_batches.into_iter().map(Ok)),
11631195
));
11641196

1197+
let schema_adapter = super::super::SchemaAdapter::new(Arc::clone(&scan_plan.result_schema));
11651198
DeltaScanStream {
11661199
scan_plan,
11671200
kernel_type,
@@ -1172,6 +1205,7 @@ mod tests {
11721205
file_id_column: FILE_ID_COLUMN_DEFAULT.to_string(),
11731206
return_file_ids,
11741207
pending: VecDeque::new(),
1208+
schema_adapter,
11751209
}
11761210
}
11771211

@@ -1194,7 +1228,7 @@ mod tests {
11941228
assert!(selection_vectors.contains_key("f1"));
11951229
assert!(selection_vectors.contains_key("f2"));
11961230

1197-
let stream = test_scan_stream(
1231+
let mut stream = test_scan_stream(
11981232
Arc::clone(&scan_plan),
11991233
kernel_type,
12001234
selection_vectors,
@@ -1265,7 +1299,7 @@ mod tests {
12651299
false,
12661300
)?;
12671301

1268-
let stream = test_scan_stream(
1302+
let mut stream = test_scan_stream(
12691303
Arc::clone(&scan_plan),
12701304
kernel_type,
12711305
selection_vectors,
@@ -1478,4 +1512,21 @@ mod tests {
14781512
}
14791513
);
14801514
}
1515+
1516+
#[test]
1517+
fn test_dv_long_mask_retains_remainder_for_next_batch() {
1518+
use super::{DvMaskResult, consume_dv_mask};
1519+
1520+
let mut sv = vec![true, false, false, true];
1521+
let result = consume_dv_mask(&mut sv, 2);
1522+
1523+
assert_eq!(
1524+
result,
1525+
DvMaskResult {
1526+
selection: Some(vec![true, false]),
1527+
should_remove: false,
1528+
}
1529+
);
1530+
assert_eq!(sv, vec![false, true]);
1531+
}
14811532
}

crates/core/src/delta_datafusion/table_provider/next/scan/exec_meta.rs

Lines changed: 109 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,17 @@ use datafusion::physical_plan::execution_plan::{
2727
Boundedness, CardinalityEffect, EmissionType, PlanProperties,
2828
};
2929
use datafusion::physical_plan::filter_pushdown::{FilterDescription, FilterPushdownPhase};
30-
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
30+
use datafusion::physical_plan::metrics::{
31+
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
32+
};
3133
use datafusion::physical_plan::{
3234
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr, Statistics,
3335
};
3436
use delta_kernel::schema::{Schema as KernelSchema, SchemaRef as KernelSchemaRef};
3537
use delta_kernel::{EvaluationHandler, ExpressionRef};
3638
use futures::stream::Stream;
3739
use itertools::Itertools as _;
40+
use tracing::debug;
3841

3942
use crate::delta_datafusion::table_provider::next::KernelScanPlan;
4043
use crate::kernel::ARROW_HANDLER;
@@ -191,9 +194,12 @@ impl ExecutionPlan for DeltaScanMetaExec {
191194
scan_plan: Arc::clone(&self.scan_plan),
192195
input: self.input[partition].clone(),
193196
baseline_metrics: BaselineMetrics::new(&self.metrics, partition),
197+
dv_short_mask_padded_files_total: MetricBuilder::new(&self.metrics)
198+
.counter("dv_short_mask_padded_files_total", partition),
194199
transforms: Arc::clone(&self.transforms),
195200
selection_vectors: Arc::clone(&self.selection_vectors),
196201
file_id_field: self.file_id_field.clone(),
202+
schema_adapter: super::SchemaAdapter::new(Arc::clone(&self.scan_plan.result_schema)),
197203
}))
198204
}
199205

@@ -258,17 +264,21 @@ struct DeltaScanMetaStream {
258264
input: VecDeque<(String, usize)>,
259265
/// Execution metrics
260266
baseline_metrics: BaselineMetrics,
267+
/// Count of file batches where short deletion-vector masks required padding.
268+
dv_short_mask_padded_files_total: Count,
261269
/// Transforms to be applied to data read from individual files
262270
transforms: Arc<HashMap<String, ExpressionRef>>,
263271
/// Selection vectors to be applied to data read from individual files
264272
selection_vectors: Arc<DashMap<String, Vec<bool>>>,
265273
/// Column name for the file id
266274
file_id_field: Option<FieldRef>,
275+
/// Cached schema adapter for efficient batch adaptation across batches
276+
schema_adapter: super::SchemaAdapter,
267277
}
268278

269279
impl DeltaScanMetaStream {
270280
/// Apply the per-file transformation to a RecordBatch.
271-
fn batch_project(&self, file_id: String, row_count: usize) -> Result<RecordBatch> {
281+
fn batch_project(&mut self, file_id: String, row_count: usize) -> Result<RecordBatch> {
272282
static EMPTY_SCHEMA: LazyLock<SchemaRef> =
273283
LazyLock::new(|| Arc::new(Schema::new(Fields::empty())));
274284
static EMPTY_KERNEL_SCHEMA: LazyLock<KernelSchemaRef> =
@@ -283,17 +293,19 @@ impl DeltaScanMetaStream {
283293
)?;
284294

285295
let batch = if let Some(selection) = self.selection_vectors.get(&file_id) {
286-
if selection.len() != batch.num_rows() {
287-
return Err(internal_datafusion_err!(
288-
"Selection vector length ({}) does not match row count ({}) for file '{}'. \
289-
This indicates a bug in deletion vector processing.",
290-
selection.len(),
291-
batch.num_rows(),
292-
file_id
293-
));
296+
let mask_len = selection.len();
297+
let (batch, padded_rows) = apply_selection_vector(batch, selection.value(), &file_id)?;
298+
if padded_rows > 0 {
299+
self.dv_short_mask_padded_files_total.add(1);
300+
debug!(
301+
file_id = file_id.as_str(),
302+
mask_len,
303+
row_count,
304+
padded_rows,
305+
"Padded short deletion-vector keep-mask in metadata scan"
306+
);
294307
}
295-
let filter = BooleanArray::from_iter(selection.iter());
296-
filter_record_batch(&batch, &filter)?
308+
batch
297309
} else {
298310
batch
299311
};
@@ -343,13 +355,46 @@ impl DeltaScanMetaStream {
343355
result,
344356
&self.scan_plan,
345357
Some((Arc::new(file_id_array), file_id_field.clone())),
358+
&mut self.schema_adapter,
346359
)
347360
} else {
348-
super::finalize_transformed_batch(result, &self.scan_plan, None)
361+
super::finalize_transformed_batch(
362+
result,
363+
&self.scan_plan,
364+
None,
365+
&mut self.schema_adapter,
366+
)
349367
}
350368
}
351369
}
352370

371+
fn apply_selection_vector(
372+
batch: RecordBatch,
373+
selection: &[bool],
374+
file_id: &str,
375+
) -> Result<(RecordBatch, usize)> {
376+
if selection.len() > batch.num_rows() {
377+
return Err(internal_datafusion_err!(
378+
"Selection vector length ({}) exceeds row count ({}) for file '{}'. \
379+
This indicates a bug in deletion vector processing.",
380+
selection.len(),
381+
batch.num_rows(),
382+
file_id
383+
));
384+
}
385+
386+
let n_rows_to_pad = batch.num_rows() - selection.len();
387+
// Delta Kernel may emit short keep-masks; missing trailing entries are
388+
// implicitly `true` (row is kept).
389+
let filter = BooleanArray::from_iter(
390+
selection
391+
.iter()
392+
.copied()
393+
.chain(std::iter::repeat_n(true, n_rows_to_pad)),
394+
);
395+
Ok((filter_record_batch(&batch, &filter)?, n_rows_to_pad))
396+
}
397+
353398
impl Stream for DeltaScanMetaStream {
354399
type Item = Result<RecordBatch>;
355400

@@ -377,6 +422,9 @@ impl RecordBatchStream for DeltaScanMetaStream {
377422
mod tests {
378423
use std::sync::Arc;
379424

425+
use arrow::array::RecordBatch;
426+
use arrow_array::RecordBatchOptions;
427+
use arrow_schema::{Fields, Schema};
380428
use datafusion::{
381429
physical_plan::collect_partitioned,
382430
prelude::{col, lit},
@@ -389,6 +437,54 @@ mod tests {
389437
test_utils::{TestResult, open_fs_path},
390438
};
391439

440+
#[test]
441+
fn test_apply_selection_vector_short_mask_pads_with_true() {
442+
let batch = RecordBatch::try_new_with_options(
443+
Arc::new(Schema::new(Fields::empty())),
444+
vec![],
445+
&RecordBatchOptions::new().with_row_count(Some(4)),
446+
)
447+
.unwrap();
448+
449+
// Short masks are valid: missing trailing entries are implicitly true.
450+
let (filtered, padded_rows) =
451+
apply_selection_vector(batch, &[false, true], "file:///f.parquet").unwrap();
452+
assert_eq!(filtered.num_rows(), 3);
453+
assert_eq!(padded_rows, 2);
454+
}
455+
456+
#[test]
457+
fn test_apply_selection_vector_no_padding_when_lengths_match() {
458+
let batch = RecordBatch::try_new_with_options(
459+
Arc::new(Schema::new(Fields::empty())),
460+
vec![],
461+
&RecordBatchOptions::new().with_row_count(Some(2)),
462+
)
463+
.unwrap();
464+
465+
let (filtered, padded_rows) =
466+
apply_selection_vector(batch, &[true, false], "file:///f.parquet").unwrap();
467+
assert_eq!(filtered.num_rows(), 1);
468+
assert_eq!(padded_rows, 0);
469+
}
470+
471+
#[test]
472+
fn test_apply_selection_vector_longer_than_batch_errors() {
473+
let batch = RecordBatch::try_new_with_options(
474+
Arc::new(Schema::new(Fields::empty())),
475+
vec![],
476+
&RecordBatchOptions::new().with_row_count(Some(2)),
477+
)
478+
.unwrap();
479+
480+
let err = apply_selection_vector(batch, &[true, true, true], "file:///f.parquet")
481+
.expect_err("selection vector longer than row count must error");
482+
assert!(
483+
err.to_string().contains("Selection vector length"),
484+
"unexpected error: {err}"
485+
);
486+
}
487+
392488
#[tokio::test]
393489
async fn test_meta_only_scan() -> TestResult {
394490
let table = open_fs_path("../../dat/v0.0.3/reader_tests/generated/multi_partitioned/delta");

0 commit comments

Comments
 (0)