Skip to content

Commit 79ba12a

Browse files
PetarVasiljevic-DBcloud-fan
authored andcommitted
[SPARK-52929][SQL] Support MySQL and SQLServer connector for DSv2 Join pushdown
### What changes were proposed in this pull request? Similar to #51594, I am enabling the join pushdown for MySQL and SQLServer connectors. Additionally, I am moving `JoinPushdownAliasGenerator` to `JdbcSQLQueryBuilder` and using it there as well to generate new subquery alias used for full join subquery (previously, subqueries were generated only for left and right side, but now the whole thing needs to be subqueried). This is needed for these 2 dialects. H2, Oracle and Postgres didn't hit this issue, but they also still work with this, so there is no need for specialization. ### Why are the changes needed? MySQL and SQLServer connectors don't support join pushdown. ### Does this PR introduce _any_ user-facing change? No, since the flag `spark.sql.optimizer.datasourceV2JoinPushdown` is disabled. If enabled, it will be attempted to push down to join directly to the scan node. ### How was this patch tested? New tests. ### Was this patch authored or co-authored using generative AI tooling? Closes #51637 from PetarVasiljevic-DB/support_join_for_mysql. Authored-by: Petar Vasiljevic <petar.vasiljevic@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent e824f88 commit 79ba12a

File tree

7 files changed

+118
-13
lines changed

7 files changed

+118
-13
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.jdbc.v2.join
19+
20+
import java.sql.Connection
21+
import java.util.Locale
22+
23+
import org.apache.spark.sql.jdbc.{DockerJDBCIntegrationSuite, JdbcDialect, MsSQLServerDatabaseOnDocker, MsSqlServerDialect}
24+
import org.apache.spark.sql.jdbc.v2.JDBCV2JoinPushdownIntegrationSuiteBase
25+
import org.apache.spark.tags.DockerTest
26+
27+
/**
28+
* To run this test suite for a specific version (e.g., 2022-CU15-ubuntu-22.04):
29+
* {{{
30+
* ENABLE_DOCKER_INTEGRATION_TESTS=1
31+
* MSSQLSERVER_DOCKER_IMAGE_NAME=mcr.microsoft.com/mssql/server:2022-CU15-ubuntu-22.04
32+
* ./build/sbt -Pdocker-integration-tests "testOnly *v2*MsSqlServerIntegrationSuite"
33+
* }}}
34+
*/
35+
@DockerTest
36+
class MsSqlServerJoinPushdownIntegrationSuite
37+
extends DockerJDBCIntegrationSuite
38+
with JDBCV2JoinPushdownIntegrationSuiteBase {
39+
override val db = new MsSQLServerDatabaseOnDocker
40+
41+
override val url = db.getJdbcUrl(dockerIp, externalPort)
42+
43+
override val jdbcDialect: JdbcDialect = MsSqlServerDialect()
44+
45+
override def caseConvert(identifier: String): String = identifier.toUpperCase(Locale.ROOT)
46+
47+
// This method comes from DockerJDBCIntegrationSuite
48+
override def dataPreparation(connection: Connection): Unit = {
49+
super.dataPreparation()
50+
}
51+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.jdbc.v2.join
19+
20+
import java.sql.Connection
21+
import java.util.Locale
22+
23+
import org.apache.spark.sql.jdbc.{DockerJDBCIntegrationSuite, JdbcDialect, MySQLDatabaseOnDocker, MySQLDialect}
24+
import org.apache.spark.sql.jdbc.v2.JDBCV2JoinPushdownIntegrationSuiteBase
25+
import org.apache.spark.tags.DockerTest
26+
27+
/**
28+
* To run this test suite for a specific version (e.g., mysql:9.2.0):
29+
* {{{
30+
* ENABLE_DOCKER_INTEGRATION_TESTS=1 MYSQL_DOCKER_IMAGE_NAME=mysql:9.2.0
31+
* ./build/sbt -Pdocker-integration-tests "testOnly *v2*MySQLIntegrationSuite"
32+
* }}}
33+
*/
34+
@DockerTest
35+
class MySQLJoinPushdownIntegrationSuite
36+
extends DockerJDBCIntegrationSuite
37+
with JDBCV2JoinPushdownIntegrationSuiteBase {
38+
override val db = new MySQLDatabaseOnDocker
39+
40+
override val url = db.getJdbcUrl(dockerIp, externalPort)
41+
42+
override val jdbcDialect: JdbcDialect = MySQLDialect()
43+
44+
override def caseConvert(identifier: String): String = identifier.toUpperCase(Locale.ROOT)
45+
46+
// This method comes from DockerJDBCIntegrationSuite
47+
override def dataPreparation(connection: Connection): Unit = {
48+
super.dataPreparation()
49+
}
50+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScanBuilder.scala

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.sql.connector.read.{ScanBuilder, SupportsPushDownAggrega
2828
import org.apache.spark.sql.execution.datasources.PartitioningUtils
2929
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JDBCPartition, JDBCRDD, JDBCRelation}
3030
import org.apache.spark.sql.execution.datasources.v2.TableSampleInfo
31-
import org.apache.spark.sql.jdbc.{JdbcDialects, JdbcSQLQueryBuilder}
31+
import org.apache.spark.sql.jdbc.{JdbcDialects, JdbcSQLQueryBuilder, JoinPushdownAliasGenerator}
3232
import org.apache.spark.sql.types.StructType
3333

3434
case class JDBCScanBuilder(
@@ -335,11 +335,3 @@ case class JDBCScanBuilder(
335335
}
336336

337337
}
338-
339-
object JoinPushdownAliasGenerator {
340-
private val subQueryId = new java.util.concurrent.atomic.AtomicLong()
341-
342-
def getSubqueryQualifier: String = {
343-
"join_subquery_" + subQueryId.getAndIncrement()
344-
}
345-
}

sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcSQLQueryBuilder.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ class JdbcSQLQueryBuilder(dialect: JdbcDialect, options: JDBCOptions) {
200200
|$joinType
201201
|(${right.build()}) $rightSideQualifier
202202
|ON $joinCondition
203-
|)""".stripMargin
203+
|) ${JoinPushdownAliasGenerator.getSubqueryQualifier}""".stripMargin
204204
)
205205

206206
this
@@ -224,3 +224,11 @@ class JdbcSQLQueryBuilder(dialect: JdbcDialect, options: JDBCOptions) {
224224
s" $whereClause $groupByClause $orderByClause $limitClause $offsetClause"
225225
}
226226
}
227+
228+
object JoinPushdownAliasGenerator {
229+
private val subQueryId = new java.util.concurrent.atomic.AtomicLong()
230+
231+
def getSubqueryQualifier: String = {
232+
"join_subquery_" + subQueryId.getAndIncrement()
233+
}
234+
}

sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ private case class MsSqlServerDialect() extends JdbcDialect with NoLegacyJDBCErr
265265
val limitClause = dialect.getLimitClause(limit)
266266

267267
options.prepareQuery +
268-
s"SELECT $limitClause $columnList FROM ${options.tableOrQuery}" +
268+
s"SELECT $limitClause $columnList FROM $tableOrQuery" +
269269
s" $whereClause $groupByClause $orderByClause"
270270
}
271271
}
@@ -286,6 +286,8 @@ private case class MsSqlServerDialect() extends JdbcDialect with NoLegacyJDBCErr
286286
}
287287

288288
override def supportsLimit: Boolean = true
289+
290+
override def supportsJoin: Boolean = true
289291
}
290292

291293
private object MsSqlServerDialect {

sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,7 @@ private case class MySQLDialect() extends JdbcDialect with SQLConfHelper with No
430430
}
431431

432432
options.prepareQuery +
433-
s"SELECT $hintClause$columnList FROM ${options.tableOrQuery} $tableSampleClause" +
433+
s"SELECT $hintClause$columnList FROM $tableOrQuery $tableSampleClause" +
434434
s" $whereClause $groupByClause $orderByClause $limitOrOffsetStmt"
435435
}
436436
}
@@ -443,4 +443,6 @@ private case class MySQLDialect() extends JdbcDialect with SQLConfHelper with No
443443
override def supportsOffset: Boolean = true
444444

445445
override def supportsHint: Boolean = true
446+
447+
override def supportsJoin: Boolean = true
446448
}

sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCV2JoinPushdownIntegrationSuiteBase.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase
109109
def schemaPreparation(): Unit = {
110110
withConnection {conn =>
111111
conn
112-
.prepareStatement(s"CREATE SCHEMA IF NOT EXISTS ${quoteSchemaName(namespace)}")
112+
.prepareStatement(s"CREATE SCHEMA ${quoteSchemaName(namespace)}")
113113
.executeUpdate()
114114
}
115115
}

0 commit comments

Comments
 (0)