Skip to content

Commit b1cb138

Browse files
committed
feat: add hidden datafusion bench bridge
Signed-off-by: Ethan Urbanski <ethan@urbanskitech.com>
1 parent a04c9a4 commit b1cb138

4 files changed

Lines changed: 218 additions & 1 deletion

File tree

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
#![doc(hidden)]
2+
3+
//! Diagnostic only, unstable bridge for file selection internals.
4+
//! This is not a supported public API.
5+
6+
use datafusion::catalog::Session;
7+
use datafusion::common::{HashSet, Result as DataFusionResult};
8+
use datafusion::datasource::MemTable;
9+
use datafusion::logical_expr::{Expr, LogicalPlan};
10+
11+
use crate::errors::DeltaResult;
12+
use crate::kernel::{Add, EagerSnapshot};
13+
use crate::logstore::LogStoreRef;
14+
15+
use super::DeltaSessionExt as _;
16+
17+
/// Wrapper result for the internal file-selection entrypoint.
18+
pub struct FindFilesResult {
19+
pub candidates: Vec<Add>,
20+
pub partition_scan: bool,
21+
}
22+
23+
impl From<super::FindFiles> for FindFilesResult {
24+
fn from(result: super::FindFiles) -> Self {
25+
Self {
26+
candidates: result.candidates,
27+
partition_scan: result.partition_scan,
28+
}
29+
}
30+
}
31+
32+
/// Minimal wrapper around the internal matched-files scan plan.
33+
pub struct MatchedFilesScan(super::MatchedFilesScan);
34+
35+
impl MatchedFilesScan {
36+
pub fn scan(&self) -> &LogicalPlan {
37+
self.0.scan()
38+
}
39+
40+
pub fn files_set(&self) -> HashSet<String> {
41+
self.0.files_set()
42+
}
43+
44+
pub fn predicate(&self) -> &Expr {
45+
&self.0.predicate
46+
}
47+
}
48+
49+
fn prepare_session(session: &dyn Session, log_store: &LogStoreRef) -> DeltaResult<()> {
50+
super::update_datafusion_session(session, log_store.as_ref(), None)?;
51+
session.ensure_log_store_registered(log_store.as_ref())?;
52+
Ok(())
53+
}
54+
55+
pub async fn find_files(
56+
snapshot: &EagerSnapshot,
57+
log_store: LogStoreRef,
58+
session: &dyn Session,
59+
predicate: Option<Expr>,
60+
) -> DeltaResult<FindFilesResult> {
61+
prepare_session(session, &log_store)?;
62+
super::find_files(snapshot, log_store, session, predicate)
63+
.await
64+
.map(Into::into)
65+
}
66+
67+
pub async fn find_files_scan(
68+
snapshot: &EagerSnapshot,
69+
log_store: LogStoreRef,
70+
session: &dyn Session,
71+
predicate: Expr,
72+
) -> DeltaResult<Vec<Add>> {
73+
prepare_session(session, &log_store)?;
74+
super::find_files_scan(snapshot, log_store, session, predicate).await
75+
}
76+
77+
pub async fn scan_files_where_matches(
78+
session: &dyn Session,
79+
snapshot: &EagerSnapshot,
80+
log_store: LogStoreRef,
81+
predicate: Expr,
82+
) -> DataFusionResult<Option<MatchedFilesScan>> {
83+
prepare_session(session, &log_store)?;
84+
super::scan_files_where_matches(session, snapshot, predicate)
85+
.await
86+
.map(|scan| scan.map(MatchedFilesScan))
87+
}
88+
89+
pub fn add_actions_partition_mem_table(snapshot: &EagerSnapshot) -> DeltaResult<Option<MemTable>> {
90+
super::add_actions_partition_mem_table(snapshot)
91+
}

