Skip to content

Commit 282ff6a

Browse files
committed
[SPARK-55145][SS] Support Avro for timestamp based RocksDB state key encoders
### What changes were proposed in this pull request? This PR proposes to support Avro for timestamp key encoders, TimestampAsPrefixKeyStateEncoder(Spec) and TimestampAsPostfixKeyStateEncoder(Spec). This PR does not cover the scope of enabling Avro for stream-stream join operators; this would trigger more scope than expected e.g. schema evolution and warrant separate effort. ### Why are the changes needed? To resolve TODO we left in #53911 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing UTs expanded from UnsafeRow only to UnsafeRow and Avro. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude 4.6 opus Closes #54844 from HeartSaVioR/SPARK-55145. Authored-by: Jungtaek Lim <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
1 parent 61fb41f commit 282ff6a

File tree

3 files changed

+36
-18
lines changed

3 files changed

+36
-18
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -624,6 +624,8 @@ class UnsafeRowDataEncoder(
624624
decodeToUnsafeRow(bytes, reusedKeyRow)
625625
case PrefixKeyScanStateEncoderSpec(_, numColsPrefixKey) =>
626626
decodeToUnsafeRow(bytes, numFields = numColsPrefixKey)
627+
case _: TimestampAsPrefixKeyStateEncoderSpec | _: TimestampAsPostfixKeyStateEncoderSpec =>
628+
decodeToUnsafeRow(bytes, numFields = keySchema.length - 1)
627629
case _ => throw unsupportedOperationForKeyStateEncoder("decodeKey")
628630
}
629631
}
@@ -748,8 +750,15 @@ class AvroStateEncoder(
748750
)
749751

750752
// Avro schema used by the avro encoders
751-
private lazy val keyAvroType: Schema = SchemaConverters.toAvroTypeWithDefaults(keySchema)
752-
private lazy val keyProj = UnsafeProjection.create(keySchema)
753+
// For timestamp specs, the key part excludes the timestamp column (always the last field).
754+
private lazy val effectiveKeySchema: StructType = keyStateEncoderSpec match {
755+
case TimestampAsPrefixKeyStateEncoderSpec(s) => StructType(s.dropRight(1))
756+
case TimestampAsPostfixKeyStateEncoderSpec(s) => StructType(s.dropRight(1))
757+
case _ => keySchema
758+
}
759+
private lazy val keyAvroType: Schema =
760+
SchemaConverters.toAvroTypeWithDefaults(effectiveKeySchema)
761+
private lazy val keyProj = UnsafeProjection.create(effectiveKeySchema)
753762

754763
private lazy val valueAvroType: Schema = SchemaConverters.toAvroTypeWithDefaults(valueSchema)
755764
private lazy val valueProj = UnsafeProjection.create(valueSchema)
@@ -847,8 +856,10 @@ class AvroStateEncoder(
847856
}
848857
}
849858
StructType(remainingSchema)
850-
case _ =>
851-
throw unsupportedOperationForKeyStateEncoder("createAvroEnc")
859+
case TimestampAsPrefixKeyStateEncoderSpec(schema) =>
860+
StructType(schema.dropRight(1))
861+
case TimestampAsPostfixKeyStateEncoderSpec(schema) =>
862+
StructType(schema.dropRight(1))
852863
}
853864

