|
19 | 19 | # |
20 | 20 | import logging |
21 | 21 | from itertools import chain |
22 | | -from typing import TYPE_CHECKING, Any, Dict, Final, List, Mapping, Optional, Set, Tuple |
| 22 | +from typing import ( |
| 23 | + TYPE_CHECKING, |
| 24 | + Any, |
| 25 | + Dict, |
| 26 | + Final, |
| 27 | + List, |
| 28 | + Mapping, |
| 29 | + Optional, |
| 30 | + Sequence, |
| 31 | + Set, |
| 32 | + Tuple, |
| 33 | +) |
23 | 34 |
|
24 | 35 | import attr |
25 | 36 | from immutabledict import immutabledict |
|
33 | 44 | from synapse.storage.databases.main.stream import CurrentStateDeltaMembership |
34 | 45 | from synapse.storage.roommember import MemberSummary |
35 | 46 | from synapse.types import ( |
| 47 | + DeviceListUpdates, |
36 | 48 | JsonDict, |
37 | 49 | PersistedEventPosition, |
38 | 50 | Requester, |
@@ -343,6 +355,7 @@ def __init__(self, hs: "HomeServer"): |
343 | 355 | self.notifier = hs.get_notifier() |
344 | 356 | self.event_sources = hs.get_event_sources() |
345 | 357 | self.relations_handler = hs.get_relations_handler() |
| 358 | + self.device_handler = hs.get_device_handler() |
346 | 359 | self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync |
347 | 360 |
|
348 | 361 | async def wait_for_sync_for_user( |
@@ -371,10 +384,6 @@ async def wait_for_sync_for_user( |
371 | 384 | # auth_blocking will occur) |
372 | 385 | await self.auth_blocking.check_auth_blocking(requester=requester) |
373 | 386 |
|
374 | | - # TODO: If the To-Device extension is enabled and we have a `from_token`, delete |
375 | | - # any to-device messages before that token (since we now know that the device |
376 | | - # has received them). (see sync v2 for how to do this) |
377 | | - |
378 | 387 | # If we're working with a user-provided token, we need to make sure to wait for |
379 | 388 | # this worker to catch up with the token so we don't skip past any incoming |
380 | 389 | # events or future events if the user is nefariously, manually modifying the |
@@ -617,7 +626,9 @@ async def handle_room(room_id: str) -> None: |
617 | 626 | await concurrently_execute(handle_room, relevant_room_map, 10) |
618 | 627 |
|
619 | 628 | extensions = await self.get_extensions_response( |
620 | | - sync_config=sync_config, to_token=to_token |
| 629 | + sync_config=sync_config, |
| 630 | + from_token=from_token, |
| 631 | + to_token=to_token, |
621 | 632 | ) |
622 | 633 |
|
623 | 634 | return SlidingSyncResult( |
@@ -1776,48 +1787,64 @@ async def get_extensions_response( |
1776 | 1787 | self, |
1777 | 1788 | sync_config: SlidingSyncConfig, |
1778 | 1789 | to_token: StreamToken, |
| 1790 | + from_token: Optional[StreamToken], |
1779 | 1791 | ) -> SlidingSyncResult.Extensions: |
1780 | 1792 | """Handle extension requests. |
1781 | 1793 |
|
1782 | 1794 | Args: |
1783 | 1795 | sync_config: Sync configuration |
1784 | 1796 | to_token: The point in the stream to sync up to. |
| 1797 | + from_token: The point in the stream to sync from. |
1785 | 1798 | """ |
1786 | 1799 |
|
1787 | 1800 | if sync_config.extensions is None: |
1788 | 1801 | return SlidingSyncResult.Extensions() |
1789 | 1802 |
|
1790 | 1803 | to_device_response = None |
1791 | | - if sync_config.extensions.to_device: |
1792 | | - to_device_response = await self.get_to_device_extensions_response( |
| 1804 | + if sync_config.extensions.to_device is not None: |
| 1805 | + to_device_response = await self.get_to_device_extension_response( |
1793 | 1806 | sync_config=sync_config, |
1794 | 1807 | to_device_request=sync_config.extensions.to_device, |
1795 | 1808 | to_token=to_token, |
1796 | 1809 | ) |
1797 | 1810 |
|
1798 | | - return SlidingSyncResult.Extensions(to_device=to_device_response) |
| 1811 | + e2ee_response = None |
| 1812 | + if sync_config.extensions.e2ee is not None: |
| 1813 | + e2ee_response = await self.get_e2ee_extension_response( |
| 1814 | + sync_config=sync_config, |
| 1815 | + e2ee_request=sync_config.extensions.e2ee, |
| 1816 | + to_token=to_token, |
| 1817 | + from_token=from_token, |
| 1818 | + ) |
1799 | 1819 |
|
1800 | | - async def get_to_device_extensions_response( |
| 1820 | + return SlidingSyncResult.Extensions( |
| 1821 | + to_device=to_device_response, |
| 1822 | + e2ee=e2ee_response, |
| 1823 | + ) |
| 1824 | + |
| 1825 | + async def get_to_device_extension_response( |
1801 | 1826 | self, |
1802 | 1827 | sync_config: SlidingSyncConfig, |
1803 | 1828 | to_device_request: SlidingSyncConfig.Extensions.ToDeviceExtension, |
1804 | 1829 | to_token: StreamToken, |
1805 | | - ) -> SlidingSyncResult.Extensions.ToDeviceExtension: |
| 1830 | + ) -> Optional[SlidingSyncResult.Extensions.ToDeviceExtension]: |
1806 | 1831 | """Handle to-device extension (MSC3885) |
1807 | 1832 |
|
1808 | 1833 | Args: |
1809 | 1834 | sync_config: Sync configuration |
1810 | 1835 | to_device_request: The to-device extension from the request |
1811 | 1836 | to_token: The point in the stream to sync up to. |
1812 | 1837 | """ |
1813 | | - |
1814 | 1838 | user_id = sync_config.user.to_string() |
1815 | 1839 | device_id = sync_config.device_id |
1816 | 1840 |
|
| 1841 | + # Skip if the extension is not enabled |
| 1842 | + if not to_device_request.enabled: |
| 1843 | + return None |
| 1844 | + |
1817 | 1845 | # Check that this request has a valid device ID (not all requests have |
1818 | | - # to belong to a device, and so device_id is None), and that the |
1819 | | - # extension is enabled. |
1820 | | - if device_id is None or not to_device_request.enabled: |
| 1846 | + # to belong to a device, and so device_id is None) |
| 1847 | + if device_id is None: |
1821 | 1848 | return SlidingSyncResult.Extensions.ToDeviceExtension( |
1822 | 1849 | next_batch=f"{to_token.to_device_key}", |
1823 | 1850 | events=[], |
@@ -1868,3 +1895,53 @@ async def get_to_device_extensions_response( |
1868 | 1895 | next_batch=f"{stream_id}", |
1869 | 1896 | events=messages, |
1870 | 1897 | ) |
| 1898 | + |
| 1899 | + async def get_e2ee_extension_response( |
| 1900 | + self, |
| 1901 | + sync_config: SlidingSyncConfig, |
| 1902 | + e2ee_request: SlidingSyncConfig.Extensions.E2eeExtension, |
| 1903 | + to_token: StreamToken, |
| 1904 | + from_token: Optional[StreamToken], |
| 1905 | + ) -> Optional[SlidingSyncResult.Extensions.E2eeExtension]: |
| 1906 | + """Handle E2EE device extension (MSC3884) |
| 1907 | +
|
| 1908 | + Args: |
| 1909 | + sync_config: Sync configuration |
| 1910 | + e2ee_request: The e2ee extension from the request |
| 1911 | + to_token: The point in the stream to sync up to. |
| 1912 | + from_token: The point in the stream to sync from. |
| 1913 | + """ |
| 1914 | + user_id = sync_config.user.to_string() |
| 1915 | + device_id = sync_config.device_id |
| 1916 | + |
| 1917 | + # Skip if the extension is not enabled |
| 1918 | + if not e2ee_request.enabled: |
| 1919 | + return None |
| 1920 | + |
| 1921 | + device_list_updates: Optional[DeviceListUpdates] = None |
| 1922 | + if from_token is not None: |
| 1923 | + # TODO: This should take into account the `from_token` and `to_token` |
| 1924 | + device_list_updates = await self.device_handler.get_user_ids_changed( |
| 1925 | + user_id=user_id, |
| 1926 | + from_token=from_token, |
| 1927 | + ) |
| 1928 | + |
| 1929 | + device_one_time_keys_count: Mapping[str, int] = {} |
| 1930 | + device_unused_fallback_key_types: Sequence[str] = [] |
| 1931 | + if device_id: |
| 1932 | + # TODO: We should have a way to let clients differentiate between the states of: |
| 1933 | + # * no change in OTK count since the provided since token |
| 1934 | + # * the server has zero OTKs left for this device |
| 1935 | + # Spec issue: https://github.com/matrix-org/matrix-doc/issues/3298 |
| 1936 | + device_one_time_keys_count = await self.store.count_e2e_one_time_keys( |
| 1937 | + user_id, device_id |
| 1938 | + ) |
| 1939 | + device_unused_fallback_key_types = ( |
| 1940 | + await self.store.get_e2e_unused_fallback_key_types(user_id, device_id) |
| 1941 | + ) |
| 1942 | + |
| 1943 | + return SlidingSyncResult.Extensions.E2eeExtension( |
| 1944 | + device_list_updates=device_list_updates, |
| 1945 | + device_one_time_keys_count=device_one_time_keys_count, |
| 1946 | + device_unused_fallback_key_types=device_unused_fallback_key_types, |
| 1947 | + ) |
0 commit comments