@@ -48,6 +48,7 @@ use datafusion_physical_optimizer::{
4848use datafusion_physical_plan:: {
4949 aggregates:: { AggregateExec , AggregateMode , PhysicalGroupBy } ,
5050 coalesce_batches:: CoalesceBatchesExec ,
51+ coalesce_partitions:: CoalescePartitionsExec ,
5152 filter:: FilterExec ,
5253 repartition:: RepartitionExec ,
5354 sorts:: sort:: SortExec ,
@@ -267,7 +268,7 @@ async fn test_dynamic_filter_pushdown_through_hash_join_with_topk() {
267268 format_plan_for_test( & plan) ,
268269 @r"
269270 - SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false], filter=[e@4 IS NULL OR e@4 < bb]
270- - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)], filter=[d@0 >= aa AND d@0 <= ab]
271+ - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
271272 - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
272273 - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ d@0 >= aa AND d@0 <= ab ] AND DynamicFilterPhysicalExpr [ e@1 IS NULL OR e@1 < bb ]
273274 "
@@ -890,7 +891,7 @@ async fn test_hashjoin_dynamic_filter_pushdown() {
890891 None ,
891892 & JoinType :: Inner ,
892893 None ,
893- PartitionMode :: Partitioned ,
894+ PartitionMode :: CollectLeft ,
894895 datafusion_common:: NullEquality :: NullEqualsNothing ,
895896 )
896897 . unwrap ( ) ,
@@ -902,12 +903,12 @@ async fn test_hashjoin_dynamic_filter_pushdown() {
902903 @r"
903904 OptimizationTest:
904905 input:
905- - HashJoinExec: mode=Partitioned , join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
906+ - HashJoinExec: mode=CollectLeft , join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
906907 - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
907908 - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true
908909 output:
909910 Ok:
910- - HashJoinExec: mode=Partitioned , join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
911+ - HashJoinExec: mode=CollectLeft , join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
911912 - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
912913 - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ]
913914 " ,
@@ -936,13 +937,233 @@ async fn test_hashjoin_dynamic_filter_pushdown() {
936937 insta:: assert_snapshot!(
937938 format!( "{}" , format_plan_for_test( & plan) ) ,
938939 @r"
939- - HashJoinExec: mode=Partitioned , join_type=Inner, on=[(a@0, a@0), (b@1, b@1)], filter=[a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb ]
940+ - HashJoinExec: mode=CollectLeft , join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
940941 - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
941942 - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb ]
942943 "
943944 ) ;
944945}
945946
947+ #[ tokio:: test]
948+ async fn test_hashjoin_dynamic_filter_pushdown_partitioned ( ) {
949+ use datafusion_common:: JoinType ;
950+ use datafusion_physical_plan:: joins:: { HashJoinExec , PartitionMode } ;
951+
952+ // Rouugh plan we're trying to recreate:
953+ // COPY (select i as k from generate_series(1, 10000000) as t(i))
954+ // TO 'test_files/scratch/push_down_filter/t1.parquet'
955+ // STORED AS PARQUET;
956+ // COPY (select i as k, i as v from generate_series(1, 10000000) as t(i))
957+ // TO 'test_files/scratch/push_down_filter/t2.parquet'
958+ // STORED AS PARQUET;
959+ // create external table t1 stored as parquet location 'test_files/scratch/push_down_filter/t1.parquet';
960+ // create external table t2 stored as parquet location 'test_files/scratch/push_down_filter/t2.parquet';
961+ // explain
962+ // select *
963+ // from t1
964+ // join t2 on t1.k = t2.k;
965+ // +---------------+------------------------------------------------------------+
966+ // | plan_type | plan |
967+ // +---------------+------------------------------------------------------------+
968+ // | physical_plan | ┌───────────────────────────┐ |
969+ // | | │ CoalesceBatchesExec │ |
970+ // | | │ -------------------- │ |
971+ // | | │ target_batch_size: │ |
972+ // | | │ 8192 │ |
973+ // | | └─────────────┬─────────────┘ |
974+ // | | ┌─────────────┴─────────────┐ |
975+ // | | │ HashJoinExec │ |
976+ // | | │ -------------------- ├──────────────┐ |
977+ // | | │ on: (k = k) │ │ |
978+ // | | └─────────────┬─────────────┘ │ |
979+ // | | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ |
980+ // | | │ CoalesceBatchesExec ││ CoalesceBatchesExec │ |
981+ // | | │ -------------------- ││ -------------------- │ |
982+ // | | │ target_batch_size: ││ target_batch_size: │ |
983+ // | | │ 8192 ││ 8192 │ |
984+ // | | └─────────────┬─────────────┘└─────────────┬─────────────┘ |
985+ // | | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ |
986+ // | | │ RepartitionExec ││ RepartitionExec │ |
987+ // | | │ -------------------- ││ -------------------- │ |
988+ // | | │ partition_count(in->out): ││ partition_count(in->out): │ |
989+ // | | │ 12 -> 12 ││ 12 -> 12 │ |
990+ // | | │ ││ │ |
991+ // | | │ partitioning_scheme: ││ partitioning_scheme: │ |
992+ // | | │ Hash([k@0], 12) ││ Hash([k@0], 12) │ |
993+ // | | └─────────────┬─────────────┘└─────────────┬─────────────┘ |
994+ // | | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ |
995+ // | | │ DataSourceExec ││ DataSourceExec │ |
996+ // | | │ -------------------- ││ -------------------- │ |
997+ // | | │ files: 12 ││ files: 12 │ |
998+ // | | │ format: parquet ││ format: parquet │ |
999+ // | | │ ││ predicate: true │ |
1000+ // | | └───────────────────────────┘└───────────────────────────┘ |
1001+ // | | |
1002+ // +---------------+------------------------------------------------------------+
1003+
1004+ // Create build side with limited values
1005+ let build_batches = vec ! [ record_batch!(
1006+ ( "a" , Utf8 , [ "aa" , "ab" ] ) ,
1007+ ( "b" , Utf8 , [ "ba" , "bb" ] ) ,
1008+ ( "c" , Float64 , [ 1.0 , 2.0 ] ) // Extra column not used in join
1009+ )
1010+ . unwrap( ) ] ;
1011+ let build_side_schema = Arc :: new ( Schema :: new ( vec ! [
1012+ Field :: new( "a" , DataType :: Utf8 , false ) ,
1013+ Field :: new( "b" , DataType :: Utf8 , false ) ,
1014+ Field :: new( "c" , DataType :: Float64 , false ) ,
1015+ ] ) ) ;
1016+ let build_scan = TestScanBuilder :: new ( Arc :: clone ( & build_side_schema) )
1017+ . with_support ( true )
1018+ . with_batches ( build_batches)
1019+ . build ( ) ;
1020+
1021+ // Create probe side with more values
1022+ let probe_batches = vec ! [ record_batch!(
1023+ ( "a" , Utf8 , [ "aa" , "ab" , "ac" , "ad" ] ) ,
1024+ ( "b" , Utf8 , [ "ba" , "bb" , "bc" , "bd" ] ) ,
1025+ ( "e" , Float64 , [ 1.0 , 2.0 , 3.0 , 4.0 ] ) // Extra column not used in join
1026+ )
1027+ . unwrap( ) ] ;
1028+ let probe_side_schema = Arc :: new ( Schema :: new ( vec ! [
1029+ Field :: new( "a" , DataType :: Utf8 , false ) ,
1030+ Field :: new( "b" , DataType :: Utf8 , false ) ,
1031+ Field :: new( "e" , DataType :: Float64 , false ) ,
1032+ ] ) ) ;
1033+ let probe_scan = TestScanBuilder :: new ( Arc :: clone ( & probe_side_schema) )
1034+ . with_support ( true )
1035+ . with_batches ( probe_batches)
1036+ . build ( ) ;
1037+
1038+ // Create RepartitionExec nodes for both sides with hash partitioning on join keys
1039+ let partition_count = 12 ;
1040+
1041+ // Build side: DataSource -> RepartitionExec (Hash) -> CoalesceBatchesExec
1042+ let build_hash_exprs = vec ! [
1043+ col( "a" , & build_side_schema) . unwrap( ) ,
1044+ col( "b" , & build_side_schema) . unwrap( ) ,
1045+ ] ;
1046+ let build_repartition = Arc :: new (
1047+ RepartitionExec :: try_new (
1048+ build_scan,
1049+ Partitioning :: Hash ( build_hash_exprs, partition_count) ,
1050+ )
1051+ . unwrap ( ) ,
1052+ ) ;
1053+ let build_coalesce = Arc :: new ( CoalesceBatchesExec :: new ( build_repartition, 8192 ) ) ;
1054+
1055+ // Probe side: DataSource -> RepartitionExec (Hash) -> CoalesceBatchesExec
1056+ let probe_hash_exprs = vec ! [
1057+ col( "a" , & probe_side_schema) . unwrap( ) ,
1058+ col( "b" , & probe_side_schema) . unwrap( ) ,
1059+ ] ;
1060+ let probe_repartition = Arc :: new (
1061+ RepartitionExec :: try_new (
1062+ probe_scan,
1063+ Partitioning :: Hash ( probe_hash_exprs, partition_count) ,
1064+ )
1065+ . unwrap ( ) ,
1066+ ) ;
1067+ let probe_coalesce = Arc :: new ( CoalesceBatchesExec :: new ( probe_repartition, 8192 ) ) ;
1068+
1069+ // Create HashJoinExec with partitioned inputs
1070+ let on = vec ! [
1071+ (
1072+ col( "a" , & build_side_schema) . unwrap( ) ,
1073+ col( "a" , & probe_side_schema) . unwrap( ) ,
1074+ ) ,
1075+ (
1076+ col( "b" , & build_side_schema) . unwrap( ) ,
1077+ col( "b" , & probe_side_schema) . unwrap( ) ,
1078+ ) ,
1079+ ] ;
1080+ let hash_join = Arc :: new (
1081+ HashJoinExec :: try_new (
1082+ build_coalesce,
1083+ probe_coalesce,
1084+ on,
1085+ None ,
1086+ & JoinType :: Inner ,
1087+ None ,
1088+ PartitionMode :: Partitioned ,
1089+ datafusion_common:: NullEquality :: NullEqualsNothing ,
1090+ )
1091+ . unwrap ( ) ,
1092+ ) ;
1093+
1094+ // Top-level CoalesceBatchesExec
1095+ let cb =
1096+ Arc :: new ( CoalesceBatchesExec :: new ( hash_join, 8192 ) ) as Arc < dyn ExecutionPlan > ;
1097+ // Top-level CoalesceParititionsExec
1098+ let plan = Arc :: new ( CoalescePartitionsExec :: new ( cb) ) as Arc < dyn ExecutionPlan > ;
1099+
1100+ // expect the predicate to be pushed down into the probe side DataSource
1101+ insta:: assert_snapshot!(
1102+ OptimizationTest :: new( Arc :: clone( & plan) , FilterPushdown :: new_post_optimization( ) , true ) ,
1103+ @r"
1104+ OptimizationTest:
1105+ input:
1106+ - CoalescePartitionsExec
1107+ - CoalesceBatchesExec: target_batch_size=8192
1108+ - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
1109+ - CoalesceBatchesExec: target_batch_size=8192
1110+ - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1111+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
1112+ - CoalesceBatchesExec: target_batch_size=8192
1113+ - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1114+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true
1115+ output:
1116+ Ok:
1117+ - CoalescePartitionsExec
1118+ - CoalesceBatchesExec: target_batch_size=8192
1119+ - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
1120+ - CoalesceBatchesExec: target_batch_size=8192
1121+ - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1122+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
1123+ - CoalesceBatchesExec: target_batch_size=8192
1124+ - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1125+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ]
1126+ "
1127+ ) ;
1128+
1129+ // Actually apply the optimization to the plan and execute to see the filter in action
1130+ let mut config = ConfigOptions :: default ( ) ;
1131+ config. execution . parquet . pushdown_filters = true ;
1132+ config. optimizer . enable_dynamic_filter_pushdown = true ;
1133+ let plan = FilterPushdown :: new_post_optimization ( )
1134+ . optimize ( plan, & config)
1135+ . unwrap ( ) ;
1136+ let config = SessionConfig :: new ( ) . with_batch_size ( 10 ) ;
1137+ let session_ctx = SessionContext :: new_with_config ( config) ;
1138+ session_ctx. register_object_store (
1139+ ObjectStoreUrl :: parse ( "test://" ) . unwrap ( ) . as_ref ( ) ,
1140+ Arc :: new ( InMemory :: new ( ) ) ,
1141+ ) ;
1142+ let state = session_ctx. state ( ) ;
1143+ let task_ctx = state. task_ctx ( ) ;
1144+ let mut stream = plan. execute ( 0 , Arc :: clone ( & task_ctx) ) . unwrap ( ) ;
1145+ // Iterate one batch
1146+ if let Some ( batch_result) = stream. next ( ) . await {
1147+ batch_result. unwrap ( ) ;
1148+ }
1149+
1150+ // Now check what our filter looks like
1151+ insta:: assert_snapshot!(
1152+ format!( "{}" , format_plan_for_test( & plan) ) ,
1153+ @r"
1154+ - CoalescePartitionsExec
1155+ - CoalesceBatchesExec: target_batch_size=8192
1156+ - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
1157+ - CoalesceBatchesExec: target_batch_size=8192
1158+ - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1159+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
1160+ - CoalesceBatchesExec: target_batch_size=8192
1161+ - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1162+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb OR a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba ]
1163+ "
1164+ ) ;
1165+ }
1166+
9461167#[ tokio:: test]
9471168async fn test_nested_hashjoin_dynamic_filter_pushdown ( ) {
9481169 use datafusion_common:: JoinType ;
@@ -1082,9 +1303,9 @@ async fn test_nested_hashjoin_dynamic_filter_pushdown() {
10821303 insta:: assert_snapshot!(
10831304 format!( "{}" , format_plan_for_test( & plan) ) ,
10841305 @r"
1085- - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)], filter=[b@0 >= aa AND b@0 <= ab]
1306+ - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)]
10861307 - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, x], file_type=test, pushdown_supported=true
1087- - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)], filter=[d@0 >= ca AND d@0 <= ce]
1308+ - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)]
10881309 - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ b@0 >= aa AND b@0 <= ab ]
10891310 - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ d@0 >= ca AND d@0 <= ce ]
10901311 "
0 commit comments