Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 6c1f7c7

Browse files
authored
Fix limit logic for AccountDataStream (#7384)
Make sure that the AccountDataStream presents complete updates, in the right order. This is much the same fix as #7337 and #7358, but applied to a different stream.
1 parent 34a43f0 commit 6c1f7c7

4 files changed

Lines changed: 217 additions & 31 deletions

File tree

changelog.d/7384.bugfix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix a bug where event updates might not be sent over replication to worker processes after the stream falls behind.

synapse/replication/tcp/streams/_base.py

Lines changed: 56 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,27 @@
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
1616

17+
import heapq
1718
import logging
1819
from collections import namedtuple
19-
from typing import Any, Awaitable, Callable, List, Optional, Tuple
20+
from typing import (
21+
TYPE_CHECKING,
22+
Any,
23+
Awaitable,
24+
Callable,
25+
List,
26+
Optional,
27+
Tuple,
28+
TypeVar,
29+
)
2030

2131
import attr
2232

2333
from synapse.replication.http.streams import ReplicationGetStreamUpdates
2434

35+
if TYPE_CHECKING:
36+
import synapse.server
37+
2538
logger = logging.getLogger(__name__)
2639

2740
# the number of rows to request from an update_function.
@@ -37,7 +50,7 @@
3750
# parsing with Stream.parse_row (which turns it into a `ROW_TYPE`). Normally it's
3851
# just a row from a database query, though this is dependent on the stream in question.
3952
#
40-
StreamRow = Tuple
53+
StreamRow = TypeVar("StreamRow", bound=Tuple)
4154

4255
# The type returned by the update_function of a stream, as well as get_updates(),
4356
# get_updates_since, etc.
@@ -533,32 +546,63 @@ class AccountDataStream(Stream):
533546
"""
534547

535548
AccountDataStreamRow = namedtuple(
536-
"AccountDataStream", ("user_id", "room_id", "data_type") # str # str # str
549+
"AccountDataStream",
550+
("user_id", "room_id", "data_type"), # str # Optional[str] # str
537551
)
538552

539553
NAME = "account_data"
540554
ROW_TYPE = AccountDataStreamRow
541555

542-
def __init__(self, hs):
556+
def __init__(self, hs: "synapse.server.HomeServer"):
543557
self.store = hs.get_datastore()
544558
super().__init__(
545559
hs.get_instance_name(),
546560
current_token_without_instance(self.store.get_max_account_data_stream_id),
547-
db_query_to_update_function(self._update_function),
561+
self._update_function,
562+
)
563+
564+
async def _update_function(
565+
self, instance_name: str, from_token: int, to_token: int, limit: int
566+
) -> StreamUpdateResult:
567+
limited = False
568+
global_results = await self.store.get_updated_global_account_data(
569+
from_token, to_token, limit
548570
)
549571

550-
async def _update_function(self, from_token, to_token, limit):
551-
global_results, room_results = await self.store.get_all_updated_account_data(
552-
from_token, from_token, to_token, limit
572+
# if the global results hit the limit, we'll need to limit the room results to
573+
# the same stream token.
574+
if len(global_results) >= limit:
575+
to_token = global_results[-1][0]
576+
limited = True
577+
578+
room_results = await self.store.get_updated_room_account_data(
579+
from_token, to_token, limit
553580
)
554581

555-
results = list(room_results)
556-
results.extend(
557-
(stream_id, user_id, None, account_data_type)
582+
# likewise, if the room results hit the limit, limit the global results to
583+
# the same stream token.
584+
if len(room_results) >= limit:
585+
to_token = room_results[-1][0]
586+
limited = True
587+
588+
# convert the global results to the right format, and limit them to the to_token
589+
# at the same time
590+
global_rows = (
591+
(stream_id, (user_id, None, account_data_type))
558592
for stream_id, user_id, account_data_type in global_results
593+
if stream_id <= to_token
594+
)
595+
596+
# we know that the room_results are already limited to `to_token` so no need
597+
# for a check on `stream_id` here.
598+
room_rows = (
599+
(stream_id, (user_id, room_id, account_data_type))
600+
for stream_id, user_id, room_id, account_data_type in room_results
559601
)
560602

561-
return results
603+
# we need to return a sorted list, so merge them together.
604+
updates = list(heapq.merge(room_rows, global_rows))
605+
return updates, to_token, limited
562606

563607

564608
class GroupServerStream(Stream):

synapse/storage/data_stores/main/account_data.py

Lines changed: 43 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import abc
1818
import logging
19+
from typing import List, Tuple
1920

2021
from canonicaljson import json
2122

@@ -175,41 +176,64 @@ def get_account_data_for_room_and_type_txn(txn):
175176
"get_account_data_for_room_and_type", get_account_data_for_room_and_type_txn
176177
)
177178

178-
def get_all_updated_account_data(
179-
self, last_global_id, last_room_id, current_id, limit
180-
):
181-
"""Get all the client account_data that has changed on the server
179+
async def get_updated_global_account_data(
180+
self, last_id: int, current_id: int, limit: int
181+
) -> List[Tuple[int, str, str]]:
182+
"""Get the global account_data that has changed, for the account_data stream
183+
182184
Args:
183-
last_global_id(int): The position to fetch from for top level data
184-
last_room_id(int): The position to fetch from for per room data
185-
current_id(int): The position to fetch up to.
185+
last_id: the last stream_id from the previous batch.
186+
current_id: the maximum stream_id to return up to
187+
limit: the maximum number of rows to return
188+
186189
Returns:
187-
A deferred pair of lists of tuples of stream_id int, user_id string,
188-
room_id string, and type string.
190+
A list of tuples of stream_id int, user_id string,
191+
and type string.
189192
"""
190-
if last_room_id == current_id and last_global_id == current_id:
191-
return defer.succeed(([], []))
193+
if last_id == current_id:
194+
return []
192195

193-
def get_updated_account_data_txn(txn):
196+
def get_updated_global_account_data_txn(txn):
194197
sql = (
195198
"SELECT stream_id, user_id, account_data_type"
196199
" FROM account_data WHERE ? < stream_id AND stream_id <= ?"
197200
" ORDER BY stream_id ASC LIMIT ?"
198201
)
199-
txn.execute(sql, (last_global_id, current_id, limit))
200-
global_results = txn.fetchall()
202+
txn.execute(sql, (last_id, current_id, limit))
203+
return txn.fetchall()
204+
205+
return await self.db.runInteraction(
206+
"get_updated_global_account_data", get_updated_global_account_data_txn
207+
)
208+
209+
async def get_updated_room_account_data(
210+
self, last_id: int, current_id: int, limit: int
211+
) -> List[Tuple[int, str, str, str]]:
212+
"""Get the global account_data that has changed, for the account_data stream
201213
214+
Args:
215+
last_id: the last stream_id from the previous batch.
216+
current_id: the maximum stream_id to return up to
217+
limit: the maximum number of rows to return
218+
219+
Returns:
220+
A list of tuples of stream_id int, user_id string,
221+
room_id string and type string.
222+
"""
223+
if last_id == current_id:
224+
return []
225+
226+
def get_updated_room_account_data_txn(txn):
202227
sql = (
203228
"SELECT stream_id, user_id, room_id, account_data_type"
204229
" FROM room_account_data WHERE ? < stream_id AND stream_id <= ?"
205230
" ORDER BY stream_id ASC LIMIT ?"
206231
)
207-
txn.execute(sql, (last_room_id, current_id, limit))
208-
room_results = txn.fetchall()
209-
return global_results, room_results
232+
txn.execute(sql, (last_id, current_id, limit))
233+
return txn.fetchall()
210234

211-
return self.db.runInteraction(
212-
"get_all_updated_account_data_txn", get_updated_account_data_txn
235+
return await self.db.runInteraction(
236+
"get_updated_room_account_data", get_updated_room_account_data_txn
213237
)
214238

215239
def get_updated_account_data_for_user(self, user_id, stream_id):
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
# -*- coding: utf-8 -*-
2+
# Copyright 2020 The Matrix.org Foundation C.I.C.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
from synapse.replication.tcp.streams._base import (
17+
_STREAM_UPDATE_TARGET_ROW_COUNT,
18+
AccountDataStream,
19+
)
20+
21+
from tests.replication._base import BaseStreamTestCase
22+
23+
24+
class AccountDataStreamTestCase(BaseStreamTestCase):
25+
def test_update_function_room_account_data_limit(self):
26+
"""Test replication with many room account data updates
27+
"""
28+
store = self.hs.get_datastore()
29+
30+
# generate lots of account data updates
31+
updates = []
32+
for i in range(_STREAM_UPDATE_TARGET_ROW_COUNT + 5):
33+
update = "m.test_type.%i" % (i,)
34+
self.get_success(
35+
store.add_account_data_to_room("test_user", "test_room", update, {})
36+
)
37+
updates.append(update)
38+
39+
# also one global update
40+
self.get_success(store.add_account_data_for_user("test_user", "m.global", {}))
41+
42+
# tell the notifier to catch up to avoid duplicate rows.
43+
# workaround for https://github.com/matrix-org/synapse/issues/7360
44+
# FIXME remove this when the above is fixed
45+
self.replicate()
46+
47+
# check we're testing what we think we are: no rows should yet have been
48+
# received
49+
self.assertEqual([], self.test_handler.received_rdata_rows)
50+
51+
# now reconnect to pull the updates
52+
self.reconnect()
53+
self.replicate()
54+
55+
# we should have received all the expected rows in the right order
56+
received_rows = self.test_handler.received_rdata_rows
57+
58+
for t in updates:
59+
(stream_name, token, row) = received_rows.pop(0)
60+
self.assertEqual(stream_name, AccountDataStream.NAME)
61+
self.assertIsInstance(row, AccountDataStream.AccountDataStreamRow)
62+
self.assertEqual(row.data_type, t)
63+
self.assertEqual(row.room_id, "test_room")
64+
65+
(stream_name, token, row) = received_rows.pop(0)
66+
self.assertIsInstance(row, AccountDataStream.AccountDataStreamRow)
67+
self.assertEqual(row.data_type, "m.global")
68+
self.assertIsNone(row.room_id)
69+
70+
self.assertEqual([], received_rows)
71+
72+
def test_update_function_global_account_data_limit(self):
73+
"""Test replication with many global account data updates
74+
"""
75+
store = self.hs.get_datastore()
76+
77+
# generate lots of account data updates
78+
updates = []
79+
for i in range(_STREAM_UPDATE_TARGET_ROW_COUNT + 5):
80+
update = "m.test_type.%i" % (i,)
81+
self.get_success(store.add_account_data_for_user("test_user", update, {}))
82+
updates.append(update)
83+
84+
# also one per-room update
85+
self.get_success(
86+
store.add_account_data_to_room("test_user", "test_room", "m.per_room", {})
87+
)
88+
89+
# tell the notifier to catch up to avoid duplicate rows.
90+
# workaround for https://github.com/matrix-org/synapse/issues/7360
91+
# FIXME remove this when the above is fixed
92+
self.replicate()
93+
94+
# check we're testing what we think we are: no rows should yet have been
95+
# received
96+
self.assertEqual([], self.test_handler.received_rdata_rows)
97+
98+
# now reconnect to pull the updates
99+
self.reconnect()
100+
self.replicate()
101+
102+
# we should have received all the expected rows in the right order
103+
received_rows = self.test_handler.received_rdata_rows
104+
105+
for t in updates:
106+
(stream_name, token, row) = received_rows.pop(0)
107+
self.assertEqual(stream_name, AccountDataStream.NAME)
108+
self.assertIsInstance(row, AccountDataStream.AccountDataStreamRow)
109+
self.assertEqual(row.data_type, t)
110+
self.assertIsNone(row.room_id)
111+
112+
(stream_name, token, row) = received_rows.pop(0)
113+
self.assertIsInstance(row, AccountDataStream.AccountDataStreamRow)
114+
self.assertEqual(row.data_type, "m.per_room")
115+
self.assertEqual(row.room_id, "test_room")
116+
117+
self.assertEqual([], received_rows)

0 commit comments

Comments
 (0)