Skip to content

[SPARK-55129][SS] Introduce new key encoders for timestamp as a first class (UnsafeRow)#53911

Closed
HeartSaVioR wants to merge 20 commits intoapache:masterfrom
HeartSaVioR:SPARK-55129
Closed

[SPARK-55129][SS] Introduce new key encoders for timestamp as a first class (UnsafeRow)#53911
HeartSaVioR wants to merge 20 commits intoapache:masterfrom
HeartSaVioR:SPARK-55129

Conversation

@HeartSaVioR
Copy link
Copy Markdown
Contributor

@HeartSaVioR HeartSaVioR commented Jan 22, 2026

What changes were proposed in this pull request?

This PR proposes to introduce key encodings which include "timestamp" as the first class.

Proposed key encodings:

  • TimestampAsPrefixKeyStateEncoder / TimestampAsPrefixKeyStateEncoderSpec
    • Place event time as a prefix, and key as remaining part of serialized format
  • TimestampAsPostfixKeyStateEncoder / TimestampAsPostfixKeyStateEncoderSpec
    • Place key first, and event time as a postfix of serialized format

The type of timestamp is LongType (long) - when serializing the timestamp, we flip the sign byte and store the value as "big endian". This ensures the natural ordering of long type, across positive, 0, and negative values. The serialization format of the original key is the same, e.g. for UnsafeRow, same as underlying binary format.

These encodings are specification of prefix and range key encodings:

  • TimestampAsPrefixKeyStateEncoderSpec provides the range scan with timestamp.
  • TimestampAsPostfixKeyStateEncoderSpec provides the prefix scan with the key, additionally provides the range scan with the remaining timestamp. NOTE: The range scan with timestamp is only scoped to the same key.

Compared to the prefix/range key encoding, this can eliminate the overhead of combining two UnsafeRows, minimum 12 bytes in each key in overall (8 bytes of null-tracking bitset, 4 bytes of storing length for one of two UnsafeRows). It can also skip projection(s) from deserialization as well.

To cope with the existing StateStore API which does not have a concept of timestamp on API layer, we require the caller to project the key row to attach the timestamp manually before calling the StateStore API. Flipping the coin, the key row being produced by the StateStore API will be also the form of original row + timestamp and caller is responsible to project the original row from the returned row.

Note: the performance is not optimal since there are multiple places of projections (array creation and memcpy), and we will need to introduce API level of change to eliminate these projections.

The change is big already, so this PR only enables the new key encoding with UnsafeRow. Supporting Avro will be a follow up work.

Why are the changes needed?

The existing key encodings are too general to serve the same with noticeable overheads, in terms of additional bytes on serialized format. The proposed key encodings will do the same with minimized overhead, given the fact it only needs to handle timestamp along with the key.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

New test suite.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: claude-4.5-sonnet

The above is used for creating a new test suite. All other parts aren't generated by LLM.

@github-actions
Copy link
Copy Markdown

JIRA Issue Information

=== Improvement SPARK-55129 ===
Summary: Introduce State Store API and key encoders for event-time as a first class
Assignee: None
Status: Open
Affected: ["4.2.0"]


This comment was automatically generated by GitHub Actions

@HeartSaVioR
Copy link
Copy Markdown
Contributor Author

