Skip to content

Commit d425cdb

Browse files
committed
feat(python): add deletion_vectors API via DataFusion replay
Expose DeltaTable.deletion_vectors() in Python backed by a new core replay API. - Add DeltaScan::deletion_vectors() with deterministic filepath ordering and named DeletionVectorSelection output. - Reuse shared scan metadata stream setup and document replay/drain semantics. - Build Arrow RecordBatchReader output in Python with filepath URI + selection_vector list[bool], chunked batching, non-null list items, and without_files guard. - Tighten concurrency by cloning table+state under one lock and using one SessionContext state for registration + scan. - Strengthen core/Python tests for exact DV values, determinism, eager snapshot path, URI contract, empty results, and error path. - Replace an internal DV invariant expect() with typed error propagation instead of panic. Signed-off-by: Ethan Urbanski <ethan@urbanskitech.com>
1 parent 4ed4108 commit d425cdb

8 files changed

Lines changed: 463 additions & 26 deletions

File tree

crates/core/src/delta_datafusion/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ pub use self::session::{
6565
DeltaParserOptions, DeltaRuntimeEnvBuilder, DeltaSessionConfig, DeltaSessionContext,
6666
create_session,
6767
};
68-
pub use self::table_provider::next::DeltaScan as DeltaScanNext;
68+
pub use self::table_provider::next::{DeletionVectorSelection, DeltaScan as DeltaScanNext};
6969
pub(crate) use self::utils::*;
7070
pub use cdf::scan::DeltaCdfTableProvider;
7171
pub(crate) use data_validation::{

crates/core/src/delta_datafusion/table_provider/next/mod.rs

Lines changed: 128 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,15 @@ use datafusion::{
3939
logical_expr::LogicalPlan,
4040
physical_plan::ExecutionPlan,
4141
};
42-
use delta_kernel::table_configuration::TableConfiguration;
42+
use delta_kernel::{Engine, table_configuration::TableConfiguration};
4343
use serde::{Deserialize, Serialize};
4444

4545
pub use self::scan::DeltaScanExec;
4646
pub(crate) use self::scan::KernelScanPlan;
4747
use crate::delta_datafusion::DeltaScanConfig;
4848
use crate::delta_datafusion::engine::DataFusionEngine;
4949
use crate::delta_datafusion::table_provider::TableProviderBuilder;
50-
use crate::kernel::{EagerSnapshot, Snapshot};
50+
use crate::kernel::{EagerSnapshot, SendableScanMetadataStream, Snapshot};
5151

5252
mod scan;
5353

@@ -111,6 +111,15 @@ pub struct DeltaScan {
111111
file_skipping_predicate: Option<Vec<Expr>>,
112112
}
113113

114+
/// Deletion vector selection for one data file.
115+
#[derive(Debug, Clone, PartialEq)]
116+
pub struct DeletionVectorSelection {
117+
/// Fully-qualified file URI.
118+
pub filepath: String,
119+
/// Row-level keep mask where `true` means keep and `false` means deleted.
120+
pub keep_mask: Vec<bool>,
121+
}
122+
114123
impl DeltaScan {
115124
// create new delta scan
116125
pub fn new(snapshot: impl Into<SnapshotWrapper>, config: DeltaScanConfig) -> Result<Self> {
@@ -140,6 +149,53 @@ impl DeltaScan {
140149
self
141150
}
142151

152+
/// Materialize deletion vector keep masks for files in this scan.
153+
///
154+
/// The result is sorted lexicographically by filepath for deterministic ordering and includes
155+
/// only files that have deletion vectors.
156+
///
157+
/// This API materializes all deletion vectors in memory.
158+
pub async fn deletion_vectors(
159+
&self,
160+
session: &dyn Session,
161+
) -> Result<Vec<DeletionVectorSelection>> {
162+
let engine = DataFusionEngine::new_from_session(session);
163+
164+
let scan_plan = KernelScanPlan::try_new(
165+
self.snapshot.snapshot(),
166+
None,
167+
&[],
168+
&self.config,
169+
self.file_skipping_predicate.clone(),
170+
)?;
171+
172+
let stream = self.scan_metadata_stream(&scan_plan, engine.clone());
173+
174+
scan::replay_deletion_vectors(engine, &scan_plan, &self.config, stream).await
175+
}
176+
177+
fn scan_metadata_stream(
178+
&self,
179+
scan_plan: &KernelScanPlan,
180+
engine: Arc<dyn Engine>,
181+
) -> SendableScanMetadataStream {
182+
match &self.snapshot {
183+
SnapshotWrapper::Snapshot(_) => scan_plan.scan.scan_metadata(engine),
184+
SnapshotWrapper::EagerSnapshot(esn) => {
185+
if let Ok(files) = esn.files() {
186+
scan_plan.scan.scan_metadata_from(
187+
engine,
188+
esn.snapshot().version() as u64,
189+
Box::new(files.to_vec().into_iter()),
190+
None,
191+
)
192+
} else {
193+
scan_plan.scan.scan_metadata(engine)
194+
}
195+
}
196+
}
197+
}
198+
143199
pub fn builder() -> TableProviderBuilder {
144200
TableProviderBuilder::new()
145201
}
@@ -197,21 +253,7 @@ impl TableProvider for DeltaScan {
197253
self.file_skipping_predicate.clone(),
198254
)?;
199255

200-
let stream = match &self.snapshot {
201-
SnapshotWrapper::Snapshot(_) => scan_plan.scan.scan_metadata(engine.clone()),
202-
SnapshotWrapper::EagerSnapshot(esn) => {
203-
if let Ok(files) = esn.files() {
204-
scan_plan.scan.scan_metadata_from(
205-
engine.clone(),
206-
esn.snapshot().version() as u64,
207-
Box::new(files.to_vec().into_iter()),
208-
None,
209-
)
210-
} else {
211-
scan_plan.scan.scan_metadata(engine.clone())
212-
}
213-
}
214-
};
256+
let stream = self.scan_metadata_stream(&scan_plan, engine.clone());
215257

216258
scan::execution_plan(&self.config, session, scan_plan, stream, engine, limit).await
217259
}
@@ -236,11 +278,12 @@ mod tests {
236278
physical_plan::{ExecutionPlanVisitor, collect_partitioned, visit_execution_plan},
237279
};
238280
use datafusion_datasource::source::DataSourceExec;
281+
use url::Url;
239282

240283
use crate::{
241284
assert_batches_sorted_eq,
242-
delta_datafusion::session::create_session,
243-
kernel::Snapshot,
285+
delta_datafusion::{DeltaScanConfig, session::create_session},
286+
kernel::{EagerSnapshot, Snapshot},
244287
test_utils::{TestResult, TestTables},
245288
};
246289

@@ -358,4 +401,70 @@ mod tests {
358401

359402
Ok(())
360403
}
404+
405+
fn expected_dv_small()
406+
-> std::result::Result<Vec<DeletionVectorSelection>, Box<dyn std::error::Error>> {
407+
let filepath = Url::from_file_path(
408+
TestTables::WithDvSmall
409+
.as_path()
410+
.join("part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet"),
411+
)
412+
.map_err(|_| "failed to convert expected file path to URL")?
413+
.to_string();
414+
Ok(vec![DeletionVectorSelection {
415+
filepath,
416+
keep_mask: vec![false, true, true, true, true, true, true, true, true, false],
417+
}])
418+
}
419+
420+
#[tokio::test]
421+
async fn test_deletion_vectors_with_dv_table() -> TestResult {
422+
let log_store = TestTables::WithDvSmall.table_builder()?.build_storage()?;
423+
let snapshot = Snapshot::try_new(&log_store, Default::default(), None).await?;
424+
let provider = DeltaScan::new(snapshot, DeltaScanConfig::default())?;
425+
426+
let session = Arc::new(create_session().into_inner());
427+
let state = session.state_ref().read().clone();
428+
let expected = expected_dv_small()?;
429+
430+
let deletion_vectors = provider.deletion_vectors(&state).await?;
431+
let deletion_vectors_second = provider.deletion_vectors(&state).await?;
432+
433+
assert_eq!(deletion_vectors, expected);
434+
assert_eq!(deletion_vectors_second, expected);
435+
436+
Ok(())
437+
}
438+
439+
#[tokio::test]
440+
async fn test_deletion_vectors_without_dv_table_is_empty() -> TestResult {
441+
let log_store = TestTables::Simple.table_builder()?.build_storage()?;
442+
let snapshot = Snapshot::try_new(&log_store, Default::default(), None).await?;
443+
let provider = DeltaScan::new(snapshot, DeltaScanConfig::default())?;
444+
445+
let session = Arc::new(create_session().into_inner());
446+
let state = session.state_ref().read().clone();
447+
448+
let deletion_vectors = provider.deletion_vectors(&state).await?;
449+
450+
assert!(deletion_vectors.is_empty());
451+
452+
Ok(())
453+
}
454+
455+
#[tokio::test]
456+
async fn test_deletion_vectors_with_eager_snapshot() -> TestResult {
457+
let log_store = TestTables::WithDvSmall.table_builder()?.build_storage()?;
458+
let eager = EagerSnapshot::try_new(&log_store, Default::default(), None).await?;
459+
let provider = DeltaScan::new(eager, DeltaScanConfig::default())?;
460+
461+
let session = Arc::new(create_session().into_inner());
462+
let state = session.state_ref().read().clone();
463+
let expected = expected_dv_small()?;
464+
465+
let deletion_vectors = provider.deletion_vectors(&state).await?;
466+
assert_eq!(deletion_vectors, expected);
467+
468+
Ok(())
469+
}
361470
}

crates/core/src/delta_datafusion/table_provider/next/scan/mod.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ use crate::{
6060
DeltaScanConfig,
6161
engine::{AsObjectStoreUrl as _, to_datafusion_scalar},
6262
file_id::wrap_file_id_value,
63+
table_provider::next::DeletionVectorSelection,
6364
},
6465
};
6566

