Skip to content

Commit ee8214e

Browse files
committed
Spark: Initial support for 4.1.0
| Cause | Type | Category | Description | Affected Files | |-------|------|----------|-------------|----------------| | - | Feat | Feature | Introduce Spark41Shims and update build configuration to support Spark 4.1. | pom.xml<br>shims/pom.xml<br>shims/spark41/pom.xml<br>shims/spark41/.../META-INF/services/org.apache.gluten.sql.shims.SparkShimProvider<br>shims/spark41/.../spark41/Spark41Shims.scala<br>shims/spark41/.../spark41/SparkShimProvider.scala | | [#51477](apache/spark#51477) | Fix | Compatibility | Use class name instead of class object for streaming call detection to ensure Spark 4.1 compatibility. | gluten-core/.../caller/CallerInfo.scala | | [#50852](apache/spark#50852) | Fix | Compatibility | Add printOutputColumns parameter to generateTreeString methods | shims/spark41/.../GenerateTreeStringShim.scala | | [#51775](apache/spark#51775) | Fix | Compatibility | Remove unused MDC import in FileSourceScanExecShim.scala | shims/spark41/.../FileSourceScanExecShim.scala | | [#51979](apache/spark#51979) | Fix | Compatibility | Add missing StoragePartitionJoinParams import in BatchScanExecShim and AbstractBatchScanExec | shims/spark41/.../v2/AbstractBatchScanExec.scala<br>shims/spark41/.../v2/BatchScanExecShim.scala | | [#51302](apache/spark#51302) | Fix | Compatibility | Remove TimeAdd from ExpressionConverter and ExpressionMappings for test | gluten-substrait/.../ExpressionConverter.scala<br>gluten-substrait/.../ExpressionMappings.scala | | [#50598](apache/spark#50598) | Fix | Compatibility | Adapt to QueryExecution.createSparkPlan interface change | gluten-substrait/.../GlutenImplicits.scala<br>shims/spark\*/.../shims/spark\*/Spark*Shims.scala | | [#52599](apache/spark#52599) | Fix | Compatibility | Adapt to DataSourceV2Relation interface change | backends-velox/.../ArrowConvertorRule.scala | | [#52384](apache/spark#52384) | Fix | Compatibility | Using new interface of ParquetFooterReader | backends-velox/.../ParquetMetadataUtils.scala<br>gluten-ut/spark40/.../parquet/GlutenParquetRowIndexSuite.scala<br>shims/spark*/.../parquet/ParquetFooterReaderShim.scala | | [#52509](apache/spark#52509) | Fix | Build | Update Scala version to 2.13.17 in pom.xml to fix `java.lang.NoSuchMethodError: 'java.lang.String scala.util.hashing.MurmurHash3$.caseClassHash$default$2()'` | pom.xml | | - | Fix | Test | Refactor Spark version checks in VeloxHashJoinSuite to improve readability and maintainability | backends-velox/.../VeloxHashJoinSuite.scala | | [#50849](apache/spark#50849) | Fix | Test | Fix MiscOperatorSuite to support OneRowRelationExec plan Spark 4.1 | backends-velox/.../MiscOperatorSuite.scala | | [#52723](apache/spark#52723) | Fix | Compatibility | Add GeographyVal and GeometryVal support in ColumnarArrayShim | shims/spark41/.../vectorized/ColumnarArrayShim.java | | [#48470](apache/spark#48470) | 4.1.0 | Exclude | Exclude split test in VeloxStringFunctionsSuite | backends-velox/.../VeloxStringFunctionsSuite.scala | | [#51259](apache/spark#51259) | 4.1.0 | Exclude | Only Run ArrowEvalPythonExecSuite tests up to Spark 4.0, we need update ci python to 3.10 | backends-velox/.../python/ArrowEvalPythonExecSuite.scala |
1 parent 90e08d2 commit ee8214e

File tree

39 files changed

+615
-157
lines changed

39 files changed

+615
-157
lines changed

.github/workflows/util/install-spark-resources.sh

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,11 @@ case "$1" in
119119
cd ${INSTALL_DIR} && \
120120
install_spark "4.0.1" "3" "2.12"
121121
;;
122+
4.1)
123+
# Spark-4.x, scala 2.12 // using 2.12 as a hack as 4.0 does not have 2.13 suffix
124+
cd ${INSTALL_DIR} && \
125+
install_spark "4.1.0" "3" "2.12"
126+
;;
122127
*)
123128
echo "Spark version is expected to be specified."
124129
exit 1

