Skip to content

Commit 9641322

Browse files
committed
feat: Allow struct field access projections to be pushed down into scans
1 parent 1a48d58 commit 9641322

24 files changed

Lines changed: 1043 additions & 148 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/expr-common/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,10 @@ pub mod dyn_eq;
4141
pub mod groups_accumulator;
4242
pub mod interval_arithmetic;
4343
pub mod operator;
44+
pub mod placement;
4445
pub mod signature;
4546
pub mod sort_properties;
4647
pub mod statistics;
4748
pub mod type_coercion;
49+
50+
pub use placement::ExpressionPlacement;
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Expression placement classification for scalar functions.
19+
//!
20+
//! This module determines where in the query plan expressions should be placed
21+
//! to optimize data flow:
22+
//!
23+
//! - **Leaf placement**: Cheap expressions (field accessors, column references)
24+
//! are pushed down to leaf nodes (near data sources) to reduce data volume early.
25+
//! - **Root placement**: Expensive expressions (computations, aggregates) are kept
26+
//! at root nodes (after filtering) to operate on less data.
27+
28+
/// Classification of expression placement for scalar functions.
29+
///
30+
/// This enum is used by [`ScalarUDFImpl::placement`] to allow
31+
/// functions to make context-dependent decisions about where they should
32+
/// be placed in the query plan based on the nature of their arguments.
33+
///
34+
/// For example, `get_field(struct_col, 'field_name')` is
35+
/// leaf-pushable (static field lookup), but `string_col like '%foo%'`
36+
/// performs expensive per-row computation and should be placed
37+
/// as further up the tree so that it can be run after filtering, sorting, etc.
38+
///
39+
/// # Why not pass in expressions directly to decide placement?
40+
///
41+
/// There are two reasons for using this enum instead of passing in the full expressions:
42+
///
43+
/// 1. **ScalarUDFImpl cannot reference PhysicalExpr**: The trait is defined in datafusion-expr,
44+
/// which cannot reference datafusion-physical-expr since the latter depends on the former
45+
/// (it would create a circular dependency).
46+
/// 2. **Simplicity**: Without this enum abstracting away logical / physical distinctions,
47+
/// we would need two distinct methods on ScalarUDFImpl: one for logical expression placement
48+
/// and one for physical expression placement. This would require implementors to duplicate logic
49+
/// and increases complexity for UDF authors.
50+
///
51+
/// [`ScalarUDFImpl::placement`]: https://docs.rs/datafusion-expr/latest/datafusion_expr/trait.ScalarUDFImpl.html#tymethod.placement
52+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53+
pub enum ExpressionPlacement {
54+
/// Argument is a literal constant value or an expression that can be
55+
/// evaluated to a constant at planning time.
56+
Literal,
57+
/// Argument is a simple column reference.
58+
Column,
59+
/// Argument is a complex expression that can be safely placed at leaf nodes.
60+
/// For example, if `get_field(struct_col, 'field_name')` is implemented as a
61+
/// leaf-pushable expression, then it would return this variant.
62+
/// Then `other_leaf_function(get_field(...), 42)` could also be classified as
63+
/// leaf-pushable using the knowledge that `get_field(...)` is leaf-pushable.
64+
PlaceAtLeafs,
65+
/// Argument is a complex expression that should be placed at root nodes.
66+
/// For example, `min(col1 + col2)` is not leaf-pushable because it requires per-row computation.
67+
PlaceAtRoot,
68+
}

datafusion/expr/src/expr.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use std::sync::Arc;
2727
use crate::expr_fn::binary_expr;
2828
use crate::function::WindowFunctionSimplification;
2929
use crate::logical_plan::Subquery;
30-
use crate::{AggregateUDF, Volatility};
30+
use crate::{AggregateUDF, ExpressionPlacement, Volatility};
3131
use crate::{ExprSchemable, Operator, Signature, WindowFrame, WindowUDF};
3232

3333
use arrow::datatypes::{DataType, Field, FieldRef};
@@ -1933,6 +1933,28 @@ impl Expr {
19331933
}
19341934
}
19351935

