Skip to content

Commit ec130f7

Browse files
HuSen8891gandronchik
authored andcommitted
Eliminate multi limit-offset nodes to emptyRelation (apache#2823)
* eliminate multi limit-offset * refine the code * add more test cases
1 parent cc6b297 commit ec130f7

1 file changed

Lines changed: 206 additions & 27 deletions

File tree

datafusion/core/src/optimizer/eliminate_limit.rs

Lines changed: 206 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,20 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! Optimizer rule to replace `LIMIT 0` on a plan with an empty relation.
18+
//! Optimizer rule to replace `LIMIT 0` or
19+
//! `LIMIT whose ancestor LIMIT's skip is greater than or equal to current's fetch`
20+
//! on a plan with an empty relation.
1921
//! This saves time in planning and executing the query.
2022
use crate::error::Result;
2123
use crate::logical_plan::{EmptyRelation, Limit, LogicalPlan};
2224
use crate::optimizer::optimizer::OptimizerRule;
2325

24-
use super::utils;
26+
use super::utils::from_plan;
2527
use crate::optimizer::optimizer::OptimizerConfig;
2628

27-
/// Optimization rule that replaces LIMIT 0 with an [LogicalPlan::EmptyRelation]
29+
/// Optimization rule that replaces LIMIT 0 or
30+
/// LIMIT whose ancestor LIMIT's skip is greater than or equal to current's fetch
31+
/// with an [LogicalPlan::EmptyRelation]
2832
#[derive(Default)]
2933
pub struct EliminateLimit;
3034

@@ -35,35 +39,98 @@ impl EliminateLimit {
3539
}
3640
}
3741

