Skip to content

Commit 3878699

Browse files
Speed up device deletion (#18602)
This is to handle the case of deleting lots of "bot" devices at once. Reviewable commit-by-commit --------- Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com>
1 parent b35c648 commit 3878699

7 files changed

Lines changed: 225 additions & 84 deletions

File tree

changelog.d/18602.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Speed up bulk device deletion.

synapse/handlers/auth.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@
7676
LoginTokenLookupResult,
7777
LoginTokenReused,
7878
)
79-
from synapse.types import JsonDict, Requester, UserID
79+
from synapse.types import JsonDict, Requester, StrCollection, UserID
8080
from synapse.util import stringutils as stringutils
8181
from synapse.util.async_helpers import delay_cancellation, maybe_awaitable
8282
from synapse.util.msisdn import phone_number_to_msisdn
@@ -1547,6 +1547,31 @@ async def delete_access_tokens_for_user(
15471547
user_id, (token_id for _, token_id, _ in tokens_and_devices)
15481548
)
15491549

1550+
async def delete_access_tokens_for_devices(
1551+
self,
1552+
user_id: str,
1553+
device_ids: StrCollection,
1554+
) -> None:
1555+
"""Invalidate access tokens for the devices
1556+
1557+
Args:
1558+
user_id: ID of user the tokens belong to
1559+
device_ids: ID of device the tokens are associated with.
1560+
If None, tokens associated with any device (or no device) will
1561+
be deleted
1562+
"""
1563+
tokens_and_devices = await self.store.user_delete_access_tokens_for_devices(
1564+
user_id,
1565+
device_ids,
1566+
)
1567+
1568+
# see if any modules want to know about this
1569+
if self.password_auth_provider.on_logged_out_callbacks:
1570+
for token, _, device_id in tokens_and_devices:
1571+
await self.password_auth_provider.on_logged_out(
1572+
user_id=user_id, device_id=device_id, access_token=token
1573+
)
1574+
15501575
async def add_threepid(
15511576
self, user_id: str, medium: str, address: str, validated_at: int
15521577
) -> None:

synapse/handlers/device.py

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -671,12 +671,12 @@ async def delete_all_devices_for_user(
671671
except_device_id: optional device id which should not be deleted
672672
"""
673673
device_map = await self.store.get_devices_by_user(user_id)
674-
device_ids = list(device_map)
675674
if except_device_id is not None:
676-
device_ids = [d for d in device_ids if d != except_device_id]
677-
await self.delete_devices(user_id, device_ids)
675+
device_map.pop(except_device_id, None)
676+
user_device_ids = device_map.keys()
677+
await self.delete_devices(user_id, user_device_ids)
678678

679-
async def delete_devices(self, user_id: str, device_ids: List[str]) -> None:
679+
async def delete_devices(self, user_id: str, device_ids: StrCollection) -> None:
680680
"""Delete several devices
681681
682682
Args:
@@ -695,24 +695,24 @@ async def delete_devices(self, user_id: str, device_ids: List[str]) -> None:
695695
else:
696696
raise
697697

698-
# Delete data specific to each device. Not optimised as it is not
699-
# considered as part of a critical path.
700-
for device_id in device_ids:
701-
await self._auth_handler.delete_access_tokens_for_user(
702-
user_id, device_id=device_id
703-
)
704-
await self.store.delete_e2e_keys_by_device(
705-
user_id=user_id, device_id=device_id
706-
)
707-
708-
if self.hs.config.experimental.msc3890_enabled:
698+
# Delete data specific to each device. Not optimised as its an
699+
# experimental MSC.
700+
if self.hs.config.experimental.msc3890_enabled:
701+
for device_id in device_ids:
709702
# Remove any local notification settings for this device in accordance
710703
# with MSC3890.
711704
await self._account_data_handler.remove_account_data_for_user(
712705
user_id,
713706
f"org.matrix.msc3890.local_notification_settings.{device_id}",
714707
)
715708

709+
# If we're deleting a lot of devices, a bunch of them may not have any
710+
# to-device messages queued up. We filter those out to avoid scheduling
711+
# unnecessary tasks.
712+
devices_with_messages = await self.store.get_devices_with_messages(
713+
user_id, device_ids
714+
)
715+
for device_id in devices_with_messages:
716716
# Delete device messages asynchronously and in batches using the task scheduler
717717
# We specify an upper stream id to avoid deleting non delivered messages
718718
# if an user re-uses a device ID.
@@ -726,6 +726,10 @@ async def delete_devices(self, user_id: str, device_ids: List[str]) -> None:
726726
},
727727
)
728728

