Skip to content

Commit ecc8beb

Browse files
authored
feat: expose DV metadata and payloads as Arrow streams (delta-io#4168)
# Description Adds `DeltaTable.deletion_vectors() -> RecordBatchReader` returning one row per data file with a deletion vector. Schema: `filepath: utf8`, `selection_vector: list[bool]` (true = keep, false = deleted). Reuses the existing DataFusion replay path via `replay_deletion_vectors(...)`. Results are deterministic and sorted by filepath. **Core changes:** - `DeletionVectorSelection` struct, `DeltaScan::deletion_vectors()`, shared `scan_metadata_stream()` helper to avoid drift between scan paths - Replaced internal DV `expect(...)` with typed error propagation **Python binding:** - `cloned_table_and_state()` to avoid TOCTOU on table + snapshot - Chunked Arrow batch output with non-nullable list items - Preserves `without_files` guard behavior # Related Issue(s) - Closes delta-io#4159 <!--- For example: - closes #106 ---> # Documentation <!--- Share links to useful documentation ---> cc @ion-elgreco --------- Signed-off-by: Ethan Urbanski <ethan@urbanskitech.com>
1 parent 25b4968 commit ecc8beb

8 files changed

Lines changed: 478 additions & 34 deletions

File tree

crates/core/src/delta_datafusion/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ pub use self::session::{
6565
DeltaParserOptions, DeltaRuntimeEnvBuilder, DeltaSessionConfig, DeltaSessionContext,
6666
create_session,
6767
};
68-
pub use self::table_provider::next::DeltaScan as DeltaScanNext;
68+
pub use self::table_provider::next::{DeletionVectorSelection, DeltaScan as DeltaScanNext};
6969
pub(crate) use self::utils::*;
7070
pub use cdf::scan::DeltaCdfTableProvider;
7171
pub(crate) use data_validation::{

crates/core/src/delta_datafusion/table_provider/next/mod.rs

Lines changed: 126 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use datafusion::{
3939
logical_expr::LogicalPlan,
4040
physical_plan::ExecutionPlan,
4141
};
42-
use delta_kernel::table_configuration::TableConfiguration;
42+
use delta_kernel::{Engine, table_configuration::TableConfiguration};
4343
use serde::{Deserialize, Serialize};
4444
use url::Url;
4545

@@ -49,7 +49,7 @@ use crate::DeltaTableError;
4949
use crate::delta_datafusion::DeltaScanConfig;
5050
use crate::delta_datafusion::engine::DataFusionEngine;
5151
use crate::delta_datafusion::table_provider::TableProviderBuilder;
52-
use crate::kernel::{EagerSnapshot, Snapshot};
52+
use crate::kernel::{EagerSnapshot, SendableScanMetadataStream, Snapshot};
5353

5454
mod scan;
5555

@@ -268,6 +268,15 @@ pub struct DeltaScan {
268268
file_selection: Option<FileSelection>,
269269
}
270270

271+
/// Deletion vector selection for one data file.
272+
#[derive(Debug, Clone, PartialEq)]
273+
pub struct DeletionVectorSelection {
274+
/// Fully-qualified file URI.
275+
pub filepath: String,
276+
/// Row-level keep mask where `true` means keep and `false` means deleted.
277+
pub keep_mask: Vec<bool>,
278+
}
279+
271280
impl DeltaScan {
272281
// create new delta scan
273282
pub fn new(snapshot: impl Into<SnapshotWrapper>, config: DeltaScanConfig) -> Result<Self> {
@@ -302,6 +311,52 @@ impl DeltaScan {
302311
self.file_selection = Some(selection);
303312
self
304313
}
314+
/// Materialize deletion vector keep masks for files in this scan.
315+
///
316+
/// The result is sorted lexicographically by filepath for deterministic ordering and includes
317+
/// only files that have deletion vectors.
318+
///
319+
/// This API materializes all deletion vectors in memory.
320+
pub async fn deletion_vectors(
321+
&self,
322+
session: &dyn Session,
323+
) -> Result<Vec<DeletionVectorSelection>> {
324+
let engine = DataFusionEngine::new_from_session(session);
325+
326+
let scan_plan = KernelScanPlan::try_new(
327+
self.snapshot.snapshot(),
328+
None,
329+
&[],
330+
&self.config,
331+
self.file_skipping_predicate.clone(),
332+
)?;
333+
334+
let stream = self.scan_metadata_stream(&scan_plan, engine.clone());
335+
336+
scan::replay_deletion_vectors(engine, &scan_plan, &self.config, stream).await
337+
}
338+
339+
fn scan_metadata_stream(
340+
&self,
341+
scan_plan: &KernelScanPlan,
342+
engine: Arc<dyn Engine>,
343+
) -> SendableScanMetadataStream {
344+
match &self.snapshot {
345+
SnapshotWrapper::Snapshot(_) => scan_plan.scan.scan_metadata(engine),
346+
SnapshotWrapper::EagerSnapshot(esn) => {
347+
if let Ok(files) = esn.files() {
348+
scan_plan.scan.scan_metadata_from(
349+
engine,
350+
esn.snapshot().version() as u64,
351+
Box::new(files.to_vec().into_iter()),
352+
None,
353+
)
354+
} else {
355+
scan_plan.scan.scan_metadata(engine)
356+
}
357+
}
358+
}
359+
}
305360

306361
pub fn builder() -> TableProviderBuilder {
307362
TableProviderBuilder::new()
@@ -360,21 +415,7 @@ impl TableProvider for DeltaScan {
360415
self.file_skipping_predicate.clone(),
361416
)?;
362417

363-
let stream = match &self.snapshot {
364-
SnapshotWrapper::Snapshot(_) => scan_plan.scan.scan_metadata(engine.clone()),
365-
SnapshotWrapper::EagerSnapshot(esn) => {
366-
if let Ok(files) = esn.files() {
367-
scan_plan.scan.scan_metadata_from(
368-
engine.clone(),
369-
esn.snapshot().version() as u64,
370-
Box::new(files.to_vec().into_iter()),
371-
None,
372-
)
373-
} else {
374-
scan_plan.scan.scan_metadata(engine.clone())
375-
}
376-
}
377-
};
418+
let stream = self.scan_metadata_stream(&scan_plan, engine.clone());
378419

379420
scan::execution_plan(
380421
&self.config,
@@ -416,8 +457,8 @@ mod tests {
416457

417458
use crate::{
418459
assert_batches_sorted_eq,
419-
delta_datafusion::session::create_session,
420-
kernel::Snapshot,
460+
delta_datafusion::{DeltaScanConfig, session::create_session},
461+
kernel::{EagerSnapshot, Snapshot},
421462
test_utils::{TestResult, TestTables},
422463
};
423464

@@ -860,4 +901,70 @@ mod tests {
860901

861902
Ok(())
862903
}
904+
905+
fn expected_dv_small()
906+
-> std::result::Result<Vec<DeletionVectorSelection>, Box<dyn std::error::Error>> {
907+
let filepath = Url::from_file_path(
908+
TestTables::WithDvSmall
909+
.as_path()
910+
.join("part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet"),
911+
)
912+
.map_err(|_| "failed to convert expected file path to URL")?
913+
.to_string();
914+
Ok(vec![DeletionVectorSelection {
915+
filepath,
916+
keep_mask: vec![false, true, true, true, true, true, true, true, true, false],
917+
}])
918+
}
919+
920+
#[tokio::test]
921+
async fn test_deletion_vectors_with_dv_table() -> TestResult {
922+
let log_store = TestTables::WithDvSmall.table_builder()?.build_storage()?;
923+
let snapshot = Snapshot::try_new(&log_store, Default::default(), None).await?;
924+
let provider = DeltaScan::new(snapshot, DeltaScanConfig::default())?;
925+
926+
let session = Arc::new(create_session().into_inner());
927+
let state = session.state_ref().read().clone();
928+
let expected = expected_dv_small()?;
929+
930+
let deletion_vectors = provider.deletion_vectors(&state).await?;
931+
let deletion_vectors_second = provider.deletion_vectors(&state).await?;
932+
933+
assert_eq!(deletion_vectors, expected);
934+
assert_eq!(deletion_vectors_second, expected);
935+
936+
Ok(())
937+
}
938+
939+
#[tokio::test]
940+
async fn test_deletion_vectors_without_dv_table_is_empty() -> TestResult {
941+
let log_store = TestTables::Simple.table_builder()?.build_storage()?;
942+
let snapshot = Snapshot::try_new(&log_store, Default::default(), None).await?;
943+
let provider = DeltaScan::new(snapshot, DeltaScanConfig::default())?;
944+
945+
let session = Arc::new(create_session().into_inner());
946+
let state = session.state_ref().read().clone();
947+
948+
let deletion_vectors = provider.deletion_vectors(&state).await?;
949+
950+
assert!(deletion_vectors.is_empty());
951+
952+
Ok(())
953+
}
954+
955+
#[tokio::test]
956+
async fn test_deletion_vectors_with_eager_snapshot() -> TestResult {
957+
let log_store = TestTables::WithDvSmall.table_builder()?.build_storage()?;
958+
let eager = EagerSnapshot::try_new(&log_store, Default::default(), None).await?;
959+
let provider = DeltaScan::new(eager, DeltaScanConfig::default())?;
960+
961+
let session = Arc::new(create_session().into_inner());
962+
let state = session.state_ref().read().clone();
963+
let expected = expected_dv_small()?;
964+
965+
let deletion_vectors = provider.deletion_vectors(&state).await?;
966+
assert_eq!(deletion_vectors, expected);
967+
968+
Ok(())
969+
}
863970
}

crates/core/src/delta_datafusion/table_provider/next/scan/mod.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ use crate::{
6161
DeltaScanConfig,
6262
engine::{AsObjectStoreUrl as _, to_datafusion_scalar},
6363
file_id::wrap_file_id_value,
64+
table_provider::next::DeletionVectorSelection,
6465
},
6566
};
6667

@@ -131,6 +132,52 @@ pub(super) async fn execution_plan(
131132
.await
132133
}
133134

135+
/// Materialize deletion vector keep masks for every file in the scan that has one.
136+
///
137+
/// Deletion vectors are loaded as a side-effect of consuming [`ScanFileStream`]. We drain the
138+
/// full stream here (discarding file contexts, stats, and partition values) because the DV
139+
/// loading tasks are spawned lazily during stream poll. A dedicated DV-only stream that skips
140+
/// stats parsing is possible but not yet warranted — this path is not latency-sensitive and the
141+
/// file-list is typically small.
142+
///
143+
/// [`ReceiverStreamBuilder::build`] returns a merged stream that includes a JoinSet checker;
144+
/// `.try_collect().await` below will not complete until every spawned DV-loading task has
145+
/// finished, so no results are lost.
146+
pub(super) async fn replay_deletion_vectors(
147+
engine: Arc<dyn Engine>,
148+
scan_plan: &KernelScanPlan,
149+
config: &DeltaScanConfig,
150+
stream: ScanMetadataStream,
151+
) -> Result<Vec<DeletionVectorSelection>> {
152+
let mut stream = ScanFileStream::new(engine, &scan_plan.scan, config.clone(), None, stream);
153+
while stream.try_next().await?.is_some() {}
154+
155+
let dv_stream = stream.dv_stream.build();
156+
// Only files with `dv_info.has_vector()` spawn tasks, so every item should carry a DV.
157+
// Guard with a typed error (instead of panic) in case that invariant drifts.
158+
let dvs: DashMap<_, _> = dv_stream
159+
.and_then(|(url, dv)| {
160+
ready(match dv {
161+
Some(keep_mask) => Ok((url.to_string(), keep_mask)),
162+
None => Err(DeltaTableError::generic(
163+
"Invariant violation: DV task spawned for file without deletion vector",
164+
)),
165+
})
166+
})
167+
.try_collect()
168+
.await?;
169+
170+
let mut vectors: Vec<_> = dvs
171+
.into_iter()
172+
.map(|(filepath, keep_mask)| DeletionVectorSelection {
173+
filepath,
174+
keep_mask,
175+
})
176+
.collect();
177+
vectors.sort_unstable_by(|left, right| left.filepath.cmp(&right.filepath));
178+
Ok(vectors)
179+
}
180+
134181
async fn replay_files(
135182
engine: Arc<dyn Engine>,
136183
scan_plan: &KernelScanPlan,

crates/core/src/delta_datafusion/table_provider/next/scan/replay.rs

Lines changed: 67 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use pin_project_lite::pin_project;
3737
use url::Url;
3838

3939
use crate::{
40-
DeltaResult,
40+
DeltaResult, DeltaTableError,
4141
delta_datafusion::{DeltaScanConfig, engine::to_datafusion_scalar},
4242
kernel::{
4343
LogicalFileView, ReceiverStreamBuilder, Scan, StructDataExt,
@@ -195,8 +195,11 @@ where
195195
scan_data
196196
};
197197

198-
let mut ctx = ScanContext::new(this.table_root.clone());
199-
ctx = match scan_data.visit_scan_files(ctx, visit_scan_file) {
198+
let ctx = match scan_data
199+
.visit_scan_files(ScanContext::new(this.table_root.clone()), visit_scan_file)
200+
.map_err(|err| DataFusionError::from(DeltaTableError::from(err)))
201+
.and_then(ScanContext::error_or)
202+
{
200203
Ok(ctx) => ctx,
201204
Err(err) => return Poll::Ready(Some(Err(err.into()))),
202205
};
@@ -430,8 +433,6 @@ pub(crate) struct ScanFileContext {
430433
pub file_url: Url,
431434
/// Size of the file on disk.
432435
pub size: u64,
433-
/// Selection vector to filter the data in the file.
434-
// pub selection_vector: Option<Vec<bool>>,
435436
/// Transformations to apply to the data in the file.
436437
pub transform: Option<ExpressionRef>,
437438
/// Statistics about the data in the file.
@@ -461,8 +462,6 @@ struct ScanFileContextInner {
461462
pub file_url: Url,
462463
/// Size of the file on disk.
463464
pub size: u64,
464-
/// Selection vector to filter the data in the file.
465-
// pub selection_vector: Option<Vec<bool>>,
466465
/// Transformations to apply to the data in the file.
467466
pub transform: Option<ExpressionRef>,
468467

@@ -492,6 +491,22 @@ impl ScanContext {
492491
fn parse_path(&self, path: &str) -> DeltaResult<Url, DataFusionError> {
493492
parse_path(&self.table_root, path)
494493
}
494+
495+
fn error_or(self) -> DeltaResult<Self, DataFusionError> {
496+
let ScanContext {
497+
table_root,
498+
files,
499+
errs,
500+
count,
501+
} = self;
502+
errs.error_or(())?;
503+
Ok(ScanContext {
504+
table_root,
505+
files,
506+
errs: DataFusionErrorBuilder::new(),
507+
count,
508+
})
509+
}
495510
}
496511

497512
fn parse_path(url: &Url, path: &str) -> DeltaResult<Url, DataFusionError> {
@@ -519,13 +534,11 @@ fn apply_file_selection(
519534
continue;
520535
}
521536

522-
let keep = parse_path(
537+
let file_url = parse_path(
523538
table_root,
524539
LogicalFileView::new(batch.clone(), idx).path_raw(),
525-
)
526-
.map(|url| file_selection.contains(url.as_str()))
527-
.unwrap_or(false);
528-
selection_vector[idx] = keep;
540+
)?;
541+
selection_vector[idx] = file_selection.contains(file_url.as_str());
529542
}
530543

531544
scan_data.scan_files =
@@ -550,3 +563,45 @@ fn visit_scan_file(ctx: &mut ScanContext, scan_file: ScanFile) {
550563
});
551564
ctx.count += 1;
552565
}
566+
567+
#[cfg(test)]
568+
mod tests {
569+
use std::collections::HashMap;
570+
571+
use delta_kernel::scan::state::{DvInfo, ScanFile};
572+
use url::Url;
573+
574+
use super::{ScanContext, visit_scan_file};
575+
576+
fn scan_file(path: impl Into<String>) -> ScanFile {
577+
ScanFile {
578+
path: path.into(),
579+
size: 1,
580+
modification_time: 0,
581+
stats: None,
582+
dv_info: DvInfo::default(),
583+
transform: None,
584+
partition_values: HashMap::new(),
585+
}
586+
}
587+
588+
#[test]
589+
fn test_scan_context_error_or_returns_error_for_invalid_path() {
590+
let mut ctx = ScanContext::new(Url::parse("mailto:delta@example.com").unwrap());
591+
visit_scan_file(&mut ctx, scan_file("part-000.parquet"));
592+
assert!(ctx.error_or().is_err());
593+
}
594+
595+
#[test]
596+
fn test_scan_context_error_or_keeps_valid_path() {
597+
let mut ctx = ScanContext::new(Url::parse("file:///tmp/delta/").unwrap());
598+
visit_scan_file(&mut ctx, scan_file("part-000.parquet"));
599+
600+
let ctx = ctx.error_or().unwrap();
601+
assert_eq!(ctx.files.len(), 1);
602+
assert_eq!(
603+
ctx.files[0].file_url.as_str(),
604+
"file:///tmp/delta/part-000.parquet"
605+
);
606+
}
607+
}

python/deltalake/_internal.pyi

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,7 @@ class RawDeltaTable:
257257
ending_timestamp: str | None = None,
258258
allow_out_of_range: bool = False,
259259
) -> RecordBatchReader: ...
260+
def deletion_vectors(self) -> RecordBatchReader: ...
260261
def transaction_version(self, app_id: str) -> int | None: ...
261262
def set_column_metadata(
262263
self,

0 commit comments

Comments
 (0)