Skip to content

Commit 7b587b4

Browse files
committed
Add Spark 3.5.5 support with new compatibility shims
1 parent 664e681 commit 7b587b4

7 files changed

Lines changed: 285 additions & 0 deletions

File tree

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.shims
21+
22+
import org.apache.spark.paths.SparkPath
23+
import org.apache.spark.sql.catalyst.InternalRow
24+
import org.apache.spark.sql.execution.datasources.PartitionedFile
25+
26+
object ShimBatchReader {
27+
28+
def newPartitionedFile(partitionValues: InternalRow, file: String): PartitionedFile =
29+
PartitionedFile(
30+
partitionValues,
31+
SparkPath.fromPathString(file),
32+
-1, // -1 means we read the entire file
33+
-1,
34+
Array.empty[String],
35+
0,
36+
0,
37+
Map.empty)
38+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.sql.comet.shims
21+
22+
import org.apache.spark.SparkArithmeticException
23+
import org.apache.spark.sql.errors.QueryExecutionErrors.toSQLConf
24+
import org.apache.spark.sql.internal.SQLConf
25+
26+
// TODO: Only the Spark 3.3 version of this class is different from the others.
27+
// Remove this class after dropping Spark 3.3 support.
28+
class ShimCastOverflowException(t: String, from: String, to: String)
29+
extends SparkArithmeticException(
30+
"CAST_OVERFLOW",
31+
Map(
32+
"value" -> t,
33+
"sourceType" -> s""""$from"""",
34+
"targetType" -> s""""$to"""",
35+
"ansiConfig" -> toSQLConf(SQLConf.ANSI_ENABLED.key)),
36+
Array.empty,
37+
"") {}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.sql.comet.shims
21+
22+
import scala.collection.mutable.ArrayBuffer
23+
24+
import org.apache.spark.executor.TaskMetrics
25+
import org.apache.spark.util.AccumulatorV2
26+
27+
object ShimTaskMetrics {
28+
29+
def getTaskAccumulator(taskMetrics: TaskMetrics): Option[AccumulatorV2[_, _]] =
30+
taskMetrics.withExternalAccums(identity[ArrayBuffer[AccumulatorV2[_, _]]](_)).lastOption
31+
}

pom.xml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -585,6 +585,21 @@ under the License.
585585
</properties>
586586
</profile>
587587

588+
<profile>
589+
<id>spark-3.5.5</id>
590+
<properties>
591+
<scala.version>2.12.18</scala.version>
592+
<spark.version>3.5.5</spark.version>
593+
<spark.version.short>3.5</spark.version.short>
594+
<parquet.version>1.13.1</parquet.version>
595+
<slf4j.version>2.0.7</slf4j.version>
596+
<shims.minorVerSrc>spark-3.5.5</shims.minorVerSrc>
597+
<shims.pre35Src>not-needed</shims.pre35Src>
598+
<additional.pre35.test.source>not-needed</additional.pre35.test.source>
599+
<additional.3_5.test.source>spark-3.5</additional.3_5.test.source>
600+
</properties>
601+
</profile>
602+
588603
<profile>
589604
<!-- FIXME: this is WIP. Tests may fail https://github.com/apache/datafusion-comet/issues/551 -->
590605
<id>spark-4.0</id>
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.comet.shims
20+
21+
import org.apache.comet.expressions.CometEvalMode
22+
import org.apache.spark.sql.catalyst.expressions._
23+
import org.apache.spark.sql.types.{DataType, TimestampNTZType}
24+
25+
/**
26+
* `CometExprShim` acts as a shim for for parsing expressions from different Spark versions.
27+
*/
28+
trait CometExprShim {
29+
/**
30+
* Returns a tuple of expressions for the `unhex` function.
31+
*/
32+
protected def unhexSerde(unhex: Unhex): (Expression, Expression) = {
33+
(unhex.child, Literal(unhex.failOnError))
34+
}
35+
36+
protected def isTimestampNTZType(dt: DataType): Boolean = dt match {
37+
case _: TimestampNTZType => true
38+
case _ => false
39+
}
40+
41+
protected def evalMode(c: Cast): CometEvalMode.Value =
42+
CometEvalModeUtil.fromSparkEvalMode(c.evalMode)
43+
}
44+
45+
object CometEvalModeUtil {
46+
def fromSparkEvalMode(evalMode: EvalMode.Value): CometEvalMode.Value = evalMode match {
47+
case EvalMode.LEGACY => CometEvalMode.LEGACY
48+
case EvalMode.TRY => CometEvalMode.TRY
49+
case EvalMode.ANSI => CometEvalMode.ANSI
50+
}
51+
}
52+
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.shims
21+
22+
import org.apache.spark.sql.internal.LegacyBehaviorPolicy
23+
import org.apache.spark.sql.internal.SQLConf
24+
25+
trait ShimSQLConf {
26+
27+
/**
28+
* Spark 3.4 renamed parquetFilterPushDownStringStartWith to
29+
* parquetFilterPushDownStringPredicate
30+
*/
31+
protected def getPushDownStringPredicate(sqlConf: SQLConf): Boolean =
32+
sqlConf.parquetFilterPushDownStringPredicate
33+
34+
protected val LEGACY = LegacyBehaviorPolicy.LEGACY
35+
protected val CORRECTED = LegacyBehaviorPolicy.CORRECTED
36+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.sql.comet.shims
21+
22+
23+
import org.apache.hadoop.fs.Path
24+
25+
import org.apache.spark.sql.SparkSession
26+
import org.apache.spark.sql.catalyst.InternalRow
27+
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
28+
import org.apache.spark.sql.errors.QueryExecutionErrors
29+
import org.apache.spark.sql.execution.{FileSourceScanExec, PartitionedFileUtil}
30+
import org.apache.spark.sql.execution.datasources._
31+
import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
32+
import org.apache.spark.sql.sources.Filter
33+
import org.apache.spark.sql.types.StructType
34+
35+
trait ShimCometScanExec {
36+
def wrapped: FileSourceScanExec
37+
38+
lazy val fileConstantMetadataColumns: Seq[AttributeReference] =
39+
wrapped.fileConstantMetadataColumns
40+
41+
protected def newFileScanRDD(
42+
fsRelation: HadoopFsRelation,
43+
readFunction: PartitionedFile => Iterator[InternalRow],
44+
filePartitions: Seq[FilePartition],
45+
readSchema: StructType,
46+
options: ParquetOptions): FileScanRDD = new FileScanRDD(
47+
fsRelation.sparkSession,
48+
readFunction,
49+
filePartitions,
50+
readSchema,
51+
fileConstantMetadataColumns,
52+
fsRelation.fileFormat.fileConstantMetadataExtractors,
53+
options)
54+
55+
protected def invalidBucketFile(path: String, sparkVersion: String): Throwable =
56+
QueryExecutionErrors.invalidBucketFile(path)
57+
58+
// see SPARK-39634
59+
protected def isNeededForSchema(sparkSchema: StructType): Boolean = false
60+
61+
protected def getPartitionedFile(f: FileStatusWithMetadata, p: PartitionDirectory): PartitionedFile =
62+
PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values)
63+
64+
protected def splitFiles(sparkSession: SparkSession,
65+
file: FileStatusWithMetadata,
66+
filePath: Path,
67+
isSplitable: Boolean,
68+
maxSplitBytes: Long,
69+
partitionValues: InternalRow): Seq[PartitionedFile] =
70+
PartitionedFileUtil.splitFiles(sparkSession, file, filePath, isSplitable, maxSplitBytes, partitionValues)
71+
72+
protected def getPushedDownFilters(relation: HadoopFsRelation , dataFilters: Seq[Expression]): Seq[Filter] = {
73+
val supportNestedPredicatePushdown = DataSourceUtils.supportNestedPredicatePushdown(relation)
74+
dataFilters.flatMap(DataSourceStrategy.translateFilter(_, supportNestedPredicatePushdown))
75+
}
76+
}

0 commit comments

Comments
 (0)