Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit c72d26c

Browse files
authored
Refactor EventContext (#12689)
Refactor how the `EventContext` class works, with the intention of reducing the amount of state we fetch from the DB during event processing. The idea here is to get rid of the cached `current_state_ids` and `prev_state_ids` that live in the `EventContext`, and instead defer straight to the database (and its caching). One change that may have a noticeable effect is that we now no longer prefill the `get_current_state_ids` cache on a state change. However, that query is relatively light, since its just a case of reading a table from the DB (unlike fetching state at an event which is more heavyweight). For deployments with workers this cache isn't even used. Part of #12684
1 parent c997bfb commit c72d26c

13 files changed

Lines changed: 70 additions & 200 deletions

File tree

changelog.d/12689.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Refactor `EventContext` class.

synapse/events/snapshot.py

Lines changed: 32 additions & 145 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,8 @@
1717
from frozendict import frozendict
1818
from typing_extensions import Literal
1919

20-
from twisted.internet.defer import Deferred
21-
2220
from synapse.appservice import ApplicationService
2321
from synapse.events import EventBase
24-
from synapse.logging.context import make_deferred_yieldable, run_in_background
2522
from synapse.types import JsonDict, StateMap
2623

2724
if TYPE_CHECKING:
@@ -61,6 +58,9 @@ class EventContext:
6158
If ``state_group`` is None (ie, the event is an outlier),
6259
``state_group_before_event`` will always also be ``None``.
6360
61+
state_delta_due_to_event: If `state_group` and `state_group_before_event` are not None
62+
then this is the delta of the state between the two groups.
63+
6464
prev_group: If it is known, ``state_group``'s prev_group. Note that this being
6565
None does not necessarily mean that ``state_group`` does not have
6666
a prev_group!
@@ -79,73 +79,47 @@ class EventContext:
7979
app_service: If this event is being sent by a (local) application service, that
8080
app service.
8181
82-
_current_state_ids: The room state map, including this event - ie, the state
83-
in ``state_group``.
84-
85-
(type, state_key) -> event_id
86-
87-
For an outlier, this is {}
88-
89-
Note that this is a private attribute: it should be accessed via
90-
``get_current_state_ids``. _AsyncEventContext impl calculates this
91-
on-demand: it will be None until that happens.
92-
93-
_prev_state_ids: The room state map, excluding this event - ie, the state
94-
in ``state_group_before_event``. For a non-state
95-
event, this will be the same as _current_state_events.
96-
97-
Note that it is a completely different thing to prev_group!
98-
99-
(type, state_key) -> event_id
100-
101-
For an outlier, this is {}
102-
103-
As with _current_state_ids, this is a private attribute. It should be
104-
accessed via get_prev_state_ids.
105-
10682
partial_state: if True, we may be storing this event with a temporary,
10783
incomplete state.
10884
"""
10985

86+
_storage: "Storage"
11087
rejected: Union[Literal[False], str] = False
11188
_state_group: Optional[int] = None
11289
state_group_before_event: Optional[int] = None
90+
_state_delta_due_to_event: Optional[StateMap[str]] = None
11391
prev_group: Optional[int] = None
11492
delta_ids: Optional[StateMap[str]] = None
11593
app_service: Optional[ApplicationService] = None
11694

117-
_current_state_ids: Optional[StateMap[str]] = None
118-
_prev_state_ids: Optional[StateMap[str]] = None
119-
12095
partial_state: bool = False
12196

12297
@staticmethod
12398
def with_state(
99+
storage: "Storage",
124100
state_group: Optional[int],
125101
state_group_before_event: Optional[int],
126-
current_state_ids: Optional[StateMap[str]],
127-
prev_state_ids: Optional[StateMap[str]],
102+
state_delta_due_to_event: Optional[StateMap[str]],
128103
partial_state: bool,
129104
prev_group: Optional[int] = None,
130105
delta_ids: Optional[StateMap[str]] = None,
131106
) -> "EventContext":
132107
return EventContext(
133-
current_state_ids=current_state_ids,
134-
prev_state_ids=prev_state_ids,
108+
storage=storage,
135109
state_group=state_group,
136110
state_group_before_event=state_group_before_event,
111+
state_delta_due_to_event=state_delta_due_to_event,
137112
prev_group=prev_group,
138113
delta_ids=delta_ids,
139114
partial_state=partial_state,
140115
)
141116

142117
@staticmethod
143-
def for_outlier() -> "EventContext":
118+
def for_outlier(
119+
storage: "Storage",
120+
) -> "EventContext":
144121
"""Return an EventContext instance suitable for persisting an outlier event"""
145-
return EventContext(
146-
current_state_ids={},
147-
prev_state_ids={},
148-
)
122+
return EventContext(storage=storage)
149123

150124
async def serialize(self, event: EventBase, store: "DataStore") -> JsonDict:
151125
"""Converts self to a type that can be serialized as JSON, and then
@@ -158,24 +132,14 @@ async def serialize(self, event: EventBase, store: "DataStore") -> JsonDict:
158132
The serialized event.
159133
"""
160134

161-
# We don't serialize the full state dicts, instead they get pulled out
162-
# of the DB on the other side. However, the other side can't figure out
163-
# the prev_state_ids, so if we're a state event we include the event
164-
# id that we replaced in the state.
165-
if event.is_state():
166-
prev_state_ids = await self.get_prev_state_ids()
167-
prev_state_id = prev_state_ids.get((event.type, event.state_key))
168-
else:
169-
prev_state_id = None
170-
171135
return {
172-
"prev_state_id": prev_state_id,
173-
"event_type": event.type,
174-
"event_state_key": event.get_state_key(),
175136
"state_group": self._state_group,
176137
"state_group_before_event": self.state_group_before_event,
177138
"rejected": self.rejected,
178139
"prev_group": self.prev_group,
140+
"state_delta_due_to_event": _encode_state_dict(
141+
self._state_delta_due_to_event
142+
),
179143
"delta_ids": _encode_state_dict(self.delta_ids),
180144
"app_service_id": self.app_service.id if self.app_service else None,
181145
"partial_state": self.partial_state,
@@ -193,16 +157,16 @@ def deserialize(storage: "Storage", input: JsonDict) -> "EventContext":
193157
Returns:
194158
The event context.
195159
"""
196-
context = _AsyncEventContextImpl(
160+
context = EventContext(
197161
# We use the state_group and prev_state_id stuff to pull the
198162
# current_state_ids out of the DB and construct prev_state_ids.
199163
storage=storage,
200-
prev_state_id=input["prev_state_id"],
201-
event_type=input["event_type"],
202-
event_state_key=input["event_state_key"],
203164
state_group=input["state_group"],
204165
state_group_before_event=input["state_group_before_event"],
205166
prev_group=input["prev_group"],
167+
state_delta_due_to_event=_decode_state_dict(
168+
input["state_delta_due_to_event"]
169+
),
206170
delta_ids=_decode_state_dict(input["delta_ids"]),
207171
rejected=input["rejected"],
208172
partial_state=input.get("partial_state", False),
@@ -250,8 +214,15 @@ async def get_current_state_ids(self) -> Optional[StateMap[str]]:
250214
if self.rejected:
251215
raise RuntimeError("Attempt to access state_ids of rejected event")
252216

253-
await self._ensure_fetched()
254-
return self._current_state_ids
217+
assert self._state_delta_due_to_event is not None
218+
219+
prev_state_ids = await self.get_prev_state_ids()
220+
221+
if self._state_delta_due_to_event:
222+
prev_state_ids = dict(prev_state_ids)
223+
prev_state_ids.update(self._state_delta_due_to_event)
224+
225+
return prev_state_ids
255226

256227
async def get_prev_state_ids(self) -> StateMap[str]:
257228
"""
@@ -266,94 +237,10 @@ async def get_prev_state_ids(self) -> StateMap[str]:
266237
Maps a (type, state_key) to the event ID of the state event matching
267238
this tuple.
268239
"""
269-
await self._ensure_fetched()
270-
# There *should* be previous state IDs now.
271-
assert self._prev_state_ids is not None
272-
return self._prev_state_ids
273-
274-
def get_cached_current_state_ids(self) -> Optional[StateMap[str]]:
275-
"""Gets the current state IDs if we have them already cached.
276-
277-
It is an error to access this for a rejected event, since rejected state should
278-
not make it into the room state. This method will raise an exception if
279-
``rejected`` is set.
280-
281-
Returns:
282-
Returns None if we haven't cached the state or if state_group is None
283-
(which happens when the associated event is an outlier).
284-
285-
Otherwise, returns the the current state IDs.
286-
"""
287-
if self.rejected:
288-
raise RuntimeError("Attempt to access state_ids of rejected event")
289-
290-
return self._current_state_ids
291-
292-
async def _ensure_fetched(self) -> None:
293-
return None
294-
295-
296-
@attr.s(slots=True)
297-
class _AsyncEventContextImpl(EventContext):
298-
"""
299-
An implementation of EventContext which fetches _current_state_ids and
300-
_prev_state_ids from the database on demand.
301-
302-
Attributes:
303-
304-
_storage
305-
306-
_fetching_state_deferred: Resolves when *_state_ids have been calculated.
307-
None if we haven't started calculating yet
308-
309-
_event_type: The type of the event the context is associated with.
310-
311-
_event_state_key: The state_key of the event the context is associated with.
312-
313-
_prev_state_id: If the event associated with the context is a state event,
314-
then `_prev_state_id` is the event_id of the state that was replaced.
315-
"""
316-
317-
# This needs to have a default as we're inheriting
318-
_storage: "Storage" = attr.ib(default=None)
319-
_prev_state_id: Optional[str] = attr.ib(default=None)
320-
_event_type: str = attr.ib(default=None)
321-
_event_state_key: Optional[str] = attr.ib(default=None)
322-
_fetching_state_deferred: Optional["Deferred[None]"] = attr.ib(default=None)
323-
324-
async def _ensure_fetched(self) -> None:
325-
if not self._fetching_state_deferred:
326-
self._fetching_state_deferred = run_in_background(self._fill_out_state)
327-
328-
await make_deferred_yieldable(self._fetching_state_deferred)
329-
330-
async def _fill_out_state(self) -> None:
331-
"""Called to populate the _current_state_ids and _prev_state_ids
332-
attributes by loading from the database.
333-
"""
334-
if self.state_group is None:
335-
# No state group means the event is an outlier. Usually the state_ids dicts are also
336-
# pre-set to empty dicts, but they get reset when the context is serialized, so set
337-
# them to empty dicts again here.
338-
self._current_state_ids = {}
339-
self._prev_state_ids = {}
340-
return
341-
342-
current_state_ids = await self._storage.state.get_state_ids_for_group(
343-
self.state_group
240+
assert self.state_group_before_event is not None
241+
return await self._storage.state.get_state_ids_for_group(
242+
self.state_group_before_event
344243
)
345-
# Set this separately so mypy knows current_state_ids is not None.
346-
self._current_state_ids = current_state_ids
347-
if self._event_state_key is not None:
348-
self._prev_state_ids = dict(current_state_ids)
349-
350-
key = (self._event_type, self._event_state_key)
351-
if self._prev_state_id:
352-
self._prev_state_ids[key] = self._prev_state_id
353-
else:
354-
self._prev_state_ids.pop(key, None)
355-
else:
356-
self._prev_state_ids = current_state_ids
357244

358245

359246
def _encode_state_dict(

synapse/handlers/federation.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -659,7 +659,7 @@ async def do_knock(
659659
# in the invitee's sync stream. It is stripped out for all other local users.
660660
event.unsigned["knock_room_state"] = stripped_room_state["knock_state_events"]
661661

662-
context = EventContext.for_outlier()
662+
context = EventContext.for_outlier(self.storage)
663663
stream_id = await self._federation_event_handler.persist_events_and_notify(
664664
event.room_id, [(event, context)]
665665
)
@@ -848,7 +848,7 @@ async def on_invite_request(
848848
)
849849
)
850850

851-
context = EventContext.for_outlier()
851+
context = EventContext.for_outlier(self.storage)
852852
await self._federation_event_handler.persist_events_and_notify(
853853
event.room_id, [(event, context)]
854854
)
@@ -877,7 +877,7 @@ async def do_remotely_reject_invite(
877877

878878
await self.federation_client.send_leave(host_list, event)
879879

880-
context = EventContext.for_outlier()
880+
context = EventContext.for_outlier(self.storage)
881881
stream_id = await self._federation_event_handler.persist_events_and_notify(
882882
event.room_id, [(event, context)]
883883
)

synapse/handlers/federation_event.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1423,7 +1423,7 @@ def prep(event: EventBase) -> Optional[Tuple[EventBase, EventContext]]:
14231423
# we're not bothering about room state, so flag the event as an outlier.
14241424
event.internal_metadata.outlier = True
14251425

1426-
context = EventContext.for_outlier()
1426+
context = EventContext.for_outlier(self._storage)
14271427
try:
14281428
validate_event_for_room_version(room_version_obj, event)
14291429
check_auth_rules_for_event(room_version_obj, event, auth)
@@ -1874,10 +1874,10 @@ async def _update_context_for_auth_events(
18741874
)
18751875

18761876
return EventContext.with_state(
1877+
storage=self._storage,
18771878
state_group=state_group,
18781879
state_group_before_event=context.state_group_before_event,
1879-
current_state_ids=current_state_ids,
1880-
prev_state_ids=prev_state_ids,
1880+
state_delta_due_to_event=state_updates,
18811881
prev_group=prev_group,
18821882
delta_ids=state_updates,
18831883
partial_state=context.partial_state,

synapse/handlers/message.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -757,6 +757,10 @@ async def deduplicate_state_event(
757757
The previous version of the event is returned, if it is found in the
758758
event context. Otherwise, None is returned.
759759
"""
760+
if event.internal_metadata.is_outlier():
761+
# This can happen due to out of band memberships
762+
return None
763+
760764
prev_state_ids = await context.get_prev_state_ids()
761765
prev_event_id = prev_state_ids.get((event.type, event.state_key))
762766
if not prev_event_id:
@@ -1001,7 +1005,7 @@ async def create_new_client_event(
10011005
# after it is created
10021006
if builder.internal_metadata.outlier:
10031007
event.internal_metadata.outlier = True
1004-
context = EventContext.for_outlier()
1008+
context = EventContext.for_outlier(self.storage)
10051009
elif (
10061010
event.type == EventTypes.MSC2716_INSERTION
10071011
and state_event_ids

synapse/push/action_generator.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,5 +40,9 @@ def __init__(self, hs: "HomeServer"):
4040
async def handle_push_actions_for_event(
4141
self, event: EventBase, context: EventContext
4242
) -> None:
43+
if event.internal_metadata.is_outlier():
44+
# This can happen due to out of band memberships
45+
return
46+
4347
with Measure(self.clock, "action_for_event_by_user"):
4448
await self.bulk_evaluator.action_for_event_by_user(event, context)

synapse/state/__init__.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ def __init__(self, hs: "HomeServer"):
130130
self.state_store = hs.get_storage().state
131131
self.hs = hs
132132
self._state_resolution_handler = hs.get_state_resolution_handler()
133+
self._storage = hs.get_storage()
133134

134135
@overload
135136
async def get_current_state(
@@ -361,10 +362,10 @@ async def compute_event_context(
361362

362363
if not event.is_state():
363364
return EventContext.with_state(
365+
storage=self._storage,
364366
state_group_before_event=state_group_before_event,
365367
state_group=state_group_before_event,
366-
current_state_ids=state_ids_before_event,
367-
prev_state_ids=state_ids_before_event,
368+
state_delta_due_to_event={},
368369
prev_group=state_group_before_event_prev_group,
369370
delta_ids=deltas_to_state_group_before_event,
370371
partial_state=partial_state,
@@ -393,10 +394,10 @@ async def compute_event_context(
393394
)
394395

395396
return EventContext.with_state(
397+
storage=self._storage,
396398
state_group=state_group_after_event,
397399
state_group_before_event=state_group_before_event,
398-
current_state_ids=state_ids_after_event,
399-
prev_state_ids=state_ids_before_event,
400+
state_delta_due_to_event=delta_ids,
400401
prev_group=state_group_before_event,
401402
delta_ids=delta_ids,
402403
partial_state=partial_state,

0 commit comments

Comments
 (0)