@@ -1028,24 +1028,42 @@ fn add_hash_on_top(
10281028 dist_onward : & mut Option < ExecTree > ,
10291029 input_idx : usize ,
10301030) -> Result < Arc < dyn ExecutionPlan > > {
1031- // When there is an existing ordering, we preserve ordering
1032- // during repartition. This will be un-done in the future
1033- // If any of the following conditions is true
1034- // - Preserving ordering is not helpful in terms of satisfying ordering requirements
1035- // - Usage of order preserving variants is not desirable
1036- // (determined by flag `config.optimizer.bounded_order_preserving_variants`)
1037- let should_preserve_ordering = input. output_ordering ( ) . is_some ( ) ;
1038- // Since hashing benefits from partitioning, add a round-robin repartition
1039- // before it:
1040- let mut new_plan = add_roundrobin_on_top ( input, n_target, dist_onward, 0 ) ?;
1041- new_plan = Arc :: new (
1042- RepartitionExec :: try_new ( new_plan, Partitioning :: Hash ( hash_exprs, n_target) ) ?
1043- . with_preserve_order ( should_preserve_ordering) ,
1044- ) as _ ;
1045-
1046- // update distribution onward with new operator
1047- update_distribution_onward ( new_plan. clone ( ) , dist_onward, input_idx) ;
1048- Ok ( new_plan)
1031+ if n_target == input. output_partitioning ( ) . partition_count ( ) && n_target == 1 {
1032+ // In this case adding a hash repartition is unnecessary as the hash
1033+ // requirement is implicitly satisfied.
1034+ return Ok ( input) ;
1035+ }
1036+ let satisfied = input
1037+ . output_partitioning ( )
1038+ . satisfy ( Distribution :: HashPartitioned ( hash_exprs. clone ( ) ) , || {
1039+ input. equivalence_properties ( )
1040+ } ) ;
1041+ // Add hash repartitioning when:
1042+ // - The hash distribution requirement is not satisfied, or
1043+ // - We can increase parallelism by adding hash partitioning.
1044+ if !satisfied || n_target > input. output_partitioning ( ) . partition_count ( ) {
1045+ // When there is an existing ordering, we preserve ordering during
1046+ // repartition. This will be rolled back in the future if any of the
1047+ // following conditions is true:
1048+ // - Preserving ordering is not helpful in terms of satisfying ordering
1049+ // requirements.
1050+ // - Usage of order preserving variants is not desirable (per the flag
1051+ // `config.optimizer.bounded_order_preserving_variants`).
1052+ let should_preserve_ordering = input. output_ordering ( ) . is_some ( ) ;
1053+ // Since hashing benefits from partitioning, add a round-robin repartition
1054+ // before it:
1055+ let mut new_plan = add_roundrobin_on_top ( input, n_target, dist_onward, 0 ) ?;
1056+ new_plan = Arc :: new (
1057+ RepartitionExec :: try_new ( new_plan, Partitioning :: Hash ( hash_exprs, n_target) ) ?
1058+ . with_preserve_order ( should_preserve_ordering) ,
1059+ ) as _ ;
1060+
1061+ // update distribution onward with new operator
1062+ update_distribution_onward ( new_plan. clone ( ) , dist_onward, input_idx) ;
1063+ Ok ( new_plan)
1064+ } else {
1065+ Ok ( input)
1066+ }
10491067}
10501068
10511069/// Adds a `SortPreservingMergeExec` operator on top of input executor:
@@ -1329,27 +1347,23 @@ fn ensure_distribution(
13291347 ) ?;
13301348 }
13311349
1332- if !child
1333- . output_partitioning ( )
1334- . satisfy ( requirement. clone ( ) , || child. equivalence_properties ( ) )
1335- {
1336- // Satisfy the distribution requirement if it is unmet.
1337- match requirement {
1338- Distribution :: SinglePartition => {
1339- child = add_spm_on_top ( child, dist_onward, child_idx) ;
1340- }
1341- Distribution :: HashPartitioned ( exprs) => {
1342- child = add_hash_on_top (
1343- child,
1344- exprs. to_vec ( ) ,
1345- target_partitions,
1346- dist_onward,
1347- child_idx,
1348- ) ?;
1349- }
1350- Distribution :: UnspecifiedDistribution => { }
1351- } ;
1352- }
1350+ // Satisfy the distribution requirement if it is unmet.
1351+ match requirement {
1352+ Distribution :: SinglePartition => {
1353+ child = add_spm_on_top ( child, dist_onward, child_idx) ;
1354+ }
1355+ Distribution :: HashPartitioned ( exprs) => {
1356+ child = add_hash_on_top (
1357+ child,
1358+ exprs. to_vec ( ) ,
1359+ target_partitions,
1360+ dist_onward,
1361+ child_idx,
1362+ ) ?;
1363+ }
1364+ Distribution :: UnspecifiedDistribution => { }
1365+ } ;
1366+
13531367 // There is an ordering requirement of the operator:
13541368 if let Some ( required_input_ordering) = required_input_ordering {
13551369 let existing_ordering = child. output_ordering ( ) . unwrap_or ( & [ ] ) ;
@@ -4390,4 +4404,59 @@ mod tests {
43904404
43914405 Ok ( ( ) )
43924406 }
4407+
4408+ #[ test]
4409+ fn do_not_add_unnecessary_hash ( ) -> Result < ( ) > {
4410+ let schema = schema ( ) ;
4411+ let sort_key = vec ! [ PhysicalSortExpr {
4412+ expr: col( "c" , & schema) . unwrap( ) ,
4413+ options: SortOptions :: default ( ) ,
4414+ } ] ;
4415+ let alias = vec ! [ ( "a" . to_string( ) , "a" . to_string( ) ) ] ;
4416+ let input = parquet_exec_with_sort ( vec ! [ sort_key] ) ;
4417+ let physical_plan = aggregate_exec_with_alias ( input, alias. clone ( ) ) ;
4418+
4419+ let expected = & [
4420+ "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]" ,
4421+ "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]" ,
4422+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]" ,
4423+ ] ;
4424+
4425+ // Make sure target partition number is 1. In this case hash repartition is unnecessary
4426+ assert_optimized ! ( expected, physical_plan. clone( ) , true , false , 1 , false , 1024 ) ;
4427+ assert_optimized ! ( expected, physical_plan, false , false , 1 , false , 1024 ) ;
4428+
4429+ Ok ( ( ) )
4430+ }
4431+
4432+ #[ test]
4433+ fn do_not_add_unnecessary_hash2 ( ) -> Result < ( ) > {
4434+ let schema = schema ( ) ;
4435+ let sort_key = vec ! [ PhysicalSortExpr {
4436+ expr: col( "c" , & schema) . unwrap( ) ,
4437+ options: SortOptions :: default ( ) ,
4438+ } ] ;
4439+ let alias = vec ! [ ( "a" . to_string( ) , "a" . to_string( ) ) ] ;
4440+ let input = parquet_exec_multiple_sorted ( vec ! [ sort_key] ) ;
4441+ let aggregate = aggregate_exec_with_alias ( input, alias. clone ( ) ) ;
4442+ let physical_plan = aggregate_exec_with_alias ( aggregate, alias. clone ( ) ) ;
4443+
4444+ let expected = & [
4445+ "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]" ,
4446+ // Since hash requirements of this operator is satisfied. There shouldn't be
4447+ // a hash repartition here
4448+ "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]" ,
4449+ "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]" ,
4450+ "RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4" ,
4451+ "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]" ,
4452+ "RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2" ,
4453+ "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]" ,
4454+ ] ;
4455+
4456+ // Make sure target partition number is larger than 2 (e.g partition number at the source).
4457+ assert_optimized ! ( expected, physical_plan. clone( ) , true , false , 4 , false , 1024 ) ;
4458+ assert_optimized ! ( expected, physical_plan, false , false , 4 , false , 1024 ) ;
4459+
4460+ Ok ( ( ) )
4461+ }
43934462}
0 commit comments