Skip to content

Commit 22d03c1

Browse files
[MINOR]: Do not add unnecessary hash repartition to the physical plan (#7667)
* Do not add unnecessary hash repartition * Add new test, move satisfy check to inside add_hash method * Improve comments --------- Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>
1 parent bcc2acd commit 22d03c1

File tree

1 file changed

+108
-39
lines changed

1 file changed

+108
-39
lines changed

datafusion/core/src/physical_optimizer/enforce_distribution.rs

Lines changed: 108 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1028,24 +1028,42 @@ fn add_hash_on_top(
10281028
dist_onward: &mut Option<ExecTree>,
10291029
input_idx: usize,
10301030
) -> Result<Arc<dyn ExecutionPlan>> {
1031-
// When there is an existing ordering, we preserve ordering
1032-
// during repartition. This will be un-done in the future
1033-
// If any of the following conditions is true
1034-
// - Preserving ordering is not helpful in terms of satisfying ordering requirements
1035-
// - Usage of order preserving variants is not desirable
1036-
// (determined by flag `config.optimizer.bounded_order_preserving_variants`)
1037-
let should_preserve_ordering = input.output_ordering().is_some();
1038-
// Since hashing benefits from partitioning, add a round-robin repartition
1039-
// before it:
1040-
let mut new_plan = add_roundrobin_on_top(input, n_target, dist_onward, 0)?;
1041-
new_plan = Arc::new(
1042-
RepartitionExec::try_new(new_plan, Partitioning::Hash(hash_exprs, n_target))?
1043-
.with_preserve_order(should_preserve_ordering),
1044-
) as _;
1045-
1046-
// update distribution onward with new operator
1047-
update_distribution_onward(new_plan.clone(), dist_onward, input_idx);
1048-
Ok(new_plan)
1031+
if n_target == input.output_partitioning().partition_count() && n_target == 1 {
1032+
// In this case adding a hash repartition is unnecessary as the hash
1033+
// requirement is implicitly satisfied.
1034+
return Ok(input);
1035+
}
1036+
let satisfied = input
1037+
.output_partitioning()
1038+
.satisfy(Distribution::HashPartitioned(hash_exprs.clone()), || {
1039+
input.equivalence_properties()
1040+
});
1041+
// Add hash repartitioning when:
1042+
// - The hash distribution requirement is not satisfied, or
1043+
// - We can increase parallelism by adding hash partitioning.
1044+
if !satisfied || n_target > input.output_partitioning().partition_count() {
1045+
// When there is an existing ordering, we preserve ordering during
1046+
// repartition. This will be rolled back in the future if any of the
1047+
// following conditions is true:
1048+
// - Preserving ordering is not helpful in terms of satisfying ordering
1049+
// requirements.
1050+
// - Usage of order preserving variants is not desirable (per the flag
1051+
// `config.optimizer.bounded_order_preserving_variants`).
1052+
let should_preserve_ordering = input.output_ordering().is_some();
1053+
// Since hashing benefits from partitioning, add a round-robin repartition
1054+
// before it:
1055+
let mut new_plan = add_roundrobin_on_top(input, n_target, dist_onward, 0)?;
1056+
new_plan = Arc::new(
1057+
RepartitionExec::try_new(new_plan, Partitioning::Hash(hash_exprs, n_target))?
1058+
.with_preserve_order(should_preserve_ordering),
1059+
) as _;
1060+
1061+
// update distribution onward with new operator
1062+
update_distribution_onward(new_plan.clone(), dist_onward, input_idx);
1063+
Ok(new_plan)
1064+
} else {
1065+
Ok(input)
1066+
}
10491067
}
10501068

10511069
/// Adds a `SortPreservingMergeExec` operator on top of input executor:
@@ -1329,27 +1347,23 @@ fn ensure_distribution(
13291347
)?;
13301348
}
13311349

1332-
if !child
1333-
.output_partitioning()
1334-
.satisfy(requirement.clone(), || child.equivalence_properties())
1335-
{
1336-
// Satisfy the distribution requirement if it is unmet.
1337-
match requirement {
1338-
Distribution::SinglePartition => {
1339-
child = add_spm_on_top(child, dist_onward, child_idx);
1340-
}
1341-
Distribution::HashPartitioned(exprs) => {
1342-
child = add_hash_on_top(
1343-
child,
1344-
exprs.to_vec(),
1345-
target_partitions,
1346-
dist_onward,
1347-
child_idx,
1348-
)?;
1349-
}
1350-
Distribution::UnspecifiedDistribution => {}
1351-
};
1352-
}
1350+
// Satisfy the distribution requirement if it is unmet.
1351+
match requirement {
1352+
Distribution::SinglePartition => {
1353+
child = add_spm_on_top(child, dist_onward, child_idx);
1354+
}
1355+
Distribution::HashPartitioned(exprs) => {
1356+
child = add_hash_on_top(
1357+
child,
1358+
exprs.to_vec(),
1359+
target_partitions,
1360+
dist_onward,
1361+
child_idx,
1362+
)?;
1363+
}
1364+
Distribution::UnspecifiedDistribution => {}
1365+
};
1366+
13531367
// There is an ordering requirement of the operator:
13541368
if let Some(required_input_ordering) = required_input_ordering {
13551369
let existing_ordering = child.output_ordering().unwrap_or(&[]);
@@ -4390,4 +4404,59 @@ mod tests {
43904404

43914405
Ok(())
43924406
}
4407+
4408+
#[test]
4409+
fn do_not_add_unnecessary_hash() -> Result<()> {
4410+
let schema = schema();
4411+
let sort_key = vec![PhysicalSortExpr {
4412+
expr: col("c", &schema).unwrap(),
4413+
options: SortOptions::default(),
4414+
}];
4415+
let alias = vec![("a".to_string(), "a".to_string())];
4416+
let input = parquet_exec_with_sort(vec![sort_key]);
4417+
let physical_plan = aggregate_exec_with_alias(input, alias.clone());
4418+
4419+
let expected = &[
4420+
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
4421+
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
4422+
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
4423+
];
4424+
4425+
// Make sure target partition number is 1. In this case hash repartition is unnecessary
4426+
assert_optimized!(expected, physical_plan.clone(), true, false, 1, false, 1024);
4427+
assert_optimized!(expected, physical_plan, false, false, 1, false, 1024);
4428+
4429+
Ok(())
4430+
}
4431+
4432+
#[test]
4433+
fn do_not_add_unnecessary_hash2() -> Result<()> {
4434+
let schema = schema();
4435+
let sort_key = vec![PhysicalSortExpr {
4436+
expr: col("c", &schema).unwrap(),
4437+
options: SortOptions::default(),
4438+
}];
4439+
let alias = vec![("a".to_string(), "a".to_string())];
4440+
let input = parquet_exec_multiple_sorted(vec![sort_key]);
4441+
let aggregate = aggregate_exec_with_alias(input, alias.clone());
4442+
let physical_plan = aggregate_exec_with_alias(aggregate, alias.clone());
4443+
4444+
let expected = &[
4445+
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
4446+
// Since hash requirements of this operator is satisfied. There shouldn't be
4447+
// a hash repartition here
4448+
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
4449+
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
4450+
"RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4",
4451+
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
4452+
"RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2",
4453+
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
4454+
];
4455+
4456+
// Make sure target partition number is larger than 2 (e.g partition number at the source).
4457+
assert_optimized!(expected, physical_plan.clone(), true, false, 4, false, 1024);
4458+
assert_optimized!(expected, physical_plan, false, false, 4, false, 1024);
4459+
4460+
Ok(())
4461+
}
43934462
}

0 commit comments

Comments
 (0)