Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit fd4c975

Browse files
authored
Merge pull request #7190 from matrix-org/rav/one_bg_update_at_a_time
Only run one background update at a time
2 parents bae3274 + fcc2de7 commit fd4c975

6 files changed

Lines changed: 99 additions & 72 deletions

File tree

changelog.d/7190.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Only run one background database update at a time.

synapse/storage/background_updates.py

Lines changed: 56 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,10 @@ def __init__(self, hs, database):
9090
self._clock = hs.get_clock()
9191
self.db = database
9292

93+
# if a background update is currently running, its name.
94+
self._current_background_update = None # type: Optional[str]
95+
9396
self._background_update_performance = {}
94-
self._background_update_queue = []
9597
self._background_update_handlers = {}
9698
self._all_done = False
9799

@@ -111,34 +113,33 @@ async def run_background_updates(self, sleep=True):
111113
except Exception:
112114
logger.exception("Error doing update")
113115
else:
114-
if result is None:
116+
if result:
115117
logger.info(
116118
"No more background updates to do."
117119
" Unscheduling background update task."
118120
)
119121
self._all_done = True
120122
return None
121123

122-
@defer.inlineCallbacks
123-
def has_completed_background_updates(self):
124+
async def has_completed_background_updates(self) -> bool:
124125
"""Check if all the background updates have completed
125126
126127
Returns:
127-
Deferred[bool]: True if all background updates have completed
128+
True if all background updates have completed
128129
"""
129130
# if we've previously determined that there is nothing left to do, that
130131
# is easy
131132
if self._all_done:
132133
return True
133134

134-
# obviously, if we have things in our queue, we're not done.
135-
if self._background_update_queue:
135+
# obviously, if we are currently processing an update, we're not done.
136+
if self._current_background_update:
136137
return False
137138