.github/workflows/velox_backend_x86.yml

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1482,3 +1482,109 @@ jobs:
14821482
**/target/*.log
14831483
**/gluten-ut/**/hs_err_*.log
14841484
**/gluten-ut/**/core.*
1485+
1486+
spark-test-spark41:
1487+
needs: build-native-lib-centos-7
1488+
runs-on: ubuntu-22.04
1489+
env:
1490+
SPARK_TESTING: true
1491+
container: apache/gluten:centos-8-jdk17
1492+
steps:
1493+
- uses: actions/checkout@v2
1494+
- name: Download All Artifacts
1495+
uses: actions/download-artifact@v4
1496+
with:
1497+
name: velox-native-lib-centos-7-${{github.sha}}
1498+
path: ./cpp/build/releases
1499+
- name: Download Arrow Jars
1500+
uses: actions/download-artifact@v4
1501+
with:
1502+
name: arrow-jars-centos-7-${{github.sha}}
1503+
path: /root/.m2/repository/org/apache/arrow/
1504+
- name: Prepare
1505+
run: |
1506+
dnf module -y install python39 && \
1507+
alternatives --set python3 /usr/bin/python3.9 && \
1508+
pip3 install setuptools==77.0.3 && \
1509+
pip3 install pyspark==3.5.5 cython && \
1510+
pip3 install pandas==2.2.3 pyarrow==20.0.0
1511+
- name: Prepare Spark Resources for Spark 4.1.0 #TODO remove after image update
1512+
run: |
1513+
rm -rf /opt/shims/spark41
1514+
bash .github/workflows/util/install-spark-resources.sh 4.1
1515+
mv /opt/shims/spark41/spark_home/assembly/target/scala-2.12 /opt/shims/spark41/spark_home/assembly/target/scala-2.13
1516+
- name: Build and Run unit test for Spark 4.1.0 with scala-2.13 (other tests)
1517+
run: |
1518+
cd $GITHUB_WORKSPACE/
1519+
export SPARK_SCALA_VERSION=2.13
1520+
yum install -y java-17-openjdk-devel
1521+
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk
1522+
export PATH=$JAVA_HOME/bin:$PATH
1523+
java -version
1524+
$MVN_CMD clean test -Pspark-4.1 -Pscala-2.13 -Pjava-17 -Pbackends-velox \
1525+
-Pspark-ut -DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/" \
1526+
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.EnhancedFeaturesTest,org.apache.gluten.tags.SkipTest
1527+
- name: Upload test report
1528+
if: always()
1529+
uses: actions/upload-artifact@v4
1530+
with:
1531+
name: ${{ github.job }}-report
1532+
path: '**/surefire-reports/TEST-*.xml'
1533+
- name: Upload unit tests log files
1534+
if: ${{ !success() }}
1535+
uses: actions/upload-artifact@v4
1536+
with:
1537+
name: ${{ github.job }}-test-log
1538+
path: |
1539+
**/target/*.log
1540+
**/gluten-ut/**/hs_err_*.log
1541+
**/gluten-ut/**/core.*
1542+
1543+
spark-test-spark41-slow:
1544+
needs: build-native-lib-centos-7
1545+
runs-on: ubuntu-22.04
1546+
env:
1547+
SPARK_TESTING: true
1548+
container: apache/gluten:centos-8-jdk17
1549+
steps:
1550+
- uses: actions/checkout@v2
1551+
- name: Download All Artifacts
1552+
uses: actions/download-artifact@v4
1553+
with:
1554+
name: velox-native-lib-centos-7-${{github.sha}}
1555+
path: ./cpp/build/releases
1556+
- name: Download Arrow Jars
1557+
uses: actions/download-artifact@v4
1558+
with:
1559+
name: arrow-jars-centos-7-${{github.sha}}
1560+
path: /root/.m2/repository/org/apache/arrow/
1561+
- name: Prepare Spark Resources for Spark 4.1.0 #TODO remove after image update
1562+
run: |
1563+
rm -rf /opt/shims/spark41
1564+
bash .github/workflows/util/install-spark-resources.sh 4.1
1565+
mv /opt/shims/spark41/spark_home/assembly/target/scala-2.12 /opt/shims/spark41/spark_home/assembly/target/scala-2.13
1566+
- name: Build and Run unit test for Spark 4.0 (slow tests)
1567+
run: |
1568+
cd $GITHUB_WORKSPACE/
1569+
yum install -y java-17-openjdk-devel
1570+
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk
1571+
export PATH=$JAVA_HOME/bin:$PATH
1572+
java -version
1573+
$MVN_CMD clean test -Pspark-4.1 -Pscala-2.13 -Pjava-17 -Pbackends-velox -Pspark-ut \
1574+
-DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/" \
1575+
-DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
1576+
- name: Upload test report
1577+
if: always()
1578+
uses: actions/upload-artifact@v4
1579+
with:
1580+
name: ${{ github.job }}-report
1581+
path: '**/surefire-reports/TEST-*.xml'
1582+
- name: Upload unit tests log files
1583+
if: ${{ !success() }}
1584+
uses: actions/upload-artifact@v4
1585+
with:
1586+
name: ${{ github.job }}-test-log
1587+
path: |
1588+
**/target/*.log
1589+
**/gluten-ut/**/hs_err_*.log
1590+
**/gluten-ut/**/core.*

