Skip to content

Commit cc9913b

Browse files
committed
[SPARK-55129][SS] Introduce new key encoders for timestamp as a first class (UnsafeRow)
### What changes were proposed in this pull request? This PR proposes to introduce key encodings which include "timestamp" as the first class. Proposed key encodings: * `TimestampAsPrefixKeyStateEncoder` / `TimestampAsPrefixKeyStateEncoderSpec` * Place event time as a prefix, and key as remaining part of serialized format * `TimestampAsPostfixKeyStateEncoder` / `TimestampAsPostfixKeyStateEncoderSpec` * Place key first, and event time as a postfix of serialized format The type of timestamp is LongType (long) - when serializing the timestamp, we flip the sign byte and store the value as "big endian". This ensures the natural ordering of long type, across positive, 0, and negative values. The serialization format of the original key is the same, e.g. for UnsafeRow, same as underlying binary format. These encodings are specification of prefix and range key encodings: * `TimestampAsPrefixKeyStateEncoderSpec` provides the range scan with timestamp. * `TimestampAsPostfixKeyStateEncoderSpec` provides the prefix scan with the key, additionally provides the range scan with the remaining timestamp. NOTE: The range scan with timestamp is only scoped to the same key. Compared to the prefix/range key encoding, this can eliminate the overhead of combining two UnsafeRows, minimum 12 bytes in each key in overall (8 bytes of null-tracking bitset, 4 bytes of storing length for one of two UnsafeRows). It can also skip projection(s) from deserialization as well. To cope with the existing StateStore API which does not have a concept of timestamp on API layer, we require the caller to project the key row to attach the timestamp manually before calling the StateStore API. Flipping the coin, the key row being produced by the StateStore API will be also the form of original row + timestamp and caller is responsible to project the original row from the returned row. Note: the performance is not optimal since there are multiple places of projections (array creation and memcpy), and we will need to introduce API level of change to eliminate these projections. The change is big already, so this PR only enables the new key encoding with UnsafeRow. Supporting Avro will be a follow up work. ### Why are the changes needed? The existing key encodings are too general to serve the same with noticeable overheads, in terms of additional bytes on serialized format. The proposed key encodings will do the same with minimized overhead, given the fact it only needs to handle timestamp along with the key. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New test suite. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: claude-4.5-sonnet The above is used for creating a new test suite. All other parts aren't generated by LLM. Closes #53911 from HeartSaVioR/SPARK-55129. Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
1 parent 54a6a68 commit cc9913b

File tree

3 files changed

+725
-2
lines changed

3 files changed

+725
-2
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala

Lines changed: 204 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@ import org.apache.spark.broadcast.Broadcast
3434
import org.apache.spark.internal.Logging
3535
import org.apache.spark.sql.avro.{AvroDeserializer, AvroOptions, AvroSerializer, SchemaConverters}
3636
import org.apache.spark.sql.catalyst.InternalRow
37-
import org.apache.spark.sql.catalyst.expressions.{BoundReference, JoinedRow, UnsafeProjection, UnsafeRow}
37+
import org.apache.spark.sql.catalyst.expressions.{BoundReference, JoinedRow, Literal, UnsafeProjection, UnsafeRow}
3838
import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
39+
import org.apache.spark.sql.catalyst.types.DataTypeUtils
3940
import org.apache.spark.sql.execution.streaming.checkpointing.CheckpointFileManager
4041
import org.apache.spark.sql.execution.streaming.operators.stateful.transformwithstate.StateStoreColumnFamilySchemaUtils
4142
import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider.{SCHEMA_ID_PREFIX_BYTES, STATE_ENCODING_NUM_VERSION_BYTES, STATE_ENCODING_VERSION}
@@ -846,6 +847,8 @@ class AvroStateEncoder(
846847
}
847848
}
848849
StructType(remainingSchema)
850+
case _ =>
851+
throw unsupportedOperationForKeyStateEncoder("createAvroEnc")
849852
}
850853

851854
// Handle suffix key schema for prefix scan case
@@ -1713,6 +1716,206 @@ class NoPrefixKeyStateEncoder(
17131716
}
17141717
}
17151718