* The interface for event-time aware state operations, which are based on (key, eventTime, value)
* data structure instead of (key, value).
*/
trait EventTimeAwareStateOperations {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want the APIs at the state store layer to be generic enough for reusability. This implementation is too tied to EventTime which is an operator level detail that shouldn't be coupled into the state store details.

We have a StateStoreRow class that allows passing in extra fields with an UnsafeRow. We use this for Row checksum too. So might be better to extend StateStoreRow and add eventTime field.

Then you can expose new state store APIs that take in StateStoreRow instead of UnsafeRow. Then this would make this implementation generic enough, that it can be reused for other use cases in the future and not tie it too much to event time.

require(keyEncoder.supportEventTime,
"EventTimeAwareStateOperations requires encoder supporting event time!")

override def get(key: UnsafeRow, eventTime: Long): UnsafeRow = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a lot of duplication with the original func in these funcs. Most of them , the major difference is which encoder/decoder api is called. Can we make them use common code and then each can pass in how they want to handle encoder/decoder. This will help reduce duplication and code maintenance.

def encodePrefixKey(prefixKey: UnsafeRow): Array[Byte]
def encodeKey(row: UnsafeRow): Array[Byte]
def decodeKey(keyBytes: Array[Byte]): UnsafeRow
def supportEventTime: Boolean
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not implement another class that extends the KeyStateEncoder and adds this 3 methods?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could, but that's just like checking boolean vs pattern matching to check the type, and use whole different methods based on the result of if-statement. I can still do the thing to avoid existing encoder to force implementing new methods, if it's preferable.

IMHO they are naturally different ones, though I had to add them here because the abstraction of KeyStateEncoder is used widespread and I can't easily expand to receive another kind of it.

* @throws UnsupportedOperationException if called on an encoder that doesn't support event time
* as postfix.
*/
def encodeKeyForEventTimeAsPostfix(row: UnsafeRow, eventTime: Long): Array[Byte]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem like it should be implemented at this layer. We can have another Encoder class that adds this prefix/postfix after calling the underlying encoder.

override def supportEventTime: Boolean = true

override def encodeKeyWithEventTime(row: UnsafeRow, eventTime: Long): Array[Byte] = {
dataEncoder.encodeKeyForEventTimeAsPrefix(row, eventTime)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class should just take in its own implementation of Data encoder that adds the prefix/postfix. So that we don't need to add this APIs at the DataEncoder level.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that should be the same with prefix/range scan, right? I thought we were collecting the actual impl to DataEncoder.

throw StateStoreErrors.unsupportedOperationException("mergeList", providerName)
}

override def initiateEventTimeAwareStateOperations(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The phrasing of this just seems a bit odd/non-conformant in this layer. Could we rename/rephrase this ?

def encodePrefixKey(prefixKey: UnsafeRow): Array[Byte]
def encodeKey(row: UnsafeRow): Array[Byte]
def decodeKey(keyBytes: Array[Byte]): UnsafeRow
def supportEventTime: Boolean
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here with all these methods

* Encodes key and event time, ensuring prefix scan with key and also proper sort order with
* event time within the same key in RocksDB.
*
* This method handles the encoding as follows:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just expose this in a more generic fashion ?

@HeartSaVioR
Copy link
Copy Markdown
Contributor Author

HeartSaVioR commented Jan 25, 2026

I'll clarify the intention.

I argue event time should have been considered as a first class at the first time when designing state store. We didn't do this, hence the operation with event time is always not performant (full scan), though it is indeed the way operator produces output on append mode and evicts state. I'd rather say I'm trying to fix it.

Note that prefix scan from session window (the reason we introduced prefix scan) is bound to the case of "event time as first class", and the usages of range scan are mostly against event time.

The only exception from the above is TWS which we separate out the data and the timer and the data won't be coupled with event time since timer will do it. While someone may argue this is a better design since we separate out the concerns, but I believe this doesn't perform well compared to the proposal.

So for me the attempt of generalization to remove the concept of event time here and replace it with long type or something is against the direction of proposal. If we have a case where data should be ordered in integer type - should we do the same and expose an API and etcetc? I don't think that has sufficient motivation. The motivation of event time as first class is that it is one of the core concept of the streaming engine and we had been ignoring it in state store.

@HeartSaVioR
Copy link
Copy Markdown
Contributor Author

Now this PR is on top of another PR: #54083

@HeartSaVioR HeartSaVioR force-pushed the SPARK-55129 branch 3 times, most recently from f6058b9 to 64d18bb Compare February 2, 2026 12:20
@HeartSaVioR HeartSaVioR changed the title [SPARK-55129][SS] Introduce State Store API and key encoder for event-time as a first class (UnsafeRow) [SPARK-55129][SS] Introduce State Store API and key encoder for timestamp as a first class (UnsafeRow) Feb 2, 2026
@HeartSaVioR HeartSaVioR force-pushed the SPARK-55129 branch 2 times, most recently from 0940a5f to cacd73d Compare February 11, 2026 06:49
@HeartSaVioR HeartSaVioR changed the title [SPARK-55129][SS] Introduce State Store API and key encoder for timestamp as a first class (UnsafeRow) [SPARK-55129][SS] Introduce new key encoders for timestamp as a first class (UnsafeRow) Feb 11, 2026
}
}


Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

