Commit 3c22ecd
[SPARK-55129][SS] Introduce new key encoders for timestamp as a first class (UnsafeRow)
### What changes were proposed in this pull request?
This PR proposes to introduce key encodings which include "timestamp" as the first class.
Proposed key encodings:
* `TimestampAsPrefixKeyStateEncoder` / `TimestampAsPrefixKeyStateEncoderSpec`
* Place event time as a prefix, and key as remaining part of serialized format
* `TimestampAsPostfixKeyStateEncoder` / `TimestampAsPostfixKeyStateEncoderSpec`
* Place key first, and event time as a postfix of serialized format
The type of timestamp is LongType (long) - when serializing the timestamp, we flip the sign byte and store the value as "big endian". This ensures the natural ordering of long type, across positive, 0, and negative values. The serialization format of the original key is the same, e.g. for UnsafeRow, same as underlying binary format.
These encodings are specification of prefix and range key encodings:
* `TimestampAsPrefixKeyStateEncoderSpec` provides the range scan with timestamp.
* `TimestampAsPostfixKeyStateEncoderSpec` provides the prefix scan with the key, additionally provides the range scan with the remaining timestamp. NOTE: The range scan with timestamp is only scoped to the same key.
Compared to the prefix/range key encoding, this can eliminate the overhead of combining two UnsafeRows, minimum 12 bytes in each key in overall (8 bytes of null-tracking bitset, 4 bytes of storing length for one of two UnsafeRows). It can also skip projection(s) from deserialization as well.
To cope with the existing StateStore API which does not have a concept of timestamp on API layer, we require the caller to project the key row to attach the timestamp manually before calling the StateStore API. Flipping the coin, the key row being produced by the StateStore API will be also the form of original row + timestamp and caller is responsible to project the original row from the returned row.
Note: the performance is not optimal since there are multiple places of projections (array creation and memcpy), and we will need to introduce API level of change to eliminate these projections.
The change is big already, so this PR only enables the new key encoding with UnsafeRow. Supporting Avro will be a follow up work.
### Why are the changes needed?
The existing key encodings are too general to serve the same with noticeable overheads, in terms of additional bytes on serialized format. The proposed key encodings will do the same with minimized overhead, given the fact it only needs to handle timestamp along with the key.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New test suite.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: claude-4.5-sonnet
The above is used for creating a new test suite. All other parts aren't generated by LLM.
Closes apache#53911 from HeartSaVioR/SPARK-55129.
Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>1 parent 6228e74 commit 3c22ecd
File tree
3 files changed
+725
-2
lines changed- sql/core/src
- main/scala/org/apache/spark/sql/execution/streaming/state
- test/scala/org/apache/spark/sql/execution/streaming/state
3 files changed
+725
-2
lines changedLines changed: 204 additions & 1 deletion
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
34 | 34 | | |
35 | 35 | | |
36 | 36 | | |
37 | | - | |
| 37 | + | |
38 | 38 | | |
| 39 | + | |
39 | 40 | | |
40 | 41 | | |
41 | 42 | | |
| |||
846 | 847 | | |
847 | 848 | | |
848 | 849 | | |
| 850 | + | |
| 851 | + | |
849 | 852 | | |
850 | 853 | | |
851 | 854 | | |
| |||
1713 | 1716 | | |
1714 | 1717 | | |
1715 | 1718 | | |
| 1719 | + | |
| 1720 | + | |
| 1721 | + | |
| 1722 | + | |
| 1723 | + | |
| 1724 | + | |
| 1725 | + | |
| 1726 | + | |
| 1727 | + | |
| 1728 | + | |
| 1729 | + | |
| 1730 | + | |
| 1731 | + | |
| 1732 | + | |
| 1733 | + | |
| 1734 | + | |
| 1735 | + | |
| 1736 | + | |
| 1737 | + | |
| 1738 | + | |
| 1739 | + | |
| 1740 | + | |
| 1741 | + | |
| 1742 | + | |
| 1743 | + | |
| 1744 | + | |
| 1745 | + | |
| 1746 | + | |
| 1747 | + | |
| 1748 | + | |
| 1749 | + | |
| 1750 | + | |
| 1751 | + | |
| 1752 | + | |
| 1753 | + | |
| 1754 | + | |
| 1755 | + | |
| 1756 | + | |
| 1757 | + | |
| 1758 | + | |
| 1759 | + | |
| 1760 | + | |
| 1761 | + | |
| 1762 | + | |
| 1763 | + | |
| 1764 | + | |
| 1765 | + | |
| 1766 | + | |
| 1767 | + | |
| 1768 | + | |
| 1769 | + | |
| 1770 | + | |
| 1771 | + | |
| 1772 | + | |
| 1773 | + | |
| 1774 | + | |
| 1775 | + | |
| 1776 | + | |
| 1777 | + | |
| 1778 | + | |
| 1779 | + | |
| 1780 | + | |
| 1781 | + | |
| 1782 | + | |
| 1783 | + | |
| 1784 | + | |
| 1785 | + | |
| 1786 | + | |
| 1787 | + | |
| 1788 | + | |
| 1789 | + | |
| 1790 | + | |
| 1791 | + | |
| 1792 | + | |
| 1793 | + | |
| 1794 | + | |
| 1795 | + | |
| 1796 | + | |
| 1797 | + | |
| 1798 | + | |
| 1799 | + | |
| 1800 | + | |
| 1801 | + | |
| 1802 | + | |
| 1803 | + | |
| 1804 | + | |
| 1805 | + | |
| 1806 | + | |
| 1807 | + | |
| 1808 | + | |
| 1809 | + | |
| 1810 | + | |
| 1811 | + | |
| 1812 | + | |
| 1813 | + | |
| 1814 | + | |
| 1815 | + | |
| 1816 | + | |
| 1817 | + | |
| 1818 | + | |
| 1819 | + | |
| 1820 | + | |
| 1821 | + | |
| 1822 | + | |
| 1823 | + | |
| 1824 | + | |
| 1825 | + | |
| 1826 | + | |
| 1827 | + | |
| 1828 | + | |
| 1829 | + | |
| 1830 | + | |
| 1831 | + | |
| 1832 | + | |
| 1833 | + | |
| 1834 | + | |
| 1835 | + | |
| 1836 | + | |
| 1837 | + | |
| 1838 | + | |
| 1839 | + | |
| 1840 | + | |
| 1841 | + | |
| 1842 | + | |
| 1843 | + | |
| 1844 | + | |
| 1845 | + | |
| 1846 | + | |
| 1847 | + | |
| 1848 | + | |
| 1849 | + | |
| 1850 | + | |
| 1851 | + | |
| 1852 | + | |
| 1853 | + | |
| 1854 | + | |
| 1855 | + | |
| 1856 | + | |
| 1857 | + | |
| 1858 | + | |
| 1859 | + | |
| 1860 | + | |
| 1861 | + | |
| 1862 | + | |
| 1863 | + | |
| 1864 | + | |
| 1865 | + | |
| 1866 | + | |
| 1867 | + | |
| 1868 | + | |
| 1869 | + | |
| 1870 | + | |
| 1871 | + | |
| 1872 | + | |
| 1873 | + | |
| 1874 | + | |
| 1875 | + | |
| 1876 | + | |
| 1877 | + | |
| 1878 | + | |
| 1879 | + | |
| 1880 | + | |
| 1881 | + | |
| 1882 | + | |
| 1883 | + | |
| 1884 | + | |
| 1885 | + | |
| 1886 | + | |
| 1887 | + | |
| 1888 | + | |
| 1889 | + | |
| 1890 | + | |
| 1891 | + | |
| 1892 | + | |
| 1893 | + | |
| 1894 | + | |
| 1895 | + | |
| 1896 | + | |
| 1897 | + | |
| 1898 | + | |
| 1899 | + | |
| 1900 | + | |
| 1901 | + | |
| 1902 | + | |
| 1903 | + | |
| 1904 | + | |
| 1905 | + | |
| 1906 | + | |
| 1907 | + | |
| 1908 | + | |
| 1909 | + | |
| 1910 | + | |
| 1911 | + | |
| 1912 | + | |
| 1913 | + | |
| 1914 | + | |
| 1915 | + | |
| 1916 | + | |
| 1917 | + | |
| 1918 | + | |
1716 | 1919 | | |
1717 | 1920 | | |
1718 | 1921 | | |
| |||
Lines changed: 30 additions & 1 deletion
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
638 | 638 | | |
639 | 639 | | |
640 | 640 | | |
| 641 | + | |
| 642 | + | |
| 643 | + | |
| 644 | + | |
| 645 | + | |
| 646 | + | |
| 647 | + | |
| 648 | + | |
| 649 | + | |
| 650 | + | |
| 651 | + | |
| 652 | + | |
| 653 | + | |
| 654 | + | |
| 655 | + | |
| 656 | + | |
| 657 | + | |
| 658 | + | |
| 659 | + | |
| 660 | + | |
| 661 | + | |
| 662 | + | |
| 663 | + | |
| 664 | + | |
| 665 | + | |
| 666 | + | |
| 667 | + | |
| 668 | + | |
| 669 | + | |
| 670 | + | |
641 | 671 | | |
642 | 672 | | |
643 | 673 | | |
| |||
1081 | 1111 | | |
1082 | 1112 | | |
1083 | 1113 | | |
1084 | | - | |
1085 | 1114 | | |
1086 | 1115 | | |
1087 | 1116 | | |
| |||
0 commit comments