1719+
/**
1720+
* The singleton instance to provide utility-like methods for key state encoders which include
1721+
* timestamp, specifically [[TimestampAsPrefixKeyStateEncoder]] and
1722+
* [[TimestampAsPostfixKeyStateEncoder]].
1723+
*/
1724+
object TimestampKeyStateEncoder {
1725+
private val INTERNAL_TIMESTAMP_COLUMN_NAME = "__event_time"
1726+
1727+
def keySchemaWithTimestamp(keySchema: StructType): StructType = {
1728+
StructType(keySchema.fields)
1729+
.add(name = INTERNAL_TIMESTAMP_COLUMN_NAME, dataType = LongType, nullable = false)
1730+
}
1731+
1732+
def getAttachTimestampProjection(keyWithoutTimestampSchema: StructType): UnsafeProjection = {
1733+
val refs = keyWithoutTimestampSchema.zipWithIndex.map(x =>
1734+
BoundReference(x._2, x._1.dataType, x._1.nullable))
1735+
UnsafeProjection.create(
1736+
refs :+ Literal(0L), // placeholder for timestamp
1737+
DataTypeUtils.toAttributes(StructType(keyWithoutTimestampSchema)))
1738+
}
1739+
1740+
def getDetachTimestampProjection(keyWithTimestampSchema: StructType): UnsafeProjection = {
1741+
val refs = keyWithTimestampSchema.zipWithIndex.dropRight(1).map(x =>
1742+
BoundReference(x._2, x._1.dataType, x._1.nullable))
1743+
UnsafeProjection.create(refs)
1744+
}
1745+
1746+
def attachTimestamp(
1747+
attachTimestampProjection: UnsafeProjection,
1748+
keyWithTimestampSchema: StructType,
1749+
key: UnsafeRow,
1750+
timestamp: Long): UnsafeRow = {
1751+
val rowWithTimestamp = attachTimestampProjection(key)
1752+
rowWithTimestamp.setLong(keyWithTimestampSchema.length - 1, timestamp)
1753+
rowWithTimestamp
1754+
}
1755+
1756+
def extractTimestamp(key: UnsafeRow): Long = {
1757+
key.getLong(key.numFields - 1)
1758+
}
1759+
}
1760+
1761+
/**
1762+
* The abstract base class for key state encoders which include timestamp, specifically
1763+
* [[TimestampAsPrefixKeyStateEncoder]] and [[TimestampAsPostfixKeyStateEncoder]].
1764+
*/
1765+
abstract class TimestampKeyStateEncoder(
1766+
dataEncoder: RocksDBDataEncoder,
1767+
keySchema: StructType)
1768+
extends RocksDBKeyStateEncoder with Logging {
1769+
1770+
protected val detachTimestampProjection: UnsafeProjection =
1771+
TimestampKeyStateEncoder.getDetachTimestampProjection(keySchema)
1772+
1773+
protected val attachTimestampProjection: UnsafeProjection =
1774+
TimestampKeyStateEncoder.getAttachTimestampProjection(
1775+
StructType(keySchema.fields.dropRight(1)))
1776+
1777+
protected def decodeKey(keyBytes: Array[Byte], startPos: Int): UnsafeRow = {
1778+
val rowBytesLength = keyBytes.length - 8
1779+
val rowBytes = new Array[Byte](rowBytesLength)
1780+
Platform.copyMemory(
1781+
keyBytes, Platform.BYTE_ARRAY_OFFSET + startPos,
1782+
rowBytes, Platform.BYTE_ARRAY_OFFSET,
1783+
rowBytesLength
1784+
)
1785+
// The encoded row does not include the timestamp (it's stored separately),
1786+
// so decode with keySchema.length - 1 fields.
1787+
dataEncoder.decodeToUnsafeRow(rowBytes, keySchema.length - 1)
1788+
}
1789+
1790+
// NOTE: We reuse the ByteBuffer to avoid allocating a new one for every encoding/decoding,
1791+
// which means the encoder is not thread-safe. Built-in operators do not access the encoder in
1792+
// multiple threads, but if we are concerned about thread-safety in the future, we can maintain
1793+
// the thread-local of ByteBuffer to retain the reusability of the instance while avoiding
1794+
// thread-safety issue. We do not use position - we always put/get at offset 0.
1795+
private val buffForBigEndianLong = ByteBuffer.allocate(8).order(ByteOrder.BIG_ENDIAN)
1796+
1797+
private val SIGN_MASK_FOR_LONG: Long = 0x8000000000000000L
1798+
1799+
protected def encodeTimestamp(timestamp: Long): Array[Byte] = {
1800+
// Flip the sign bit to ensure correct lexicographical ordering, even for negative timestamps.
1801+
// We should flip the sign bit back when decoding the timestamp.
1802+
val signFlippedTimestamp = timestamp ^ SIGN_MASK_FOR_LONG
1803+
buffForBigEndianLong.putLong(0, signFlippedTimestamp)
1804+
buffForBigEndianLong.array()
1805+
}
1806+
1807+
protected def decodeTimestamp(keyBytes: Array[Byte], startPos: Int): Long = {
1808+
buffForBigEndianLong.put(0, keyBytes, startPos, 8)
1809+
val signFlippedTimestamp = buffForBigEndianLong.getLong(0)
1810+
// Flip the sign bit back to get the original timestamp.
1811+
signFlippedTimestamp ^ SIGN_MASK_FOR_LONG
1812+
}
1813+
1814+
protected def attachTimestamp(key: UnsafeRow, timestamp: Long): UnsafeRow = {
1815+
TimestampKeyStateEncoder.attachTimestamp(attachTimestampProjection, keySchema, key, timestamp)
1816+
}
1817+
1818+
protected def detachTimestamp(key: UnsafeRow): UnsafeRow = {
1819+
detachTimestampProjection(key)
1820+
}
1821+
1822+
def extractTimestamp(key: UnsafeRow): Long = {
1823+
TimestampKeyStateEncoder.extractTimestamp(key)
1824+
}
1825+
}
1826+
1827+
/**
1828+
* Encodes row with timestamp as prefix of the key, so that they can be scanned based on
1829+
* timestamp ordering.
1830+
*
1831+
* The encoder expects the provided key schema to have [original key fields..., timestamp field].
1832+
* The key has to conform to this schema when putting/getting from the state store. The schema
1833+
* needs to be built via calling [[TimestampKeyStateEncoder.keySchemaWithTimestamp()]].
1834+
*/
1835+
class TimestampAsPrefixKeyStateEncoder(
1836+
dataEncoder: RocksDBDataEncoder,
1837+
keySchema: StructType,
1838+
useColumnFamilies: Boolean = false)
1839+
extends TimestampKeyStateEncoder(dataEncoder, keySchema) with Logging {
1840+
1841+
override def supportPrefixKeyScan: Boolean = false
1842+
1843+
override def encodePrefixKey(prefixKey: UnsafeRow): Array[Byte] = {
1844+
throw new IllegalStateException("This encoder doesn't support key without event time!")
1845+
}
1846+
1847+
override def encodeKey(row: UnsafeRow): Array[Byte] = {
1848+
val prefix = dataEncoder.encodeKey(detachTimestamp(row))
1849+
val timestamp = extractTimestamp(row)
1850+
1851+
val byteArray = new Array[Byte](prefix.length + 8)
1852+
Platform.copyMemory(
1853+
encodeTimestamp(timestamp), Platform.BYTE_ARRAY_OFFSET,
1854+
byteArray, Platform.BYTE_ARRAY_OFFSET, 8)
1855+
Platform.copyMemory(prefix, Platform.BYTE_ARRAY_OFFSET,
1856+
byteArray, Platform.BYTE_ARRAY_OFFSET + 8, prefix.length)
1857+
1858+
byteArray
1859+
}
1860+
1861+
override def decodeKey(keyBytes: Array[Byte]): UnsafeRow = {
1862+
val timestamp = decodeTimestamp(keyBytes, 0)
1863+
val row = decodeKey(keyBytes, 8)
1864+
attachTimestamp(row, timestamp)
1865+
}
1866+
1867+
// TODO: [SPARK-55491] Revisit this to support delete range if needed.
1868+
override def supportsDeleteRange: Boolean = false
1869+
}
1870+
1871+
/**
1872+
* Encodes row with timestamp as postfix of the key, so that prefix scan with the keys
1873+
* having the same key but different timestamps is supported. In addition, timestamp is stored
1874+
* in sort order to support timestamp ordered iteration in the result of prefix scan.
1875+
*
1876+
* The encoder expects the provided key schema to have [original key fields..., timestamp field].
1877+
* The key has to be conformed to this schema when putting/getting from the state store. The schema
1878+
* needs to be built via calling [[TimestampKeyStateEncoder.keySchemaWithTimestamp()]].
1879+
*/
1880+
class TimestampAsPostfixKeyStateEncoder(
1881+
dataEncoder: RocksDBDataEncoder,
1882+
keySchema: StructType,
1883+
useColumnFamilies: Boolean = false)
1884+
extends TimestampKeyStateEncoder(dataEncoder, keySchema) with Logging {
1885+
1886+
override def supportPrefixKeyScan: Boolean = true
1887+
1888+
override def encodePrefixKey(prefixKey: UnsafeRow): Array[Byte] = {
1889+
dataEncoder.encodeKey(prefixKey)
1890+
}
1891+
1892+
override def encodeKey(row: UnsafeRow): Array[Byte] = {
1893+
val prefix = dataEncoder.encodeKey(detachTimestamp(row))
1894+
val timestamp = extractTimestamp(row)
1895+
1896+
val byteArray = new Array[Byte](prefix.length + 8)
1897+
1898+
Platform.copyMemory(prefix, Platform.BYTE_ARRAY_OFFSET,
1899+
byteArray, Platform.BYTE_ARRAY_OFFSET, prefix.length)
1900+
Platform.copyMemory(
1901+
encodeTimestamp(timestamp), Platform.BYTE_ARRAY_OFFSET,
1902+
byteArray, Platform.BYTE_ARRAY_OFFSET + prefix.length,
1903+
8
1904+
)
1905+
1906+
byteArray
1907+
}
1908+
1909+
override def decodeKey(keyBytes: Array[Byte]): UnsafeRow = {
1910+
val row = decodeKey(keyBytes, 0)
1911+
val rowBytesLength = keyBytes.length - 8
1912+
val timestamp = decodeTimestamp(keyBytes, rowBytesLength)
1913+
attachTimestamp(row, timestamp)
1914+
}
1915+
1916+
override def supportsDeleteRange: Boolean = false
1917+
}
1918+
17161919
/**
17171920
* Supports encoding multiple values per key in RocksDB.
17181921
* A single value is encoded in the format below, where first value is number of bytes

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -638,6 +638,36 @@ case class RangeKeyScanStateEncoderSpec(
638638
}
639639
}
640640

641+
/** The encoder specification for [[TimestampAsPrefixKeyStateEncoder]]. */
642+
case class TimestampAsPrefixKeyStateEncoderSpec(keySchema: StructType)
643+
extends KeyStateEncoderSpec {
644+
645+
override def toEncoder(
646+
dataEncoder: RocksDBDataEncoder,
647+
useColumnFamilies: Boolean): RocksDBKeyStateEncoder = {
648+
new TimestampAsPrefixKeyStateEncoder(dataEncoder, keySchema, useColumnFamilies)
649+
}
650+
651+
override def jsonValue: JValue = {
652+
"keyStateEncoderType" -> JString("TimestampAsPrefixKeyStateEncoderSpec")
653+
}
654+
}
655+
656+
/** The encoder specification for [[TimestampAsPostfixKeyStateEncoder]]. */
657+
case class TimestampAsPostfixKeyStateEncoderSpec(keySchema: StructType)
658+
extends KeyStateEncoderSpec {
659+
660+
override def toEncoder(
661+
dataEncoder: RocksDBDataEncoder,
662+
useColumnFamilies: Boolean): RocksDBKeyStateEncoder = {
663+
new TimestampAsPostfixKeyStateEncoder(dataEncoder, keySchema, useColumnFamilies)
664+
}
665+
666+
override def jsonValue: JValue = {
667+
"keyStateEncoderType" -> JString("TimestampAsPostfixKeyStateEncoderSpec")
668+
}
669+
}
670+
641671
/**
642672
* Trait representing a provider that provide [[StateStore]] instances representing
643673
* versions of state data.
@@ -1081,7 +1111,6 @@ class UnsafeRowPair(var key: UnsafeRow = null, var value: UnsafeRow = null) {
10811111
}
10821112
}
10831113

1084-
10851114
/**
10861115
* Companion object to [[StateStore]] that provides helper methods to create and retrieve stores
10871116
* by their unique ids. In addition, when a SparkContext is active (i.e. SparkEnv.get is not null),

0 commit comments

Comments
 (0)