1936+
/// Returns the placement classification of this expression.
1937+
///
1938+
/// This tells us if optimizers should preferentially
1939+
/// move this expression towards the leafs of the execution plan
1940+
/// tree (for cheap expressions or expressions that reduce the data size)
1941+
/// or towards the root of the execution plan tree (for expensive expressions
1942+
/// that should be run after filtering or parallelization, or expressions that increase the data size).
1943+
pub fn placement(&self) -> ExpressionPlacement {
1944+
match self {
1945+
Expr::Column(_) => ExpressionPlacement::Column,
1946+
Expr::Literal(_, _) => ExpressionPlacement::Literal,
1947+
Expr::ScalarFunction(func) => {
1948+
// Classify each argument's placement for context-aware decision making
1949+
let arg_placements: Vec<ExpressionPlacement> =
1950+
func.args.iter().map(|arg| arg.placement()).collect();
1951+
1952+
func.func.placement_with_args(&arg_placements)
1953+
}
1954+
_ => ExpressionPlacement::PlaceAtRoot,
1955+
}
1956+
}
1957+
19361958
/// Return all references to columns in this expression.
19371959
///
19381960
/// # Example

datafusion/expr/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ pub use datafusion_doc::{
9292
DocSection, Documentation, DocumentationBuilder, aggregate_doc_sections,
9393
scalar_doc_sections, window_doc_sections,
9494
};
95+
pub use datafusion_expr_common::ExpressionPlacement;
9596
pub use datafusion_expr_common::accumulator::Accumulator;
9697
pub use datafusion_expr_common::columnar_value::ColumnarValue;
9798
pub use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator};

datafusion/expr/src/udf.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use datafusion_common::config::ConfigOptions;
3131
use datafusion_common::{ExprSchema, Result, ScalarValue, not_impl_err};
3232
use datafusion_expr_common::dyn_eq::{DynEq, DynHash};
3333
use datafusion_expr_common::interval_arithmetic::Interval;
34+
use datafusion_expr_common::placement::ExpressionPlacement;
3435
use std::any::Any;
3536
use std::cmp::Ordering;
3637
use std::fmt::Debug;
@@ -123,6 +124,22 @@ impl ScalarUDF {
123124
Self { inner: fun }
124125
}
125126

