Skip to content

Commit ee5c89c

Browse files
adriangbclaude
andcommitted
feat: add ExpressionPlacement enum for optimizer expression placement decisions
This extracts the ExpressionPlacement enum from PR apache#20036 to provide a mechanism for expressions to indicate where they should be placed in the query plan for optimal execution. Changes: - Add ExpressionPlacement enum with variants: Literal, Column, PlaceAtLeaves, PlaceAtRoot - Add placement() method to Expr, ScalarUDF, ScalarUDFImpl traits - Add placement() method to PhysicalExpr trait and implementations - Implement placement() for GetFieldFunc to return PlaceAtLeaves when accessing struct fields with literal keys - Replace is_expr_trivial() checks with placement() in optimizer and physical-plan projection code Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent f997169 commit ee5c89c

12 files changed

Lines changed: 161 additions & 28 deletions

File tree

datafusion/expr-common/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,10 @@ pub mod dyn_eq;
4040
pub mod groups_accumulator;
4141
pub mod interval_arithmetic;
4242
pub mod operator;
43+
pub mod placement;
4344
pub mod signature;
4445
pub mod sort_properties;
4546
pub mod statistics;
4647
pub mod type_coercion;
48+
49+
pub use placement::ExpressionPlacement;
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Expression placement information for optimization decisions.
19+
20+
/// Describes where an expression should be placed in the query plan for
21+
/// optimal execution. This is used by optimizers to make decisions about
22+
/// expression placement, such as whether to push expressions down through
23+
/// projections.
24+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
25+
pub enum ExpressionPlacement {
26+
/// A constant literal value.
27+
Literal,
28+
/// A simple column reference.
29+
Column,
30+
/// A cheap expression that can be pushed to leaf nodes in the plan.
31+
/// Examples include `get_field` for struct field access.
32+
PlaceAtLeaves,
33+
/// An expensive expression that should stay at the root of the plan.
34+
/// This is the default for most expressions.
35+
PlaceAtRoot,
36+
}

datafusion/expr/src/expr.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use crate::{AggregateUDF, Volatility};
3131
use crate::{ExprSchemable, Operator, Signature, WindowFrame, WindowUDF};
3232

3333
use arrow::datatypes::{DataType, Field, FieldRef};
34+
use datafusion_expr_common::placement::ExpressionPlacement;
3435
use datafusion_common::cse::{HashNode, NormalizeEq, Normalizeable};
3536
use datafusion_common::tree_node::{
3637
Transformed, TransformedResult, TreeNode, TreeNodeContainer, TreeNodeRecursion,
@@ -1536,6 +1537,23 @@ impl Expr {
15361537
}
15371538
}
15381539

