Skip to content

Commit d6cffd7

Browse files
authored
optimizer: Add configuration to disable join reordering (#21072)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #. ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> Now DataFusion has heuristics for join reordering based on plan statistics https://github.com/apache/datafusion/blob/9885f4bfe88ae9e4df96466d7246b80244b8054a/datafusion/physical-optimizer/src/join_selection.rs#L59 This PR adds one option to disable it. If it's set to false, join reordering won't happen, and always be the same as the join order appear in the query. ```sql set datafusion.optimizer.join_reordering = false; -- always t1 on the left, t2 on the right regardless of t1/t2 statistics select * from t1 join t2 on t1.v1=t2.v1; ``` The reasons are: - Heuristics can fail, this configuration makes it possible to hand-tune join orders - Some tests assumes join order, if we can keep the join order fixed, they're easier to maintain. ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> 1. Added a configuration 2. Propagate the config into join swapping utility, and disable join re-ordering if the config is disabled. ## Are these changes tested? sqllogictests <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 4. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? No <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent 895d852 commit d6cffd7

5 files changed

Lines changed: 128 additions & 38 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1088,6 +1088,12 @@ config_namespace! {
10881088
/// process to reorder the join keys
10891089
pub top_down_join_key_reordering: bool, default = true
10901090

1091+
/// When set to true, the physical plan optimizer may swap join inputs
1092+
/// based on statistics. When set to false, statistics-driven join
1093+
/// input reordering is disabled and the original join order in the
1094+
/// query is used.
1095+
pub join_reordering: bool, default = true
1096+
10911097
/// When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin.
10921098
/// HashJoin can work more efficiently than SortMergeJoin but consumes more memory
10931099
pub prefer_hash_join: bool, default = true

datafusion/physical-optimizer/src/join_selection.rs

Lines changed: 57 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,27 @@ impl JoinSelection {
5555

5656
// TODO: We need some performance test for Right Semi/Right Join swap to Left Semi/Left Join in case that the right side is smaller but not much smaller.
5757
// TODO: In PrestoSQL, the optimizer flips join sides only if one side is much smaller than the other by more than SIZE_DIFFERENCE_THRESHOLD times, by default is 8 times.
58-
/// Checks statistics for join swap.
58+
/// Checks whether join inputs should be swapped using available statistics.
59+
///
60+
/// It follows these steps:
61+
/// 1. Compare the in-memory sizes of both sides, and place the smaller side on
62+
/// the left (build) side.
63+
/// 2. If in-memory byte sizes are unavailable, fall back to row counts.
64+
/// 3. Do not reorder the join if neither statistic is available, or if
65+
/// `datafusion.optimizer.join_reordering` is disabled.
66+
///
67+
///
68+
/// Used configurations inside arg `config`
69+
/// - `config.optimizer.join_reordering`: allows or forbids statistics-driven join swapping
5970
pub(crate) fn should_swap_join_order(
6071
left: &dyn ExecutionPlan,
6172
right: &dyn ExecutionPlan,
73+
config: &ConfigOptions,
6274
) -> Result<bool> {
75+
if !config.optimizer.join_reordering {
76+
return Ok(false);
77+
}
78+
6379
// Get the left and right table's total bytes
6480
// If both the left and right tables contain total_byte_size statistics,
6581
// use `total_byte_size` to determine `should_swap_join_order`, else use `num_rows`
@@ -133,17 +149,8 @@ impl PhysicalOptimizerRule for JoinSelection {
133149
// do not modify join sides.
134150
// - We will also swap left and right sides for cross joins so that the left
135151
// side is the small side.
136-
let config = &config.optimizer;
137-
let collect_threshold_byte_size = config.hash_join_single_partition_threshold;
138-
let collect_threshold_num_rows = config.hash_join_single_partition_threshold_rows;
139152
new_plan
140-
.transform_up(|plan| {
141-
statistical_join_selection_subrule(
142-
plan,
143-
collect_threshold_byte_size,
144-
collect_threshold_num_rows,
145-
)
146-
})
153+
.transform_up(|plan| statistical_join_selection_subrule(plan, config))
147154
.data()
148155
}
149156

@@ -162,34 +169,39 @@ impl PhysicalOptimizerRule for JoinSelection {
162169
/// `CollectLeft` mode is applicable. Otherwise, it will try to swap the join sides.
163170
/// When the `ignore_threshold` is false, this function will also check left
164171
/// and right sizes in bytes or rows.
172+
///
173+
/// Used configurations inside arg `config`
174+
/// - `config.optimizer.hash_join_single_partition_threshold`: byte threshold for `CollectLeft`
175+
/// - `config.optimizer.hash_join_single_partition_threshold_rows`: row threshold for `CollectLeft`
176+
/// - `config.optimizer.join_reordering`: allows or forbids input swapping
165177
pub(crate) fn try_collect_left(
166178
hash_join: &HashJoinExec,
167179
ignore_threshold: bool,
168-
threshold_byte_size: usize,
169-
threshold_num_rows: usize,
180+
config: &ConfigOptions,
170181
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
171182
let left = hash_join.left();
172183
let right = hash_join.right();
184+
let optimizer_config = &config.optimizer;
173185

174186
let left_can_collect = ignore_threshold
175187
|| supports_collect_by_thresholds(
176188
&**left,
177-
threshold_byte_size,
178-
threshold_num_rows,
189+
optimizer_config.hash_join_single_partition_threshold,
190+
optimizer_config.hash_join_single_partition_threshold_rows,
179191
);
180192
let right_can_collect = ignore_threshold
181193
|| supports_collect_by_thresholds(
182194
&**right,
183-
threshold_byte_size,
184-
threshold_num_rows,
195+
optimizer_config.hash_join_single_partition_threshold,
196+
optimizer_config.hash_join_single_partition_threshold_rows,
185197
);
186198

187199
match (left_can_collect, right_can_collect) {
188200
(true, true) => {
189201
// Don't swap null-aware anti joins as they have specific side requirements
190202
if hash_join.join_type().supports_swap()
191203
&& !hash_join.null_aware
192-
&& should_swap_join_order(&**left, &**right)?
204+
&& should_swap_join_order(&**left, &**right, config)?
193205
{
194206
Ok(Some(hash_join.swap_inputs(PartitionMode::CollectLeft)?))
195207
} else {
@@ -209,7 +221,10 @@ pub(crate) fn try_collect_left(
209221
))),
210222
(false, true) => {
211223
// Don't swap null-aware anti joins as they have specific side requirements
212-
if hash_join.join_type().supports_swap() && !hash_join.null_aware {
224+
if optimizer_config.join_reordering
225+
&& hash_join.join_type().supports_swap()
226+
&& !hash_join.null_aware
227+
{
213228
hash_join.swap_inputs(PartitionMode::CollectLeft).map(Some)
214229
} else {
215230
Ok(None)
@@ -224,15 +239,19 @@ pub(crate) fn try_collect_left(
224239
/// Checks if the join order should be swapped based on the join type and input statistics.
225240
/// If swapping is optimal and supported, creates a swapped partitioned hash join; otherwise,
226241
/// creates a standard partitioned hash join.
242+
///
243+
/// Used configurations inside arg `config`
244+
/// - `config.optimizer.join_reordering`: allows or forbids statistics-driven join swapping
227245
pub(crate) fn partitioned_hash_join(
228246
hash_join: &HashJoinExec,
247+
config: &ConfigOptions,
229248
) -> Result<Arc<dyn ExecutionPlan>> {
230249
let left = hash_join.left();
231250
let right = hash_join.right();
232251
// Don't swap null-aware anti joins as they have specific side requirements
233252
if hash_join.join_type().supports_swap()
234253
&& !hash_join.null_aware
235-
&& should_swap_join_order(&**left, &**right)?
254+
&& should_swap_join_order(&**left, &**right, config)?
236255
{
237256
hash_join.swap_inputs(PartitionMode::Partitioned)
238257
} else {
@@ -256,28 +275,28 @@ pub(crate) fn partitioned_hash_join(
256275
}
257276

258277
/// This subrule tries to modify a given plan so that it can
259-
/// optimize hash and cross joins in the plan according to available statistical information.
278+
/// optimize hash and cross joins in the plan according to available statistical
279+
/// information.
280+
///
281+
/// Used configurations inside arg `config`
282+
/// - `config.optimizer.hash_join_single_partition_threshold`: byte threshold for `CollectLeft`
283+
/// - `config.optimizer.hash_join_single_partition_threshold_rows`: row threshold for `CollectLeft`
284+
/// - `config.optimizer.join_reordering`: allows or forbids input swapping
260285
fn statistical_join_selection_subrule(
261286
plan: Arc<dyn ExecutionPlan>,
262-
collect_threshold_byte_size: usize,
263-
collect_threshold_num_rows: usize,
287+
config: &ConfigOptions,
264288
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
265289
let transformed =
266290
if let Some(hash_join) = plan.as_any().downcast_ref::<HashJoinExec>() {
267291
match hash_join.partition_mode() {
268-
PartitionMode::Auto => try_collect_left(
269-
hash_join,
270-
false,
271-
collect_threshold_byte_size,
272-
collect_threshold_num_rows,
273-
)?
274-
.map_or_else(
275-
|| partitioned_hash_join(hash_join).map(Some),
276-
|v| Ok(Some(v)),
277-
)?,
278-
PartitionMode::CollectLeft => try_collect_left(hash_join, true, 0, 0)?
292+
PartitionMode::Auto => try_collect_left(hash_join, false, config)?
293+
.map_or_else(
294+
|| partitioned_hash_join(hash_join, config).map(Some),
295+
|v| Ok(Some(v)),
296+
)?,
297+
PartitionMode::CollectLeft => try_collect_left(hash_join, true, config)?
279298
.map_or_else(
280-
|| partitioned_hash_join(hash_join).map(Some),
299+
|| partitioned_hash_join(hash_join, config).map(Some),
281300
|v| Ok(Some(v)),
282301
)?,
283302
PartitionMode::Partitioned => {
@@ -286,7 +305,7 @@ fn statistical_join_selection_subrule(
286305
// Don't swap null-aware anti joins as they have specific side requirements
287306
if hash_join.join_type().supports_swap()
288307
&& !hash_join.null_aware
289-
&& should_swap_join_order(&**left, &**right)?
308+
&& should_swap_join_order(&**left, &**right, config)?
290309
{
291310
hash_join
292311
.swap_inputs(PartitionMode::Partitioned)
@@ -299,7 +318,7 @@ fn statistical_join_selection_subrule(
299318
} else if let Some(cross_join) = plan.as_any().downcast_ref::<CrossJoinExec>() {
300319
let left = cross_join.left();
301320
let right = cross_join.right();
302-
if should_swap_join_order(&**left, &**right)? {
321+
if should_swap_join_order(&**left, &**right, config)? {
303322
cross_join.swap_inputs().map(Some)?
304323
} else {
305324
None
@@ -308,7 +327,7 @@ fn statistical_join_selection_subrule(
308327
let left = nl_join.left();
309328
let right = nl_join.right();
310329
if nl_join.join_type().supports_swap()
311-
&& should_swap_join_order(&**left, &**right)?
330+
&& should_swap_join_order(&**left, &**right, config)?
312331
{
313332
nl_join.swap_inputs().map(Some)?
314333
} else {

datafusion/sqllogictest/test_files/information_schema.slt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,7 @@ datafusion.optimizer.hash_join_inlist_pushdown_max_distinct_values 150
312312
datafusion.optimizer.hash_join_inlist_pushdown_max_size 131072
313313
datafusion.optimizer.hash_join_single_partition_threshold 1048576
314314
datafusion.optimizer.hash_join_single_partition_threshold_rows 131072
315+
datafusion.optimizer.join_reordering true
315316
datafusion.optimizer.max_passes 3
316317
datafusion.optimizer.prefer_existing_sort false
317318
datafusion.optimizer.prefer_existing_union false
@@ -452,6 +453,7 @@ datafusion.optimizer.hash_join_inlist_pushdown_max_distinct_values 150 Maximum n
452453
datafusion.optimizer.hash_join_inlist_pushdown_max_size 131072 Maximum size in bytes for the build side of a hash join to be pushed down as an InList expression for dynamic filtering. Build sides larger than this will use hash table lookups instead. Set to 0 to always use hash table lookups. InList pushdown can be more efficient for small build sides because it can result in better statistics pruning as well as use any bloom filters present on the scan side. InList expressions are also more transparent and easier to serialize over the network in distributed uses of DataFusion. On the other hand InList pushdown requires making a copy of the data and thus adds some overhead to the build side and uses more memory. This setting is per-partition, so we may end up using `hash_join_inlist_pushdown_max_size` * `target_partitions` memory. The default is 128kB per partition. This should allow point lookup joins (e.g. joining on a unique primary key) to use InList pushdown in most cases but avoids excessive memory usage or overhead for larger joins.
453454
datafusion.optimizer.hash_join_single_partition_threshold 1048576 The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition
454455
datafusion.optimizer.hash_join_single_partition_threshold_rows 131072 The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition
456+
datafusion.optimizer.join_reordering true When set to true, the physical plan optimizer may swap join inputs based on statistics. When set to false, statistics-driven join input reordering is disabled and the original join order in the query is used.
455457
datafusion.optimizer.max_passes 3 Number of times that the optimizer will attempt to optimize the plan
456458
datafusion.optimizer.prefer_existing_sort false When true, DataFusion will opportunistically remove sorts when the data is already sorted, (i.e. setting `preserve_order` to true on `RepartitionExec` and using `SortPreservingMergeExec`) When false, DataFusion will maximize plan parallelism using `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`.
457459
datafusion.optimizer.prefer_existing_union false When set to true, the optimizer will not attempt to convert Union to Interleave

datafusion/sqllogictest/test_files/joins.slt

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1596,6 +1596,37 @@ physical_plan
15961596
05)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
15971597
06)--------DataSourceExec: partitions=1, partition_sizes=[1]
15981598

1599+
# don't reorder join based on stats, and use the order in the query
1600+
1601+
statement ok
1602+
set datafusion.optimizer.join_reordering = false;
1603+
1604+
statement ok
1605+
set datafusion.explain.physical_plan_only = true;
1606+
1607+
query TT
1608+
EXPLAIN
1609+
SELECT join_t1.t1_id, join_t2.t2_id, join_t1.t1_name
1610+
FROM join_t1
1611+
INNER JOIN join_t2
1612+
ON join_t1.t1_id + cast(11 as INT UNSIGNED) = join_t2.t2_id
1613+
----
1614+
physical_plan
1615+
01)ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]
1616+
02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t1.t1_id + UInt32(11)@2, t2_id@0)], projection=[t1_id@0, t1_name@1, t2_id@3]
1617+
03)----CoalescePartitionsExec
1618+
04)------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 11 as join_t1.t1_id + UInt32(11)]
1619+
05)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
1620+
06)----------DataSourceExec: partitions=1, partition_sizes=[1]
1621+
07)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
1622+
08)------DataSourceExec: partitions=1, partition_sizes=[1]
1623+
1624+
statement ok
1625+
set datafusion.optimizer.join_reordering = true;
1626+
1627+
statement ok
1628+
set datafusion.explain.physical_plan_only = false;
1629+
15991630
statement ok
16001631
set datafusion.optimizer.repartition_joins = true;
16011632

@@ -2053,6 +2084,37 @@ ORDER BY 1
20532084
33 11
20542085
44 11
20552086

2087+
# don't reorder join based on stats, and use the order in the query
2088+
2089+
statement ok
2090+
set datafusion.optimizer.join_reordering = false;
2091+
2092+
statement ok
2093+
set datafusion.explain.physical_plan_only = true;
2094+
2095+
query TT
2096+
EXPLAIN
2097+
SELECT join_t1.t1_id, join_t2.t2_id
2098+
FROM join_t1
2099+
INNER JOIN join_t2 ON join_t1.t1_id > join_t2.t2_id
2100+
WHERE join_t1.t1_id > 10 AND join_t2.t2_int > 1
2101+
----
2102+
physical_plan
2103+
01)NestedLoopJoinExec: join_type=Inner, filter=t1_id@0 > t2_id@1
2104+
02)--CoalescePartitionsExec
2105+
03)----FilterExec: t1_id@0 > 10
2106+
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
2107+
05)--------DataSourceExec: partitions=1, partition_sizes=[1]
2108+
06)--FilterExec: t2_int@1 > 1, projection=[t2_id@0]
2109+
07)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
2110+
08)------DataSourceExec: partitions=1, partition_sizes=[1]
2111+
2112+
statement ok
2113+
set datafusion.optimizer.join_reordering = true;
2114+
2115+
statement ok
2116+
set datafusion.explain.physical_plan_only = false;
2117+
20562118
# Left as inner table nested loop join
20572119

20582120
query TT

0 commit comments

Comments
 (0)