729+
await self._auth_handler.delete_access_tokens_for_devices(
730+
user_id, device_ids=device_ids
731+
)
732+
729733
# Pushers are deleted after `delete_access_tokens_for_user` is called so that
730734
# modules using `on_logged_out` hook can use them if needed.
731735
await self.hs.get_pusherpool().remove_pushers_by_devices(user_id, device_ids)
@@ -819,10 +823,11 @@ async def notify_device_update(
819823
# This should only happen if there are no updates, so we bail.
820824
return
821825

822-
for device_id in device_ids:
823-
logger.debug(
824-
"Notifying about update %r/%r, ID: %r", user_id, device_id, position
825-
)
826+
if logger.isEnabledFor(logging.DEBUG):
827+
for device_id in device_ids:
828+
logger.debug(
829+
"Notifying about update %r/%r, ID: %r", user_id, device_id, position
830+
)
826831

827832
# specify the user ID too since the user should always get their own device list
828833
# updates, even if they aren't in any rooms.
@@ -922,9 +927,6 @@ async def rehydrate_device(
922927
# can't call self.delete_device because that will clobber the
923928
# access token so call the storage layer directly
924929
await self.store.delete_devices(user_id, [old_device_id])
925-
await self.store.delete_e2e_keys_by_device(
926-
user_id=user_id, device_id=old_device_id
927-
)
928930

929931
# tell everyone that the old device is gone and that the dehydrated
930932
# device has a new display name
@@ -946,7 +948,6 @@ async def delete_dehydrated_device(self, user_id: str, device_id: str) -> None:
946948
raise errors.NotFoundError()
947949

948950
await self.delete_devices(user_id, [device_id])
949-
await self.store.delete_e2e_keys_by_device(user_id=user_id, device_id=device_id)
950951

951952
@wrap_as_background_process("_handle_new_device_update_async")
952953
async def _handle_new_device_update_async(self) -> None:

synapse/storage/databases/main/deviceinbox.py

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,11 @@
5252
make_in_list_sql_clause,
5353
)
5454
from synapse.storage.util.id_generators import MultiWriterIdGenerator
55-
from synapse.types import JsonDict
55+
from synapse.types import JsonDict, StrCollection
5656
from synapse.util import Duration, json_encoder
5757
from synapse.util.caches.expiringcache import ExpiringCache
5858
from synapse.util.caches.stream_change_cache import StreamChangeCache
59+
from synapse.util.iterutils import batch_iter
5960
from synapse.util.stringutils import parse_and_validate_server_name
6061

6162
if TYPE_CHECKING:
@@ -1027,6 +1028,40 @@ def _delete_old_federation_inbox_rows_txn(txn: LoggingTransaction) -> bool:
10271028
# loop first time we run this.
10281029
self._clock.sleep(1)
10291030

1031+
async def get_devices_with_messages(
1032+
self, user_id: str, device_ids: StrCollection
1033+
) -> StrCollection:
1034+
"""Get the matching device IDs that have messages in the device inbox."""
1035+
1036+
def get_devices_with_messages_txn(
1037+
txn: LoggingTransaction,
1038+
batch_device_ids: StrCollection,
1039+
) -> StrCollection:
1040+
clause, args = make_in_list_sql_clause(
1041+
self.database_engine, "device_id", batch_device_ids
1042+
)
1043+
sql = f"""
1044+
SELECT DISTINCT device_id FROM device_inbox
1045+
WHERE {clause} AND user_id = ?
1046+
"""
1047+
args.append(user_id)
1048+
txn.execute(sql, args)
1049+
return {row[0] for row in txn}
1050+
1051+
results: Set[str] = set()
1052+
for batch_device_ids in batch_iter(device_ids, 1000):
1053+
batch_results = await self.db_pool.runInteraction(
1054+
"get_devices_with_messages",
1055+
get_devices_with_messages_txn,
1056+
batch_device_ids,
1057+
# We don't need to run in a transaction as it's a single query
1058+
db_autocommit=True,
1059+
)
1060+
1061+
results.update(batch_results)
1062+
1063+
return results
1064+
10301065

10311066
class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
10321067
DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"

synapse/storage/databases/main/devices.py

Lines changed: 58 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ def count_devices_by_users_txn(
282282
"count_devices_by_users", count_devices_by_users_txn, user_ids
283283
)
284284

285-
@cached()
285+
@cached(tree=True)
286286
async def get_device(
287287
self, user_id: str, device_id: str
288288
) -> Optional[Mapping[str, Any]]:
@@ -1861,7 +1861,7 @@ async def store_device(
18611861
)
18621862
raise StoreError(500, "Problem storing device.")
18631863

1864-
async def delete_devices(self, user_id: str, device_ids: List[str]) -> None:
1864+
async def delete_devices(self, user_id: str, device_ids: StrCollection) -> None:
18651865
"""Deletes several devices.
18661866
18671867
Args:
@@ -1885,11 +1885,49 @@ def _delete_devices_txn(txn: LoggingTransaction, device_ids: List[str]) -> None:
18851885
values=device_ids,
18861886
keyvalues={"user_id": user_id},
18871887
)
1888-
self._invalidate_cache_and_stream_bulk(
1889-
txn, self.get_device, [(user_id, device_id) for device_id in device_ids]
1888+
1889+
# Also delete associated e2e keys.
1890+
self.db_pool.simple_delete_many_txn(
1891+
txn,
1892+
table="e2e_device_keys_json",
1893+
keyvalues={"user_id": user_id},
1894+
column="device_id",
1895+
values=device_ids,
1896+
)
1897+
self.db_pool.simple_delete_many_txn(
1898+
txn,
1899+
table="e2e_one_time_keys_json",
1900+
keyvalues={"user_id": user_id},
1901+
column="device_id",
1902+
values=device_ids,
1903+
)
1904+
self.db_pool.simple_delete_many_txn(
1905+
txn,
1906+
table="dehydrated_devices",
1907+
keyvalues={"user_id": user_id},
1908+
column="device_id",
1909+
values=device_ids,
1910+
)
1911+
self.db_pool.simple_delete_many_txn(
1912+
txn,
1913+
table="e2e_fallback_keys_json",
1914+
keyvalues={"user_id": user_id},
1915+
column="device_id",
1916+
values=device_ids,
18901917
)
18911918

1892-
for batch in batch_iter(device_ids, 100):
1919+
# We're bulk deleting potentially many devices at once, so
1920+
# let's not invalidate the cache for each device individually.
1921+
# Instead, we will invalidate the cache for the user as a whole.
1922+
self._invalidate_cache_and_stream(txn, self.get_device, (user_id,))
1923+
self._invalidate_cache_and_stream(
1924+
txn, self.count_e2e_one_time_keys, (user_id,)
1925+
)
1926+
self._invalidate_cache_and_stream(
1927+
txn, self.get_e2e_unused_fallback_key_types, (user_id,)
1928+
)
1929+
1930+
for batch in batch_iter(device_ids, 1000):
18931931
await self.db_pool.runInteraction(
18941932
"delete_devices", _delete_devices_txn, batch
18951933
)
@@ -2061,32 +2099,36 @@ async def add_device_change_to_streams(
20612099
context = get_active_span_text_map()
20622100

20632101
def add_device_changes_txn(
2064-
txn: LoggingTransaction, stream_ids: List[int]
2102+
txn: LoggingTransaction,
2103+
batch_device_ids: StrCollection,
2104+
stream_ids: List[int],
20652105
) -> None:
20662106
self._add_device_change_to_stream_txn(
20672107
txn,
20682108
user_id,
2069-
device_ids,
2109+
batch_device_ids,
20702110
stream_ids,
20712111
)
20722112

20732113
self._add_device_outbound_room_poke_txn(
20742114
txn,
20752115
user_id,
2076-
device_ids,
2116+
batch_device_ids,
20772117
room_ids,
20782118
stream_ids,
20792119
context,
20802120
)
20812121

2082-
async with self._device_list_id_gen.get_next_mult(
2083-
len(device_ids)
2084-
) as stream_ids:
2085-
await self.db_pool.runInteraction(
2086-
"add_device_change_to_stream",
2087-
add_device_changes_txn,
2088-
stream_ids,
2089-
)
2122+
for batch_device_ids in batch_iter(device_ids, 1000):
2123+
async with self._device_list_id_gen.get_next_mult(
2124+
len(device_ids)
2125+
) as stream_ids:
2126+
await self.db_pool.runInteraction(
2127+
"add_device_change_to_stream",
2128+
add_device_changes_txn,
2129+
batch_device_ids,
2130+
stream_ids,
2131+
)
20902132

20912133
return stream_ids[-1]
20922134

synapse/storage/databases/main/end_to_end_keys.py

Lines changed: 2 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -593,7 +593,7 @@ def _add_e2e_one_time_keys_txn(
593593
txn, self.count_e2e_one_time_keys, (user_id, device_id)
594594
)
595595

596-
@cached(max_entries=10000)
596+
@cached(max_entries=10000, tree=True)
597597
async def count_e2e_one_time_keys(
598598
self, user_id: str, device_id: str
599599
) -> Mapping[str, int]:
@@ -808,7 +808,7 @@ def _set_e2e_fallback_keys_txn(
808808
},
809809
)
810810

811-
@cached(max_entries=10000)
811+
@cached(max_entries=10000, tree=True)
812812
async def get_e2e_unused_fallback_key_types(
813813
self, user_id: str, device_id: str
814814
) -> Sequence[str]:
@@ -1632,46 +1632,6 @@ def _set_e2e_device_keys_txn(
16321632
log_kv({"message": "Device keys stored."})
16331633
return True
16341634

1635-
async def delete_e2e_keys_by_device(self, user_id: str, device_id: str) -> None:
1636-
def delete_e2e_keys_by_device_txn(txn: LoggingTransaction) -> None:
1637-
log_kv(
1638-
{
1639-
"message": "Deleting keys for device",
1640-
"device_id": device_id,
1641-
"user_id": user_id,
1642-
}
1643-
)
1644-
self.db_pool.simple_delete_txn(
1645-
txn,
1646-
table="e2e_device_keys_json",
1647-
keyvalues={"user_id": user_id, "device_id": device_id},
1648-
)
1649-
self.db_pool.simple_delete_txn(
1650-
txn,
1651-
table="e2e_one_time_keys_json",
1652-
keyvalues={"user_id": user_id, "device_id": device_id},
1653-
)
1654-
self._invalidate_cache_and_stream(
1655-
txn, self.count_e2e_one_time_keys, (user_id, device_id)
1656-
)
1657-
self.db_pool.simple_delete_txn(
1658-
txn,
1659-
table="dehydrated_devices",
1660-
keyvalues={"user_id": user_id, "device_id": device_id},
1661-
)
1662-
self.db_pool.simple_delete_txn(
1663-
txn,
1664-
table="e2e_fallback_keys_json",
1665-
keyvalues={"user_id": user_id, "device_id": device_id},
1666-
)
1667-
self._invalidate_cache_and_stream(
1668-
txn, self.get_e2e_unused_fallback_key_types, (user_id, device_id)
1669-
)
1670-
1671-
await self.db_pool.runInteraction(
1672-
"delete_e2e_keys_by_device", delete_e2e_keys_by_device_txn
1673-
)
1674-
16751635
def _set_e2e_cross_signing_key_txn(
16761636
self,
16771637
txn: LoggingTransaction,

0 commit comments

Comments
 (0)