@@ -126,6 +127,52 @@ pub(super) async fn execution_plan(
126127
.await
127128
}
128129

130+
/// Materialize deletion vector keep masks for every file in the scan that has one.
131+
///
132+
/// Deletion vectors are loaded as a side-effect of consuming [`ScanFileStream`]. We drain the
133+
/// full stream here (discarding file contexts, stats, and partition values) because the DV
134+
/// loading tasks are spawned lazily during stream poll. A dedicated DV-only stream that skips
135+
/// stats parsing is possible but not yet warranted — this path is not latency-sensitive and the
136+
/// file-list is typically small.
137+
///
138+
/// [`ReceiverStreamBuilder::build`] returns a merged stream that includes a JoinSet checker;
139+
/// `.try_collect().await` below will not complete until every spawned DV-loading task has
140+
/// finished, so no results are lost.
141+
pub(super) async fn replay_deletion_vectors(
142+
engine: Arc<dyn Engine>,
143+
scan_plan: &KernelScanPlan,
144+
config: &DeltaScanConfig,
145+
stream: ScanMetadataStream,
146+
) -> Result<Vec<DeletionVectorSelection>> {
147+
let mut stream = ScanFileStream::new(engine, &scan_plan.scan, config.clone(), stream);
148+
while stream.try_next().await?.is_some() {}
149+
150+
let dv_stream = stream.dv_stream.build();
151+
// Only files with `dv_info.has_vector()` spawn tasks, so every item should carry a DV.
152+
// Guard with a typed error (instead of panic) in case that invariant drifts.
153+
let dvs: DashMap<_, _> = dv_stream
154+
.and_then(|(url, dv)| {
155+
ready(match dv {
156+
Some(keep_mask) => Ok((url.to_string(), keep_mask)),
157+
None => Err(DeltaTableError::generic(
158+
"Invariant violation: DV task spawned for file without deletion vector",
159+
)),
160+
})
161+
})
162+
.try_collect()
163+
.await?;
164+
165+
let mut vectors: Vec<_> = dvs
166+
.into_iter()
167+
.map(|(filepath, keep_mask)| DeletionVectorSelection {
168+
filepath,
169+
keep_mask,
170+
})
171+
.collect();
172+
vectors.sort_unstable_by(|left, right| left.filepath.cmp(&right.filepath));
173+
Ok(vectors)
174+
}
175+
129176
async fn replay_files(
130177
engine: Arc<dyn Engine>,
131178
scan_plan: &KernelScanPlan,

crates/core/src/delta_datafusion/table_provider/next/scan/replay.rs

Lines changed: 62 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,10 @@ where
186186
Ok(ctx) => ctx,
187187
Err(err) => return Poll::Ready(Some(Err(err.into()))),
188188
};
189+
ctx = match ctx.error_or() {
190+
Ok(ctx) => ctx,
191+
Err(err) => return Poll::Ready(Some(Err(err.into()))),
192+
};
189193

