Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -695,17 +695,24 @@ async def delete_devices(self, user_id: str, device_ids: StrCollection) -> None:
else:
raise

# Delete data specific to each device. Not optimised as it is not
# considered as part of a critical path.
for device_id in device_ids:
if self.hs.config.experimental.msc3890_enabled:
# Delete data specific to each device. Not optimised as its an
# experimental MSC.
if self.hs.config.experimental.msc3890_enabled:
for device_id in device_ids:
# Remove any local notification settings for this device in accordance
# with MSC3890.
await self._account_data_handler.remove_account_data_for_user(
user_id,
f"org.matrix.msc3890.local_notification_settings.{device_id}",
)

# If we're deleting a lot of devices, a bunch of them may not have any
# to-device messages queued up. We filter those out to avoid scheduling
# unnecessary tasks.
devices_with_messages = await self.store.get_devices_with_messages(
user_id, device_ids
)
for device_id in devices_with_messages:
# Delete device messages asynchronously and in batches using the task scheduler
# We specify an upper stream id to avoid deleting non delivered messages
# if an user re-uses a device ID.
Expand Down
37 changes: 36 additions & 1 deletion synapse/storage/databases/main/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,11 @@
make_in_list_sql_clause,
)
from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.types import JsonDict
from synapse.types import JsonDict, StrCollection
from synapse.util import Duration, json_encoder
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.iterutils import batch_iter
from synapse.util.stringutils import parse_and_validate_server_name

if TYPE_CHECKING:
Expand Down Expand Up @@ -1027,6 +1028,40 @@ def _delete_old_federation_inbox_rows_txn(txn: LoggingTransaction) -> bool:
# loop first time we run this.
self._clock.sleep(1)

async def get_devices_with_messages(
self, user_id: str, device_ids: StrCollection
) -> StrCollection:
"""Get the matching device IDs that have messages in the device inbox."""

def get_devices_with_messages_txn(
txn: LoggingTransaction,
batch_device_ids: StrCollection,
) -> StrCollection:
clause, args = make_in_list_sql_clause(
self.database_engine, "device_id", batch_device_ids
)
sql = f"""
SELECT DISTINCT device_id FROM device_inbox
WHERE {clause} AND user_id = ?
"""
args.append(user_id)
txn.execute(sql, args)
return {row[0] for row in txn}

results: Set[str] = set()
for batch_device_ids in batch_iter(device_ids, 1000):
batch_results = await self.db_pool.runInteraction(
"get_devices_with_messages",
get_devices_with_messages_txn,
batch_device_ids,
# We don't need to run in a transaction as its a single query
Comment thread
erikjohnston marked this conversation as resolved.
Outdated
db_autocommit=True,
)

results.update(batch_results)

return results


class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
Expand Down