@@ -335,6 +335,93 @@ class RocksDBTimestampEncoderOperationsSuite extends SharedSparkSession
335335 }
336336 }
337337
338+ // Diverse set of timestamps that exercise binary lexicographic encoding edge cases,
339+ // reusing the same values from range scan encoder tests in RocksDBStateStoreSuite,
340+ // including large negatives, small negatives, zero, small positives, large positives,
341+ // and powers of 2.
342+ private val diverseTimestamps = Seq (931L , 8000L , 452300L , 4200L , - 1L , 90L , 1L , 2L , 8L ,
343+ - 230L , - 14569L , - 92L , - 7434253L , 35L , 6L , 9L , - 323L , 5L ,
344+ - 32L , - 64L , - 256L , 64L , 32L , 1024L , 4096L , 0L )
345+
346+ /**
347+ * Tests that the given encoder type correctly orders entries by event time when using
348+ * diverse timestamp values that exercise binary lexicographic encoding edge cases.
349+ *
350+ * @param encoderType "prefix" or "postfix"
351+ * @param useMultipleValuesPerKey whether to store multiple values per (key, timestamp)
352+ * @param encoding data encoding format (e.g. "unsaferow")
353+ */
354+ private def testDiverseTimestampOrdering (
355+ encoderType : String ,
356+ useMultipleValuesPerKey : Boolean ,
357+ encoding : String ): Unit = {
358+ val valuesPerKey = if (useMultipleValuesPerKey) 2 else 1
359+
360+ tryWithProviderResource(
361+ newStoreProviderWithTimestampEncoder(
362+ encoderType = encoderType,
363+ useMultipleValuesPerKey = useMultipleValuesPerKey,
364+ dataEncoding = encoding)
365+ ) { provider =>
366+ val store = provider.getStore(0 )
367+
368+ try {
369+ // Insert diverse timestamps in non-sorted order
370+ diverseTimestamps.zipWithIndex.foreach { case (ts, idx) =>
371+ val keyRow = keyAndTimestampToRow(" key1" , 1 , ts)
372+ if (useMultipleValuesPerKey) {
373+ val values = Array (valueToRow(idx * 10 ), valueToRow(idx * 10 + 1 ))
374+ store.putList(keyRow, values)
375+ } else {
376+ store.put(keyRow, valueToRow(idx))
377+ }
378+ }
379+
380+ // For postfix encoder, add a different key to verify prefix scan isolation
381+ if (encoderType == " postfix" ) {
382+ store.put(keyAndTimestampToRow(" key2" , 1 , 500L ), valueToRow(999 ))
383+ }
384+
385+ // Read results back using the appropriate scan method
386+ val iter = encoderType match {
387+ // For prefix encoder, we use iterator
388+ case " prefix" =>
389+ if (useMultipleValuesPerKey) store.iteratorWithMultiValues()
390+ else store.iterator()
391+ // For postfix encoder, we use prefix scan with ("key1", 1) as the prefix key
392+ case " postfix" =>
393+ if (useMultipleValuesPerKey) store.prefixScanWithMultiValues(keyToRow(" key1" , 1 ))
394+ else store.prefixScan(keyToRow(" key1" , 1 ))
395+ }
396+
397+ val results = iter.map(_.key.getLong(2 )).toList
398+ iter.close()
399+
400+ assert(results.length === diverseTimestamps.length * valuesPerKey)
401+
402+ // Verify event times are in ascending order
403+ val distinctEventTimes = results.distinct
404+ assert(distinctEventTimes === diverseTimestamps.sorted,
405+ " Results should be ordered by event time" )
406+ } finally {
407+ store.abort()
408+ }
409+ }
410+ }
411+
412+ // TODO: Address the new state format with Avro and enable the test with Avro encoding
413+ Seq (" unsaferow" ).foreach { encoding =>
414+ Seq (" prefix" , " postfix" ).foreach { encoderType =>
415+ Seq (false , true ).foreach { useMultipleValuesPerKey =>
416+ val multiValueSuffix = if (useMultipleValuesPerKey) " and multiple values" else " "
417+ test(s " Event time as $encoderType: ordering with diverse timestamps " +
418+ s " $multiValueSuffix (encoding = $encoding) " ) {
419+ testDiverseTimestampOrdering(encoderType, useMultipleValuesPerKey, encoding)
420+ }
421+ }
422+ }
423+ }
424+
338425 // Helper methods to create test data
339426 private val keyProjection = UnsafeProjection .create(keySchema)
340427 private val keyAndTimestampProjection = UnsafeProjection .create(
0 commit comments