crates/core/src/delta_datafusion/find_files.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ fn join_batches_with_add_actions(
262262
matching_files = field::Empty
263263
)
264264
)]
265-
async fn find_files_scan(
265+
pub(in crate::delta_datafusion) async fn find_files_scan(
266266
snapshot: &EagerSnapshot,
267267
log_store: LogStoreRef,
268268
session: &dyn Session,

crates/core/src/delta_datafusion/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ pub(crate) use table_provider::{
8383

8484
pub(crate) const PATH_COLUMN: &str = "__delta_rs_path";
8585

86+
#[doc(hidden)]
87+
pub mod bench_support;
8688
pub mod cdf;
8789
mod data_validation;
8890
pub mod engine;
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
#![cfg(feature = "datafusion")]
2+
3+
use std::sync::Arc;
4+
5+
use arrow_array::{Int32Array, RecordBatch, StringArray};
6+
use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema};
7+
use datafusion::datasource::TableProvider;
8+
use datafusion::physical_plan::collect;
9+
use datafusion::prelude::{SessionContext, col, lit};
10+
use deltalake_core::DeltaTable;
11+
use deltalake_core::delta_datafusion::bench_support;
12+
use deltalake_core::kernel::{DataType, PrimitiveType, StructField};
13+
use deltalake_core::protocol::SaveMode;
14+
use deltalake_test::TestResult;
15+
16+
fn batch(ids: Vec<i32>, parts: Vec<&str>) -> TestResult<RecordBatch> {
17+
Ok(RecordBatch::try_new(
18+
Arc::new(ArrowSchema::new(vec![
19+
ArrowField::new("id", ArrowDataType::Int32, false),
20+
ArrowField::new("part", ArrowDataType::Utf8, false),
21+
])),
22+
vec![
23+
Arc::new(Int32Array::from(ids)),
24+
Arc::new(StringArray::from(parts)),
25+
],
26+
)?)
27+
}
28+
29+
#[tokio::test]
30+
async fn test_out_of_crate_bridge_exposes_file_selection_paths() -> TestResult {
31+
let table = DeltaTable::new_in_memory()
32+
.create()
33+
.with_columns(vec![
34+
StructField::new(
35+
"id".to_string(),
36+
DataType::Primitive(PrimitiveType::Integer),
37+
false,
38+
),
39+
StructField::new(
40+
"part".to_string(),
41+
DataType::Primitive(PrimitiveType::String),
42+
false,
43+
),
44+
])
45+
.with_partition_columns(["part"])
46+
.await?;
47+
let table = table
48+
.write(vec![batch(vec![1, 2], vec!["a", "a"])?])
49+
.with_save_mode(SaveMode::Append)
50+
.await?;
51+
let table = table
52+
.write(vec![batch(vec![100, 101], vec!["b", "b"])?])
53+
.with_save_mode(SaveMode::Append)
54+
.await?;
55+
56+
let snapshot = table.snapshot()?.snapshot().clone();
57+
let log_store = table.log_store();
58+
let session = SessionContext::new().state();
59+
60+
let partition_result = bench_support::find_files(
61+
&snapshot,
62+
log_store.clone(),
63+
&session,
64+
Some(col("part").eq(lit("a"))),
65+
)
66+
.await?;
67+
assert!(partition_result.partition_scan);
68+
assert_eq!(partition_result.candidates.len(), 1);
69+
assert!(
70+
partition_result.candidates[0].path.contains("part=a/"),
71+
"expected partition-only path to match partition file, got {}",
72+
partition_result.candidates[0].path
73+
);
74+
75+
let mem_table = bench_support::add_actions_partition_mem_table(&snapshot)?
76+
.expect("partition mem table should exist");
77+
let field_names = mem_table
78+
.schema()
79+
.fields()
80+
.iter()
81+
.map(|field| field.name().clone())
82+
.collect::<Vec<_>>();
83+
assert_eq!(field_names, vec!["__delta_rs_path", "part"]);
84+
85+
let data_predicate = col("id").gt(lit(50i32));
86+
let data_result = bench_support::find_files(
87+
&snapshot,
88+
log_store.clone(),
89+
&session,
90+
Some(data_predicate.clone()),
91+
)
92+
.await?;
93+
assert!(!data_result.partition_scan);
94+
assert_eq!(data_result.candidates.len(), 1);
95+
assert!(
96+
data_result.candidates[0].path.contains("part=b/"),
97+
"expected data predicate to match partition b file, got {}",
98+
data_result.candidates[0].path
99+
);
100+
101+
let direct_scan = bench_support::find_files_scan(
102+
&snapshot,
103+
log_store.clone(),
104+
&session,
105+
data_predicate.clone(),
106+
)
107+
.await?;
108+
assert_eq!(direct_scan.len(), 1);
109+
assert_eq!(direct_scan[0].path, data_result.candidates[0].path);
110+
111+
let matched_scan =
112+
bench_support::scan_files_where_matches(&session, &snapshot, log_store, data_predicate)
113+
.await?
114+
.expect("matched file scan should exist");
115+
assert_eq!(matched_scan.files_set().len(), 1);
116+
assert_eq!(matched_scan.predicate(), &col("id").gt(lit(50i32)));
117+
118+
let plan = session.create_physical_plan(matched_scan.scan()).await?;
119+
let batches = collect(plan, session.task_ctx()).await?;
120+
let row_count = batches.iter().map(|batch| batch.num_rows()).sum::<usize>();
121+
assert_eq!(row_count, 2);
122+
123+
Ok(())
124+
}

0 commit comments

Comments
 (0)