Skip to content

Commit 064237a

Browse files
Prune sliding_sync_connection_required_state table (#19306)
When we change the `required_state` config for a room in sliding sync, we insert a new entry into the `sliding_sync_connection_required_state` table. As the sliding sync connection advances we can accrue a lot of stale entries, so let's clear those out. This is a sort of follow on from #19211 --------- Co-authored-by: Eric Eastwood <erice@element.io>
1 parent a0e6a05 commit 064237a

3 files changed

Lines changed: 287 additions & 3 deletions

File tree

changelog.d/19306.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Prune stale entries from `sliding_sync_connection_required_state` table.

synapse/storage/databases/main/sliding_sync.py

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,9 @@ def _get_and_clear_connection_positions_txn(
450450

451451
# Now that we have seen the client has received and used the connection
452452
# position, we can delete all the other connection positions.
453+
#
454+
# Note: the rest of the code here assumes this is the only remaining
455+
# connection position.
453456
sql = """
454457
DELETE FROM sliding_sync_connection_positions
455458
WHERE connection_key = ? AND connection_position != ?
@@ -485,9 +488,10 @@ def _get_and_clear_connection_positions_txn(
485488
),
486489
)
487490

488-
required_state_map: dict[int, dict[str, set[str]]] = {}
491+
# Map from required_state_id -> event type -> set of state keys.
492+
stored_required_state_id_maps: dict[int, dict[str, set[str]]] = {}
489493
for row in rows:
490-
state = required_state_map[row[0]] = {}
494+
state = stored_required_state_id_maps[row[0]] = {}
491495
for event_type, state_key in db_to_json(row[1]):
492496
state.setdefault(event_type, set()).add(state_key)
493497

@@ -512,7 +516,44 @@ def _get_and_clear_connection_positions_txn(
512516
) in room_config_rows:
513517
room_configs[room_id] = RoomSyncConfig(
514518
timeline_limit=timeline_limit,
515-
required_state_map=required_state_map[required_state_id],
519+
required_state_map=stored_required_state_id_maps[required_state_id],
520+
)
521+
522+
# Clean up any `required_state_id`s that are no longer used by any
523+
# connection position on this connection.
524+
#
525+
# We store the required state config per-connection per-room. Since this
526+
# can be a lot of data, we deduplicate the required state JSON and store
527+
# it separately, with multiple rooms referencing the same `required_state_id`.
528+
# Over time as the required state configs change, some `required_state_id`s
529+
# may no longer be referenced by any room config, so we need
530+
# to clean them up.
531+
#
532+
# We do this by noting that we have pulled out *all* rows from
533+
# `sliding_sync_connection_required_state` for this connection above. We
534+
# have also pulled out all referenced `required_state_id`s for *this*
535+
# connection position, which is the only connection position that
536+
# remains (we deleted the others above).
537+
#
538+
# Thus we can compute the unused `required_state_id`s by looking for any
539+
# `required_state_id`s that are not referenced by the remaining connection
540+
# position.
541+
used_required_state_ids = {
542+
required_state_id for _, _, required_state_id in room_config_rows
543+
}
544+
545+
unused_required_state_ids = (
546+
stored_required_state_id_maps.keys() - used_required_state_ids
547+
)
548+
if unused_required_state_ids:
549+
self.db_pool.simple_delete_many_batch_txn(
550+
txn,
551+
table="sliding_sync_connection_required_state",
552+
keys=("connection_key", "required_state_id"),
553+
values=[
554+
(connection_key, required_state_id)
555+
for required_state_id in unused_required_state_ids
556+
],
516557
)
517558

518559
# Now look up the per-room stream data.

tests/storage/test_sliding_sync_tables.py

Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3120,6 +3120,248 @@ def test_lazy_loading_room_members_last_seen_ts(self) -> None:
31203120
# The timestamp for user1 should be updated.
31213121
self.assertGreater(lazy_member_entries[user1_id], prev_timestamp)
31223122

3123+
def test_pruning_sliding_sync_connection_required_state(self) -> None:
3124+
"""Test that we prune old entries from
3125+
`sliding_sync_connection_required_state`.
3126+
"""
3127+
3128+
user1_id = self.register_user("user1", "pass")
3129+
user1_tok = self.login(user1_id, "pass")
3130+
3131+
room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
3132+
self.helper.send_state(
3133+
room_id, EventTypes.Name, {"name": "A room"}, tok=user1_tok
3134+
)
3135+
3136+
# Do an initial sync, this will pull down the above room and thus cause
3137+
# us to store a single required state entry for the room.
3138+
sync_body = {
3139+
"lists": {
3140+
"foo-list": {
3141+
"ranges": [[0, 1]],
3142+
"required_state": [
3143+
[EventTypes.Member, StateValues.LAZY],
3144+
],
3145+
"timeline_limit": 1,
3146+
}
3147+
}
3148+
}
3149+
_, from_token = self.do_sync(sync_body, tok=user1_tok)
3150+
3151+
# Check that we have an entry in sliding_sync_connection_required_state
3152+
connection_pos1 = self.get_success(
3153+
SlidingSyncStreamToken.from_string(self.store, from_token)
3154+
).connection_position
3155+
3156+
connection_key = self.get_success(
3157+
self.store.db_pool.simple_select_one_onecol(
3158+
table="sliding_sync_connection_positions",
3159+
keyvalues={"connection_position": connection_pos1},
3160+
retcol="connection_key",
3161+
)
3162+
)
3163+
3164+
required_state_entries = self.get_success(
3165+
self.store.db_pool.simple_select_list(
3166+
table="sliding_sync_connection_required_state",
3167+
keyvalues={"connection_key": connection_key},
3168+
retcols=("required_state_id", "required_state"),
3169+
)
3170+
)
3171+
3172+
# We expect a single entry here for the one room ID.
3173+
self.assertEqual(len(required_state_entries), 1)
3174+
first_required_state_id = required_state_entries[0][0]
3175+
3176+
# Update the sync body to request more required state, so that we get
3177+
# another entry in the table.
3178+
sync_body = {
3179+
"lists": {
3180+
"foo-list": {
3181+
"ranges": [[0, 1]],
3182+
"required_state": [
3183+
[EventTypes.Name, ""],
3184+
[EventTypes.Member, StateValues.LAZY],
3185+
],
3186+
"timeline_limit": 1,
3187+
}
3188+
}
3189+
}
3190+
3191+
# We need to send a message to cause the room to come down the next
3192+
# sync. This shouldn't be necessary, but we don't currently implement
3193+
# immediately sending down the room when required_state is updated,
3194+
# see https://github.com/element-hq/synapse/issues/18844
3195+
self.helper.send(room_id, "msg1", tok=user1_tok)
3196+
3197+
_, from_token = self.do_sync(sync_body, since=from_token, tok=user1_tok)
3198+
3199+
required_state_entries = self.get_success(
3200+
self.store.db_pool.simple_select_list(
3201+
table="sliding_sync_connection_required_state",
3202+
keyvalues={"connection_key": connection_key},
3203+
retcols=("required_state_id", "required_state"),
3204+
)
3205+
)
3206+
3207+
# We expect two entries here, one for old state and one for new state.
3208+
# The old entry doesn't get pruned yet as the previous from_token could
3209+
# still be used.
3210+
self.assertEqual(len(required_state_entries), 2)
3211+
3212+
# Sync again with the latest token. This time we expect the old
3213+
# entry to be pruned.
3214+
self.do_sync(sync_body, since=from_token, tok=user1_tok)
3215+
3216+
required_state_entries = self.get_success(
3217+
self.store.db_pool.simple_select_list(
3218+
table="sliding_sync_connection_required_state",
3219+
keyvalues={"connection_key": connection_key},
3220+
retcols=("required_state_id", "required_state"),
3221+
)
3222+
)
3223+
3224+
self.assertEqual(len(required_state_entries), 1)
3225+
3226+
# Double check that we have pruned the old entry.
3227+
self.assertNotEqual(required_state_entries[0][0], first_required_state_id)
3228+
3229+
def test_pruning_sliding_sync_connection_required_state_forks(self) -> None:
3230+
"""Test that we prune entries in
3231+
`sliding_sync_connection_required_state` for forked positions.
3232+
"""
3233+
3234+
user1_id = self.register_user("user1", "pass")
3235+
user1_tok = self.login(user1_id, "pass")
3236+
3237+
room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
3238+
self.helper.send_state(
3239+
room_id, EventTypes.Name, {"name": "A room"}, tok=user1_tok
3240+
)
3241+
3242+
# Do an initial sync, this will pull down the above room and thus cause
3243+
# us to store a single required state entry for the room.
3244+
sync_body = {
3245+
"lists": {
3246+
"foo-list": {
3247+
"ranges": [[0, 1]],
3248+
"required_state": [
3249+
[EventTypes.Member, StateValues.LAZY],
3250+
],
3251+
"timeline_limit": 1,
3252+
}
3253+
}
3254+
}
3255+
_, from_token = self.do_sync(sync_body, tok=user1_tok)
3256+
3257+
# Check that we have an entry in sliding_sync_connection_required_state
3258+
connection_pos1 = self.get_success(
3259+
SlidingSyncStreamToken.from_string(self.store, from_token)
3260+
).connection_position
3261+
3262+
connection_key = self.get_success(
3263+
self.store.db_pool.simple_select_one_onecol(
3264+
table="sliding_sync_connection_positions",
3265+
keyvalues={"connection_position": connection_pos1},
3266+
retcol="connection_key",
3267+
)
3268+
)
3269+
3270+
required_state_entries = self.get_success(
3271+
self.store.db_pool.simple_select_list(
3272+
table="sliding_sync_connection_required_state",
3273+
keyvalues={"connection_key": connection_key},
3274+
retcols=("required_state_id", "required_state"),
3275+
)
3276+
)
3277+
3278+
# We expect a single entry here for the one room ID.
3279+
self.assertEqual(len(required_state_entries), 1)
3280+
first_required_state_id = required_state_entries[0][0]
3281+
3282+
# Update the sync body to request more required state, so that we get
3283+
# another entry in the table.
3284+
sync_body = {
3285+
"lists": {
3286+
"foo-list": {
3287+
"ranges": [[0, 1]],
3288+
"required_state": [
3289+
[EventTypes.Name, ""],
3290+
[EventTypes.Member, StateValues.LAZY],
3291+
],
3292+
"timeline_limit": 1,
3293+
}
3294+
}
3295+
}
3296+
3297+
# We need to send a message to cause the room to come down the next
3298+
# sync. This shouldn't be necessary, but we don't currently implement
3299+
# immediately sending down the room when required_state is updated,
3300+
# see https://github.com/element-hq/synapse/issues/18844
3301+
self.helper.send(room_id, "msg1", tok=user1_tok)
3302+
3303+
_, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
3304+
3305+
required_state_entries = self.get_success(
3306+
self.store.db_pool.simple_select_list(
3307+
table="sliding_sync_connection_required_state",
3308+
keyvalues={"connection_key": connection_key},
3309+
retcols=("required_state_id", "required_state"),
3310+
)
3311+
)
3312+
3313+
# We expect two entries here, one for old state and one for new state.
3314+
# The old entry doesn't get pruned yet as the previous from_token could
3315+
# still be used.
3316+
self.assertEqual(len(required_state_entries), 2)
3317+
second_required_state_id = sorted(required_state_entries)[1][0]
3318+
3319+
# We sync again, but with the old token, creating a fork in the
3320+
# connection positions. We change the sync body again so that the
3321+
# `required_state` doesn't get deduplicated.
3322+
sync_body = {
3323+
"lists": {
3324+
"foo-list": {
3325+
"ranges": [[0, 1]],
3326+
"required_state": [
3327+
[EventTypes.Topic, ""],
3328+
[EventTypes.Member, StateValues.LAZY],
3329+
],
3330+
"timeline_limit": 1,
3331+
}
3332+
}
3333+
}
3334+
_, from_token = self.do_sync(sync_body, since=from_token, tok=user1_tok)
3335+
3336+
# There should now be three entries, one for each of the required_state.
3337+
required_state_entries = self.get_success(
3338+
self.store.db_pool.simple_select_list(
3339+
table="sliding_sync_connection_required_state",
3340+
keyvalues={"connection_key": connection_key},
3341+
retcols=("required_state_id", "required_state"),
3342+
)
3343+
)
3344+
3345+
self.assertEqual(len(required_state_entries), 3)
3346+
3347+
# Sync again with the latest token. This should prune all except the
3348+
# latest entry in `sliding_sync_connection_required_state`.
3349+
_, from_token = self.do_sync(sync_body, since=from_token, tok=user1_tok)
3350+
3351+
required_state_entries = self.get_success(
3352+
self.store.db_pool.simple_select_list(
3353+
table="sliding_sync_connection_required_state",
3354+
keyvalues={"connection_key": connection_key},
3355+
retcols=("required_state_id", "required_state"),
3356+
)
3357+
)
3358+
3359+
self.assertEqual(len(required_state_entries), 1)
3360+
3361+
# Double check that we have pruned the old entry.
3362+
self.assertNotEqual(required_state_entries[0][0], first_required_state_id)
3363+
self.assertNotEqual(required_state_entries[0][0], second_required_state_id)
3364+
31233365

31243366
class SlidingSyncTablesBackgroundUpdatesTestCase(SlidingSyncTablesTestCaseBase):
31253367
"""

0 commit comments

Comments
 (0)