854865
// Handle suffix key schema for prefix scan case
@@ -1005,6 +1016,11 @@ class AvroStateEncoder(
10051016
StateSchemaIdRow(currentKeySchemaId, avroRow))
10061017
case PrefixKeyScanStateEncoderSpec(_, _) =>
10071018
encodeUnsafeRowToAvro(row, avroEncoder.keySerializer, prefixKeyAvroType, out)
1019+
case _: TimestampAsPrefixKeyStateEncoderSpec | _: TimestampAsPostfixKeyStateEncoderSpec =>
1020+
val avroRow =
1021+
encodeUnsafeRowToAvro(row, avroEncoder.keySerializer, keyAvroType, out)
1022+
encodeWithStateSchemaId(
1023+
StateSchemaIdRow(currentKeySchemaId, avroRow))
10081024
case _ => throw unsupportedOperationForKeyStateEncoder("encodeKey")
10091025
}
10101026
prependVersionByte(keyBytes)
@@ -1179,6 +1195,10 @@ class AvroStateEncoder(
11791195
case PrefixKeyScanStateEncoderSpec(_, _) =>
11801196
decodeFromAvroToUnsafeRow(
11811197
bytes, avroEncoder.keyDeserializer, prefixKeyAvroType, prefixKeyProj)
1198+
case _: TimestampAsPrefixKeyStateEncoderSpec | _: TimestampAsPostfixKeyStateEncoderSpec =>
1199+
val schemaIdRow = decodeStateSchemaIdRow(bytes)
1200+
decodeFromAvroToUnsafeRow(
1201+
schemaIdRow.bytes, avroEncoder.keyDeserializer, keyAvroType, keyProj)
11821202
case _ => throw unsupportedOperationForKeyStateEncoder("decodeKey")
11831203
}
11841204
}
@@ -1782,9 +1802,7 @@ abstract class TimestampKeyStateEncoder(
17821802
rowBytes, Platform.BYTE_ARRAY_OFFSET,
17831803
rowBytesLength
17841804
)
1785-
// The encoded row does not include the timestamp (it's stored separately),
1786-
// so decode with keySchema.length - 1 fields.
1787-
dataEncoder.decodeToUnsafeRow(rowBytes, keySchema.length - 1)
1805+
dataEncoder.decodeKey(rowBytes)
17881806
}
17891807

17901808
// NOTE: We reuse the ByteBuffer to avoid allocating a new one for every encoding/decoding,

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -673,7 +673,10 @@ case class RangeKeyScanStateEncoderSpec(
673673
}
674674
}
675675

676-
/** The encoder specification for [[TimestampAsPrefixKeyStateEncoder]]. */
676+
/**
677+
* The encoder specification for [[TimestampAsPrefixKeyStateEncoder]].
678+
* The encoder expects the provided key schema to have [original key fields..., timestamp field].
679+
*/
677680
case class TimestampAsPrefixKeyStateEncoderSpec(keySchema: StructType)
678681
extends KeyStateEncoderSpec {
679682

@@ -688,7 +691,10 @@ case class TimestampAsPrefixKeyStateEncoderSpec(keySchema: StructType)
688691
}
689692
}
690693

691-
/** The encoder specification for [[TimestampAsPostfixKeyStateEncoder]]. */
694+
/**
695+
* The encoder specification for [[TimestampAsPostfixKeyStateEncoder]].
696+
* The encoder expects the provided key schema to have [original key fields..., timestamp field].
697+
*/
692698
case class TimestampAsPostfixKeyStateEncoderSpec(keySchema: StructType)
693699
extends KeyStateEncoderSpec {
694700

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBTimestampEncoderOperationsSuite.scala

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,7 @@ class RocksDBTimestampEncoderOperationsSuite extends SharedSparkSession
6868

6969
private def newDir(): String = Utils.createTempDir().getCanonicalPath
7070

71-
// TODO: [SPARK-55145] Address the new state format with Avro and enable the test with Avro
72-
// encoding
73-
Seq("unsaferow").foreach { encoding =>
71+
Seq("unsaferow", "avro").foreach { encoding =>
7472
Seq("prefix", "postfix").foreach { encoderType =>
7573
test(s"Event time as $encoderType: basic put and get operations (encoding = $encoding)") {
7674
tryWithProviderResource(
@@ -223,9 +221,7 @@ class RocksDBTimestampEncoderOperationsSuite extends SharedSparkSession
223221
}
224222
}
225223

226-
// TODO: [SPARK-55145] Address the new state format with Avro and enable the test with Avro
227-
// encoding
228-
Seq("unsaferow").foreach { encoding =>
224+
Seq("unsaferow", "avro").foreach { encoding =>
229225
test(s"Event time as prefix: iterator operations (encoding = $encoding)") {
230226
tryWithProviderResource(
231227
newStoreProviderWithTimestampEncoder(
@@ -558,9 +554,7 @@ class RocksDBTimestampEncoderOperationsSuite extends SharedSparkSession
558554
}
559555
}
560556

561-
// TODO: [SPARK-55145] Address the new state format with Avro and enable the test with Avro
562-
// encoding
563-
Seq("unsaferow").foreach { encoding =>
557+
Seq("unsaferow", "avro").foreach { encoding =>
564558
Seq("prefix", "postfix").foreach { encoderType =>
565559
Seq(false, true).foreach { useMultipleValuesPerKey =>
566560
val multiValueSuffix = if (useMultipleValuesPerKey) " and multiple values" else ""

0 commit comments

Comments
 (0)