Skip to content

Commit fee90be

Browse files
AdamGSalamb
andauthored
Misc minor optimizations to query optimizer performance (#21128)
## Which issue does this PR close? - Closes #. ## Rationale for this change Inspired by @blaginin, trying to find more places that might drag the optimizer's performance. On my laptop , this improves many of the sql planner's benchmarks by a fairly consistent 2-5%. ## What changes are included in this PR? A slew of minor optimization in the logical planner, trying to avoid wasted work or repeated allocations ## Are these changes tested? Existing tests. ## Are there any user-facing changes? None --------- Signed-off-by: Adam Gutglick <adamgsal@gmail.com> Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent 9de1253 commit fee90be

8 files changed

Lines changed: 127 additions & 99 deletions

File tree

datafusion/optimizer/src/analyzer/type_coercion.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
use arrow::compute::can_cast_types;
2121
use datafusion_expr::binary::BinaryTypeCoercer;
2222
use itertools::{Itertools as _, izip};
23-
use std::sync::Arc;
23+
use std::sync::{Arc, LazyLock};
2424

2525
use crate::analyzer::AnalyzerRule;
2626
use crate::utils::NamePreserver;
@@ -91,11 +91,11 @@ impl AnalyzerRule for TypeCoercion {
9191
}
9292

9393
fn analyze(&self, plan: LogicalPlan, config: &ConfigOptions) -> Result<LogicalPlan> {
94-
let empty_schema = DFSchema::empty();
94+
static EMPTY_SCHEMA: LazyLock<DFSchema> = LazyLock::new(DFSchema::empty);
9595

9696
// recurse
9797
let transformed_plan = plan
98-
.transform_up_with_subqueries(|plan| analyze_internal(&empty_schema, plan))?
98+
.transform_up_with_subqueries(|plan| analyze_internal(&EMPTY_SCHEMA, plan))?
9999
.data;
100100

101101
// finish

datafusion/optimizer/src/common_subexpr_eliminate.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -325,11 +325,7 @@ impl CommonSubexprEliminate {
325325
.map(|expr| Some(name_preserver.save(expr)))
326326
.collect::<Vec<_>>()
327327
} else {
328-
new_aggr_expr
329-
.clone()
330-
.into_iter()
331-
.map(|_| None)
332-
.collect::<Vec<_>>()
328+
(0..new_aggr_expr.len()).map(|_| None).collect()
333329
};
334330

335331
let mut agg_exprs = common_exprs

datafusion/optimizer/src/extract_leaf_expressions.rs

Lines changed: 18 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use datafusion_expr::{Expr, ExpressionPlacement, Projection};
3232

3333
use crate::optimizer::ApplyOrder;
3434
use crate::push_down_filter::replace_cols_by_name;
35-
use crate::utils::has_all_column_refs;
35+
use crate::utils::{ColumnReference, has_all_column_refs, schema_columns};
3636
use crate::{OptimizerConfig, OptimizerRule};
3737

3838
/// Prefix for aliases generated by the extraction optimizer passes.
@@ -213,10 +213,11 @@ fn extract_from_plan(
213213
.collect();
214214

215215
// Build per-input column sets for routing expressions to the correct input
216-
let input_column_sets: Vec<std::collections::HashSet<Column>> = input_schemas
217-
.iter()
218-
.map(|schema| schema_columns(schema.as_ref()))
219-
.collect();
216+
let input_column_sets: Vec<std::collections::HashSet<ColumnReference>> =
217+
input_schemas
218+
.iter()
219+
.map(|schema| schema_columns(schema.as_ref()))
220+
.collect();
220221

221222
// Transform expressions via map_expressions with routing
222223
let transformed = plan.map_expressions(|expr| {
@@ -272,7 +273,7 @@ fn extract_from_plan(
272273
/// in both sides of a join).
273274
fn find_owning_input(
274275
expr: &Expr,
275-
input_column_sets: &[std::collections::HashSet<Column>],
276+
input_column_sets: &[std::collections::HashSet<ColumnReference>],
276277
) -> Option<usize> {
277278
let mut found = None;
278279
for (idx, cols) in input_column_sets.iter().enumerate() {
@@ -292,7 +293,7 @@ fn find_owning_input(
292293
fn routing_extract(
293294
expr: Expr,
294295
extractors: &mut [LeafExpressionExtractor],
295-
input_column_sets: &[std::collections::HashSet<Column>],
296+
input_column_sets: &[std::collections::HashSet<ColumnReference>],
296297
) -> Result<Transformed<Expr>> {
297298
expr.transform_down(|e| {
298299
// Skip expressions already aliased with extracted expression pattern
@@ -340,19 +341,6 @@ fn routing_extract(
340341
})
341342
}
342343

343-
/// Returns all columns in the schema (both qualified and unqualified forms)
344-
fn schema_columns(schema: &DFSchema) -> std::collections::HashSet<Column> {
345-
schema
346-
.iter()
347-
.flat_map(|(qualifier, field)| {
348-
[
349-
Column::new(qualifier.cloned(), field.name()),
350-
Column::new_unqualified(field.name()),
351-
]
352-
})
353-
.collect()
354-
}
355-
356344
/// Rewrites extraction pairs and column references from one qualifier
357345
/// space to another.
358346
///
@@ -1072,7 +1060,7 @@ fn route_to_inputs(
10721060
pairs: &[(Expr, String)],
10731061
columns: &IndexSet<Column>,
10741062
node: &LogicalPlan,
1075-
input_column_sets: &[std::collections::HashSet<Column>],
1063+
input_column_sets: &[std::collections::HashSet<ColumnReference>],
10761064
input_schemas: &[Arc<DFSchema>],
10771065
) -> Result<Option<Vec<ExtractionTarget>>> {
10781066
let num_inputs = input_schemas.len();
@@ -1173,7 +1161,7 @@ fn try_push_into_inputs(
11731161
// Build per-input schemas and column sets for routing
11741162
let input_schemas: Vec<Arc<DFSchema>> =
11751163
inputs.iter().map(|i| Arc::clone(i.schema())).collect();
1176-
let input_column_sets: Vec<std::collections::HashSet<Column>> =
1164+
let input_column_sets: Vec<std::collections::HashSet<ColumnReference>> =
11771165
input_schemas.iter().map(|s| schema_columns(s)).collect();
11781166

11791167
// Route pairs and columns to the appropriate inputs
@@ -2436,16 +2424,18 @@ mod tests {
24362424
// Simulate schema_columns output for two sides of a join where both
24372425
// have a "user" column — each set contains the qualified and
24382426
// unqualified form.
2439-
let left_cols: HashSet<Column> = [
2440-
Column::new(Some("test"), "user"),
2441-
Column::new_unqualified("user"),
2427+
let relation = "test".into();
2428+
let left_cols: HashSet<ColumnReference> = [
2429+
ColumnReference::new(Some(&relation), "user"),
2430+
ColumnReference::new_unqualified("user"),
24422431
]
24432432
.into_iter()
24442433
.collect();
24452434

2446-
let right_cols: HashSet<Column> = [
2447-
Column::new(Some("right"), "user"),
2448-
Column::new_unqualified("user"),
2435+
let relation = "right".into();
2436+
let right_cols: HashSet<ColumnReference> = [
2437+
ColumnReference::new(Some(&relation), "user"),
2438+
ColumnReference::new_unqualified("user"),
24492439
]
24502440
.into_iter()
24512441
.collect();

datafusion/optimizer/src/optimize_unions.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,11 @@ impl OptimizerRule for OptimizeUnions {
6464
let inputs = inputs
6565
.into_iter()
6666
.flat_map(extract_plans_from_union)
67-
.map(|plan| coerce_plan_expr_for_schema(plan, &schema))
67+
.map(|plan| Ok(Arc::new(coerce_plan_expr_for_schema(plan, &schema)?)))
6868
.collect::<Result<Vec<_>>>()?;
6969

7070
Ok(Transformed::yes(LogicalPlan::Union(Union {
71-
inputs: inputs.into_iter().map(Arc::new).collect_vec(),
71+
inputs,
7272
schema,
7373
})))
7474
}

datafusion/optimizer/src/push_down_filter.rs

Lines changed: 34 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,9 @@ use datafusion_expr::{
4545

4646
use crate::optimizer::ApplyOrder;
4747
use crate::simplify_expressions::simplify_predicates;
48-
use crate::utils::{has_all_column_refs, is_restrict_null_predicate};
48+
use crate::utils::{
49+
ColumnReference, has_all_column_refs, is_restrict_null_predicate, schema_columns,
50+
};
4951
use crate::{OptimizerConfig, OptimizerRule};
5052
use datafusion_expr::ExpressionPlacement;
5153

@@ -190,11 +192,11 @@ struct ColumnChecker<'a> {
190192
/// schema of left join input
191193
left_schema: &'a DFSchema,
192194
/// columns in left_schema, computed on demand
193-
left_columns: Option<HashSet<Column>>,
195+
left_columns: Option<HashSet<ColumnReference<'a>>>,
194196
/// schema of right join input
195197
right_schema: &'a DFSchema,
196198
/// columns in left_schema, computed on demand
197-
right_columns: Option<HashSet<Column>>,
199+
right_columns: Option<HashSet<ColumnReference<'a>>>,
198200
}
199201

200202
impl<'a> ColumnChecker<'a> {
@@ -224,20 +226,6 @@ impl<'a> ColumnChecker<'a> {
224226
}
225227
}
226228

227-
/// Returns all columns in the schema
228-
fn schema_columns(schema: &DFSchema) -> HashSet<Column> {
229-
schema
230-
.iter()
231-
.flat_map(|(qualifier, field)| {
232-
[
233-
Column::new(qualifier.cloned(), field.name()),
234-
// we need to push down filter using unqualified column as well
235-
Column::new_unqualified(field.name()),
236-
]
237-
})
238-
.collect::<HashSet<_>>()
239-
}
240-
241229
/// Determine whether the predicate can evaluate as the join conditions
242230
fn can_evaluate_as_join_condition(predicate: &Expr) -> Result<bool> {
243231
let mut is_evaluate = true;
@@ -320,10 +308,8 @@ fn can_evaluate_as_join_condition(predicate: &Expr) -> Result<bool> {
320308
/// * do nothing.
321309
fn extract_or_clauses_for_join<'a>(
322310
filters: &'a [Expr],
323-
schema: &'a DFSchema,
311+
schema_cols: &'a HashSet<ColumnReference>,
324312
) -> impl Iterator<Item = Expr> + 'a {
325-
let schema_columns = schema_columns(schema);
326-
327313
// new formed OR clauses and their column references
328314
filters.iter().filter_map(move |expr| {
329315
if let Expr::BinaryExpr(BinaryExpr {
@@ -332,8 +318,8 @@ fn extract_or_clauses_for_join<'a>(
332318
right,
333319
}) = expr
334320
{
335-
let left_expr = extract_or_clause(left.as_ref(), &schema_columns);
336-
let right_expr = extract_or_clause(right.as_ref(), &schema_columns);
321+
let left_expr = extract_or_clause(left.as_ref(), schema_cols);
322+
let right_expr = extract_or_clause(right.as_ref(), schema_cols);
337323

338324
// If nothing can be extracted from any sub clauses, do nothing for this OR clause.
339325
if let (Some(left_expr), Some(right_expr)) = (left_expr, right_expr) {
@@ -355,7 +341,10 @@ fn extract_or_clauses_for_join<'a>(
355341
/// Otherwise, return None.
356342
///
357343
/// For other clause, apply the rule above to extract clause.
358-
fn extract_or_clause(expr: &Expr, schema_columns: &HashSet<Column>) -> Option<Expr> {
344+
fn extract_or_clause(
345+
expr: &Expr,
346+
schema_columns: &HashSet<ColumnReference>,
347+
) -> Option<Expr> {
359348
let mut predicate = None;
360349

361350
match expr {
@@ -421,6 +410,10 @@ fn push_down_all_join(
421410
// 3) should be kept as filter conditions
422411
let left_schema = join.left.schema();
423412
let right_schema = join.right.schema();
413+
414+
let left_schema_columns = schema_columns(left_schema.as_ref());
415+
let right_schema_columns = schema_columns(right_schema.as_ref());
416+
424417
let mut left_push = vec![];
425418
let mut right_push = vec![];
426419
let mut keep_predicates = vec![];
@@ -467,26 +460,38 @@ fn push_down_all_join(
467460
// Extract from OR clause, generate new predicates for both side of join if possible.
468461
// We only track the unpushable predicates above.
469462
if left_preserved {
470-
left_push.extend(extract_or_clauses_for_join(&keep_predicates, left_schema));
471-
left_push.extend(extract_or_clauses_for_join(&join_conditions, left_schema));
463+
left_push.extend(extract_or_clauses_for_join(
464+
&keep_predicates,
465+
&left_schema_columns,
466+
));
467+
left_push.extend(extract_or_clauses_for_join(
468+
&join_conditions,
469+
&left_schema_columns,
470+
));
472471
}
473472
if right_preserved {
474-
right_push.extend(extract_or_clauses_for_join(&keep_predicates, right_schema));
475-
right_push.extend(extract_or_clauses_for_join(&join_conditions, right_schema));
473+
right_push.extend(extract_or_clauses_for_join(
474+
&keep_predicates,
475+
&right_schema_columns,
476+
));
477+
right_push.extend(extract_or_clauses_for_join(
478+
&join_conditions,
479+
&right_schema_columns,
480+
));
476481
}
477482

478483
// For predicates from join filter, we should check with if a join side is preserved
479484
// in term of join filtering.
480485
if on_left_preserved {
481486
left_push.extend(extract_or_clauses_for_join(
482487
&on_filter_join_conditions,
483-
left_schema,
488+
&left_schema_columns,
484489
));
485490
}
486491
if on_right_preserved {
487492
right_push.extend(extract_or_clauses_for_join(
488493
&on_filter_join_conditions,
489-
right_schema,
494+
&right_schema_columns,
490495
));
491496
}
492497

datafusion/optimizer/src/push_down_limit.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,12 @@ impl OptimizerRule for PushDownLimit {
4747
true
4848
}
4949

50+
#[expect(clippy::only_used_in_recursion)]
5051
fn rewrite(
5152
&self,
5253
plan: LogicalPlan,
5354
config: &dyn OptimizerConfig,
5455
) -> Result<Transformed<LogicalPlan>> {
55-
let _ = config.options();
5656
let LogicalPlan::Limit(mut limit) = plan else {
5757
return Ok(Transformed::no(plan));
5858
};

0 commit comments

Comments
 (0)