1540+
/// Returns placement information for this expression.
1541+
///
1542+
/// This is used by optimizers to make decisions about expression placement,
1543+
/// such as whether to push expressions down through projections.
1544+
pub fn placement(&self) -> ExpressionPlacement {
1545+
match self {
1546+
Expr::Column(_) => ExpressionPlacement::Column,
1547+
Expr::Literal(_, _) => ExpressionPlacement::Literal,
1548+
Expr::ScalarFunction(func) => {
1549+
let arg_placements: Vec<_> =
1550+
func.args.iter().map(|arg| arg.placement()).collect();
1551+
func.func.placement(&arg_placements)
1552+
}
1553+
_ => ExpressionPlacement::PlaceAtRoot,
1554+
}
1555+
}
1556+
15391557
/// Return String representation of the variant represented by `self`
15401558
/// Useful for non-rust based bindings
15411559
pub fn variant_name(&self) -> &str {

datafusion/expr/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ pub use datafusion_expr_common::accumulator::Accumulator;
9595
pub use datafusion_expr_common::columnar_value::ColumnarValue;
9696
pub use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator};
9797
pub use datafusion_expr_common::operator::Operator;
98+
pub use datafusion_expr_common::placement::ExpressionPlacement;
9899
pub use datafusion_expr_common::signature::{
99100
ArrayFunctionArgument, ArrayFunctionSignature, Coercion, Signature,
100101
TIMEZONE_WILDCARD, TypeSignature, TypeSignatureClass, Volatility,

datafusion/expr/src/udf.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use crate::simplify::{ExprSimplifyResult, SimplifyContext};
2424
use crate::sort_properties::{ExprProperties, SortProperties};
2525
use crate::udf_eq::UdfEq;
2626
use crate::{ColumnarValue, Documentation, Expr, Signature};
27+
use datafusion_expr_common::placement::ExpressionPlacement;
2728
use arrow::datatypes::{DataType, Field, FieldRef};
2829
#[cfg(debug_assertions)]
2930
use datafusion_common::assert_or_internal_err;
@@ -361,6 +362,13 @@ impl ScalarUDF {
361362
pub fn as_async(&self) -> Option<&AsyncScalarUDF> {
362363
self.inner().as_any().downcast_ref::<AsyncScalarUDF>()
363364
}
365+
366+
/// Returns placement information for this function.
367+
///
368+
/// See [`ScalarUDFImpl::placement`] for more details.
369+
pub fn placement(&self, args: &[ExpressionPlacement]) -> ExpressionPlacement {
370+
self.inner.placement(args)
371+
}
364372
}
365373

366374
impl<F> From<F> for ScalarUDF
@@ -964,6 +972,20 @@ pub trait ScalarUDFImpl: Debug + DynEq + DynHash + Send + Sync {
964972
fn documentation(&self) -> Option<&Documentation> {
965973
None
966974
}
975+
976+
/// Returns placement information for this function.
977+
///
978+
/// This is used by optimizers to make decisions about expression placement,
979+
/// such as whether to push expressions down through projections.
980+
///
981+
/// The default implementation returns [`ExpressionPlacement::PlaceAtRoot`],
982+
/// meaning the expression should stay at the root of the plan.
983+
///
984+
/// Override this method to indicate that the function can be pushed down
985+
/// closer to the data source.
986+
fn placement(&self, _args: &[ExpressionPlacement]) -> ExpressionPlacement {
987+
ExpressionPlacement::PlaceAtRoot
988+
}
967989
}
968990

969991
/// ScalarUDF that adds an alias to the underlying function. It is better to
@@ -1091,6 +1113,10 @@ impl ScalarUDFImpl for AliasedScalarUDFImpl {
10911113
fn documentation(&self) -> Option<&Documentation> {
10921114
self.inner.documentation()
10931115
}
1116+
1117+
fn placement(&self, args: &[ExpressionPlacement]) -> ExpressionPlacement {
1118+
self.inner.placement(args)
1119+
}
10941120
}
10951121

10961122
#[cfg(test)]

datafusion/functions/src/core/getfield.rs

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ use datafusion_common::{
3333
use datafusion_expr::expr::ScalarFunction;
3434
use datafusion_expr::simplify::ExprSimplifyResult;
3535
use datafusion_expr::{
36-
ColumnarValue, Documentation, Expr, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF,
37-
ScalarUDFImpl, Signature, Volatility,
36+
ColumnarValue, Documentation, Expr, ExpressionPlacement, ReturnFieldArgs,
37+
ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Volatility,
3838
};
3939
use datafusion_macros::user_doc;
4040

@@ -499,6 +499,32 @@ impl ScalarUDFImpl for GetFieldFunc {
499499
fn documentation(&self) -> Option<&Documentation> {
500500
self.doc()
501501
}
502+
503+
fn placement(&self, args: &[ExpressionPlacement]) -> ExpressionPlacement {
504+
// get_field can be pushed to leaves if:
505+
// 1. The base (first arg) is a column or already placeable at leaves
506+
// 2. All field keys (remaining args) are literals
507+
if args.is_empty() {
508+
return ExpressionPlacement::PlaceAtRoot;
509+
}
510+
511+
let base_placement = args[0];
512+
let base_is_pushable = matches!(
513+
base_placement,
514+
ExpressionPlacement::Column | ExpressionPlacement::PlaceAtLeaves
515+
);
516+
517+
let all_keys_are_literals = args
518+
.iter()
519+
.skip(1)
520+
.all(|p| matches!(p, ExpressionPlacement::Literal));
521+
522+
if base_is_pushable && all_keys_are_literals {
523+
ExpressionPlacement::PlaceAtLeaves
524+
} else {
525+
ExpressionPlacement::PlaceAtRoot
526+
}
527+
}
502528
}
503529

504530
#[cfg(test)]

datafusion/optimizer/src/optimize_projections/mod.rs

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ use datafusion_common::{
3030
};
3131
use datafusion_expr::expr::Alias;
3232
use datafusion_expr::{
33-
Aggregate, Distinct, EmptyRelation, Expr, Projection, TableScan, Unnest, Window,
34-
logical_plan::LogicalPlan,
33+
Aggregate, Distinct, EmptyRelation, Expr, ExpressionPlacement, Projection, TableScan,
34+
Unnest, Window, logical_plan::LogicalPlan,
3535
};
3636

3737
use crate::optimize_projections::required_indices::RequiredIndices;
@@ -525,14 +525,15 @@ fn merge_consecutive_projections(proj: Projection) -> Result<Transformed<Project
525525
expr.iter()
526526
.for_each(|expr| expr.add_column_ref_counts(&mut column_referral_map));
527527

528-
// If an expression is non-trivial and appears more than once, do not merge
528+
// If an expression is non-trivial (PlaceAtRoot) and appears more than once, do not merge
529529
// them as consecutive projections will benefit from a compute-once approach.
530530
// For details, see: https://github.com/apache/datafusion/issues/8296
531531
if column_referral_map.into_iter().any(|(col, usage)| {
532532
usage > 1
533-
&& !is_expr_trivial(
534-
&prev_projection.expr
535-
[prev_projection.schema.index_of_column(col).unwrap()],
533+
&& matches!(
534+
prev_projection.expr[prev_projection.schema.index_of_column(col).unwrap()]
535+
.placement(),
536+
ExpressionPlacement::PlaceAtRoot
536537
)
537538
}) {
538539
// no change
@@ -586,11 +587,6 @@ fn merge_consecutive_projections(proj: Projection) -> Result<Transformed<Project
586587
}
587588
}
588589

589-
// Check whether `expr` is trivial; i.e. it doesn't imply any computation.
590-
fn is_expr_trivial(expr: &Expr) -> bool {
591-
matches!(expr, Expr::Column(_) | Expr::Literal(_, _))
592-
}
593-
594590
/// Rewrites a projection expression using the projection before it (i.e. its input)
595591
/// This is a subroutine to the `merge_consecutive_projections` function.
596592
///

datafusion/physical-expr-common/src/physical_expr.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use datafusion_common::{
3535
};
3636
use datafusion_expr_common::columnar_value::ColumnarValue;
3737
use datafusion_expr_common::interval_arithmetic::Interval;
38+
use datafusion_expr_common::placement::ExpressionPlacement;
3839
use datafusion_expr_common::sort_properties::ExprProperties;
3940
use datafusion_expr_common::statistics::Distribution;
4041

@@ -430,6 +431,16 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash {
430431
fn is_volatile_node(&self) -> bool {
431432
false
432433
}
434+
435+
/// Returns placement information for this expression.
436+
///
437+
/// This is used by optimizers to make decisions about expression placement,
438+
/// such as whether to push expressions down through projections.
439+
///
440+
/// The default implementation returns [`ExpressionPlacement::PlaceAtRoot`].
441+
fn placement(&self) -> ExpressionPlacement {
442+
ExpressionPlacement::PlaceAtRoot
443+
}
433444
}
434445

435446
#[deprecated(

datafusion/physical-expr/src/expressions/column.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use arrow::{
3030
use datafusion_common::tree_node::{Transformed, TreeNode};
3131
use datafusion_common::{Result, internal_err, plan_err};
3232
use datafusion_expr::ColumnarValue;
33+
use datafusion_expr_common::placement::ExpressionPlacement;
3334

3435
/// Represents the column at a given index in a RecordBatch
3536
///
@@ -146,6 +147,10 @@ impl PhysicalExpr for Column {
146147
fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147148
write!(f, "{}", self.name)
148149
}
150+
151+
fn placement(&self) -> ExpressionPlacement {
152+
ExpressionPlacement::Column
153+
}
149154
}
150155

151156
impl Column {

datafusion/physical-expr/src/expressions/literal.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use datafusion_common::{Result, ScalarValue};
3333
use datafusion_expr::Expr;
3434
use datafusion_expr_common::columnar_value::ColumnarValue;
3535
use datafusion_expr_common::interval_arithmetic::Interval;
36+
use datafusion_expr_common::placement::ExpressionPlacement;
3637
use datafusion_expr_common::sort_properties::{ExprProperties, SortProperties};
3738

3839
/// Represents a literal value
@@ -134,6 +135,10 @@ impl PhysicalExpr for Literal {
134135
fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
135136
std::fmt::Display::fmt(self, f)
136137
}
138+
139+
fn placement(&self) -> ExpressionPlacement {
140+
ExpressionPlacement::Literal
141+
}
137142
}
138143

139144
/// Create a literal expression

0 commit comments

Comments
 (0)