42+
/// Ancestor indicates the current ancestor in the LogicalPlan tree
43+
/// when traversing down related to "eliminate limit".
44+
enum Ancestor {
45+
/// Limit
46+
FromLimit { skip: Option<usize> },
47+
/// Other nodes that don't affect the adjustment of "Limit"
48+
NotRelevant,
49+
}
50+
51+
/// replaces LIMIT 0 with an [LogicalPlan::EmptyRelation]
52+
/// replaces LIMIT node whose ancestor LIMIT's skip is greater than or equal to current's fetch
53+
/// with an [LogicalPlan::EmptyRelation]
54+
fn eliminate_limit(
55+
_optimizer: &EliminateLimit,
56+
ancestor: &Ancestor,
57+
plan: &LogicalPlan,
58+
_optimizer_config: &OptimizerConfig,
59+
) -> Result<LogicalPlan> {
60+
match plan {
61+
LogicalPlan::Limit(Limit {
62+
skip, fetch, input, ..
63+
}) => {
64+
let ancestor_skip = match ancestor {
65+
Ancestor::FromLimit { skip, .. } => skip.unwrap_or(0),
66+
_ => 0,
67+
};
68+
// If ancestor's skip is equal or greater than current's fetch,
69+
// replaces with an [LogicalPlan::EmptyRelation].
70+
// For such query, the inner query(select * from xxx limit 5) should be optimized as an EmptyRelation:
71+
// select * from (select * from xxx limit 5) a limit 2 offset 5;
72+
match fetch {
73+
Some(fetch) => {
74+
if *fetch == 0 || ancestor_skip >= *fetch {
75+
return Ok(LogicalPlan::EmptyRelation(EmptyRelation {
76+
produce_one_row: false,
77+
schema: input.schema().clone(),
78+
}));
79+
}
80+
}
81+
None => {}
82+
}
83+
84+
let expr = plan.expressions();
85+
86+
// apply the optimization to all inputs of the plan
87+
let inputs = plan.inputs();
88+
let new_inputs = inputs
89+
.iter()
90+
.map(|plan| {
91+
eliminate_limit(
92+
_optimizer,
93+
&Ancestor::FromLimit { skip: *skip },
94+
plan,
95+
_optimizer_config,
96+
)
97+
})
98+
.collect::<Result<Vec<_>>>()?;
99+
100+
from_plan(plan, &expr, &new_inputs)
101+
}
102+
// Rest: recurse and find possible LIMIT 0/Multi LIMIT OFFSET nodes
103+
_ => {
104+
// For those plans(projection/sort/..) which do not affect the output rows of sub-plans, we still use ancestor;
105+
// otherwise, use NotRelevant instead.
106+
let ancestor = match plan {
107+
LogicalPlan::Projection { .. } | LogicalPlan::Sort { .. } => ancestor,
108+
_ => &Ancestor::NotRelevant,
109+
};
110+
111+
let expr = plan.expressions();
112+
113+
// apply the optimization to all inputs of the plan
114+
let inputs = plan.inputs();
115+
let new_inputs = inputs
116+
.iter()
117+
.map(|plan| {
118+
eliminate_limit(_optimizer, ancestor, plan, _optimizer_config)
119+
})
120+
.collect::<Result<Vec<_>>>()?;
121+
122+
from_plan(plan, &expr, &new_inputs)
123+
}
124+
}
125+
}
126+
38127
impl OptimizerRule for EliminateLimit {
39128
fn optimize(
40129
&self,
41130
plan: &LogicalPlan,
42131
optimizer_config: &OptimizerConfig,
43132
) -> Result<LogicalPlan> {
44-
match plan {
45-
LogicalPlan::Limit(Limit {
46-
fetch: Some(0),
47-
input,
48-
..
49-
}) => Ok(LogicalPlan::EmptyRelation(EmptyRelation {
50-
produce_one_row: false,
51-
schema: input.schema().clone(),
52-
})),
53-
// Rest: recurse and find possible LIMIT 0 nodes
54-
_ => {
55-
let expr = plan.expressions();
56-
57-
// apply the optimization to all inputs of the plan
58-
let inputs = plan.inputs();
59-
let new_inputs = inputs
60-
.iter()
61-
.map(|plan| self.optimize(plan, optimizer_config))
62-
.collect::<Result<Vec<_>>>()?;
63-
64-
utils::from_plan(plan, &expr, &new_inputs)
65-
}
66-
}
133+
eliminate_limit(self, &Ancestor::NotRelevant, plan, optimizer_config)
67134
}
68135

69136
fn name(&self) -> &str {
@@ -74,9 +141,10 @@ impl OptimizerRule for EliminateLimit {
74141
#[cfg(test)]
75142
mod tests {
76143
use super::*;
77-
use crate::logical_plan::LogicalPlanBuilder;
78144
use crate::logical_plan::{col, sum};
145+
use crate::logical_plan::{JoinType, LogicalPlanBuilder};
79146
use crate::test::*;
147+
use datafusion_common::Column;
80148

81149
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
82150
let rule = EliminateLimit::new();
@@ -129,4 +197,115 @@ mod tests {
129197
\n TableScan: test projection=None";
130198
assert_optimized_plan_eq(&plan, expected);
131199
}
200+
201+
#[test]
202+
fn limit_fetch_with_ancestor_limit_skip() {
203+
let table_scan = test_table_scan().unwrap();
204+
let plan = LogicalPlanBuilder::from(table_scan)
205+
.aggregate(vec![col("a")], vec![sum(col("b"))])
206+
.unwrap()
207+
.limit(None, Some(2))
208+
.unwrap()
209+
.limit(Some(2), None)
210+
.unwrap()
211+
.build()
212+
.unwrap();
213+
214+
// No aggregate / scan / limit
215+
let expected = "Limit: skip=2, fetch=None\
216+
\n EmptyRelation";
217+
assert_optimized_plan_eq(&plan, expected);
218+
}
219+
220+
#[test]
221+
fn multi_limit_offset_sort_eliminate() {
222+
let table_scan = test_table_scan().unwrap();
223+
let plan = LogicalPlanBuilder::from(table_scan)
224+
.aggregate(vec![col("a")], vec![sum(col("b"))])
225+
.unwrap()
226+
.limit(None, Some(2))
227+
.unwrap()
228+
.sort(vec![col("a")])
229+
.unwrap()
230+
.limit(Some(2), Some(1))
231+
.unwrap()
232+
.build()
233+
.unwrap();
234+
235+
let expected = "Limit: skip=2, fetch=1\
236+
\n Sort: #test.a\
237+
\n EmptyRelation";
238+
assert_optimized_plan_eq(&plan, expected);
239+
}
240+
241+
#[test]
242+
fn limit_fetch_with_ancestor_limit_fetch() {
243+
let table_scan = test_table_scan().unwrap();
244+
let plan = LogicalPlanBuilder::from(table_scan)
245+
.aggregate(vec![col("a")], vec![sum(col("b"))])
246+
.unwrap()
247+
.limit(None, Some(2))
248+
.unwrap()
249+
.sort(vec![col("a")])
250+
.unwrap()
251+
.limit(None, Some(1))
252+
.unwrap()
253+
.build()
254+
.unwrap();
255+
256+
let expected = "Limit: skip=None, fetch=1\
257+
\n Sort: #test.a\
258+
\n Limit: skip=None, fetch=2\
259+
\n Aggregate: groupBy=[[#test.a]], aggr=[[SUM(#test.b)]]\
260+
\n TableScan: test projection=None";
261+
assert_optimized_plan_eq(&plan, expected);
262+
}
263+
264+
#[test]
265+
fn limit_with_ancestor_limit() {
266+
let table_scan = test_table_scan().unwrap();
267+
let plan = LogicalPlanBuilder::from(table_scan)
268+
.aggregate(vec![col("a")], vec![sum(col("b"))])
269+
.unwrap()
270+
.limit(Some(2), Some(1))
271+
.unwrap()
272+
.sort(vec![col("a")])
273+
.unwrap()
274+
.limit(Some(3), Some(1))
275+
.unwrap()
276+
.build()
277+
.unwrap();
278+
279+
let expected = "Limit: skip=3, fetch=1\
280+
\n Sort: #test.a\
281+
\n EmptyRelation";
282+
assert_optimized_plan_eq(&plan, expected);
283+
}
284+
285+
#[test]
286+
fn limit_join_with_ancestor_limit() {
287+
let table_scan = test_table_scan().unwrap();
288+
let table_scan_inner = test_table_scan_with_name("test1").unwrap();
289+
let plan = LogicalPlanBuilder::from(table_scan)
290+
.limit(Some(2), Some(1))
291+
.unwrap()
292+
.join_using(
293+
&table_scan_inner,
294+
JoinType::Inner,
295+
vec![Column::from_name("a".to_string())],
296+
)
297+
.unwrap()
298+
.limit(Some(3), Some(1))
299+
.unwrap()
300+
.build()
301+
.unwrap();
302+
303+
let expected = "Limit: skip=3, fetch=1\
304+
\n Inner Join: Using #test.a = #test1.a\
305+
\n Limit: skip=2, fetch=1\
306+
\n TableScan: test projection=None\
307+
\n TableScan: test1 projection=None";
308+
309+
assert_optimized_plan_eq(&plan, expected);
310+
}
132311
}

0 commit comments

Comments
 (0)