Skip to content

Commit 4b87cfd

Browse files
nuno-fariaxudong963
authored andcommitted
perf: Push down join key filters for LEFT/RIGHT/ANTI joins (apache#19918)
- Closes apache#19917. Reduce the number of rows retrieved by pushing down more filters when possible. Example: ```sql create table t1 (k int, v int); create table t2 (k int, v int); -- k=1 is pushed to t1 and t2 explain select * from t1 left join t2 on t1.k = t2.k where t1.k = 1; +---------------+------------------------------------------------------------+ | plan_type | plan | +---------------+------------------------------------------------------------+ | physical_plan | ┌───────────────────────────┐ | | | │ HashJoinExec │ | | | │ -------------------- │ | | | │ join_type: Left ├──────────────┐ | | | │ on: (k = k) │ │ | | | └─────────────┬─────────────┘ │ | | | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ | | | │ RepartitionExec ││ RepartitionExec │ | | | │ -------------------- ││ -------------------- │ | | | │ partition_count(in->out): ││ partition_count(in->out): │ | | | │ 1 -> 12 ││ 1 -> 12 │ | | | │ ││ │ | | | │ partitioning_scheme: ││ partitioning_scheme: │ | | | │ Hash([k@0], 12) ││ Hash([k@0], 12) │ | | | └─────────────┬─────────────┘└─────────────┬─────────────┘ | | | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ | | | │ FilterExec ││ FilterExec │ | | | │ -------------------- ││ -------------------- │ | | | │ predicate: k = 1 ││ predicate: k = 1 │ | | | └─────────────┬─────────────┘└─────────────┬─────────────┘ | | | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ | | | │ DataSourceExec ││ DataSourceExec │ | | | │ -------------------- ││ -------------------- │ | | | │ bytes: 0 ││ bytes: 0 │ | | | │ format: memory ││ format: memory │ | | | │ rows: 0 ││ rows: 0 │ | | | └───────────────────────────┘└───────────────────────────┘ | | | | +---------------+------------------------------------------------------------+ ``` - Changed `push_down_all_join` to push down inferred predicates independently of `left_preserved`/`right_preserved` semantics. - Added unit tests. Yes. No. --------- Co-authored-by: xudong.w <wxd963996380@gmail.com>
1 parent 9522508 commit 4b87cfd

4 files changed

Lines changed: 378 additions & 49 deletions

File tree

datafusion/optimizer/src/push_down_filter.rs

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -452,11 +452,11 @@ fn push_down_all_join(
452452
}
453453
}
454454

455-
// For infer predicates, if they can not push through join, just drop them
455+
// Push predicates inferred from the join expression
456456
for predicate in inferred_join_predicates {
457-
if left_preserved && checker.is_left_only(&predicate) {
457+
if checker.is_left_only(&predicate) {
458458
left_push.push(predicate);
459-
} else if right_preserved && checker.is_right_only(&predicate) {
459+
} else if checker.is_right_only(&predicate) {
460460
right_push.push(predicate);
461461
}
462462
}
@@ -2788,8 +2788,7 @@ mod tests {
27882788
)
27892789
}
27902790

2791-
/// post-left-join predicate on a column common to both sides is only pushed to the left side
2792-
/// i.e. - not duplicated to the right side
2791+
/// post-left-join predicate on a column common to both sides is pushed to both sides
27932792
#[test]
27942793
fn filter_using_left_join_on_common() -> Result<()> {
27952794
let table_scan = test_table_scan()?;
@@ -2817,20 +2816,19 @@ mod tests {
28172816
TableScan: test2
28182817
",
28192818
);
2820-
// filter sent to left side of the join, not the right
2819+
// filter sent to left side of the join and to the right
28212820
assert_optimized_plan_equal!(
28222821
plan,
28232822
@r"
28242823
Left Join: Using test.a = test2.a
28252824
TableScan: test, full_filters=[test.a <= Int64(1)]
28262825
Projection: test2.a
2827-
TableScan: test2
2826+
TableScan: test2, full_filters=[test2.a <= Int64(1)]
28282827
"
28292828
)
28302829
}
28312830

