Skip to content

Commit fe3f018

Browse files
authored
Eagerly construct PagePruningPredicate (apache#4713)
* Eagerly construct PagePruningPredicate * Error propagation * Clippy * Fix handling of always true predicates
1 parent 2f5b25d commit fe3f018

2 files changed

Lines changed: 128 additions & 133 deletions

File tree

datafusion/core/src/physical_plan/file_format/parquet.rs

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ mod page_filter;
6565
mod row_filter;
6666
mod row_groups;
6767

68+
use crate::physical_plan::file_format::parquet::page_filter::PagePruningPredicate;
6869
pub use metrics::ParquetFileMetrics;
6970

7071
use super::get_output_ordering;
@@ -91,6 +92,8 @@ pub struct ParquetExec {
9192
predicate: Option<Arc<Expr>>,
9293
/// Optional predicate for pruning row groups
9394
pruning_predicate: Option<Arc<PruningPredicate>>,
95+
/// Optional predicate for pruning pages
96+
page_pruning_predicate: Option<Arc<PagePruningPredicate>>,
9497
/// Optional hint for the size of the parquet metadata
9598
metadata_size_hint: Option<usize>,
9699
/// Optional user defined parquet file reader factory
@@ -111,13 +114,11 @@ impl ParquetExec {
111114
let predicate_creation_errors =
112115
MetricBuilder::new(&metrics).global_counter("num_predicate_creation_errors");
113116

117+
let file_schema = &base_config.file_schema;
114118
let pruning_predicate = predicate
115119
.clone()
116120
.and_then(|predicate_expr| {
117-
match PruningPredicate::try_new(
118-
predicate_expr,
119-
base_config.file_schema.clone(),
120-
) {
121+
match PruningPredicate::try_new(predicate_expr, file_schema.clone()) {
121122
Ok(pruning_predicate) => Some(Arc::new(pruning_predicate)),
122123
Err(e) => {
123124
debug!("Could not create pruning predicate for: {}", e);
@@ -126,14 +127,18 @@ impl ParquetExec {
126127
}
127128
}
128129
})
129-
.and_then(|pruning_predicate| {
130-
// If the pruning predicate can't prune anything, don't try
131-
if pruning_predicate.allways_true() {
130+
.filter(|p| !p.allways_true());
131+
132+
let page_pruning_predicate = predicate.as_ref().and_then(|predicate_expr| {
133+
match PagePruningPredicate::try_new(predicate_expr, file_schema.clone()) {
134+
Ok(pruning_predicate) => Some(Arc::new(pruning_predicate)),
135+
Err(e) => {
136+
debug!("Could not create page pruning predicate for: {}", e);
137+
predicate_creation_errors.add(1);
132138
None
133-
} else {
134-
Some(pruning_predicate)
135139
}
136-
});
140+
}
141+
});
137142

138143
// Save original predicate
139144
let predicate = predicate.map(Arc::new);
@@ -150,6 +155,7 @@ impl ParquetExec {
150155
metrics,
151156
predicate,
152157
pruning_predicate,
158+
page_pruning_predicate,
153159
metadata_size_hint,
154160
parquet_file_reader_factory: None,
155161
}
@@ -295,6 +301,7 @@ impl ExecutionPlan for ParquetExec {
295301
batch_size: ctx.session_config().batch_size(),
296302
predicate: self.predicate.clone(),
297303
pruning_predicate: self.pruning_predicate.clone(),
304+
page_pruning_predicate: self.page_pruning_predicate.clone(),
298305
table_schema: self.base_config.file_schema.clone(),
299306
metadata_size_hint: self.metadata_size_hint,
300307
metrics: self.metrics.clone(),
@@ -382,6 +389,7 @@ struct ParquetOpener {
382389
batch_size: usize,
383390
predicate: Option<Arc<Expr>>,
384391
pruning_predicate: Option<Arc<PruningPredicate>>,
392+
page_pruning_predicate: Option<Arc<PagePruningPredicate>>,
385393
table_schema: SchemaRef,
386394
metadata_size_hint: Option<usize>,
387395
metrics: ExecutionPlanMetricsSet,
@@ -414,6 +422,7 @@ impl FileOpener for ParquetOpener {
414422
let projection = self.projection.clone();
415423
let predicate = self.predicate.clone();
416424
let pruning_predicate = self.pruning_predicate.clone();
425+
let page_pruning_predicate = self.page_pruning_predicate.clone();
417426
let table_schema = self.table_schema.clone();
418427
let reorder_predicates = self.reorder_filters;
419428
let pushdown_filters = self.pushdown_filters;
@@ -470,20 +479,14 @@ impl FileOpener for ParquetOpener {
470479
// page index pruning: if all data on individual pages can
471480
// be ruled using page metadata, rows from other columns
472481
// with that range can be skipped as well
473-
if let Some(row_selection) = (enable_page_index && !row_groups.is_empty())
474-
.then(|| {
475-
page_filter::build_page_filter(
476-
pruning_predicate.as_ref().map(|p| p.as_ref()),
477-
builder.schema().clone(),
478-
&row_groups,
479-
file_metadata.as_ref(),
480-
&file_metrics,
481-
)
482-
})
483-
.transpose()?
484-
.flatten()
485-
{
486-
builder = builder.with_row_selection(row_selection);
482+
if enable_page_index && !row_groups.is_empty() {
483+
if let Some(p) = page_pruning_predicate {
484+
let pruned =
485+
p.prune(&row_groups, file_metadata.as_ref(), &file_metrics)?;
486+
if let Some(row_selection) = pruned {
487+
builder = builder.with_row_selection(row_selection);
488+
}
489+
}
487490
}
488491

489492
let stream = builder

datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs

Lines changed: 101 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use arrow::array::{
2424
use arrow::datatypes::DataType;
2525
use arrow::{array::ArrayRef, datatypes::SchemaRef, error::ArrowError};
2626
use datafusion_common::{Column, DataFusionError, Result};
27+
use datafusion_expr::Expr;
2728
use datafusion_optimizer::utils::split_conjunction;
2829
use log::{debug, trace};
2930
use parquet::schema::types::ColumnDescriptor;
@@ -45,8 +46,8 @@ use crate::physical_plan::file_format::parquet::{
4546

4647
use super::metrics::ParquetFileMetrics;
4748

48-
/// Create a RowSelection that may rule out ranges of rows based on
49-
/// parquet page level statistics, if any.
49+
/// A [`PagePruningPredicate`] provides the ability to construct a [`RowSelection`]
50+
/// based on parquet page level statistics, if any
5051
///
5152
/// For example, given a row group with two column (chunks) for `A`
5253
/// and `B` with the following with page level statistics:
@@ -99,94 +100,114 @@ use super::metrics::ParquetFileMetrics;
99100
///
100101
/// So we can entirely skip rows 0->199 and 250->299 as we know they
101102
/// can not contain rows that match the predicate.
102-
pub(crate) fn build_page_filter(
103-
pruning_predicate: Option<&PruningPredicate>,
104-
schema: SchemaRef,
105-
row_groups: &[usize],
106-
file_metadata: &ParquetMetaData,
107-
file_metrics: &ParquetFileMetrics,
108-
) -> Result<Option<RowSelection>> {
109-
// scoped timer updates on drop
110-
let _timer_guard = file_metrics.page_index_eval_time.timer();
111-
let page_index_predicates =
112-
extract_page_index_push_down_predicates(pruning_predicate, schema)?;
103+
#[derive(Debug)]
104+
pub(crate) struct PagePruningPredicate {
105+
predicates: Vec<PruningPredicate>,
106+
}
113107

114-
if page_index_predicates.is_empty() {
115-
return Ok(None);
108+
impl PagePruningPredicate {
109+
/// Create a new [`PagePruningPredicate`]
110+
pub fn try_new(expr: &Expr, schema: SchemaRef) -> Result<Self> {
111+
let predicates = split_conjunction(expr)
112+
.into_iter()
113+
.filter_map(|predicate| match predicate.to_columns() {
114+
Ok(columns) if columns.len() == 1 => {
115+
match PruningPredicate::try_new(predicate.clone(), schema.clone()) {
116+
Ok(p) if !p.allways_true() => Some(Ok(p)),
117+
_ => None,
118+
}
119+
}
120+
_ => None,
121+
})
122+
.collect::<Result<Vec<_>>>()?;
123+
Ok(Self { predicates })
116124
}
117125

118-
let groups = file_metadata.row_groups();
126+
/// Returns a [`RowSelection`] for the given file
127+
pub fn prune(
128+
&self,
129+
row_groups: &[usize],
130+
file_metadata: &ParquetMetaData,
131+
file_metrics: &ParquetFileMetrics,
132+
) -> Result<Option<RowSelection>> {
133+
// scoped timer updates on drop
134+
let _timer_guard = file_metrics.page_index_eval_time.timer();
135+
if self.predicates.is_empty() {
136+
return Ok(None);
137+
}
119138

120-
let file_offset_indexes = file_metadata.offset_indexes();
121-
let file_page_indexes = file_metadata.page_indexes();
122-
if let (Some(file_offset_indexes), Some(file_page_indexes)) =
123-
(file_offset_indexes, file_page_indexes)
124-
{
125-
let mut row_selections = Vec::with_capacity(page_index_predicates.len());
126-
for predicate in page_index_predicates {
127-
// `extract_page_index_push_down_predicates` only return predicate with one col.
128-
// when building `PruningPredicate`, some single column filter like `abs(i) = 1`
129-
// will be rewrite to `lit(true)`, so may have an empty required_columns.
130-
if let Some(&col_id) = predicate.need_input_columns_ids().iter().next() {
131-
let mut selectors = Vec::with_capacity(row_groups.len());
132-
for r in row_groups.iter() {
133-
let rg_offset_indexes = file_offset_indexes.get(*r);
134-
let rg_page_indexes = file_page_indexes.get(*r);
135-
if let (Some(rg_page_indexes), Some(rg_offset_indexes)) =
136-
(rg_page_indexes, rg_offset_indexes)
137-
{
138-
selectors.extend(
139-
prune_pages_in_one_row_group(
140-
&groups[*r],
141-
&predicate,
142-
rg_offset_indexes.get(col_id),
143-
rg_page_indexes.get(col_id),
144-
groups[*r].column(col_id).column_descr(),
145-
file_metrics,
146-
)
147-
.map_err(|e| {
148-
ArrowError::ParquetError(format!(
149-
"Fail in prune_pages_in_one_row_group: {}",
150-
e
151-
))
152-
}),
153-
);
154-
} else {
155-
trace!(
156-
"Did not have enough metadata to prune with page indexes, falling back, falling back to all rows",
157-
);
158-
// fallback select all rows
159-
let all_selected =
160-
vec![RowSelector::select(groups[*r].num_rows() as usize)];
161-
selectors.push(all_selected);
139+
let page_index_predicates = &self.predicates;
140+
let groups = file_metadata.row_groups();
141+
142+
let file_offset_indexes = file_metadata.offset_indexes();
143+
let file_page_indexes = file_metadata.page_indexes();
144+
if let (Some(file_offset_indexes), Some(file_page_indexes)) =
145+
(file_offset_indexes, file_page_indexes)
146+
{
147+
let mut row_selections = Vec::with_capacity(page_index_predicates.len());
148+
for predicate in page_index_predicates {
149+
// `extract_page_index_push_down_predicates` only return predicate with one col.
150+
// when building `PruningPredicate`, some single column filter like `abs(i) = 1`
151+
// will be rewrite to `lit(true)`, so may have an empty required_columns.
152+
if let Some(&col_id) = predicate.need_input_columns_ids().iter().next() {
153+
let mut selectors = Vec::with_capacity(row_groups.len());
154+
for r in row_groups.iter() {
155+
let rg_offset_indexes = file_offset_indexes.get(*r);
156+
let rg_page_indexes = file_page_indexes.get(*r);
157+
if let (Some(rg_page_indexes), Some(rg_offset_indexes)) =
158+
(rg_page_indexes, rg_offset_indexes)
159+
{
160+
selectors.extend(
161+
prune_pages_in_one_row_group(
162+
&groups[*r],
163+
predicate,
164+
rg_offset_indexes.get(col_id),
165+
rg_page_indexes.get(col_id),
166+
groups[*r].column(col_id).column_descr(),
167+
file_metrics,
168+
)
169+
.map_err(|e| {
170+
ArrowError::ParquetError(format!(
171+
"Fail in prune_pages_in_one_row_group: {}",
172+
e
173+
))
174+
}),
175+
);
176+
} else {
177+
trace!(
178+
"Did not have enough metadata to prune with page indexes, falling back, falling back to all rows",
179+
);
180+
// fallback select all rows
181+
let all_selected =
182+
vec![RowSelector::select(groups[*r].num_rows() as usize)];
183+
selectors.push(all_selected);
184+
}
162185
}
186+
debug!(
187+
"Use filter and page index create RowSelection {:?} from predicate: {:?}",
188+
&selectors,
189+
predicate.predicate_expr(),
190+
);
191+
row_selections
192+
.push(selectors.into_iter().flatten().collect::<Vec<_>>());
163193
}
164-
debug!(
165-
"Use filter and page index create RowSelection {:?} from predicate: {:?}",
166-
&selectors,
167-
predicate.predicate_expr(),
168-
);
169-
row_selections.push(selectors.into_iter().flatten().collect::<Vec<_>>());
170194
}
195+
let final_selection = combine_multi_col_selection(row_selections);
196+
let total_skip = final_selection.iter().fold(0, |acc, x| {
197+
if x.skip {
198+
acc + x.row_count
199+
} else {
200+
acc
201+
}
202+
});
203+
file_metrics.page_index_rows_filtered.add(total_skip);
204+
Ok(Some(final_selection))
205+
} else {
206+
Ok(None)
171207
}
172-
let final_selection = combine_multi_col_selection(row_selections);
173-
let total_skip =
174-
final_selection.iter().fold(
175-
0,
176-
|acc, x| {
177-
if x.skip {
178-
acc + x.row_count
179-
} else {
180-
acc
181-
}
182-
},
183-
);
184-
file_metrics.page_index_rows_filtered.add(total_skip);
185-
Ok(Some(final_selection))
186-
} else {
187-
Ok(None)
188208
}
189209
}
210+
190211
/// Intersects the [`RowSelector`]s
191212
///
192213
/// For exampe, given:
@@ -203,35 +224,6 @@ fn combine_multi_col_selection(row_selections: Vec<Vec<RowSelector>>) -> RowSele
203224
.unwrap()
204225
}
205226

206-
// Extract single col pruningPredicate from input predicate for evaluating page Index.
207-
fn extract_page_index_push_down_predicates(
208-
predicate: Option<&PruningPredicate>,
209-
schema: SchemaRef,
210-
) -> Result<Vec<PruningPredicate>> {
211-
let mut one_col_predicates = vec![];
212-
if let Some(predicate) = predicate {
213-
let expr = predicate.logical_expr();
214-
// todo try use CNF rewrite when ready
215-
let predicates = split_conjunction(expr);
216-
let mut one_col_expr = vec![];
217-
predicates
218-
.into_iter()
219-
.try_for_each::<_, Result<()>>(|predicate| {
220-
let columns = predicate.to_columns()?;
221-
if columns.len() == 1 {
222-
one_col_expr.push(predicate);
223-
}
224-
Ok(())
225-
})?;
226-
one_col_predicates = one_col_expr
227-
.into_iter()
228-
.map(|e| PruningPredicate::try_new(e.clone(), schema.clone()))
229-
.collect::<Result<Vec<_>>>()
230-
.unwrap_or_default();
231-
}
232-
Ok(one_col_predicates)
233-
}
234-
235227
fn prune_pages_in_one_row_group(
236228
group: &RowGroupMetaData,
237229
predicate: &PruningPredicate,

0 commit comments

Comments
 (0)