Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/19306.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Prune stale entries from `sliding_sync_connection_required_state` table.
47 changes: 44 additions & 3 deletions synapse/storage/databases/main/sliding_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,9 @@ def _get_and_clear_connection_positions_txn(

# Now that we have seen the client has received and used the connection
# position, we can delete all the other connection positions.
#
# Note: the rest of the code here assumes this is the only remaining
# connection position.
sql = """
DELETE FROM sliding_sync_connection_positions
WHERE connection_key = ? AND connection_position != ?
Expand Down Expand Up @@ -485,9 +488,10 @@ def _get_and_clear_connection_positions_txn(
),
)

required_state_map: dict[int, dict[str, set[str]]] = {}
# Map from required_state_id -> event type -> set of state keys.
stored_required_state_id_maps: dict[int, dict[str, set[str]]] = {}
for row in rows:
state = required_state_map[row[0]] = {}
state = stored_required_state_id_maps[row[0]] = {}
for event_type, state_key in db_to_json(row[1]):
state.setdefault(event_type, set()).add(state_key)

Expand All @@ -512,7 +516,44 @@ def _get_and_clear_connection_positions_txn(
) in room_config_rows:
room_configs[room_id] = RoomSyncConfig(
timeline_limit=timeline_limit,
required_state_map=required_state_map[required_state_id],
required_state_map=stored_required_state_id_maps[required_state_id],
)

# Clean up any `required_state_id`s that are no longer used by any
# connection position on this connection.
#
# We store the required state config per-connection per-room. Since this
# can be a lot of data, we deduplicate the required state JSON and store
# it separately, with multiple rooms referencing the same `required_state_id`.
# Over time as the required state configs change, some `required_state_id`s
# may no longer be referenced by any room config, so we need
# to clean them up.
#
# We do this by noting that we have pulled out *all* rows from
# `sliding_sync_connection_required_state` for this connection above. We
# have also pulled out all referenced `required_state_id`s for *this*
# connection position, which is the only connection position that
# remains (we deleted the others above).
#
# Thus we can compute the unused `required_state_id`s by looking for any
# `required_state_id`s that are not referenced by the remaining connection
# position.
used_required_state_ids = {
required_state_id for _, _, required_state_id in room_config_rows
}

unused_required_state_ids = (
stored_required_state_id_maps.keys() - used_required_state_ids
)
if unused_required_state_ids:
self.db_pool.simple_delete_many_batch_txn(
txn,
table="sliding_sync_connection_required_state",
keys=("connection_key", "required_state_id"),
values=[
(connection_key, required_state_id)
for required_state_id in unused_required_state_ids
],
)
Comment thread
erikjohnston marked this conversation as resolved.

# Now look up the per-room stream data.
Expand Down
242 changes: 242 additions & 0 deletions tests/storage/test_sliding_sync_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -3120,6 +3120,248 @@ def test_lazy_loading_room_members_last_seen_ts(self) -> None:
# The timestamp for user1 should be updated.
self.assertGreater(lazy_member_entries[user1_id], prev_timestamp)

def test_pruning_sliding_sync_connection_required_state(self) -> None:
"""Test that we prune old entries from
`sliding_sync_connection_required_state`.
"""

user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")

room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
self.helper.send_state(
room_id, EventTypes.Name, {"name": "A room"}, tok=user1_tok
)

# Do an initial sync, this will pull down the above room and thus cause
# us to store a single required state entry for the room.
sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [
[EventTypes.Member, StateValues.LAZY],
],
"timeline_limit": 1,
}
}
}
_, from_token = self.do_sync(sync_body, tok=user1_tok)

# Check that we have an entry in sliding_sync_connection_required_state
connection_pos1 = self.get_success(
SlidingSyncStreamToken.from_string(self.store, from_token)
).connection_position

connection_key = self.get_success(
self.store.db_pool.simple_select_one_onecol(
table="sliding_sync_connection_positions",
keyvalues={"connection_position": connection_pos1},
retcol="connection_key",
)
)

required_state_entries = self.get_success(
self.store.db_pool.simple_select_list(
table="sliding_sync_connection_required_state",
keyvalues={"connection_key": connection_key},
retcols=("required_state_id", "required_state"),
)
)

# We expect a single entry here for the one room ID.
self.assertEqual(len(required_state_entries), 1)
first_required_state_id = required_state_entries[0][0]

# Update the sync body to request more required state, so that we get
# another entry in the table.
sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [
[EventTypes.Name, ""],
[EventTypes.Member, StateValues.LAZY],
],
"timeline_limit": 1,
}
}
}

# We need to send a message to cause the room to come down the next
# sync. This shouldn't be necessary, but we don't currently implement
# immediately sending down the room when required_state is updated,
# see https://github.com/element-hq/synapse/issues/18844
self.helper.send(room_id, "msg1", tok=user1_tok)

_, from_token = self.do_sync(sync_body, since=from_token, tok=user1_tok)
Comment thread
erikjohnston marked this conversation as resolved.

required_state_entries = self.get_success(
self.store.db_pool.simple_select_list(
table="sliding_sync_connection_required_state",
keyvalues={"connection_key": connection_key},
retcols=("required_state_id", "required_state"),
)
)

# We expect two entries here, one for old state and one for new state.
# The old entry doesn't get pruned yet as the previous from_token could
# still be used.
self.assertEqual(len(required_state_entries), 2)

# Sync again with the latest token. This time we expect the old
# entry to be pruned.
self.do_sync(sync_body, since=from_token, tok=user1_tok)

required_state_entries = self.get_success(
self.store.db_pool.simple_select_list(
table="sliding_sync_connection_required_state",
keyvalues={"connection_key": connection_key},
retcols=("required_state_id", "required_state"),
)
)

self.assertEqual(len(required_state_entries), 1)

# Double check that we have pruned the old entry.
self.assertNotEqual(required_state_entries[0][0], first_required_state_id)

def test_pruning_sliding_sync_connection_required_state_forks(self) -> None:
"""Test that we prune entries in
`sliding_sync_connection_required_state` for forked positions.
"""

user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")

room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
self.helper.send_state(
room_id, EventTypes.Name, {"name": "A room"}, tok=user1_tok
)

# Do an initial sync, this will pull down the above room and thus cause
# us to store a single required state entry for the room.
sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [
[EventTypes.Member, StateValues.LAZY],
],
"timeline_limit": 1,
}
}
}
_, from_token = self.do_sync(sync_body, tok=user1_tok)

# Check that we have an entry in sliding_sync_connection_required_state
connection_pos1 = self.get_success(
SlidingSyncStreamToken.from_string(self.store, from_token)
).connection_position

connection_key = self.get_success(
self.store.db_pool.simple_select_one_onecol(
table="sliding_sync_connection_positions",
keyvalues={"connection_position": connection_pos1},
retcol="connection_key",
)
)

required_state_entries = self.get_success(
self.store.db_pool.simple_select_list(
table="sliding_sync_connection_required_state",
keyvalues={"connection_key": connection_key},
retcols=("required_state_id", "required_state"),
)
)

# We expect a single entry here for the one room ID.
self.assertEqual(len(required_state_entries), 1)
first_required_state_id = required_state_entries[0][0]

# Update the sync body to request more required state, so that we get
# another entry in the table.
sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [
[EventTypes.Name, ""],
[EventTypes.Member, StateValues.LAZY],
],
"timeline_limit": 1,
}
}
}

# We need to send a message to cause the room to come down the next
# sync. This shouldn't be necessary, but we don't currently implement
# immediately sending down the room when required_state is updated,
# see https://github.com/element-hq/synapse/issues/18844
self.helper.send(room_id, "msg1", tok=user1_tok)

_, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)

required_state_entries = self.get_success(
self.store.db_pool.simple_select_list(
table="sliding_sync_connection_required_state",
keyvalues={"connection_key": connection_key},
retcols=("required_state_id", "required_state"),
)
)

# We expect two entries here, one for old state and one for new state.
# The old entry doesn't get pruned yet as the previous from_token could
# still be used.
self.assertEqual(len(required_state_entries), 2)
second_required_state_id = sorted(required_state_entries)[1][0]

# We sync again, but with the old token, creating a fork in the
# connection positions. We change the sync body again so that the
# `required_state` doesn't get deduplicated.
sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 1]],
"required_state": [
[EventTypes.Topic, ""],
[EventTypes.Member, StateValues.LAZY],
],
"timeline_limit": 1,
}
}
}
_, from_token = self.do_sync(sync_body, since=from_token, tok=user1_tok)

# There should now be three entries, one for each of the required_state.
required_state_entries = self.get_success(
self.store.db_pool.simple_select_list(
table="sliding_sync_connection_required_state",
keyvalues={"connection_key": connection_key},
retcols=("required_state_id", "required_state"),
)
)

self.assertEqual(len(required_state_entries), 3)

# Sync again with the latest token. This should prune all except the
# latest entry in `sliding_sync_connection_required_state`.
_, from_token = self.do_sync(sync_body, since=from_token, tok=user1_tok)

required_state_entries = self.get_success(
self.store.db_pool.simple_select_list(
table="sliding_sync_connection_required_state",
keyvalues={"connection_key": connection_key},
retcols=("required_state_id", "required_state"),
)
)

self.assertEqual(len(required_state_entries), 1)

# Double check that we have pruned the old entry.
self.assertNotEqual(required_state_entries[0][0], first_required_state_id)
self.assertNotEqual(required_state_entries[0][0], second_required_state_id)


class SlidingSyncTablesBackgroundUpdatesTestCase(SlidingSyncTablesTestCaseBase):
"""
Expand Down