2832-
/// post-right-join predicate on a column common to both sides is only pushed to the right side
2833-
/// i.e. - not duplicated to the left side.
2831+
/// post-right-join predicate on a column common to both sides is pushed to both sides
28342832
#[test]
28352833
fn filter_using_right_join_on_common() -> Result<()> {
28362834
let table_scan = test_table_scan()?;
@@ -2858,12 +2856,12 @@ mod tests {
28582856
TableScan: test2
28592857
",
28602858
);
2861-
// filter sent to right side of join, not duplicated to the left
2859+
// filter sent to right side of join, sent to the left as well
28622860
assert_optimized_plan_equal!(
28632861
plan,
28642862
@r"
28652863
Right Join: Using test.a = test2.a
2866-
TableScan: test
2864+
TableScan: test, full_filters=[test.a <= Int64(1)]
28672865
Projection: test2.a
28682866
TableScan: test2, full_filters=[test2.a <= Int64(1)]
28692867
"
@@ -3045,7 +3043,7 @@ mod tests {
30453043
Projection: test.a, test.b, test.c
30463044
TableScan: test
30473045
Projection: test2.a, test2.b, test2.c
3048-
TableScan: test2, full_filters=[test2.c > UInt32(4)]
3046+
TableScan: test2, full_filters=[test2.a > UInt32(1), test2.c > UInt32(4)]
30493047
"
30503048
)
30513049
}

datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -89,20 +89,22 @@ logical_plan
8989
02)--Projection: t2.a AS a2, t2.b
9090
03)----RightSemi Join: t1.d = t2.d, t1.c = t2.c
9191
04)------SubqueryAlias: t1
92-
05)--------TableScan: annotated_data projection=[c, d]
93-
06)------SubqueryAlias: t2
94-
07)--------Filter: annotated_data.d = Int32(3)
95-
08)----------TableScan: annotated_data projection=[a, b, c, d], partial_filters=[annotated_data.d = Int32(3)]
92+
05)--------Filter: annotated_data.d = Int32(3)
93+
06)----------TableScan: annotated_data projection=[c, d], partial_filters=[annotated_data.d = Int32(3)]
94+
07)------SubqueryAlias: t2
95+
08)--------Filter: annotated_data.d = Int32(3)
96+
09)----------TableScan: annotated_data projection=[a, b, c, d], partial_filters=[annotated_data.d = Int32(3)]
9697
physical_plan
9798
01)SortPreservingMergeExec: [a2@0 ASC NULLS LAST, b@1 ASC NULLS LAST], fetch=10
9899
02)--ProjectionExec: expr=[a@0 as a2, b@1 as b]
99-
03)----CoalesceBatchesExec: target_batch_size=8192, fetch=10
100-
04)------HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(d@1, d@3), (c@0, c@2)], projection=[a@0, b@1]
101-
05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c, d], file_type=csv, has_header=true
102-
06)--------CoalesceBatchesExec: target_batch_size=8192
103-
07)----------FilterExec: d@3 = 3
104-
08)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
105-
09)--------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], file_type=csv, has_header=true
100+
03)----HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(d@1, d@3), (c@0, c@2)], projection=[a@0, b@1]
101+
04)------CoalescePartitionsExec
102+
05)--------FilterExec: d@1 = 3
103+
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
104+
07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c, d], file_type=csv, has_header=true
105+
08)------FilterExec: d@3 = 3
106+
09)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
107+
10)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], file_type=csv, has_header=true
106108

107109
# preserve_right_semi_join
108110
query II nosort

