Skip to content

Commit 20cecfa

Browse files
committed
Support row group limit pruning
1 parent c6f7363 commit 20cecfa

9 files changed

Lines changed: 896 additions & 24 deletions

File tree

datafusion/core/tests/parquet/mod.rs

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,11 @@ impl TestOutput {
182182
.map(|(_pruned, matched)| matched)
183183
}
184184

185+
/// The number of row_groups fully matched by statistics
186+
fn row_groups_fully_matched_statistics(&self) -> Option<usize> {
187+
self.metric_value("row_groups_fully_matched_statistics")
188+
}
189+
185190
/// The number of row_groups pruned by statistics
186191
fn row_groups_pruned_statistics(&self) -> Option<usize> {
187192
self.pruning_metric("row_groups_pruned_statistics")
@@ -219,6 +224,11 @@ impl TestOutput {
219224
.map(|(pruned, _matched)| pruned)
220225
}
221226

227+
/// The number of row groups pruned by limit pruning
228+
fn limit_pruned_row_groups(&self) -> Option<usize> {
229+
self.metric_value("limit_pruned_row_groups")
230+
}
231+
222232
fn description(&self) -> String {
223233
format!(
224234
"Input:\n{}\nQuery:\n{}\nOutput:\n{}\nMetrics:\n{}",
@@ -232,20 +242,41 @@ impl TestOutput {
232242
/// and the appropriate scenario
233243
impl ContextWithParquet {
234244
async fn new(scenario: Scenario, unit: Unit) -> Self {
235-
Self::with_config(scenario, unit, SessionConfig::new()).await
245+
Self::with_config(scenario, unit, SessionConfig::new(), None, None).await
246+
}
247+
248+
/// Set custom schema and batches for the test
249+
pub async fn with_custom_data(
250+
scenario: Scenario,
251+
unit: Unit,
252+
schema: Arc<Schema>,
253+
batches: Vec<RecordBatch>,
254+
) -> Self {
255+
Self::with_config(
256+
scenario,
257+
unit,
258+
SessionConfig::new(),
259+
Some(schema),
260+
Some(batches),
261+
)
262+
.await
236263
}
237264

238265
async fn with_config(
239266
scenario: Scenario,
240267
unit: Unit,
241268
mut config: SessionConfig,
269+
custom_schema: Option<Arc<Schema>>,
270+
custom_batches: Option<Vec<RecordBatch>>,
242271
) -> Self {
243272
// Use a single partition for deterministic results no matter how many CPUs the host has
244273
config = config.with_target_partitions(1);
245274
let file = match unit {
246275
Unit::RowGroup(row_per_group) => {
247276
config = config.with_parquet_bloom_filter_pruning(true);
248-
make_test_file_rg(scenario, row_per_group).await
277+
config.options_mut().execution.parquet.pushdown_filters = true;
278+
make_test_file_rg(scenario, row_per_group, custom_schema, custom_batches)
279+
.await
249280
}
250281
Unit::Page(row_per_page) => {
251282
config = config.with_parquet_page_index_pruning(true);
@@ -1075,7 +1106,12 @@ fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
10751106
}
10761107

10771108
/// Create a test parquet file with various data types
1078-
async fn make_test_file_rg(scenario: Scenario, row_per_group: usize) -> NamedTempFile {
1109+
async fn make_test_file_rg(
1110+
scenario: Scenario,
1111+
row_per_group: usize,
1112+
custom_schema: Option<Arc<Schema>>,
1113+
custom_batches: Option<Vec<RecordBatch>>,
1114+
) -> NamedTempFile {
10791115
let mut output_file = tempfile::Builder::new()
10801116
.prefix("parquet_pruning")
10811117
.suffix(".parquet")
@@ -1088,8 +1124,14 @@ async fn make_test_file_rg(scenario: Scenario, row_per_group: usize) -> NamedTem
10881124
.set_statistics_enabled(EnabledStatistics::Page)
10891125
.build();
10901126

1091-
let batches = create_data_batch(scenario);
1092-
let schema = batches[0].schema();
1127+
let (batches, schema) =
1128+
if let (Some(schema), Some(batches)) = (custom_schema, custom_batches) {
1129+
(batches, schema)
1130+
} else {
1131+
let batches = create_data_batch(scenario);
1132+
let schema = batches[0].schema();
1133+
(batches, schema)
1134+
};
10931135

10941136
let mut writer = ArrowWriter::try_new(&mut output_file, schema, Some(props)).unwrap();
10951137

0 commit comments

Comments
 (0)