Skip to content

Commit 4ede214

Browse files
andygrovecomphead
andauthored
chore: Add config for enabling SMJ with join condition (apache#937)
* Add config for enabling SMJ with join condition * Update common/src/main/scala/org/apache/comet/CometConf.scala Co-authored-by: Oleks V <comphead@users.noreply.github.com> * Update docs/source/user-guide/configs.md Co-authored-by: Oleks V <comphead@users.noreply.github.com> * enable config in stability suite --------- Co-authored-by: Oleks V <comphead@users.noreply.github.com>
1 parent e98ca3c commit 4ede214

6 files changed

Lines changed: 17 additions & 0 deletions

File tree

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,12 @@ object CometConf extends ShimCometConf {
145145
val COMET_EXEC_TAKE_ORDERED_AND_PROJECT_ENABLED: ConfigEntry[Boolean] =
146146
createExecEnabledConfig("takeOrderedAndProject", defaultValue = true)
147147

148+
val COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED: ConfigEntry[Boolean] =
149+
conf("spark.comet.exec.sortMergeJoinWithJoinFilter.enabled")
150+
.doc("Experimental support for Sort Merge Join with filter")
151+
.booleanConf
152+
.createWithDefault(false)
153+
148154
val COMET_EXPR_STDDEV_ENABLED: ConfigEntry[Boolean] =
149155
createExecEnabledConfig(
150156
"stddev",

docs/source/user-guide/configs.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ Comet provides the following configuration settings.
5454
| spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | true |
5555
| spark.comet.exec.sort.enabled | Whether to enable sort by default. | true |
5656
| spark.comet.exec.sortMergeJoin.enabled | Whether to enable sortMergeJoin by default. | true |
57+
| spark.comet.exec.sortMergeJoinWithJoinFilter.enabled | Experimental support for Sort Merge Join with filter | false |
5758
| spark.comet.exec.stddev.enabled | Whether to enable stddev by default. stddev is slower than Spark's implementation. | true |
5859
| spark.comet.exec.takeOrderedAndProject.enabled | Whether to enable takeOrderedAndProject by default. | true |
5960
| spark.comet.exec.union.enabled | Whether to enable union by default. | true |

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2961,6 +2961,13 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
29612961
}
29622962
}
29632963

2964+
if (join.condition.isDefined &&
2965+
!CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED
2966+
.get(conf)) {
2967+
withInfo(join, join.condition.get)
2968+
return None
2969+
}
2970+
29642971
val condition = join.condition.map { cond =>
29652972
val condProto = exprToProto(cond, join.left.output ++ join.right.output)
29662973
if (condProto.isEmpty) {

spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,7 @@ class CometJoinSuite extends CometTestBase {
340340

341341
test("SortMergeJoin with join filter") {
342342
withSQLConf(
343+
CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key -> "true",
343344
SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
344345
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
345346
withParquetTable((0 until 10).map(i => (i, i % 5)), "tbl_a") {

spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ abstract class CometTestBase
8080
conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true")
8181
conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true")
8282
conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "2g")
83+
conf.set(CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key, "true")
8384
conf
8485
}
8586

spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,7 @@ trait CometPlanStabilitySuite extends DisableAdaptiveExecutionSuite with TPCDSBa
262262
CometConf.COMET_EXEC_ENABLED.key -> "true",
263263
CometConf.COMET_DPP_FALLBACK_ENABLED.key -> "false",
264264
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
265+
CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key -> "true",
265266
CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true", // needed for v1.4/q9, v1.4/q44, v2.7.0/q6, v2.7.0/q64
266267
"spark.sql.readSideCharPadding" -> "false",
267268
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10MB") {

0 commit comments

Comments
 (0)