138139
# otherwise, check if there are updates to be run. This is important,
139140
# as we may be running on a worker which doesn't perform the bg updates
140141
# itself, but still wants to wait for them to happen.
141-
updates = yield self.db.simple_select_onecol(
142+
updates = await self.db.simple_select_onecol(
142143
"background_updates",
143144
keyvalues=None,
144145
retcol="1",
@@ -153,11 +154,10 @@ def has_completed_background_updates(self):
153154
async def has_completed_background_update(self, update_name) -> bool:
154155
"""Check if the given background update has finished running.
155156
"""
156-
157157
if self._all_done:
158158
return True
159159

160-
if update_name in self._background_update_queue:
160+
if update_name == self._current_background_update:
161161
return False
162162

163163
update_exists = await self.db.simple_select_one_onecol(
@@ -170,9 +170,7 @@ async def has_completed_background_update(self, update_name) -> bool:
170170

171171
return not update_exists
172172

173-
async def do_next_background_update(
174-
self, desired_duration_ms: float
175-
) -> Optional[int]:
173+
async def do_next_background_update(self, desired_duration_ms: float) -> bool:
176174
"""Does some amount of work on the next queued background update
177175
178176
Returns once some amount of work is done.
@@ -181,33 +179,51 @@ async def do_next_background_update(
181179
desired_duration_ms(float): How long we want to spend
182180
updating.
183181
Returns:
184-
None if there is no more work to do, otherwise an int
182+
True if we have finished running all the background updates, otherwise False
185183
"""
186-
if not self._background_update_queue:
187-
updates = await self.db.simple_select_list(
188-
"background_updates",
189-
keyvalues=None,
190-
retcols=("update_name", "depends_on"),
184+
185+
def get_background_updates_txn(txn):
186+
txn.execute(
187+
"""
188+
SELECT update_name, depends_on FROM background_updates
189+
ORDER BY ordering, update_name
190+
"""
191191
)
192-
in_flight = {update["update_name"] for update in updates}
193-
for update in updates:
194-
if update["depends_on"] not in in_flight:
195-
self._background_update_queue.append(update["update_name"])
192+
return self.db.cursor_to_dict(txn)
196193

197-
if not self._background_update_queue:
198-
# no work left to do
199-
return None
194+
if not self._current_background_update:
195+
all_pending_updates = await self.db.runInteraction(
196+
"background_updates", get_background_updates_txn,
197+
)
198+
if not all_pending_updates:
199+
# no work left to do
200+
return True
201+
202+
# find the first update which isn't dependent on another one in the queue.
203+
pending = {update["update_name"] for update in all_pending_updates}
204+
for upd in all_pending_updates:
205+
depends_on = upd["depends_on"]
206+
if not depends_on or depends_on not in pending:
207+
break
208+
logger.info(
209+
"Not starting on bg update %s until %s is done",
210+
upd["update_name"],
211+
depends_on,
212+
)
213+
else:
214+
# if we get to the end of that for loop, there is a problem
215+
raise Exception(
216+
"Unable to find a background update which doesn't depend on "
217+
"another: dependency cycle?"
218+
)
200219

201-
# pop from the front, and add back to the back
202-
update_name = self._background_update_queue.pop(0)
203-
self._background_update_queue.append(update_name)
220+
self._current_background_update = upd["update_name"]
204221

205-
res = await self._do_background_update(update_name, desired_duration_ms)
206-
return res
222+
await self._do_background_update(desired_duration_ms)
223+
return False
207224

208-
async def _do_background_update(
209-
self, update_name: str, desired_duration_ms: float
210-
) -> int:
225+
async def _do_background_update(self, desired_duration_ms: float) -> int:
226+
update_name = self._current_background_update
211227
logger.info("Starting update batch on background update '%s'", update_name)
212228

213229
update_handler = self._background_update_handlers[update_name]
@@ -400,27 +416,6 @@ def updater(progress, batch_size):
400416

401417
self.register_background_update_handler(update_name, updater)
402418

403-
def start_background_update(self, update_name, progress):
404-
"""Starts a background update running.
405-
406-
Args:
407-
update_name: The update to set running.
408-
progress: The initial state of the progress of the update.
409-
410-
Returns:
411-
A deferred that completes once the task has been added to the
412-
queue.
413-
"""
414-
# Clear the background update queue so that we will pick up the new
415-
# task on the next iteration of do_background_update.
416-
self._background_update_queue = []
417-
progress_json = json.dumps(progress)
418-
419-
return self.db.simple_insert(
420-
"background_updates",
421-
{"update_name": update_name, "progress_json": progress_json},
422-
)
423-
424419
def _end_background_update(self, update_name):
425420
"""Removes a completed background update task from the queue.
426421
@@ -429,9 +424,12 @@ def _end_background_update(self, update_name):
429424
Returns:
430425
A deferred that completes once the task is removed.
431426
"""
432-
self._background_update_queue = [
433-
name for name in self._background_update_queue if name != update_name
434-
]
427+
if update_name != self._current_background_update:
428+
raise Exception(
429+
"Cannot end background update %s which isn't currently running"
430+
% update_name
431+
)
432+
self._current_background_update = None
435433
return self.db.simple_delete_one(
436434
"background_updates", keyvalues={"update_name": update_name}
437435
)

synapse/storage/prepare_database.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929

3030
# Remember to update this number every time a change is made to database
3131
# schema files, so the users will be informed on server restarts.
32-
SCHEMA_VERSION = 57
32+
SCHEMA_VERSION = 58
3333

3434
dir_path = os.path.abspath(os.path.dirname(__file__))
3535

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/* Copyright 2020 The Matrix.org Foundation C.I.C
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
/* add an "ordering" column to background_updates, which can be used to sort them
17+
to achieve some level of consistency. */
18+
19+
ALTER TABLE background_updates ADD COLUMN ordering INT NOT NULL DEFAULT 0;

tests/storage/test_background_update.py

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
1111
def prepare(self, reactor, clock, homeserver):
1212
self.updates = self.hs.get_datastore().db.updates # type: BackgroundUpdater
1313
# the base test class should have run the real bg updates for us
14-
self.assertTrue(self.updates.has_completed_background_updates())
14+
self.assertTrue(
15+
self.get_success(self.updates.has_completed_background_updates())
16+
)
1517

1618
self.update_handler = Mock()
1719
self.updates.register_background_update_handler(
@@ -25,12 +27,20 @@ def test_do_background_update(self):
2527
# the target runtime for each bg update
2628
target_background_update_duration_ms = 50000
2729

30+
store = self.hs.get_datastore()
31+
self.get_success(
32+
store.db.simple_insert(
33+
"background_updates",
34+
values={"update_name": "test_update", "progress_json": '{"my_key": 1}'},
35+
)
36+
)
37+
2838
# first step: make a bit of progress
2939
@defer.inlineCallbacks
3040
def update(progress, count):
3141
yield self.clock.sleep((count * duration_ms) / 1000)
3242
progress = {"my_key": progress["my_key"] + 1}
33-
yield self.hs.get_datastore().db.runInteraction(
43+
yield store.db.runInteraction(
3444
"update_progress",
3545
self.updates._background_update_progress_txn,
3646
"test_update",
@@ -39,18 +49,14 @@ def update(progress, count):
3949
return count
4050

4151
self.update_handler.side_effect = update
42-
43-
self.get_success(
44-
self.updates.start_background_update("test_update", {"my_key": 1})
45-
)
4652
self.update_handler.reset_mock()
4753
res = self.get_success(
4854
self.updates.do_next_background_update(
4955
target_background_update_duration_ms
5056
),
5157
by=0.1,
5258
)
53-
self.assertIsNotNone(res)
59+
self.assertFalse(res)
5460

5561
# on the first call, we should get run with the default background update size
5662
self.update_handler.assert_called_once_with(
@@ -73,13 +79,13 @@ def update(progress, count):
7379
result = self.get_success(
7480
self.updates.do_next_background_update(target_background_update_duration_ms)
7581
)
76-
self.assertIsNotNone(result)
82+
self.assertFalse(result)
7783
self.update_handler.assert_called_once()
7884

7985
# third step: we don't expect to be called any more
8086
self.update_handler.reset_mock()
8187
result = self.get_success(
8288
self.updates.do_next_background_update(target_background_update_duration_ms)
8389
)
84-
self.assertIsNone(result)
90+
self.assertTrue(result)
8591
self.assertFalse(self.update_handler.called)

tests/unittest.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
from synapse.http.site import SynapseRequest, SynapseSite
4141
from synapse.logging.context import (
4242
SENTINEL_CONTEXT,
43+
LoggingContext,
4344
current_context,
4445
set_current_context,
4546
)
@@ -419,15 +420,17 @@ def setup_test_homeserver(self, *args, **kwargs):
419420
config_obj.parse_config_dict(config, "", "")
420421
kwargs["config"] = config_obj
421422

423+
async def run_bg_updates():
424+
with LoggingContext("run_bg_updates", request="run_bg_updates-1"):
425+
while not await stor.db.updates.has_completed_background_updates():
426+
await stor.db.updates.do_next_background_update(1)
427+
422428
hs = setup_test_homeserver(self.addCleanup, *args, **kwargs)
423429
stor = hs.get_datastore()
424430

425431
# Run the database background updates, when running against "master".
426432
if hs.__class__.__name__ == "TestHomeServer":
427-
while not self.get_success(
428-
stor.db.updates.has_completed_background_updates()
429-
):
430-
self.get_success(stor.db.updates.do_next_background_update(1))
433+
self.get_success(run_bg_updates())
431434

432435
return hs
433436

0 commit comments

Comments
 (0)