Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/19306.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Prune stale entries from `sliding_sync_connection_required_state` table.
47 changes: 44 additions & 3 deletions synapse/storage/databases/main/sliding_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,9 @@ def _get_and_clear_connection_positions_txn(

# Now that we have seen the client has received and used the connection
# position, we can delete all the other connection positions.
#
# Note: the rest of the code here assumes this is the only remaining
# connection position.
sql = """
DELETE FROM sliding_sync_connection_positions
WHERE connection_key = ? AND connection_position != ?
Expand Down Expand Up @@ -485,9 +488,10 @@ def _get_and_clear_connection_positions_txn(
),
)

required_state_map: dict[int, dict[str, set[str]]] = {}
# Map from required_state_id -> event type -> set of state keys.
stored_required_state_id_maps: dict[int, dict[str, set[str]]] = {}
for row in rows:
state = required_state_map[row[0]] = {}
state = stored_required_state_id_maps[row[0]] = {}
for event_type, state_key in db_to_json(row[1]):
state.setdefault(event_type, set()).add(state_key)

Expand All @@ -512,7 +516,44 @@ def _get_and_clear_connection_positions_txn(
) in room_config_rows:
room_configs[room_id] = RoomSyncConfig(
timeline_limit=timeline_limit,
required_state_map=required_state_map[required_state_id],
required_state_map=stored_required_state_id_maps[required_state_id],
)

# Clean up any required state IDs that are no longer used by any
# connection position on this connection.
#
# We store the required state config per-connection per-room. Since this
# can be a lot of data, we deduplicate the required state JSON and store
# it separately, with multiple rooms referencing the same required state
# ID. Over time as the required state configs change, some required
# state IDs may no longer be referenced by any room config, so we need
# to clean them up.
#
# We do this by noting that we have pulled out *all* rows from
# `sliding_sync_connection_required_state` for this connection above. We
# have also pulled out all referenced required state IDs for *this*
# connection position, which is the only connection position that
# remains (we deleted the others above).
#
# Thus we can compute the unused required state IDs by looking for any
# required state IDs that are not referenced by the remaining connection
# position.
Comment thread
erikjohnston marked this conversation as resolved.
Outdated
used_required_state_ids = {
required_state_id for _, _, required_state_id in room_config_rows
}

unused_required_state_ids = (
stored_required_state_id_maps.keys() - used_required_state_ids
)
if unused_required_state_ids:
self.db_pool.simple_delete_many_batch_txn(
txn,
table="sliding_sync_connection_required_state",
keys=("connection_key", "required_state_id"),
values=[
(connection_key, required_state_id)
for required_state_id in unused_required_state_ids
],
)
Comment thread
erikjohnston marked this conversation as resolved.

# Now look up the per-room stream data.
Expand Down
105 changes: 105 additions & 0 deletions tests/storage/test_sliding_sync_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -3120,6 +3120,111 @@ def test_lazy_loading_room_members_last_seen_ts(self) -> None:
# The timestamp for user1 should be updated.
self.assertGreater(lazy_member_entries[user1_id], prev_timestamp)

def test_pruning_sliding_sync_connection_required_state(self) -> None:
"""Test that we prune old entries from
`sliding_sync_connection_required_state`.
"""

user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")

room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
self.helper.send_state(
room_id, EventTypes.Name, {"name": "A room"}, tok=user1_tok
)

# Do an initial sync, this will pull down the above room and thus cause
# us to store a single required state entry for the room.
sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [
[EventTypes.Member, StateValues.LAZY],
],
"timeline_limit": 1,
}
}
}
_, from_token = self.do_sync(sync_body, tok=user1_tok)

# Check that we have an entry in sliding_sync_connection_required_state
connection_pos1 = self.get_success(
SlidingSyncStreamToken.from_string(self.store, from_token)
).connection_position

connection_key = self.get_success(
self.store.db_pool.simple_select_one_onecol(
table="sliding_sync_connection_positions",
keyvalues={"connection_position": connection_pos1},
retcol="connection_key",
)
)

required_state_entries = self.get_success(
self.store.db_pool.simple_select_list(
table="sliding_sync_connection_required_state",
keyvalues={"connection_key": connection_key},
retcols=("required_state_id", "required_state"),
)
)

# We expect a single entry here for the one room ID.
self.assertEqual(len(required_state_entries), 1)
first_required_state_id = required_state_entries[0][0]

# Update the sync body to request more required state, so that we get
# another entry in the table.
sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [
[EventTypes.Name, ""],
[EventTypes.Member, StateValues.LAZY],
],
"timeline_limit": 1,
}
}
}

# We need to send a message to cause the room to come down the next
# sync. This shouldn't be necessary, but we don't currently implement
# immediately sending down the room when required_state is updated.
Comment thread
erikjohnston marked this conversation as resolved.
Outdated
self.helper.send(room_id, "msg1", tok=user1_tok)

_, from_token = self.do_sync(sync_body, since=from_token, tok=user1_tok)
Comment thread
erikjohnston marked this conversation as resolved.

required_state_entries = self.get_success(
self.store.db_pool.simple_select_list(
table="sliding_sync_connection_required_state",
keyvalues={"connection_key": connection_key},
retcols=("required_state_id", "required_state"),
)
)

# We expect two entries here, one for old state and one for new state.
# The old entry doesn't get pruned yet as the previous from_token could
# still be used.
self.assertEqual(len(required_state_entries), 2)

# Sync again with the latest token. This time we expect the old
# entry to be pruned.
self.do_sync(sync_body, since=from_token, tok=user1_tok)

required_state_entries = self.get_success(
self.store.db_pool.simple_select_list(
table="sliding_sync_connection_required_state",
keyvalues={"connection_key": connection_key},
retcols=("required_state_id", "required_state"),
)
)

self.assertEqual(len(required_state_entries), 1)

# Double check that we have pruned the old entry.
self.assertNotEqual(required_state_entries[0][0], first_required_state_id)


class SlidingSyncTablesBackgroundUpdatesTestCase(SlidingSyncTablesTestCaseBase):
"""
Expand Down
Loading