@@ -533,8 +533,8 @@ class CometExecSuite extends CometTestBase {
533533 Seq (" struct" , " array" ).foreach { dataType =>
534534 val df =
535535 sql(s """ SELECT f.date_id, f.product_id, f.units_sold, f.store_id FROM fact_stats f
536- |JOIN dim_stats s
537- |ON $dataType(f.store_id) = $dataType(s.store_id) WHERE s.country = 'DE'
536+ |JOIN dim_stats s
537+ |ON $dataType(f.store_id) = $dataType(s.store_id) WHERE s.country = 'DE'
538538 """ .stripMargin)
539539 checkSparkAnswer(df)
540540 }
@@ -553,8 +553,8 @@ class CometExecSuite extends CometTestBase {
553553 Seq (" struct" , " array" ).foreach { dataType =>
554554 val df =
555555 sql(s """ SELECT f.date_id, f.product_id, f.units_sold, f.store_id FROM fact_stats f
556- |JOIN dim_stats s
557- |ON $dataType(f.store_id) = $dataType(s.store_id) WHERE s.country = 'DE'
556+ |JOIN dim_stats s
557+ |ON $dataType(f.store_id) = $dataType(s.store_id) WHERE s.country = 'DE'
558558 """ .stripMargin)
559559 val (_, cometPlan) = checkSparkAnswer(df)
560560
@@ -570,6 +570,178 @@ class CometExecSuite extends CometTestBase {
570570 }
571571 }
572572
573+ test(" non-AQE DPP with two separate broadcast joins" ) {
574+ withTempDir { dir =>
575+ val path = s " ${dir.getAbsolutePath}/data "
576+ withSQLConf(CometConf .COMET_EXEC_ENABLED .key -> " false" ) {
577+ spark
578+ .range(100 )
579+ .selectExpr(
580+ " cast(id % 5 as int) as store_id" ,
581+ " cast(id % 3 as int) as region_id" ,
582+ " cast(id as int) as amount" )
583+ .write
584+ .partitionBy(" store_id" , " region_id" )
585+ .parquet(s " $path/fact " )
586+ spark
587+ .range(5 )
588+ .selectExpr(" cast(id as int) as store_id" , " cast(id as string) as store_name" )
589+ .write
590+ .parquet(s " $path/store_dim " )
591+ spark
592+ .range(3 )
593+ .selectExpr(" cast(id as int) as region_id" , " cast(id as string) as region_name" )
594+ .write
595+ .parquet(s " $path/region_dim " )
596+ }
597+
598+ withSQLConf(
599+ SQLConf .USE_V1_SOURCE_LIST .key -> " parquet" ,
600+ SQLConf .ADAPTIVE_EXECUTION_ENABLED .key -> " false" ) {
601+ spark.read.parquet(s " $path/fact " ).createOrReplaceTempView(" fact_two_joins" )
602+ spark.read.parquet(s " $path/store_dim " ).createOrReplaceTempView(" store_dim" )
603+ spark.read.parquet(s " $path/region_dim " ).createOrReplaceTempView(" region_dim" )
604+
605+ val df = spark.sql(""" SELECT f.amount, s.store_name, r.region_name
606+ |FROM fact_two_joins f
607+ |JOIN store_dim s ON f.store_id = s.store_id
608+ |JOIN region_dim r ON f.region_id = r.region_id
609+ |WHERE s.store_name = '1' AND r.region_name = '2'""" .stripMargin)
610+ val (_, cometPlan) = checkSparkAnswer(df)
611+
612+ val nativeScans = cometPlan.collect { case s : CometNativeScanExec => s }
613+ assert(nativeScans.nonEmpty, " Expected CometNativeScanExec in plan" )
614+
615+ val dppScans =
616+ nativeScans.filter(_.partitionFilters.exists(_.isInstanceOf [DynamicPruningExpression ]))
617+ assert(
618+ dppScans.nonEmpty,
619+ " Expected at least one CometNativeScanExec with DynamicPruningExpression" )
620+ }
621+ }
622+ }
623+
624+ test(" non-AQE DPP fallback when broadcast exchange is not Comet" ) {
625+ withTempDir { dir =>
626+ val path = s " ${dir.getAbsolutePath}/data "
627+ withSQLConf(CometConf .COMET_EXEC_ENABLED .key -> " false" ) {
628+ spark
629+ .range(100 )
630+ .selectExpr(" cast(id % 10 as int) as store_id" , " cast(id as int) as amount" )
631+ .write
632+ .partitionBy(" store_id" )
633+ .parquet(s " $path/fact " )
634+ spark
635+ .range(10 )
636+ .selectExpr(" cast(id as int) as store_id" , " cast(id as string) as country" )
637+ .write
638+ .parquet(s " $path/dim " )
639+ }
640+
641+ // Disable Comet broadcast exchange so SubqueryBroadcastExec wraps a Spark
642+ // BroadcastExchangeExec. convertSubqueryBroadcasts should skip it (child isn't
643+ // CometNativeExec). Query should still produce correct results via Spark's standard path.
644+ withSQLConf(
645+ SQLConf .USE_V1_SOURCE_LIST .key -> " parquet" ,
646+ SQLConf .ADAPTIVE_EXECUTION_ENABLED .key -> " false" ,
647+ CometConf .COMET_EXEC_BROADCAST_EXCHANGE_ENABLED .key -> " false" ,
648+ CometConf .COMET_EXEC_BROADCAST_HASH_JOIN_ENABLED .key -> " false" ) {
649+ spark.read.parquet(s " $path/fact " ).createOrReplaceTempView(" fact_fallback" )
650+ spark.read.parquet(s " $path/dim " ).createOrReplaceTempView(" dim_fallback" )
651+
652+ val df = spark.sql(""" SELECT f.amount, f.store_id
653+ |FROM fact_fallback f JOIN dim_fallback d
654+ |ON f.store_id = d.store_id
655+ |WHERE d.country = 'DE'""" .stripMargin)
656+ checkSparkAnswer(df)
657+ }
658+ }
659+ }
660+
661+ test(" non-AQE DPP with empty broadcast result" ) {
662+ withTempDir { dir =>
663+ val path = s " ${dir.getAbsolutePath}/data "
664+ withSQLConf(CometConf .COMET_EXEC_ENABLED .key -> " false" ) {
665+ spark
666+ .range(100 )
667+ .selectExpr(" cast(id % 10 as int) as store_id" , " cast(id as int) as amount" )
668+ .write
669+ .partitionBy(" store_id" )
670+ .parquet(s " $path/fact " )
671+ spark
672+ .range(10 )
673+ .selectExpr(" cast(id as int) as store_id" , " cast(id as string) as country" )
674+ .write
675+ .parquet(s " $path/dim " )
676+ }
677+
678+ withSQLConf(
679+ SQLConf .USE_V1_SOURCE_LIST .key -> " parquet" ,
680+ SQLConf .ADAPTIVE_EXECUTION_ENABLED .key -> " false" ) {
681+ spark.read.parquet(s " $path/fact " ).createOrReplaceTempView(" fact_empty" )
682+ spark.read.parquet(s " $path/dim " ).createOrReplaceTempView(" dim_empty" )
683+
684+ // Filter on dim that matches nothing -- DPP prunes all partitions
685+ val df = spark.sql(""" SELECT f.amount, f.store_id
686+ |FROM fact_empty f JOIN dim_empty d
687+ |ON f.store_id = d.store_id
688+ |WHERE d.country = 'NONEXISTENT'""" .stripMargin)
689+ val result = df.collect()
690+ assert(result.isEmpty, s " Expected empty result but got ${result.length} rows " )
691+ checkSparkAnswer(df)
692+ }
693+ }
694+ }
695+
696+ test(" non-AQE DPP resolves both outer and inner partition filters" ) {
697+ // CometNativeScanExec.partitionFilters and CometScanExec.partitionFilters contain
698+ // different InSubqueryExec instances. Both must be resolved for partition selection
699+ // to work correctly. This test verifies correct results, which requires both sets
700+ // of filters to be resolved.
701+ withTempDir { dir =>
702+ val path = s " ${dir.getAbsolutePath}/data "
703+ withSQLConf(CometConf .COMET_EXEC_ENABLED .key -> " false" ) {
704+ spark
705+ .range(100 )
706+ .selectExpr(
707+ " cast(id % 10 as int) as store_id" ,
708+ " cast(id as int) as date_id" ,
709+ " cast(id as int) as amount" )
710+ .write
711+ .partitionBy(" store_id" )
712+ .parquet(s " $path/fact " )
713+ spark
714+ .range(10 )
715+ .selectExpr(" cast(id as int) as store_id" , " cast(id as string) as country" )
716+ .write
717+ .parquet(s " $path/dim " )
718+ }
719+
720+ withSQLConf(
721+ SQLConf .USE_V1_SOURCE_LIST .key -> " parquet" ,
722+ SQLConf .ADAPTIVE_EXECUTION_ENABLED .key -> " false" ,
723+ SQLConf .DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY .key -> " true" ) {
724+ spark.read.parquet(s " $path/fact " ).createOrReplaceTempView(" fact_dual" )
725+ spark.read.parquet(s " $path/dim " ).createOrReplaceTempView(" dim_dual" )
726+
727+ val df = spark.sql(""" SELECT f.date_id, f.store_id
728+ |FROM fact_dual f JOIN dim_dual d
729+ |ON f.store_id = d.store_id
730+ |WHERE d.country = 'DE'""" .stripMargin)
731+ val (_, cometPlan) = checkSparkAnswer(df)
732+
733+ // Verify native scan is used
734+ val nativeScans = cometPlan.collect { case s : CometNativeScanExec => s }
735+ assert(nativeScans.nonEmpty, " Expected CometNativeScanExec in plan" )
736+
737+ // Verify DPP is present
738+ val dppScans =
739+ nativeScans.filter(_.partitionFilters.exists(_.isInstanceOf [DynamicPruningExpression ]))
740+ assert(dppScans.nonEmpty, " Expected DPP filter on native scan" )
741+ }
742+ }
743+ }
744+
573745 test(" ShuffleQueryStageExec could be direct child node of CometBroadcastExchangeExec" ) {
574746 withSQLConf(CometConf .COMET_SHUFFLE_MODE .key -> " jvm" ) {
575747 val table = " src"
0 commit comments