rocksDB.merge(keyEncoder.encodeKey(key), valueEncoder.encodeValue(value), colFamilyName)
}


Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

colFamilyName)
}


Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

private val AVRO_ENCODER_LIFETIME_HOURS = 1L
private val DEFAULT_SCHEMA_IDS = StateSchemaInfo(0, 0)

type KeyValueEncoder = (RocksDBKeyStateEncoder, RocksDBValueStateEncoder, Short)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
type KeyValueEncoder = (RocksDBKeyStateEncoder, RocksDBValueStateEncoder, Short)

private val DEFAULT_SCHEMA_IDS = StateSchemaInfo(0, 0)

type KeyValueEncoder = (RocksDBKeyStateEncoder, RocksDBValueStateEncoder, Short)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

}
StructType(remainingSchema)
case _ =>
throw unsupportedOperationForKeyStateEncoder("createAvroEnc")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we improve the passed arg/error message ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def unsupportedOperationForKeyStateEncoder(
      operation: String
  ): UnsupportedOperationException = {
    new UnsupportedOperationException(
      s"Method $operation not supported for encoder spec type " +
        s"${keyStateEncoderSpec.getClass.getSimpleName}")
  }

I feel like this should be sufficient as long as this isn't user-facing error?

}
}

object TimestampKeyStateEncoder {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets add some high level comments here ?


val SIGN_MASK_FOR_LONG: Long = 0x8000000000000000L

def finalKeySchema(keySchema: StructType): StructType = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it need to be public ? if so, can we rename the function ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will rename.

}

