Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit ce428a1

Browse files
committed
Fix EventsStream raising assertions when it falls behind
Figuring out how to correctly limit updates from this stream without dropping entries is far more complicated than just counting the number of rows being returned. We need to consider each query separately and, if any one query hits the limit, truncate the results from the others. I think this also fixes some potentially long-standing bugs where events or state changes could get missed if we hit the limit on either query.
1 parent 9cbdfb3 commit ce428a1

2 files changed

Lines changed: 129 additions & 30 deletions

File tree

synapse/replication/tcp/streams/events.py

Lines changed: 95 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,12 @@
1515
# limitations under the License.
1616

1717
import heapq
18-
from typing import Iterable, Tuple, Type
18+
from collections import Iterable
19+
from typing import List, Tuple, Type
1920

2021
import attr
2122

22-
from ._base import Stream, Token, db_query_to_update_function
23+
from ._base import Stream, StreamUpdateResult, Token
2324

2425

2526
"""Handling of the 'events' replication stream
@@ -117,30 +118,106 @@ class EventsStream(Stream):
117118
def __init__(self, hs):
118119
self._store = hs.get_datastore()
119120
super().__init__(
120-
self._store.get_current_events_token,
121-
db_query_to_update_function(self._update_function),
121+
self._store.get_current_events_token, self._update_function,
122122
)
123123

124124
async def _update_function(
125-
self, from_token: Token, current_token: Token, limit: int
126-
) -> Iterable[tuple]:
125+
self, from_token: Token, current_token: Token, target_row_count: int
126+
) -> StreamUpdateResult:
127+
128+
# the events stream merges together three separate sources:
129+
# * new events
130+
# * current_state changes
131+
# * events which were previously outliers, but have now been de-outliered.
132+
#
133+
# The merge operation is complicated by the fact that we only have a single
134+
# "stream token" which is supposed to indicate how far we have got through
135+
# all three streams. It's therefore no good to return rows 1-1000 from the
136+
# "new events" table if the state_deltas are limited to rows 1-100 by the
137+
# target_row_count.
138+
#
139+
# In other words: we must pick a new upper limit, and must return *all* rows
140+
# up to that point for each of the three sources.
141+
#
142+
# Start by trying to split the target_row_count up. We expect to have a
143+
# negligible number of ex-outliers, and a rough approximation based on recent
144+
# traffic on sw1v.org shows that there are approximately the same number of
145+
# event rows between a given pair of stream ids as there are state
146+
# updates, so let's split our target_row_count among those two types. The target
147+
# is only an approximation - it doesn't matter if we end up going a bit over it.
148+
149+
target_row_count //= 2
150+
151+
# now we fetch up to that many rows from the events table
152+
127153
event_rows = await self._store.get_all_new_forward_event_rows(
128-
from_token, current_token, limit
129-
)
130-
event_updates = (
131-
(row[0], EventsStreamEventRow.TypeId, row[1:]) for row in event_rows
132-
)
154+
from_token, current_token, target_row_count
155+
) # type: List[Tuple]
156+
157+
# we rely on get_all_new_forward_event_rows strictly honouring the limit, so
158+
# that we know it is safe to just take upper_limit = event_rows[-1][0].
159+
assert (
160+
len(event_rows) <= target_row_count
161+
), "get_all_new_forward_event_rows did not honour row limit"
162+
163+
# if we hit the limit on event_updates, there's no point in going beyond the
164+
# last stream_id in the batch for the other sources.
165+
166+
if len(event_rows) == target_row_count:
167+
limited = True
168+
upper_limit = event_rows[-1][0] # type: int
169+
else:
170+
limited = False
171+
upper_limit = current_token
172+
173+
# next up is the state delta table
133174

134175
state_rows = await self._store.get_all_updated_current_state_deltas(
135-
from_token, current_token, limit
136-
)
137-
state_updates = (
138-
(row[0], EventsStreamCurrentStateRow.TypeId, row[1:]) for row in state_rows
139-
)
176+
from_token, upper_limit, target_row_count
177+
) # type: List[Tuple]
178+
179+
# again, if we've hit the limit there, we'll need to limit the other sources
180+
assert len(state_rows) < target_row_count
181+
if len(state_rows) == target_row_count:
182+
assert state_rows[-1][0] <= upper_limit
183+
upper_limit = state_rows[-1][0]
184+
limited = True
185+
186+
# FIXME: is it a given that there is only one row per stream_id in the
187+
# state_deltas table (so that we can be sure that we have got all of the
188+
# rows for upper_limit)?
189+
190+
# finally, fetch the ex-outliers rows. We assume there are few enough of these
191+
# not to bother with the limit.
140192

141-
all_updates = heapq.merge(event_updates, state_updates)
193+
ex_outliers_rows = await self._store.get_ex_outlier_stream_rows(
194+
from_token, upper_limit
195+
) # type: List[Tuple]
142196

143-
return all_updates
197+
# we now need to turn the raw database rows returned into tuples suitable
198+
# for the replication protocol (basically, we add an identifier to
199+
# distinguish the row type). At the same time, we can limit the event_rows
200+
# to the max stream_id from state_rows.
201+
202+
event_updates = (
203+
(stream_id, (EventsStreamEventRow.TypeId, rest))
204+
for (stream_id, *rest) in event_rows
205+
if stream_id <= upper_limit
206+
) # type: Iterable[Tuple[int, Tuple]]
207+
208+
state_updates = (
209+
(stream_id, (EventsStreamCurrentStateRow.TypeId, rest))
210+
for (stream_id, *rest) in state_rows
211+
) # type: Iterable[Tuple[int, Tuple]]
212+
213+
ex_outliers_updates = (
214+
(stream_id, (EventsStreamEventRow.TypeId, rest))
215+
for (stream_id, *rest) in ex_outliers_rows
216+
) # type: Iterable[Tuple[int, Tuple]]
217+
218+
# we need to return a sorted list, so merge them together.
219+
updates = list(heapq.merge(event_updates, state_updates, ex_outliers_updates))
220+
return updates, upper_limit, limited
144221

145222
@classmethod
146223
def parse_row(cls, row):

synapse/storage/data_stores/main/events_worker.py

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -973,8 +973,18 @@ def get_current_events_token(self):
973973
return self._stream_id_gen.get_current_token()
974974

975975
def get_all_new_forward_event_rows(self, last_id, current_id, limit):
976-
if last_id == current_id:
977-
return defer.succeed([])
976+
"""Returns new events, for the Events replication stream
977+
978+
Args:
979+
last_id: the last stream_id from the previous batch.
980+
current_id: the maximum stream_id to return up to
981+
limit: the maximum number of rows to return
982+
983+
Returns: Deferred[List[Tuple]]
984+
a list of events stream rows. Each tuple consists of a stream id as
985+
the first element, followed by fields suitable for casting into an
986+
EventsStreamRow.
987+
"""
978988

979989
def get_all_new_forward_event_rows(txn):
980990
sql = (
@@ -989,13 +999,26 @@ def get_all_new_forward_event_rows(txn):
989999
" LIMIT ?"
9901000
)
9911001
txn.execute(sql, (last_id, current_id, limit))
992-
new_event_updates = txn.fetchall()
1002+
return txn.fetchall()
9931003

994-
if len(new_event_updates) == limit:
995-
upper_bound = new_event_updates[-1][0]
996-
else:
997-
upper_bound = current_id
1004+
return self.db.runInteraction(
1005+
"get_all_new_forward_event_rows", get_all_new_forward_event_rows
1006+
)
1007+
1008+
def get_ex_outlier_stream_rows(self, last_id, current_id):
1009+
"""Returns de-outliered events, for the Events replication stream
9981010
1011+
Args:
1012+
last_id: the last stream_id from the previous batch.
1013+
current_id: the maximum stream_id to return up to
1014+
1015+
Returns: Deferred[List[Tuple]]
1016+
a list of events stream rows. Each tuple consists of a stream id as
1017+
the first element, followed by fields suitable for casting into an
1018+
EventsStreamRow.
1019+
"""
1020+
1021+
def get_ex_outlier_stream_rows_txn(txn):
9991022
sql = (
10001023
"SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
10011024
" state_key, redacts, relates_to_id"
@@ -1006,15 +1029,14 @@ def get_all_new_forward_event_rows(txn):
10061029
" LEFT JOIN event_relations USING (event_id)"
10071030
" WHERE ? < event_stream_ordering"
10081031
" AND event_stream_ordering <= ?"
1009-
" ORDER BY event_stream_ordering DESC"
1032+
" ORDER BY event_stream_ordering ASC"
10101033
)
1011-
txn.execute(sql, (last_id, upper_bound))
1012-
new_event_updates.extend(txn)
10131034

1014-
return new_event_updates
1035+
txn.execute(sql, (last_id, current_id))
1036+
return txn.fetchall()
10151037

10161038
return self.db.runInteraction(
1017-
"get_all_new_forward_event_rows", get_all_new_forward_event_rows
1039+
"get_ex_outlier_stream_rows", get_ex_outlier_stream_rows_txn
10181040
)
10191041

10201042
def get_all_new_backfill_event_rows(self, last_id, current_id, limit):

0 commit comments

Comments
 (0)