127+
/// Returns the placement classification of this function given its arguments' placement.
128+
///
129+
/// This allows functions to make context-dependent decisions about where they should
130+
/// be placed in the query plan. For example, `get_field(struct_col, 'field_name')` is
131+
/// leaf-pushable (static field lookup), but `string_col like '%foo%'`
132+
/// performs expensive per-row computation and should be placed
133+
/// as further up the tree so that it can be run after filtering, sorting, etc.
134+
///
135+
/// See [`ScalarUDFImpl::placement`] for more details.
136+
pub fn placement_with_args(
137+
&self,
138+
args: &[ExpressionPlacement],
139+
) -> ExpressionPlacement {
140+
self.inner.placement(args)
141+
}
142+
126143
/// Return the underlying [`ScalarUDFImpl`] trait object for this function
127144
pub fn inner(&self) -> &Arc<dyn ScalarUDFImpl> {
128145
&self.inner
@@ -885,6 +902,37 @@ pub trait ScalarUDFImpl: Debug + DynEq + DynHash + Send + Sync {
885902
fn documentation(&self) -> Option<&Documentation> {
886903
None
887904
}
905+
906+
/// Returns the placement classification of this function given its arguments' placement.
907+
///
908+
/// This method allows functions to make context-dependent decisions about
909+
/// where they should be placed in the query plan. The default implementation
910+
/// returns [`ExpressionPlacement::PlaceAtRoot`] (conservative default).
911+
///
912+
/// Leaf-pushable functions are lightweight accessor functions like `get_field`
913+
/// (struct field access) that simply access nested data within a column
914+
/// without significant computation.
915+
/// These can be pushed down to leaf nodes near data sources to reduce data volume early in the plan.
916+
///
917+
/// [`ExpressionPlacement::PlaceAtRoot`] represents expressions that should be kept after filtering,
918+
/// such as expensive computations or aggregates that benefit from operating
919+
/// on fewer rows.
920+
///
921+
/// # Example
922+
///
923+
/// - `get_field(struct_col, 'field_name')` with a literal key is leaf-pushable as it
924+
/// performs metadata only (cheap) extraction of a sub-array from a struct column.
925+
/// Thus, it can be placed near the data source to minimize data early.
926+
/// - `string_col like '%foo%'` performs expensive per-row computation and should be placed
927+
/// further up the tree so that it can be run after filtering, sorting, etc.
928+
///
929+
/// # Arguments
930+
///
931+
/// * `args` - Classification of each argument's placement, collected from the expression tree
932+
/// by the caller.
933+
fn placement(&self, _args: &[ExpressionPlacement]) -> ExpressionPlacement {
934+
ExpressionPlacement::PlaceAtRoot
935+
}
888936
}
889937

890938
/// ScalarUDF that adds an alias to the underlying function. It is better to
@@ -1012,6 +1060,10 @@ impl ScalarUDFImpl for AliasedScalarUDFImpl {
10121060
fn documentation(&self) -> Option<&Documentation> {
10131061
self.inner.documentation()
10141062
}
1063+
1064+
fn placement(&self, args: &[ExpressionPlacement]) -> ExpressionPlacement {
1065+
self.inner.placement(args)
1066+
}
10151067
}
10161068

10171069
#[cfg(test)]

datafusion/functions/src/core/getfield.rs

Lines changed: 109 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ use datafusion_common::{
3333
use datafusion_expr::expr::ScalarFunction;
3434
use datafusion_expr::simplify::ExprSimplifyResult;
3535
use datafusion_expr::{
36-
ColumnarValue, Documentation, Expr, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF,
37-
ScalarUDFImpl, Signature, Volatility,
36+
ColumnarValue, Documentation, Expr, ExpressionPlacement, ReturnFieldArgs,
37+
ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Volatility,
3838
};
3939
use datafusion_macros::user_doc;
4040

@@ -499,6 +499,37 @@ impl ScalarUDFImpl for GetFieldFunc {
499499
fn documentation(&self) -> Option<&Documentation> {
500500
self.doc()
501501
}
502+
503+
fn placement(&self, args: &[ExpressionPlacement]) -> ExpressionPlacement {
504+
// get_field is leaf-pushable if:
505+
// 1. The struct/map argument is a Column or PlaceAtLeafs (not Literal or PlaceAtRoot)
506+
// 2. All key arguments are literals (static field access, not dynamic per-row lookup)
507+
//
508+
// Literal base is not considered leaf-pushable because it would be constant-folded anyway.
509+
if args.is_empty() {
510+
return ExpressionPlacement::PlaceAtRoot;
511+
}
512+
513+
// Check if the base (struct/map) argument is Column or PlaceAtLeafs
514+
if !matches!(
515+
args[0],
516+
ExpressionPlacement::Column | ExpressionPlacement::PlaceAtLeafs
517+
) {
518+
return ExpressionPlacement::PlaceAtRoot;
519+
}
520+
521+
// All key arguments (after the first) must be literals for static field access
522+
let keys_literal = args
523+
.iter()
524+
.skip(1)
525+
.all(|a| *a == ExpressionPlacement::Literal);
526+
527+
if keys_literal {
528+
ExpressionPlacement::PlaceAtLeafs
529+
} else {
530+
ExpressionPlacement::PlaceAtRoot
531+
}
532+
}
502533
}
503534

504535
#[cfg(test)]
@@ -542,4 +573,80 @@ mod tests {
542573

543574
Ok(())
544575
}
576+
577+
#[test]
578+
fn test_placement_with_args_literal_key() {
579+
let func = GetFieldFunc::new();
580+
581+
// get_field(col, 'literal') -> leaf-pushable (static field access)
582+
let args = vec![ExpressionPlacement::Column, ExpressionPlacement::Literal];
583+
assert_eq!(func.placement(&args), ExpressionPlacement::PlaceAtLeafs);
584+
585+
// get_field(col, 'a', 'b') -> leaf-pushable (nested static field access)
586+
let args = vec![
587+
ExpressionPlacement::Column,
588+
ExpressionPlacement::Literal,
589+
ExpressionPlacement::Literal,
590+
];
591+
assert_eq!(func.placement(&args), ExpressionPlacement::PlaceAtLeafs);
592+
593+
// get_field(get_field(col, 'a'), 'b') represented as PlaceAtLeafs for base
594+
let args = vec![
595+
ExpressionPlacement::PlaceAtLeafs,
596+
ExpressionPlacement::Literal,
597+
];
598+
assert_eq!(func.placement(&args), ExpressionPlacement::PlaceAtLeafs);
599+
}
600+
601+
#[test]
602+
fn test_placement_with_args_column_key() {
603+
let func = GetFieldFunc::new();
604+
605+
// get_field(col, other_col) -> NOT leaf-pushable (dynamic per-row lookup)
606+
let args = vec![ExpressionPlacement::Column, ExpressionPlacement::Column];
607+
assert_eq!(func.placement(&args), ExpressionPlacement::PlaceAtRoot);
608+
609+
// get_field(col, 'a', other_col) -> NOT leaf-pushable (dynamic nested lookup)
610+
let args = vec![
611+
ExpressionPlacement::Column,
612+
ExpressionPlacement::Literal,
613+
ExpressionPlacement::Column,
614+
];
615+
assert_eq!(func.placement(&args), ExpressionPlacement::PlaceAtRoot);
616+
}
617+
618+
#[test]
619+
fn test_placement_with_args_root() {
620+
let func = GetFieldFunc::new();
621+
622+
// get_field(root_expr, 'literal') -> NOT leaf-pushable
623+
let args = vec![
624+
ExpressionPlacement::PlaceAtRoot,
625+
ExpressionPlacement::Literal,
626+
];
627+
assert_eq!(func.placement(&args), ExpressionPlacement::PlaceAtRoot);
628+
629+
// get_field(col, root_expr) -> NOT leaf-pushable
630+
let args = vec![
631+
ExpressionPlacement::Column,
632+
ExpressionPlacement::PlaceAtRoot,
633+
];
634+
assert_eq!(func.placement(&args), ExpressionPlacement::PlaceAtRoot);
635+
}
636+
637+
#[test]
638+
fn test_placement_with_args_edge_cases() {
639+
let func = GetFieldFunc::new();
640+
641+
// Empty args -> NOT leaf-pushable
642+
assert_eq!(func.placement(&[]), ExpressionPlacement::PlaceAtRoot);
643+
644+
// Just base, no key -> PlaceAtLeafs (not a valid call but should handle gracefully)
645+
let args = vec![ExpressionPlacement::Column];
646+
assert_eq!(func.placement(&args), ExpressionPlacement::PlaceAtLeafs);
647+
648+
// Literal base with literal key -> NOT leaf-pushable (would be constant-folded)
649+
let args = vec![ExpressionPlacement::Literal, ExpressionPlacement::Literal];
650+
assert_eq!(func.placement(&args), ExpressionPlacement::PlaceAtRoot);
651+
}
545652
}

datafusion/optimizer/src/optimize_projections/mod.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ use datafusion_common::{
3030
};
3131
use datafusion_expr::expr::Alias;
3232
use datafusion_expr::{
33-
Aggregate, Distinct, EmptyRelation, Expr, Projection, TableScan, Unnest, Window,
34-
logical_plan::LogicalPlan,
33+
Aggregate, Distinct, EmptyRelation, Expr, ExpressionPlacement, Projection, TableScan,
34+
Unnest, Window, logical_plan::LogicalPlan,
3535
};
3636

3737
use crate::optimize_projections::required_indices::RequiredIndices;
@@ -530,9 +530,11 @@ fn merge_consecutive_projections(proj: Projection) -> Result<Transformed<Project
530530
// For details, see: https://github.com/apache/datafusion/issues/8296
531531
if column_referral_map.into_iter().any(|(col, usage)| {
532532
usage > 1
533-
&& !is_expr_trivial(
534-
&prev_projection.expr
535-
[prev_projection.schema.index_of_column(col).unwrap()],
533+
&& matches!(
534+
prev_projection.expr
535+
[prev_projection.schema.index_of_column(col).unwrap()]
536+
.placement(),
537+
ExpressionPlacement::PlaceAtRoot
536538
)
537539
}) {
538540
// no change
@@ -586,11 +588,6 @@ fn merge_consecutive_projections(proj: Projection) -> Result<Transformed<Project
586588
}
587589
}
588590

589-
// Check whether `expr` is trivial; i.e. it doesn't imply any computation.
590-
fn is_expr_trivial(expr: &Expr) -> bool {
591-
matches!(expr, Expr::Column(_) | Expr::Literal(_, _))
592-
}
593-
594591
/// Rewrites a projection expression using the projection before it (i.e. its input)
595592
/// This is a subroutine to the `merge_consecutive_projections` function.
596593
///

0 commit comments

Comments
 (0)