@@ -1039,6 +1039,34 @@ async def _handle_received_pdu(self, origin: str, pdu: EventBase) -> None:
10391039 pdu .room_id , room_version , lock , origin , pdu
10401040 )
10411041
1042+ async def _get_next_valid_staged_event_for_room (
1043+ self , room_id : str , room_version : RoomVersion
1044+ ) -> Optional [Tuple [str , EventBase ]]:
1045+ """Return the first non-spam event from staging queue.
1046+ """
1047+
1048+ while True :
1049+ # We need to do this check outside the lock to avoid a race between
1050+ # a new event being inserted by another instance and it attempting
1051+ # to acquire the lock.
1052+ next = await self .store .get_next_staged_event_for_room (
1053+ room_id , room_version
1054+ )
1055+
1056+ if next is None :
1057+ return None
1058+
1059+ origin , event = next
1060+
1061+ if await self ._spam_checker .should_drop_federated_event (event ):
1062+ logger .warning (
1063+ "Staged federated event contains spam, dropping %s" ,
1064+ event .event_id ,
1065+ )
1066+ continue
1067+
1068+ return next
1069+
10421070 @wrap_as_background_process ("_process_incoming_pdus_in_room_inner" )
10431071 async def _process_incoming_pdus_in_room_inner (
10441072 self ,
@@ -1116,31 +1144,15 @@ async def _process_incoming_pdus_in_room_inner(
11161144 (self ._clock .time_msec () - received_ts ) / 1000
11171145 )
11181146
1119- while True :
1120- # We need to do this check outside the lock to avoid a race between
1121- # a new event being inserted by another instance and it attempting
1122- # to acquire the lock.
1123- next = await self .store .get_next_staged_event_for_room (
1124- room_id , room_version
1125- )
1126-
1127- if next is None :
1128- break
1129-
1130- origin , event = next
1131-
1132- if await self ._spam_checker .should_drop_federated_event (event ):
1133- logger .warning (
1134- "Staged federated event contains spam, dropping %s" ,
1135- event .event_id ,
1136- )
1137- continue
1138-
1139- break
1147+ next = await self ._get_next_valid_staged_event_for_room (
1148+ room_id , room_version
1149+ )
11401150
11411151 if not next :
11421152 break
11431153
1154+ origin , event = next
1155+
11441156 # Prune the event queue if it's getting large.
11451157 #
11461158 # We do this *after* handling the first event as the common case is
0 commit comments