@@ -34,7 +34,7 @@ import org.apache.spark.broadcast.Broadcast
3434import org .apache .spark .internal .Logging
3535import org .apache .spark .sql .avro .{AvroDeserializer , AvroOptions , AvroSerializer , SchemaConverters }
3636import org .apache .spark .sql .catalyst .InternalRow
37- import org .apache .spark .sql .catalyst .expressions .{BoundReference , JoinedRow , UnsafeProjection , UnsafeRow }
37+ import org .apache .spark .sql .catalyst .expressions .{BoundReference , JoinedRow , Literal , UnsafeProjection , UnsafeRow }
3838import org .apache .spark .sql .catalyst .expressions .codegen .UnsafeRowWriter
3939import org .apache .spark .sql .execution .streaming .checkpointing .CheckpointFileManager
4040import org .apache .spark .sql .execution .streaming .operators .stateful .transformwithstate .StateStoreColumnFamilySchemaUtils
@@ -50,11 +50,6 @@ sealed trait RocksDBKeyStateEncoder {
5050 def decodeKey (keyBytes : Array [Byte ]): UnsafeRow
5151}
5252
53- sealed trait RocksDBKeyWithTimestampStateEncoder extends RocksDBKeyStateEncoder {
54- def encodeKeyWithTimestamp (row : UnsafeRow , timestamp : Long ): Array [Byte ]
55- def decodeKeyWithTimestamp (keyBytes : Array [Byte ]): (UnsafeRow , Long )
56- }
57-
5853sealed trait RocksDBValueStateEncoder {
5954 def supportsMultipleValuesPerKey : Boolean
6055 def encodeValue (row : UnsafeRow ): Array [Byte ]
@@ -1720,53 +1715,102 @@ class NoPrefixKeyStateEncoder(
17201715 }
17211716}
17221717
1718+ object TimestampKeyStateEncoder {
1719+ val INTERNAL_TIMESTAMP_COLUMN_NAME = " __event_time"
1720+
1721+ val SIGN_MASK_FOR_LONG : Long = 0x8000000000000000L
1722+
1723+ def finalKeySchema (keySchema : StructType ): StructType = {
1724+ StructType (keySchema.fields)
1725+ .add(name = INTERNAL_TIMESTAMP_COLUMN_NAME , dataType = LongType , nullable = false )
1726+ }
1727+
1728+ def getByteBufferForBigEndianLong (): ByteBuffer = {
1729+ ByteBuffer .allocate(8 ).order(ByteOrder .BIG_ENDIAN )
1730+ }
1731+
1732+ def encodeTimestamp (buff : ByteBuffer , timestamp : Long ): Array [Byte ] = {
1733+ // Flip the sign bit to ensure correct lexicographical ordering, even for negative timestamps.
1734+ // We should flip the sign bit back when decoding the timestamp.
1735+ val signFlippedTimestamp = timestamp ^ TimestampKeyStateEncoder .SIGN_MASK_FOR_LONG
1736+ buff.putLong(0 , signFlippedTimestamp)
1737+ buff.array()
1738+ }
1739+
1740+ def decodeTimestamp (buff : ByteBuffer , keyBytes : Array [Byte ], startPos : Int ): Long = {
1741+ buff.put(0 , keyBytes, startPos, 8 )
1742+ val signFlippedTimestamp = buff.getLong(0 )
1743+ // Flip the sign bit back to get the original timestamp.
1744+ signFlippedTimestamp ^ TimestampKeyStateEncoder .SIGN_MASK_FOR_LONG
1745+ }
1746+ }
1747+
1748+ /**
1749+ * FIXME: doc...
1750+ */
17231751class TimestampAsPrefixKeyStateEncoder (
17241752 dataEncoder : RocksDBDataEncoder ,
17251753 keySchema : StructType ,
17261754 useColumnFamilies : Boolean = false )
1727- extends RocksDBKeyWithTimestampStateEncoder with Logging {
1755+ extends RocksDBKeyStateEncoder with Logging {
17281756
1729- override def supportPrefixKeyScan : Boolean = false
1757+ import TimestampKeyStateEncoder ._
1758+ import org .apache .spark .sql .catalyst .types .DataTypeUtils
17301759
1731- override def encodePrefixKey (prefixKey : UnsafeRow ): Array [Byte ] = {
1732- throw new IllegalStateException (" This encoder doesn't support key without event time!" )
1760+ // keySchema includes the event time column as the last field, hence we remove it to project key.
1761+ private val keySchemaWithoutTimestampWithIdx : Seq [(StructField , Int )] = {
1762+ keySchema.zipWithIndex.dropRight(1 )
17331763 }
17341764
1735- override def encodeKey (row : UnsafeRow ): Array [Byte ] = {
1736- throw new IllegalStateException (" This encoder doesn't support key without event time!" )
1765+ private val keyWithoutTimestampProjection : UnsafeProjection = {
1766+ val refs = keySchemaWithoutTimestampWithIdx.map(x =>
1767+ BoundReference (x._2, x._1.dataType, x._1.nullable))
1768+ UnsafeProjection .create(refs)
17371769 }
17381770
1739- override def decodeKey (keyBytes : Array [Byte ]): UnsafeRow = {
1740- throw new IllegalStateException (" This encoder doesn't support key without event time!" )
1771+ private val keySchemaWithoutTimestampAttrs = DataTypeUtils .toAttributes(
1772+ StructType (keySchema.dropRight(1 )))
1773+ private val keyWithTimestampProjection : UnsafeProjection = {
1774+ val refs = keySchema.zipWithIndex.map(x =>
1775+ BoundReference (x._2, x._1.dataType, x._1.nullable))
1776+ UnsafeProjection .create(
1777+ refs :+ Literal (0L ), // placeholder for timestamp
1778+ keySchemaWithoutTimestampAttrs)
17411779 }
17421780
1743- // TODO: Revisit whether we need to mark this to true to support delete range
1744- override def supportsDeleteRange : Boolean = false
1781+ private def extractTimestamp (key : UnsafeRow ): Long = {
1782+ key.getLong(key.numFields - 1 )
1783+ }
1784+
1785+ override def supportPrefixKeyScan : Boolean = false
1786+
1787+ override def encodePrefixKey (prefixKey : UnsafeRow ): Array [Byte ] = {
1788+ throw new IllegalStateException (" This encoder doesn't support key without event time!" )
1789+ }
17451790
17461791 // NOTE: We reuse the ByteBuffer to avoid allocating a new one for every encoding/decoding,
17471792 // which means the encoder is not thread-safe. Built-in operators do not access the encoder in
17481793 // multiple threads, but if we are concerned about thread-safety in the future, we can maintain
17491794 // the thread-local of ByteBuffer to retain the reusability of the instance while avoiding
1750- // thread-safety issue.
1751- private val buffForBigEndianLong = ByteBuffer .allocate( 8 ).order( ByteOrder . BIG_ENDIAN )
1795+ // thread-safety issue. We do not use position - we always put/get at offset 0.
1796+ private val buffForBigEndianLong = getByteBufferForBigEndianLong( )
17521797
1753- override def encodeKeyWithTimestamp (row : UnsafeRow , timestamp : Long ): Array [Byte ] = {
1754- val prefix = encodeKey(row)
1755- val byteArray = new Array [ Byte ](prefix.length + 8 )
1798+ override def encodeKey (row : UnsafeRow ): Array [Byte ] = {
1799+ val prefix = dataEncoder. encodeKey(keyWithoutTimestampProjection( row) )
1800+ val timestamp = extractTimestamp(row )
17561801
1802+ val byteArray = new Array [Byte ](prefix.length + 8 )
17571803 Platform .copyMemory(
1758- encodeTimestamp(timestamp), Platform .BYTE_ARRAY_OFFSET ,
1804+ encodeTimestamp(buffForBigEndianLong, timestamp), Platform .BYTE_ARRAY_OFFSET ,
17591805 byteArray, Platform .BYTE_ARRAY_OFFSET , 8 )
17601806 Platform .copyMemory(prefix, Platform .BYTE_ARRAY_OFFSET ,
17611807 byteArray, Platform .BYTE_ARRAY_OFFSET + 8 , prefix.length)
17621808
17631809 byteArray
17641810 }
17651811
1766- override def decodeKeyWithTimestamp (keyBytes : Array [Byte ]): (UnsafeRow , Long ) = {
1767- buffForBigEndianLong.clear()
1768- buffForBigEndianLong.put(0 , keyBytes, 0 , 8 )
1769- val timestamp = buffForBigEndianLong.getLong(0 )
1812+ override def decodeKey (keyBytes : Array [Byte ]): UnsafeRow = {
1813+ val timestamp = decodeTimestamp(buffForBigEndianLong, keyBytes, 0 )
17701814
17711815 val rowBytesLength = keyBytes.length - 8
17721816 val rowBytes = new Array [Byte ](rowBytesLength)
@@ -1777,63 +1821,82 @@ class TimestampAsPrefixKeyStateEncoder(
17771821 )
17781822 val row = dataEncoder.decodeToUnsafeRow(rowBytes, keySchema.length)
17791823
1780- (row, timestamp)
1824+ val rowWithTimestamp = keyWithTimestampProjection(row)
1825+ rowWithTimestamp.setLong(keySchema.length - 1 , timestamp)
1826+ rowWithTimestamp
17811827 }
17821828
1783- private def encodeTimestamp (timestamp : Long ): Array [Byte ] = {
1784- buffForBigEndianLong.clear()
1785- buffForBigEndianLong.putLong(0 , timestamp)
1786- buffForBigEndianLong.array()
1787- }
1829+ // TODO: Revisit this to support delete range if needed.
1830+ override def supportsDeleteRange : Boolean = false
17881831}
17891832
1833+ /**
1834+ * FIXME: doc...
1835+ */
17901836class TimestampAsPostfixKeyStateEncoder (
17911837 dataEncoder : RocksDBDataEncoder ,
17921838 keySchema : StructType ,
17931839 useColumnFamilies : Boolean = false )
1794- extends RocksDBKeyWithTimestampStateEncoder with Logging {
1840+ extends RocksDBKeyStateEncoder with Logging {
17951841
1796- override def supportPrefixKeyScan : Boolean = true
1842+ import TimestampKeyStateEncoder ._
1843+ import org .apache .spark .sql .catalyst .types .DataTypeUtils
17971844
1798- override def encodePrefixKey (prefixKey : UnsafeRow ): Array [Byte ] = {
1799- dataEncoder.encodeKey(prefixKey)
1845+ // keySchema includes the event time column as the last field, hence we remove it to project key.
1846+ private val keySchemaWithoutTimestampWithIdx : Seq [(StructField , Int )] = {
1847+ keySchema.zipWithIndex.dropRight(1 )
18001848 }
18011849
1802- override def encodeKey (row : UnsafeRow ): Array [Byte ] = {
1803- throw new IllegalStateException (" This encoder doesn't support key without event time!" )
1850+ private val keyWithoutTimestampProjection : UnsafeProjection = {
1851+ val refs = keySchemaWithoutTimestampWithIdx.map(x =>
1852+ BoundReference (x._2, x._1.dataType, x._1.nullable))
1853+ UnsafeProjection .create(refs)
18041854 }
18051855
1806- override def decodeKey (keyBytes : Array [Byte ]): UnsafeRow = {
1807- throw new IllegalStateException (" This encoder doesn't support key without event time!" )
1856+ private val keySchemaWithoutTimestampAttrs = DataTypeUtils .toAttributes(
1857+ StructType (keySchema.dropRight(1 )))
1858+ private val keyWithTimestampProjection : UnsafeProjection = {
1859+ val refs = keySchema.zipWithIndex.map(x =>
1860+ BoundReference (x._2, x._1.dataType, x._1.nullable))
1861+ UnsafeProjection .create(refs, keySchemaWithoutTimestampAttrs)
18081862 }
18091863
1810- override def supportsDeleteRange : Boolean = false
1864+ private def extractTimestamp (key : UnsafeRow ): Long = {
1865+ key.getLong(key.numFields - 1 )
1866+ }
1867+
1868+ override def supportPrefixKeyScan : Boolean = true
1869+
1870+ override def encodePrefixKey (prefixKey : UnsafeRow ): Array [Byte ] = {
1871+ dataEncoder.encodeKey(prefixKey)
1872+ }
18111873
18121874 // NOTE: We reuse the ByteBuffer to avoid allocating a new one for every encoding/decoding,
18131875 // which means the encoder is not thread-safe. Built-in operators do not access the encoder in
18141876 // multiple threads, but if we are concerned about thread-safety in the future, we can maintain
18151877 // the thread-local of ByteBuffer to retain the reusability of the instance while avoiding
1816- // thread-safety issue.
1817- private val buffForBigEndianLong = ByteBuffer .allocate(8 ).order(ByteOrder .BIG_ENDIAN )
1878+ // thread-safety issue. We do not use position - we always put/get at offset 0.
1879+ private val buffForBigEndianLong = TimestampKeyStateEncoder .getByteBufferForBigEndianLong()
1880+
1881+ override def encodeKey (row : UnsafeRow ): Array [Byte ] = {
1882+ val prefix = dataEncoder.encodeKey(keyWithoutTimestampProjection(row))
1883+ val timestamp = extractTimestamp(row)
18181884
1819- override def encodeKeyWithTimestamp (row : UnsafeRow , timestamp : Long ): Array [Byte ] = {
1820- val prefix = encodeKey(row)
18211885 val byteArray = new Array [Byte ](prefix.length + 8 )
18221886
18231887 Platform .copyMemory(prefix, Platform .BYTE_ARRAY_OFFSET ,
18241888 byteArray, Platform .BYTE_ARRAY_OFFSET , prefix.length)
18251889 Platform .copyMemory(
1826- encodeTimestamp(timestamp), Platform .BYTE_ARRAY_OFFSET ,
1890+ encodeTimestamp(buffForBigEndianLong, timestamp), Platform .BYTE_ARRAY_OFFSET ,
18271891 byteArray, Platform .BYTE_ARRAY_OFFSET + prefix.length,
18281892 8
18291893 )
18301894
18311895 byteArray
18321896 }
18331897
1834- override def decodeKeyWithTimestamp (keyBytes : Array [Byte ]): ( UnsafeRow , Long ) = {
1898+ override def decodeKey (keyBytes : Array [Byte ]): UnsafeRow = {
18351899 val rowBytesLength = keyBytes.length - 8
1836-
18371900 val rowBytes = new Array [Byte ](rowBytesLength)
18381901 Platform .copyMemory(
18391902 keyBytes, Platform .BYTE_ARRAY_OFFSET ,
@@ -1842,18 +1905,14 @@ class TimestampAsPostfixKeyStateEncoder(
18421905 )
18431906 val row = dataEncoder.decodeToUnsafeRow(rowBytes, keySchema.length)
18441907
1845- buffForBigEndianLong.clear()
1846- buffForBigEndianLong.put(0 , keyBytes, rowBytesLength, 8 )
1847- val eventTime = buffForBigEndianLong.getLong(0 )
1908+ val timestamp = decodeTimestamp(buffForBigEndianLong, keyBytes, rowBytesLength)
18481909
1849- (row, eventTime)
1910+ val rowWithTimestamp = keyWithTimestampProjection(row)
1911+ rowWithTimestamp.setLong(keySchema.length - 1 , timestamp)
1912+ rowWithTimestamp
18501913 }
18511914
1852- private def encodeTimestamp (timestamp : Long ): Array [Byte ] = {
1853- buffForBigEndianLong.clear()
1854- buffForBigEndianLong.putLong(0 , timestamp)
1855- buffForBigEndianLong.array()
1856- }
1915+ override def supportsDeleteRange : Boolean = false
18571916}
18581917
18591918/**
0 commit comments