forked from apache/datafusion
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathoptimizer.rs
More file actions
258 lines (240 loc) · 13.3 KB
/
optimizer.rs
File metadata and controls
258 lines (240 loc) · 13.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Physical optimizer traits
use std::fmt::Debug;
use std::sync::Arc;
use crate::aggregate_statistics::AggregateStatistics;
use crate::combine_partial_final_agg::CombinePartialFinalAggregate;
use crate::emit_partial_aggregate_hash::EmitPartialAggregateHash;
use crate::enforce_distribution::EnforceDistribution;
use crate::enforce_sorting::EnforceSorting;
use crate::ensure_coop::EnsureCooperative;
use crate::filter_pushdown::FilterPushdown;
use crate::join_selection::JoinSelection;
use crate::limit_pushdown::LimitPushdown;
use crate::limited_distinct_aggregation::LimitedDistinctAggregation;
use crate::output_requirements::OutputRequirements;
use crate::projection_pushdown::ProjectionPushdown;
use crate::sanity_checker::SanityCheckPlan;
use crate::topk_aggregation::TopKAggregation;
use crate::topk_repartition::TopKRepartition;
use crate::update_aggr_exprs::OptimizeAggregateOrder;
use crate::hash_join_buffering::HashJoinBuffering;
use crate::limit_pushdown_past_window::LimitPushPastWindows;
use crate::pushdown_sort::PushdownSort;
use crate::window_topn::WindowTopN;
use datafusion_common::Result;
use datafusion_common::config::ConfigOptions;
use datafusion_physical_plan::ExecutionPlan;
use datafusion_physical_plan::operator_statistics::StatisticsRegistry;
/// Context available to physical optimizer rules.
///
/// This trait provides access to configuration options and optional statistics
/// registry for enhanced statistics lookup. It allows optimizer rules to access
/// extended context without changing the core [`PhysicalOptimizerRule::optimize`]
/// signature.
pub trait PhysicalOptimizerContext: Send + Sync {
/// Returns the configuration options.
fn config_options(&self) -> &ConfigOptions;
/// Returns the statistics registry for enhanced statistics lookup.
///
/// Returns `None` if no registry is configured, in which case rules
/// should fall back to using `ExecutionPlan::partition_statistics()`.
fn statistics_registry(&self) -> Option<&StatisticsRegistry> {
None
}
}
/// Simple context wrapping [`ConfigOptions`] for backward compatibility.
///
/// This struct provides a minimal implementation of [`PhysicalOptimizerContext`]
/// that only supplies configuration options. Used when no statistics registry
/// is available or needed.
pub struct ConfigOnlyContext<'a> {
config: &'a ConfigOptions,
}
impl<'a> ConfigOnlyContext<'a> {
/// Create a new context wrapping the given config options.
pub fn new(config: &'a ConfigOptions) -> Self {
Self { config }
}
}
impl PhysicalOptimizerContext for ConfigOnlyContext<'_> {
fn config_options(&self) -> &ConfigOptions {
self.config
}
}
/// `PhysicalOptimizerRule` transforms one ['ExecutionPlan'] into another which
/// computes the same results, but in a potentially more efficient way.
///
/// Use [`SessionState::add_physical_optimizer_rule`] to register additional
/// `PhysicalOptimizerRule`s.
///
/// [`SessionState::add_physical_optimizer_rule`]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionState.html#method.add_physical_optimizer_rule
pub trait PhysicalOptimizerRule: Debug + std::any::Any {
/// Rewrite `plan` to an optimized form.
///
/// This is the primary optimization method. For rules that need access to
/// the statistics registry, override [`optimize_with_context`](Self::optimize_with_context) instead.
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>>;
/// Rewrite `plan` with access to extended context (statistics registry, etc.).
///
/// Override this method if you need access to the statistics registry for
/// enhanced statistics lookup. The default implementation simply calls
/// [`optimize`](Self::optimize) with the config options from the context.
fn optimize_with_context(
&self,
plan: Arc<dyn ExecutionPlan>,
context: &dyn PhysicalOptimizerContext,
) -> Result<Arc<dyn ExecutionPlan>> {
self.optimize(plan, context.config_options())
}
/// A human readable name for this optimizer rule
fn name(&self) -> &str;
/// A flag to indicate whether the physical planner should validate that the rule will not
/// change the schema of the plan after the rewriting.
/// Some of the optimization rules might change the nullable properties of the schema
/// and should disable the schema check.
fn schema_check(&self) -> bool;
}
/// A rule-based physical optimizer.
#[derive(Clone, Debug)]
pub struct PhysicalOptimizer {
/// All rules to apply
pub rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
}
impl Default for PhysicalOptimizer {
fn default() -> Self {
Self::new()
}
}
impl PhysicalOptimizer {
/// Create a new optimizer using the recommended list of rules
pub fn new() -> Self {
// NOTEs:
// - The order of rules in this list is important, as it determines the
// order in which they are applied.
// - Adding a new rule here is expensive as it will be applied to all
// queries, and will likely increase the optimization time. Please extend
// existing rules when possible, rather than adding a new rule.
let rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
// If there is a output requirement of the query, make sure that
// this information is not lost across different rules during optimization.
Arc::new(OutputRequirements::new_add_mode()),
Arc::new(AggregateStatistics::new()),
// Statistics-based join selection will change the Auto mode to a real join implementation,
// like collect left, or hash join, or future sort merge join, which will influence the
// EnforceDistribution and EnforceSorting rules as they decide whether to add additional
// repartitioning and local sorting steps to meet distribution and ordering requirements.
// Therefore, it should run before EnforceDistribution and EnforceSorting.
Arc::new(JoinSelection::new()),
// The LimitedDistinctAggregation rule should be applied before the EnforceDistribution rule,
// as that rule may inject other operations in between the different AggregateExecs.
// Applying the rule early means only directly-connected AggregateExecs must be examined.
Arc::new(LimitedDistinctAggregation::new()),
// The FilterPushdown rule tries to push down filters as far as it can.
// For example, it will push down filtering from a `FilterExec` to `DataSourceExec`.
// Note that this does not push down dynamic filters (such as those created by a `SortExec` operator in TopK mode),
// those are handled by the later `FilterPushdown` rule.
// See `FilterPushdownPhase` for more details.
Arc::new(FilterPushdown::new()),
// The EnforceDistribution rule is for adding essential repartitioning to satisfy distribution
// requirements. Please make sure that the whole plan tree is determined before this rule.
// This rule increases parallelism if doing so is beneficial to the physical plan; i.e. at
// least one of the operators in the plan benefits from increased parallelism.
Arc::new(EnforceDistribution::new()),
// The CombinePartialFinalAggregate rule should be applied after the EnforceDistribution rule
Arc::new(CombinePartialFinalAggregate::new()),
// The EnforceSorting rule is for adding essential local sorting to satisfy the required
// ordering. Please make sure that the whole plan tree is determined before this rule.
// Note that one should always run this rule after running the EnforceDistribution rule
// as the latter may break local sorting requirements.
Arc::new(EnforceSorting::new()),
// Run once after the local sorting requirement is changed
Arc::new(OptimizeAggregateOrder::new()),
// WindowTopN: replaces Filter(rn<=K) → Window(ROW_NUMBER) → Sort
// with Window(ROW_NUMBER) → PartitionedTopKExec(fetch=K).
// Must run after EnforceSorting (which inserts SortExec) and before
// ProjectionPushdown (which embeds projections into FilterExec).
Arc::new(WindowTopN::new()),
// TODO: `try_embed_to_hash_join` in the ProjectionPushdown rule would be block by the CoalesceBatches, so add it before CoalesceBatches. Maybe optimize it in the future.
Arc::new(ProjectionPushdown::new()),
// Remove the ancillary output requirement operator since we are done with the planning
// phase.
Arc::new(OutputRequirements::new_remove_mode()),
// The aggregation limiter will try to find situations where the accumulator count
// is not tied to the cardinality, i.e. when the output of the aggregation is passed
// into an `order by max(x) limit y`. In this case it will copy the limit value down
// to the aggregation, allowing it to use only y number of accumulators.
Arc::new(TopKAggregation::new()),
// Tries to push limits down through window functions, growing as appropriate
// This can possibly be combined with [LimitPushdown]
// It needs to come after [EnforceSorting]
Arc::new(LimitPushPastWindows::new()),
// The HashJoinBuffering rule adds a BufferExec node with the configured capacity
// in the prob side of hash joins. That way, the probe side gets eagerly polled before
// the build side is completely finished.
Arc::new(HashJoinBuffering::new()),
// The LimitPushdown rule tries to push limits down as far as possible,
// replacing operators with fetching variants, or adding limits
// past operators that support limit pushdown.
Arc::new(LimitPushdown::new()),
// TopKRepartition pushes TopK (Sort with fetch) below Hash
// repartition when the partition key is a prefix of the sort key.
// This reduces data volume before a hash shuffle. It must run
// after LimitPushdown so that the TopK already exists on the SortExec.
Arc::new(TopKRepartition::new()),
// The ProjectionPushdown rule tries to push projections towards
// the sources in the execution plan. As a result of this process,
// a projection can disappear if it reaches the source providers, and
// sequential projections can merge into one. Even if these two cases
// are not present, the load of executors such as join or union will be
// reduced by narrowing their input tables.
Arc::new(ProjectionPushdown::new()),
// PushdownSort: Detect sorts that can be pushed down to data sources.
Arc::new(PushdownSort::new()),
// Enable the precomputed-hash output on any Partial AggregateExec
// whose consumer is a Hash RepartitionExec over the same group
// columns. Runs after CombinePartialFinalAggregate (which may fuse
// Partial+Final into Single) and ProjectionPushdown (which settles
// the group-column positions).
Arc::new(EmitPartialAggregateHash::new()),
Arc::new(EnsureCooperative::new()),
// This FilterPushdown handles dynamic filters that may have references to the source ExecutionPlan.
// Therefore, it should be run at the end of the optimization process since any changes to the plan may break the dynamic filter's references.
// See `FilterPushdownPhase` for more details.
Arc::new(FilterPushdown::new_post_optimization()),
// The SanityCheckPlan rule checks whether the order and
// distribution requirements of each node in the plan
// is satisfied. It will also reject non-runnable query
// plans that use pipeline-breaking operators on infinite
// input(s). The rule generates a diagnostic error
// message for invalid plans. It makes no changes to the
// given query plan; i.e. it only acts as a final
// gatekeeping rule.
Arc::new(SanityCheckPlan::new()),
];
Self::with_rules(rules)
}
/// Create a new optimizer with the given rules
pub fn with_rules(rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>) -> Self {
Self { rules }
}
}