backends-velox/src/main/scala/org/apache/gluten/extension/ArrowConvertorRule.scala

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,24 @@ import java.nio.charset.StandardCharsets
3838

3939
import scala.collection.convert.ImplicitConversions.`map AsScala`
4040

41+
/**
42+
* Extracts a CSVTable from a DataSourceV2Relation.
43+
*
44+
* Only the table variable of DataSourceV2Relation is accessed to improve compatibility across
45+
* different Spark versions.
46+
* @since Spark
47+
* 4.1
48+
*/
49+
private object CSVTableExtractor {
50+
def unapply(relation: DataSourceV2Relation): Option[(DataSourceV2Relation, CSVTable)] = {
51+
relation.table match {
52+
case t: CSVTable =>
53+
Some((relation, t))
54+
case _ => None
55+
}
56+
}
57+
}
58+
4159
@Experimental
4260
case class ArrowConvertorRule(session: SparkSession) extends Rule[LogicalPlan] {
4361
override def apply(plan: LogicalPlan): LogicalPlan = {
@@ -56,25 +74,15 @@ case class ArrowConvertorRule(session: SparkSession) extends Rule[LogicalPlan] {
5674
l.copy(relation = r.copy(fileFormat = new ArrowCSVFileFormat(csvOptions))(session))
5775
case _ => l
5876
}
59-
case d @ DataSourceV2Relation(
60-
t @ CSVTable(
61-
name,
62-
sparkSession,
63-
options,
64-
paths,
65-
userSpecifiedSchema,
66-
fallbackFileFormat),
67-
_,
68-
_,
69-
_,
70-
_) if validate(session, t.dataSchema, options.asCaseSensitiveMap().toMap) =>
77+
case CSVTableExtractor(d, t)
78+
if validate(session, t.dataSchema, t.options.asCaseSensitiveMap().toMap) =>
7179
d.copy(table = ArrowCSVTable(
72-
"arrow" + name,
73-
sparkSession,
74-
options,
75-
paths,
76-
userSpecifiedSchema,
77-
fallbackFileFormat))
80+
"arrow" + t.name,
81+
t.sparkSession,
82+
t.options,
83+
t.paths,
84+
t.userSpecifiedSchema,
85+
t.fallbackFileFormat))
7886
case r =>
7987
r
8088
}

backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import org.apache.gluten.sql.shims.SparkShimLoader
2121

2222
import org.apache.spark.internal.Logging
2323
import org.apache.spark.sql.execution.datasources.DataSourceUtils
24-
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFooterReader, ParquetOptions}
24+
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFooterReaderShim, ParquetOptions}
2525

2626
import org.apache.hadoop.conf.Configuration
2727
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path}
@@ -135,7 +135,7 @@ object ParquetMetadataUtils extends Logging {
135135
parquetOptions: ParquetOptions): Option[String] = {
136136
val footer =
137137
try {
138-
ParquetFooterReader.readFooter(conf, fileStatus, ParquetMetadataConverter.NO_FILTER)
138+
ParquetFooterReaderShim.readFooter(conf, fileStatus, ParquetMetadataConverter.NO_FILTER)
139139
} catch {
140140
case e: Exception if ExceptionUtils.hasCause(e, classOf[ParquetCryptoRuntimeException]) =>
141141
return Some("Encrypted Parquet footer detected.")

backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -753,7 +753,11 @@ class MiscOperatorSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa
753753
val df = sql("SELECT 1")
754754
checkAnswer(df, Row(1))
755755
val plan = df.queryExecution.executedPlan
756-
assert(plan.find(_.isInstanceOf[RDDScanExec]).isDefined)
756+
if (isSparkVersionGE("4.1")) {
757+
assert(plan.find(_.getClass.getSimpleName == "OneRowRelationExec").isDefined)
758+
} else {
759+
assert(plan.find(_.isInstanceOf[RDDScanExec]).isDefined)
760+
}
757761
assert(plan.find(_.isInstanceOf[ProjectExecTransformer]).isDefined)
758762
assert(plan.find(_.isInstanceOf[RowToVeloxColumnarExec]).isDefined)
759763
}

backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,12 +92,9 @@ class VeloxHashJoinSuite extends VeloxWholeStageTransformerSuite {
9292

9393
// The computing is combined into one single whole stage transformer.
9494
val wholeStages = plan.collect { case wst: WholeStageTransformer => wst }
95-
if (SparkShimLoader.getSparkVersion.startsWith("3.2.")) {
95+
if (isSparkVersionLE("3.2")) {
9696
assert(wholeStages.length == 1)
97-
} else if (
98-
SparkShimLoader.getSparkVersion.startsWith("3.5.") ||
99-
SparkShimLoader.getSparkVersion.startsWith("4.0.")
100-
) {
97+
} else if (isSparkVersionGE("3.5")) {
10198
assert(wholeStages.length == 5)
10299
} else {
103100
assert(wholeStages.length == 3)

backends-velox/src/test/scala/org/apache/gluten/execution/VeloxStringFunctionsSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -544,7 +544,8 @@ class VeloxStringFunctionsSuite extends VeloxWholeStageTransformerSuite {
544544
s"from $LINEITEM_TABLE limit 5") { _ => }
545545
}
546546

547-
testWithMinSparkVersion("split", "3.4") {
547+
// TODO: fix on spark-4.1
548+
testWithSpecifiedSparkVersion("split", "3.4", "3.5") {
548549
runQueryAndCompare(
549550
s"select l_orderkey, l_comment, split(l_comment, '') " +
550551
s"from $LINEITEM_TABLE limit 5") {

backends-velox/src/test/scala/org/apache/gluten/execution/python/ArrowEvalPythonExecSuite.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ class ArrowEvalPythonExecSuite extends WholeStageTransformerSuite {
3939
.set("spark.executor.cores", "1")
4040
}
4141

42-
test("arrow_udf test: without projection") {
42+
// TODO: fix on spark-4.1
43+
testWithMaxSparkVersion("arrow_udf test: without projection", "4.0") {
4344
lazy val base =
4445
Seq(("1", 1), ("1", 2), ("2", 1), ("2", 2), ("3", 1), ("3", 2), ("0", 1), ("3", 0))
4546
.toDF("a", "b")
@@ -59,7 +60,8 @@ class ArrowEvalPythonExecSuite extends WholeStageTransformerSuite {
5960
checkAnswer(df2, expected)
6061
}
6162

62-
test("arrow_udf test: with unrelated projection") {
63+
// TODO: fix on spark-4.1
64+
testWithMaxSparkVersion("arrow_udf test: with unrelated projection", "4.0") {
6365
lazy val base =
6466
Seq(("1", 1), ("1", 2), ("2", 1), ("2", 2), ("3", 1), ("3", 2), ("0", 1), ("3", 0))
6567
.toDF("a", "b")

gluten-core/src/main/scala/org/apache/gluten/extension/caller/CallerInfo.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package org.apache.gluten.extension.caller
1818

1919
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
2020
import org.apache.spark.sql.execution.columnar.InMemoryRelation
21-
import org.apache.spark.sql.execution.streaming.StreamExecution
21+
import org.apache.spark.util.SparkVersionUtil
2222

2323
/**
2424
* Helper API that stores information about the call site of the columnar rule. Specific columnar
@@ -70,7 +70,12 @@ object CallerInfo {
7070
}
7171

7272
private def inStreamingCall(stack: Seq[StackTraceElement]): Boolean = {
73-
stack.exists(_.getClassName.equals(StreamExecution.getClass.getName.split('$').head))
73+
val streamName = if (SparkVersionUtil.gteSpark41) {
74+
"org.apache.spark.sql.execution.streaming.runtime.StreamExecution"
75+
} else {
76+
"org.apache.spark.sql.execution.streaming.StreamExecution"
77+
}
78+
stack.exists(_.getClassName.equals(streamName))
7479
}
7580

7681
private def inBloomFilterStatFunctionCall(stack: Seq[StackTraceElement]): Boolean = {

gluten-core/src/main/scala/org/apache/spark/util/SparkVersionUtil.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ object SparkVersionUtil {
2525
val gteSpark33: Boolean = comparedWithSpark33 >= 0
2626
val gteSpark35: Boolean = comparedWithSpark35 >= 0
2727
val gteSpark40: Boolean = compareMajorMinorVersion((4, 0)) >= 0
28+
val gteSpark41: Boolean = compareMajorMinorVersion((4, 1)) >= 0
2829

2930
// Returns X. X < 0 if one < other, x == 0 if one == other, x > 0 if one > other.
3031
def compareMajorMinorVersion(other: (Int, Int)): Int = {

0 commit comments

Comments
 (0)