Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 49 additions & 53 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -852,18 +852,14 @@ async def _resolve_state_at_missing_prevs(
# note that if any of the missing prevs share missing state or
# auth events, the requests to fetch those events are deduped
# by the get_pdu_cache in federation_client.
remote_state = await self._get_state_after_missing_prev_event(
dest, room_id, p
remote_state_map = (
await self._get_state_ids_after_missing_prev_event(
dest, room_id, p
)
)

remote_state_map = {
(x.type, x.state_key): x.event_id for x in remote_state
}
state_maps.append(remote_state_map)

for x in remote_state:
event_map[x.event_id] = x
Comment thread
erikjohnston marked this conversation as resolved.

room_version = await self._store.get_room_version_id(room_id)
state_map = await self._state_resolution_handler.resolve_events_with_store(
room_id,
Expand All @@ -886,12 +882,12 @@ async def _resolve_state_at_missing_prevs(
)
return state_map

async def _get_state_after_missing_prev_event(
async def _get_state_ids_after_missing_prev_event(
self,
destination: str,
room_id: str,
event_id: str,
) -> List[EventBase]:
) -> StateMap[str]:
"""Requests all of the room state at a given event from a remote homeserver.

Args:
Expand All @@ -900,7 +896,7 @@ async def _get_state_after_missing_prev_event(
event_id: The id of the event we want the state at.

Returns:
A list of events in the state, including the event itself
The event ids of the state *after* the given event.
"""
(
state_event_ids,
Expand All @@ -919,15 +915,13 @@ async def _get_state_after_missing_prev_event(
desired_events = set(state_event_ids)
Comment thread
erikjohnston marked this conversation as resolved.
desired_events.add(event_id)
logger.debug("Fetching %i events from cache/store", len(desired_events))
fetched_events = await self._store.get_events(
desired_events, allow_rejected=True
)
have_events = await self._store.have_seen_events(room_id, desired_events)

missing_desired_events = desired_events - fetched_events.keys()
missing_desired_events = desired_events - have_events
logger.debug(
"We are missing %i events (got %i)",
len(missing_desired_events),
len(fetched_events),
len(have_events),
)

# We probably won't need most of the auth events, so let's just check which
Expand All @@ -938,7 +932,7 @@ async def _get_state_after_missing_prev_event(
# already have a bunch of the state events. It would be nice if the
# federation api gave us a way of finding out which we actually need.

missing_auth_events = set(auth_event_ids) - fetched_events.keys()
missing_auth_events = set(auth_event_ids) - have_events
missing_auth_events.difference_update(
await self._store.have_seen_events(room_id, missing_auth_events)
)
Expand All @@ -964,62 +958,64 @@ async def _get_state_after_missing_prev_event(
destination=destination, room_id=room_id, event_ids=missing_events
)

# we need to make sure we re-load from the database to get the rejected
# state correct.
fetched_events.update(
await self._store.get_events(missing_desired_events, allow_rejected=True)
)

# check for events which were in the wrong room.
#
# this can happen if a remote server claims that the state or
# auth_events at an event in room A are actually events in room B
# We now need to fill out the state map, which involves fetching the
# type and state key for each event ID in the state.
state_map = {}

bad_events = [
(event_id, event.room_id)
for event_id, event in fetched_events.items()
if event.room_id != room_id
]
event_metadata = await self._store.get_metadata_for_events(state_event_ids)
for state_event_id, metadata in event_metadata.items():
if metadata.room_id != room_id:
# This is a bogus situation, but since we may only discover it a long time
# after it happened, we try our best to carry on, by just omitting the
# bad events from the returned state set.
#
# This can happen if a remote server claims that the state or
# auth_events at an event in room A are actually events in room B
logger.warning(
"Remote server %s claims event %s in room %s is an auth/state "
"event in room %s",
destination,
state_event_id,
metadata.room_id,
room_id,
)
continue

for bad_event_id, bad_room_id in bad_events:
# This is a bogus situation, but since we may only discover it a long time
# after it happened, we try our best to carry on, by just omitting the
# bad events from the returned state set.
logger.warning(
"Remote server %s claims event %s in room %s is an auth/state "
"event in room %s",
destination,
bad_event_id,
bad_room_id,
room_id,
)
if metadata.state_key is None:
logger.warning(
"Remote server gave us non-state event in state: %s", state_event_id
)
continue

del fetched_events[bad_event_id]
state_map[(metadata.event_type, metadata.state_key)] = state_event_id

# if we couldn't get the prev event in question, that's a problem.
remote_event = fetched_events.get(event_id)
remote_event = await self._store.get_event(
event_id,
allow_none=True,
allow_rejected=True,
redact_behaviour=EventRedactBehaviour.as_is,
)
if not remote_event:
raise Exception("Unable to get missing prev_event %s" % (event_id,))

# missing state at that event is a warning, not a blocker
# XXX: this doesn't sound right? it means that we'll end up with incomplete
# state.
failed_to_fetch = desired_events - fetched_events.keys()
failed_to_fetch = desired_events - event_metadata.keys()
if failed_to_fetch:
logger.warning(
"Failed to fetch missing state events for %s %s",
event_id,
failed_to_fetch,
)

remote_state = [
fetched_events[e_id] for e_id in state_event_ids if e_id in fetched_events
]

if remote_event.is_state() and remote_event.rejected_reason is None:
remote_state.append(remote_event)
state_map[
(remote_event.type, remote_event.state_key)
] = remote_event.event_id

return remote_state
return state_map

async def _get_state_and_persist(
self, destination: str, room_id: str, event_id: str
Expand Down