Skip to content

Commit 82f27e9

Browse files
adriangbclaude
andauthored
refactor: extract pushdown test utilities to shared module (apache#20010)
## Motivation Working on apache#19538 I found that this refactor was necessary to be able to test more advanced projection expression pushdown functionality. ## Summary Move TestSource, TestOpener, TestScanBuilder, OptimizationTest and related utilities from `filter_pushdown/util.rs` to a new shared `pushdown_utils.rs` module. This allows these utilities to be reused by other pushdown tests like projection_pushdown. Key changes: - Extract test utilities to `datafusion/core/tests/physical_optimizer/pushdown_utils.rs` - Update TestOpener and TestSource to use `ProjectionExprs` instead of `Vec<usize>` for projections - Enable support for complex projection expressions (e.g., `get_field`) - Update imports in filter_pushdown/mod.rs and mod.rs ## Test Plan - Verify tests compile without errors - CI should pass on all test suites - No functional changes to test behavior 🤖 Generated with Claude Code --------- Co-authored-by: Claude Haiku 4.5 <noreply@anthropic.com>
1 parent 52deee5 commit 82f27e9

3 files changed

Lines changed: 37 additions & 11 deletions

File tree

datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs renamed to datafusion/core/tests/physical_optimizer/filter_pushdown.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,15 +64,13 @@ use datafusion_physical_plan::{
6464
sorts::sort::SortExec,
6565
};
6666

67+
use super::pushdown_utils::{
68+
OptimizationTest, TestNode, TestScanBuilder, TestSource, format_plan_for_test,
69+
};
6770
use datafusion_physical_plan::union::UnionExec;
6871
use futures::StreamExt;
6972
use object_store::{ObjectStore, memory::InMemory};
7073
use regex::Regex;
71-
use util::{OptimizationTest, TestNode, TestScanBuilder, format_plan_for_test};
72-
73-
use crate::physical_optimizer::filter_pushdown::util::TestSource;
74-
75-
mod util;
7674

7775
#[test]
7876
fn test_pushdown_into_scan() {

datafusion/core/tests/physical_optimizer/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ mod combine_partial_final_agg;
2424
mod enforce_distribution;
2525
mod enforce_sorting;
2626
mod enforce_sorting_monotonicity;
27-
#[expect(clippy::needless_pass_by_value)]
2827
mod filter_pushdown;
2928
mod join_selection;
3029
#[expect(clippy::needless_pass_by_value)]
@@ -38,3 +37,5 @@ mod sanity_checker;
3837
#[expect(clippy::needless_pass_by_value)]
3938
mod test_utils;
4039
mod window_optimize;
40+
41+
mod pushdown_utils;

datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs renamed to datafusion/core/tests/physical_optimizer/pushdown_utils.rs

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use datafusion_datasource::{
2424
file_scan_config::FileScanConfigBuilder, file_stream::FileOpenFuture,
2525
file_stream::FileOpener, source::DataSourceExec,
2626
};
27+
use datafusion_physical_expr::projection::ProjectionExprs;
2728
use datafusion_physical_expr_common::physical_expr::fmt_sql;
2829
use datafusion_physical_optimizer::PhysicalOptimizerRule;
2930
use datafusion_physical_plan::filter::batch_filter;
@@ -50,7 +51,7 @@ use std::{
5051
pub struct TestOpener {
5152
batches: Vec<RecordBatch>,
5253
batch_size: Option<usize>,
53-
projection: Option<Vec<usize>>,
54+
projection: Option<ProjectionExprs>,
5455
predicate: Option<Arc<dyn PhysicalExpr>>,
5556
}
5657

@@ -60,6 +61,7 @@ impl FileOpener for TestOpener {
6061
if self.batches.is_empty() {
6162
return Ok((async { Ok(TestStream::new(vec![]).boxed()) }).boxed());
6263
}
64+
let schema = self.batches[0].schema();
6365
if let Some(batch_size) = self.batch_size {
6466
let batch = concat_batches(&batches[0].schema(), &batches)?;
6567
let mut new_batches = Vec::new();
@@ -83,9 +85,10 @@ impl FileOpener for TestOpener {
8385
batches = new_batches;
8486

8587
if let Some(projection) = &self.projection {
88+
let projector = projection.make_projector(&schema)?;
8689
batches = batches
8790
.into_iter()
88-
.map(|batch| batch.project(projection).unwrap())
91+
.map(|batch| projector.project_batch(&batch).unwrap())
8992
.collect();
9093
}
9194

@@ -103,14 +106,13 @@ pub struct TestSource {
103106
batch_size: Option<usize>,
104107
batches: Vec<RecordBatch>,
105108
metrics: ExecutionPlanMetricsSet,
106-
projection: Option<Vec<usize>>,
109+
projection: Option<ProjectionExprs>,
107110
table_schema: datafusion_datasource::TableSchema,
108111
}
109112

110113
impl TestSource {
111114
pub fn new(schema: SchemaRef, support: bool, batches: Vec<RecordBatch>) -> Self {
112-
let table_schema =
113-
datafusion_datasource::TableSchema::new(Arc::clone(&schema), vec![]);
115+
let table_schema = datafusion_datasource::TableSchema::new(schema, vec![]);
114116
Self {
115117
support,
116118
metrics: ExecutionPlanMetricsSet::new(),
@@ -210,6 +212,30 @@ impl FileSource for TestSource {
210212
}
211213
}
212214

215+
fn try_pushdown_projection(
216+
&self,
217+
projection: &ProjectionExprs,
218+
) -> Result<Option<Arc<dyn FileSource>>> {
219+
if let Some(existing_projection) = &self.projection {
220+
// Combine existing projection with new projection
221+
let combined_projection = existing_projection.try_merge(projection)?;
222+
Ok(Some(Arc::new(TestSource {
223+
projection: Some(combined_projection),
224+
table_schema: self.table_schema.clone(),
225+
..self.clone()
226+
})))
227+
} else {
228+
Ok(Some(Arc::new(TestSource {
229+
projection: Some(projection.clone()),
230+
..self.clone()
231+
})))
232+
}
233+
}
234+
235+
fn projection(&self) -> Option<&ProjectionExprs> {
236+
self.projection.as_ref()
237+
}
238+
213239
fn table_schema(&self) -> &datafusion_datasource::TableSchema {
214240
&self.table_schema
215241
}
@@ -332,6 +358,7 @@ pub struct OptimizationTest {
332358
}
333359

334360
impl OptimizationTest {
361+
#[expect(clippy::needless_pass_by_value)]
335362
pub fn new<O>(
336363
input_plan: Arc<dyn ExecutionPlan>,
337364
opt: O,

0 commit comments

Comments
 (0)