/**
* FIXME: doc...
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pending ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah yet to get there. Will update.

rowWithTimestamp
}

// TODO: Revisit this to support delete range if needed.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we add a SPARK JIRA for this then ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do.

}

/**
* FIXME: doc...
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pending ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah yet to get there. Will update.

}

private val keySchemaWithoutTimestampAttrs = DataTypeUtils.toAttributes(
StructType(keySchema.dropRight(1)))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementations of the functions in both the encoders seem similar. Any common code we can reuse across both ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will try to dedup. Maybe singleton can provide the projection instances.

new StateStoreIterator(iter, rocksDbIter.closeIfNeeded)
}

override def prefixScanWithMultiValues(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this related to the encoder change ? where do we plan to use this ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be used in next PR. I can move this to next one or a separate PR if you mind.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this out.

* timestamp ordering.
*
* The encoder expects the provided key schema to have [original key fields..., timestamp field].
* The key has to be conformed to this schema when putting/getting from the state store. The schema
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: has to conform to this schema

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still pending ?

import org.apache.spark.util.Utils

@ExtendedSQLTest
class RocksDBTimestampEncoderOperationsSuite extends SharedSparkSession
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we add a comment to explain what the suite is doing ?


private def newDir(): String = Utils.createTempDir().getCanonicalPath

// TODO: Address the new state format with Avro and enable the test with Avro encoding
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a JIRA ticket for avro support ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


test(
s"Event time as postfix: prefix scan operations (encoding = $encoding)"
) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move to line above ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor does this all the time. Will change.

// Prefix scan should find all entries with the same key across different event times

// Insert in non-sorted order to verify that prefix scan returns them sorted by time
store.put(keyAndTimestampToRow("key1", 1, 2000L), valueToRow(102))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make sure we have enough coverage for different sort orders ? i.e. numbers with different binary lexicographic encodings to make sure results are as expected ? we could use values from other encoder tests ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, fd6d1aa

Copy link
Copy Markdown
Contributor

@anishshri-db anishshri-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm pending nits and green CI

@HeartSaVioR
Copy link
Copy Markdown
Contributor Author

https://github.com/HeartSaVioR/spark/actions/runs/22030638407/job/63655143981
The CI only failed on docker integration module which is not related.

@HeartSaVioR
Copy link
Copy Markdown
Contributor Author

Thanks! Merging to master.

rpnkv pushed a commit to rpnkv/spark that referenced this pull request Feb 18, 2026
… class (UnsafeRow)

### What changes were proposed in this pull request?

This PR proposes to introduce key encodings which include "timestamp" as the first class.

Proposed key encodings:

* `TimestampAsPrefixKeyStateEncoder` / `TimestampAsPrefixKeyStateEncoderSpec`
  * Place event time as a prefix, and key as remaining part of serialized format
* `TimestampAsPostfixKeyStateEncoder` / `TimestampAsPostfixKeyStateEncoderSpec`
  * Place key first, and event time as a postfix of serialized format

The type of timestamp is LongType (long) - when serializing the timestamp, we flip the sign byte and store the value as "big endian". This ensures the natural ordering of long type, across positive, 0, and negative values. The serialization format of the original key is the same, e.g. for UnsafeRow, same as underlying binary format.

These encodings are specification of prefix and range key encodings:

* `TimestampAsPrefixKeyStateEncoderSpec` provides the range scan with timestamp.
* `TimestampAsPostfixKeyStateEncoderSpec` provides the prefix scan with the key, additionally provides the range scan with the remaining timestamp. NOTE: The range scan with timestamp is only scoped to the same key.

Compared to the prefix/range key encoding, this can eliminate the overhead of combining two UnsafeRows, minimum 12 bytes in each key in overall (8 bytes of null-tracking bitset, 4 bytes of storing length for one of two UnsafeRows). It can also skip projection(s) from deserialization as well.

To cope with the existing StateStore API which does not have a concept of timestamp on API layer, we require the caller to project the key row to attach the timestamp manually before calling the StateStore API. Flipping the coin, the key row being produced by the StateStore API will be also the form of original row + timestamp and caller is responsible to project the original row from the returned row.

Note: the performance is not optimal since there are multiple places of projections (array creation and memcpy), and we will need to introduce API level of change to eliminate these projections.

The change is big already, so this PR only enables the new key encoding with UnsafeRow. Supporting Avro will be a follow up work.

### Why are the changes needed?

The existing key encodings are too general to serve the same with noticeable overheads, in terms of additional bytes on serialized format. The proposed key encodings will do the same with minimized overhead, given the fact it only needs to handle timestamp along with the key.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New test suite.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: claude-4.5-sonnet

The above is used for creating a new test suite. All other parts aren't generated by LLM.

Closes apache#53911 from HeartSaVioR/SPARK-55129.

Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
HeartSaVioR added a commit that referenced this pull request Mar 19, 2026
…encoders

### What changes were proposed in this pull request?

This PR proposes to support Avro for timestamp key encoders, TimestampAsPrefixKeyStateEncoder(Spec) and TimestampAsPostfixKeyStateEncoder(Spec).

This PR does not cover the scope of enabling Avro for stream-stream join operators; this would trigger more scope than expected e.g. schema evolution and warrant separate effort.

### Why are the changes needed?

To resolve TODO we left in #53911

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing UTs expanded from UnsafeRow only to UnsafeRow and Avro.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude 4.6 opus

Closes #54844 from HeartSaVioR/SPARK-55145.

Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
terana pushed a commit to terana/spark that referenced this pull request Mar 23, 2026
…encoders

### What changes were proposed in this pull request?

This PR proposes to support Avro for timestamp key encoders, TimestampAsPrefixKeyStateEncoder(Spec) and TimestampAsPostfixKeyStateEncoder(Spec).

This PR does not cover the scope of enabling Avro for stream-stream join operators; this would trigger more scope than expected e.g. schema evolution and warrant separate effort.

### Why are the changes needed?

To resolve TODO we left in apache#53911

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing UTs expanded from UnsafeRow only to UnsafeRow and Avro.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude 4.6 opus

Closes apache#54844 from HeartSaVioR/SPARK-55145.

Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants