Skip to content

Commit 53d20b2

Browse files
committed
fix: support to_json on Spark 4.0
In Spark 4.0, StructsToJson is a RuntimeReplaceable whose replacement is Invoke(Literal(StructsToJsonEvaluator), "evaluate", ...). Comet's serde saw the post-replacement Invoke and fell back with "invoke is not supported". Add a Spark 4-only matcher in CometExprShim that detects this specific Invoke shape, reconstructs StructsToJson from the evaluator's options, child, and timeZoneId, and recurses through exprToProtoInternal so support-level checks still apply. Re-enable the four to_json tests that were skipped on Spark 4. Closes #3920
1 parent 29bcb75 commit 53d20b2

3 files changed

Lines changed: 15 additions & 9 deletions

File tree

spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@
2020
package org.apache.comet.shims
2121

2222
import org.apache.spark.sql.catalyst.expressions._
23-
import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
23+
import org.apache.spark.sql.catalyst.expressions.json.StructsToJsonEvaluator
24+
import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke}
2425
import org.apache.spark.sql.internal.SQLConf
2526
import org.apache.spark.sql.internal.types.StringTypeWithCollation
2627
import org.apache.spark.sql.types.{BinaryType, BooleanType, DataTypes, StringType}
@@ -114,6 +115,19 @@ trait CometExprShim extends CommonStringExprs {
114115
case k: KnownNotContainsNull =>
115116
exprToProtoInternal(k.child, inputs, binding)
116117

118+
// In Spark 4.0, StructsToJson is a RuntimeReplaceable whose replacement is
119+
// Invoke(Literal(StructsToJsonEvaluator), "evaluate", ...). Reconstruct the
120+
// original StructsToJson and recurse so support-level checks apply.
121+
case i: Invoke =>
122+
(i.targetObject, i.functionName, i.arguments) match {
123+
case (Literal(evaluator: StructsToJsonEvaluator, _), "evaluate", Seq(child)) =>
124+
exprToProtoInternal(
125+
StructsToJson(evaluator.options, child, evaluator.timeZoneId),
126+
inputs,
127+
binding)
128+
case _ => None
129+
}
130+
117131
case _ => None
118132
}
119133
}

spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2271,8 +2271,6 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
22712271
}
22722272

22732273
test("to_json") {
2274-
// TODO fix for Spark 4.0.0
2275-
assume(!isSpark40Plus)
22762274
withSQLConf(CometConf.getExprAllowIncompatConfigKey(classOf[StructsToJson]) -> "true") {
22772275
Seq(true, false).foreach { dictionaryEnabled =>
22782276
withParquetTable(
@@ -2298,8 +2296,6 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
22982296
}
22992297

23002298
test("to_json escaping of field names and string values") {
2301-
// TODO fix for Spark 4.0.0
2302-
assume(!isSpark40Plus)
23032299
withSQLConf(CometConf.getExprAllowIncompatConfigKey(classOf[StructsToJson]) -> "true") {
23042300
val gen = new DataGenerator(new Random(42))
23052301
val chars = "\\'\"abc\t\r\n\f\b"
@@ -2329,8 +2325,6 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
23292325
}
23302326

23312327
test("to_json unicode") {
2332-
// TODO fix for Spark 4.0.0
2333-
assume(!isSpark40Plus)
23342328
withSQLConf(CometConf.getExprAllowIncompatConfigKey(classOf[StructsToJson]) -> "true") {
23352329
Seq(true, false).foreach { dictionaryEnabled =>
23362330
withParquetTable(

spark/src/test/scala/org/apache/comet/CometJsonExpressionSuite.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import org.apache.spark.sql.catalyst.expressions.{JsonToStructs, StructsToJson}
3030
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
3131
import org.apache.spark.sql.functions._
3232

33-
import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus
3433
import org.apache.comet.serde.CometStructsToJson
3534
import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions}
3635

@@ -48,7 +47,6 @@ class CometJsonExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe
4847
}
4948

5049
test("to_json - all supported types") {
51-
assume(!isSpark40Plus)
5250
withTempDir { dir =>
5351
val path = new Path(dir.toURI.toString, "test.parquet")
5452
val filename = path.toString

0 commit comments

Comments
 (0)