datafusion/sqllogictest/test_files/joins.slt

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2106,7 +2106,7 @@ SELECT join_t1.t1_id, join_t2.t2_id
21062106
FROM (select t1_id from join_t1 where join_t1.t1_id > 22) as join_t1
21072107
RIGHT JOIN (select t2_id from join_t2 where join_t2.t2_id > 11) as join_t2
21082108
ON join_t1.t1_id < join_t2.t2_id
2109-
ORDER BY 1, 2
2109+
ORDER BY 1, 2
21102110
----
21112111
33 44
21122112
33 55
@@ -4025,7 +4025,7 @@ query TT
40254025
explain select t1_id, t1_name, i from join_t1 t1 cross join lateral (select * from unnest(generate_series(1, t1_int))) as series(i);
40264026
----
40274027
logical_plan
4028-
01)Cross Join:
4028+
01)Cross Join:
40294029
02)--SubqueryAlias: t1
40304030
03)----TableScan: join_t1 projection=[t1_id, t1_name]
40314031
04)--SubqueryAlias: series
@@ -4617,7 +4617,7 @@ query TT
46174617
explain SELECT * FROM person a NATURAL JOIN lineitem b;
46184618
----
46194619
logical_plan
4620-
01)Cross Join:
4620+
01)Cross Join:
46214621
02)--SubqueryAlias: a
46224622
03)----TableScan: person projection=[id, age, state]
46234623
04)--SubqueryAlias: b
@@ -4664,7 +4664,7 @@ query TT
46644664
explain SELECT j1_string, j2_string FROM j1, LATERAL (SELECT * FROM j2 WHERE j1_id < j2_id) AS j2;
46654665
----
46664666
logical_plan
4667-
01)Cross Join:
4667+
01)Cross Join:
46684668
02)--TableScan: j1 projection=[j1_string]
46694669
03)--SubqueryAlias: j2
46704670
04)----Projection: j2.j2_string
@@ -4677,7 +4677,7 @@ query TT
46774677
explain SELECT * FROM j1 JOIN (j2 JOIN j3 ON(j2_id = j3_id - 2)) ON(j1_id = j2_id), LATERAL (SELECT * FROM j3 WHERE j3_string = j2_string) as j4
46784678
----
46794679
logical_plan
4680-
01)Cross Join:
4680+
01)Cross Join:
46814681
02)--Inner Join: CAST(j2.j2_id AS Int64) = CAST(j3.j3_id AS Int64) - Int64(2)
46824682
03)----Inner Join: j1.j1_id = j2.j2_id
46834683
04)------TableScan: j1 projection=[j1_string, j1_id]
@@ -4693,11 +4693,11 @@ query TT
46934693
explain SELECT * FROM j1, LATERAL (SELECT * FROM j1, LATERAL (SELECT * FROM j2 WHERE j1_id = j2_id) as j2) as j2;
46944694
----
46954695
logical_plan
4696-
01)Cross Join:
4696+
01)Cross Join:
46974697
02)--TableScan: j1 projection=[j1_string, j1_id]
46984698
03)--SubqueryAlias: j2
46994699
04)----Subquery:
4700-
05)------Cross Join:
4700+
05)------Cross Join:
47014701
06)--------TableScan: j1 projection=[j1_string, j1_id]
47024702
07)--------SubqueryAlias: j2
47034703
08)----------Subquery:
@@ -4709,7 +4709,7 @@ query TT
47094709
explain SELECT j1_string, j2_string FROM j1 LEFT JOIN LATERAL (SELECT * FROM j2 WHERE j1_id < j2_id) AS j2 ON(true);
47104710
----
47114711
logical_plan
4712-
01)Left Join:
4712+
01)Left Join:
47134713
02)--TableScan: j1 projection=[j1_string]
47144714
03)--SubqueryAlias: j2
47154715
04)----Projection: j2.j2_string
@@ -4722,9 +4722,9 @@ query TT
47224722
explain SELECT * FROM j1, (j2 LEFT JOIN LATERAL (SELECT * FROM j3 WHERE j1_id + j2_id = j3_id) AS j3 ON(true));
47234723
----
47244724
logical_plan
4725-
01)Cross Join:
4725+
01)Cross Join:
47264726
02)--TableScan: j1 projection=[j1_string, j1_id]
4727-
03)--Left Join:
4727+
03)--Left Join:
47284728
04)----TableScan: j2 projection=[j2_string, j2_id]
47294729
05)----SubqueryAlias: j3
47304730
06)------Subquery:
@@ -4736,7 +4736,7 @@ query TT
47364736
explain SELECT * FROM j1, LATERAL (SELECT 1) AS j2;
47374737
----
47384738
logical_plan
4739-
01)Cross Join:
4739+
01)Cross Join:
47404740
02)--TableScan: j1 projection=[j1_string, j1_id]
47414741
03)--SubqueryAlias: j2
47424742
04)----Projection: Int64(1)
@@ -5089,7 +5089,7 @@ FULL JOIN t2 ON k1 = k2
50895089

50905090
# LEFT MARK JOIN
50915091
query TT
5092-
EXPLAIN
5092+
EXPLAIN
50935093
SELECT *
50945094
FROM t2
50955095
WHERE k2 > 0
@@ -5148,12 +5148,11 @@ LEFT ANTI JOIN t2 ON k1 = k2
51485148
WHERE k1 < 0
51495149
----
51505150
physical_plan
5151-
01)CoalesceBatchesExec: target_batch_size=3
5152-
02)--HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(k2@0, k1@0)]
5151+
01)HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(k2@0, k1@0)]
5152+
02)--FilterExec: k2@0 < 0
51535153
03)----DataSourceExec: partitions=1, partition_sizes=[0]
5154-
04)----CoalesceBatchesExec: target_batch_size=3
5155-
05)------FilterExec: k1@0 < 0
5156-
06)--------DataSourceExec: partitions=1, partition_sizes=[10000]
5154+
04)--FilterExec: k1@0 < 0
5155+
05)----DataSourceExec: partitions=1, partition_sizes=[10000]
51575156

51585157
query II
51595158
SELECT *
@@ -5168,14 +5167,14 @@ CREATE OR REPLACE TABLE t1(b INT, c INT, d INT);
51685167

51695168
statement ok
51705169
INSERT INTO t1 VALUES
5171-
(10, 5, 3),
5172-
( 1, 7, 8),
5173-
( 2, 9, 7),
5174-
( 3, 8,10),
5175-
( 5, 6, 6),
5176-
( 0, 4, 9),
5177-
( 4, 8, 7),
5178-
(100,6, 5);
5170+
(10, 5, 3),
5171+
( 1, 7, 8),
5172+
( 2, 9, 7),
5173+
( 3, 8,10),
5174+
( 5, 6, 6),
5175+
( 0, 4, 9),
5176+
( 4, 8, 7),
5177+
(100,6, 5);
51795178

51805179
query I rowsort
51815180
SELECT c

0 commit comments

Comments
 (0)