190194
// Spawn tasks to read the deletion vectors from disk.
191195
for file in &ctx.files {
@@ -416,8 +420,6 @@ pub(crate) struct ScanFileContext {
416420
pub file_url: Url,
417421
/// Size of the file on disk.
418422
pub size: u64,
419-
/// Selection vector to filter the data in the file.
420-
// pub selection_vector: Option<Vec<bool>>,
421423
/// Transformations to apply to the data in the file.
422424
pub transform: Option<ExpressionRef>,
423425
/// Statistics about the data in the file.
@@ -447,8 +449,6 @@ struct ScanFileContextInner {
447449
pub file_url: Url,
448450
/// Size of the file on disk.
449451
pub size: u64,
450-
/// Selection vector to filter the data in the file.
451-
// pub selection_vector: Option<Vec<bool>>,
452452
/// Transformations to apply to the data in the file.
453453
pub transform: Option<ExpressionRef>,
454454

@@ -478,6 +478,22 @@ impl ScanContext {
478478
fn parse_path(&self, path: &str) -> DeltaResult<Url, DataFusionError> {
479479
parse_path(&self.table_root, path)
480480
}
481+
482+
fn error_or(self) -> DeltaResult<Self, DataFusionError> {
483+
let ScanContext {
484+
table_root,
485+
files,
486+
errs,
487+
count,
488+
} = self;
489+
errs.error_or(())?;
490+
Ok(ScanContext {
491+
table_root,
492+
files,
493+
errs: DataFusionErrorBuilder::new(),
494+
count,
495+
})
496+
}
481497
}
482498

483499
fn parse_path(url: &Url, path: &str) -> DeltaResult<Url, DataFusionError> {
@@ -505,3 +521,45 @@ fn visit_scan_file(ctx: &mut ScanContext, scan_file: ScanFile) {
505521
});
506522
ctx.count += 1;
507523
}
524+
525+
#[cfg(test)]
526+
mod tests {
527+
use std::collections::HashMap;
528+
529+
use delta_kernel::scan::state::{DvInfo, ScanFile};
530+
use url::Url;
531+
532+
use super::{ScanContext, visit_scan_file};
533+
534+
fn scan_file(path: impl Into<String>) -> ScanFile {
535+
ScanFile {
536+
path: path.into(),
537+
size: 1,
538+
modification_time: 0,
539+
stats: None,
540+
dv_info: DvInfo::default(),
541+
transform: None,
542+
partition_values: HashMap::new(),
543+
}
544+
}
545+
546+
#[test]
547+
fn test_scan_context_error_or_returns_error_for_invalid_path() {
548+
let mut ctx = ScanContext::new(Url::parse("mailto:delta@example.com").unwrap());
549+
visit_scan_file(&mut ctx, scan_file("part-000.parquet"));
550+
assert!(ctx.error_or().is_err());
551+
}
552+
553+
#[test]
554+
fn test_scan_context_error_or_keeps_valid_path() {
555+
let mut ctx = ScanContext::new(Url::parse("file:///tmp/delta/").unwrap());
556+
visit_scan_file(&mut ctx, scan_file("part-000.parquet"));
557+
558+
let ctx = ctx.error_or().unwrap();
559+
assert_eq!(ctx.files.len(), 1);
560+
assert_eq!(
561+
ctx.files[0].file_url.as_str(),
562+
"file:///tmp/delta/part-000.parquet"
563+
);
564+
}
565+
}

python/deltalake/_internal.pyi

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,7 @@ class RawDeltaTable:
257257
ending_timestamp: str | None = None,
258258
allow_out_of_range: bool = False,
259259
) -> RecordBatchReader: ...
260+
def deletion_vectors(self) -> RecordBatchReader: ...
260261
def transaction_version(self, app_id: str) -> int | None: ...
261262
def set_column_metadata(
262263
self,

python/deltalake/table.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,21 @@ def load_cdf(
413413
allow_out_of_range=allow_out_of_range,
414414
)
415415

416+
def deletion_vectors(self) -> RecordBatchReader:
417+
"""
418+
Return deletion vectors for data files in this table.
419+
420+
Returns:
421+
RecordBatchReader: A reader with two columns:
422+
- filepath (utf8): fully-qualified file URI.
423+
- selection_vector (list[bool]): row keep mask where True means keep and False means deleted.
424+
425+
Notes:
426+
Only files that have deletion vectors are returned.
427+
Deletion vectors are materialized in memory before being exposed as record batches.
428+
"""
429+
return self._table.deletion_vectors()
430+
416431
@property
417432
def table_uri(self) -> str:
418433
return self._table.table_uri()

0 commit comments

Comments
 (0)