Problem statement
The projection pushdown optimizer rule / implementations generally only push down a projection if it "narrows" a schema (i.e. has less output expressions than input expressions) and the output expressions are all columns or literals:
|
// If the projection does not narrow the schema, we should not try to push it down: |
|
if projection.expr().len() < projection.input().schema().fields().len() { |
|
// If the projection does not narrow the schema, we should not try to push it down. |
|
if projection.expr().len() >= projection.input().schema().fields().len() { |
|
return Ok(None); |
|
} |
|
|
|
// If pushdown is not beneficial or applicable, break it. |
|
if projection.benefits_from_input_partitioning()[0] |
|
|| !all_columns(projection.expr()) |
|
{ |
|
return Ok(None); |
|
} |
|
fn benefits_from_input_partitioning(&self) -> Vec<bool> { |
|
let all_simple_exprs = |
|
self.projector |
|
.projection() |
|
.as_ref() |
|
.iter() |
|
.all(|proj_expr| { |
|
proj_expr.expr.as_any().is::<Column>() |
|
|| proj_expr.expr.as_any().is::<Literal>() |
|
}); |
|
// If expressions are all either column_expr or Literal, then all computations in this projection are reorder or rename, |
|
// and projection would not benefit from the repartition, benefits_from_input_partitioning will return false. |
|
vec![!all_simple_exprs] |
|
} |
This is problematic with a plan like:
copy (
select 1 as id, named_struct('large_string_field', 'big text!', 'small_int_field', 2) as large_struct
)
TO 'struct.parquet';
create external table t stored as parquet location 'struct.parquet';
explain format indent
select large_struct['small_int_field'] * 2 from t where id = 1;
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: get_field(t.large_struct, Utf8("small_int_field")) * Int64(2) |
| | Filter: t.id = Int64(1) |
| | TableScan: t projection=[id, large_struct], partial_filters=[t.id = Int64(1)] |
| physical_plan | ProjectionExec: expr=[get_field(large_struct@0, small_int_field) * 2 as t.large_struct[small_int_field] * Int64(2)] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: id@0 = 1, projection=[large_struct@1] |
| | RepartitionExec: partitioning=RoundRobinBatch(12), input_partitions=1 |
| | DataSourceExec: file_groups={1 group: [[Users/adrian/GitHub/datafusion/struct.parquet]]}, projection=[id, large_struct], file_type=parquet, predicate=id@0 = 1 |
| | |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
The problem here is that get_field(large_struct@0, small_int_field) is not getting pushed down into Parquet, forcing the entire struct column (including the large_string_field) to get read into memory (pending #11745 the Parquet machinery will be able to push down struct field access).
The reasoning for the current status quo is that evaluating an expression has a computational cost, thus we certainly do not want to push it below filters, probably not even below a CoalesceBatchesExec so that the computation can be done on larger batches.
With RepartitionExec the logic is that computation benefits from re-partitioning, so it it makes sense to do it after a repartition.
However since the field access expression is essentially negative compute to evaluate in Parquet scan optimal query plan would look like:
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: get_field(t.large_struct, Utf8("small_int_field")) * Int64(2) |
| | Filter: t.id = Int64(1) |
| | TableScan: t projection=[id, large_struct], partial_filters=[t.id = Int64(1)] |
| physical_plan | ProjectionExec: expr=[large_struct[small_int_field] * 2 as t.large_struct[small_int_field] * Int64(2)] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: id@0 = 1, projection=[large_struct[small_int_field]@1] |
| | RepartitionExec: partitioning=RoundRobinBatch(12), input_partitions=1 |
| | DataSourceExec: file_groups={1 group: [[Users/adrian/GitHub/datafusion/struct.parquet]]}, projection=[id, large_struct[small_int_field]], file_type=parquet, predicate=id@0 = 1 |
| | |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
(Essentially push large_struct[small_int_field] down into the scan and pass it up as a column).
Solutions considered
Giving expressions the ability to determine their cost relative to a column selection
The idea would be to add a method to PhysicalExpr and ScalarFunctionUDF along the lines of should_push_down() -> bool or something. This would default to false and we set it to true only for Column and get_field(). We could also make it is_column_field_access() -> bool or column_field_path() -> Option<(Column, Vec<FieldPath>)> which would be closer to the field path proposal below.
I think we will also need to add functionality to "split" a ProjectionExec into two get_field(large_struct, small_int_field) * 2 -> col(0) * 2 (retained) and get_field(large_struct, small_int_field) (pushed down). I think this could be added as a method on ProjectionExec or ProjectionExprs.
Cost based pushdown / optimizer
This is a generalization of should_push_down(), consider something like pushdown_cost() -> f64.
A full blown cost based optimizer would probably handle things this way, but given that there is not a cost based optimizer in DataFusion and no long term plan for adding one this seems like unnecessary complexity at this point.
First class field path
Currently we have a Column expression that does not handle sub-fields/selections at all.
We could either expand Column to have some sort of path: Option<Vec<String>> (maybe Vec<FieldPath>?).
This is enticing if it made the system in general more aware of nested structures and if it could represent field access for both structs and json/variant. But it feels like that last point especially might be a bit sticky: do we want to be able to represent struct[list_field][0]? If not do we end up with a mix of both worlds i.e. get_field(column_path(struct, [list_field]), 0)?
There is also the issue that for structs the structure of which is known ahead of time, but for variant it can very row by row i.e. can't be validated.
Finally each one of these different path accesses is evaluated differently, both in how they are optimized at the scan level and how they are evaluated against in-memory data. So having Vec<FieldPath> is not enough. We'd need to retain a reference to how to evaluate that path as well (i.e. a reference to get_field or similar).
So the structure would end up looking something like:
enum FieldPath {
Field(String),
Index(usize)
}
struct ColumnPath {
column: Column,
path: FieldPath,
evaluate: Arc<dyn FieldPathEvaluator>, // ?
}
And we would have to have an optimizer rule that converts get_field or variant_get into these sorts of expressions.
Problem statement
The projection pushdown optimizer rule / implementations generally only push down a projection if it "narrows" a schema (i.e. has less output expressions than input expressions) and the output expressions are all columns or literals:
datafusion/datafusion/physical-plan/src/filter.rs
Lines 470 to 471 in d68b629
datafusion/datafusion/physical-plan/src/repartition/mod.rs
Lines 1045 to 1055 in d68b629
datafusion/datafusion/physical-plan/src/projection.rs
Lines 255 to 268 in d68b629
This is problematic with a plan like:
The problem here is that
get_field(large_struct@0, small_int_field)is not getting pushed down into Parquet, forcing the entire struct column (including thelarge_string_field) to get read into memory (pending #11745 the Parquet machinery will be able to push down struct field access).The reasoning for the current status quo is that evaluating an expression has a computational cost, thus we certainly do not want to push it below filters, probably not even below a
CoalesceBatchesExecso that the computation can be done on larger batches.With
RepartitionExecthe logic is that computation benefits from re-partitioning, so it it makes sense to do it after a repartition.However since the field access expression is essentially negative compute to evaluate in Parquet scan optimal query plan would look like:
(Essentially push
large_struct[small_int_field]down into the scan and pass it up as a column).Solutions considered
Giving expressions the ability to determine their cost relative to a column selection
The idea would be to add a method to
PhysicalExprandScalarFunctionUDFalong the lines ofshould_push_down() -> boolor something. This would default tofalseand we set it totrueonly forColumnandget_field(). We could also make itis_column_field_access() -> boolorcolumn_field_path() -> Option<(Column, Vec<FieldPath>)>which would be closer to the field path proposal below.I think we will also need to add functionality to "split" a
ProjectionExecinto twoget_field(large_struct, small_int_field) * 2->col(0) * 2(retained) andget_field(large_struct, small_int_field)(pushed down). I think this could be added as a method onProjectionExecorProjectionExprs.Cost based pushdown / optimizer
This is a generalization of
should_push_down(), consider something likepushdown_cost() -> f64.A full blown cost based optimizer would probably handle things this way, but given that there is not a cost based optimizer in DataFusion and no long term plan for adding one this seems like unnecessary complexity at this point.
First class field path
Currently we have a
Columnexpression that does not handle sub-fields/selections at all.We could either expand
Columnto have some sort ofpath: Option<Vec<String>>(maybeVec<FieldPath>?).This is enticing if it made the system in general more aware of nested structures and if it could represent field access for both structs and json/variant. But it feels like that last point especially might be a bit sticky: do we want to be able to represent
struct[list_field][0]? If not do we end up with a mix of both worlds i.e.get_field(column_path(struct, [list_field]), 0)?There is also the issue that for structs the structure of which is known ahead of time, but for variant it can very row by row i.e. can't be validated.
Finally each one of these different path accesses is evaluated differently, both in how they are optimized at the scan level and how they are evaluated against in-memory data. So having
Vec<FieldPath>is not enough. We'd need to retain a reference to how to evaluate that path as well (i.e. a reference toget_fieldor similar).So the structure would end up looking something like:
And we would have to have an optimizer rule that converts
get_fieldorvariant_getinto these sorts of expressions.