-
Notifications
You must be signed in to change notification settings - Fork 522
Fix looping calls not getting GCed. #19416
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Fix memory leak caused by not cleaning up stopped looping calls. Introduced in v1.140.0. | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,10 +15,12 @@ | |
|
|
||
|
|
||
| import logging | ||
| from functools import wraps | ||
| from typing import ( | ||
| Any, | ||
| Callable, | ||
| ) | ||
| from weakref import WeakSet | ||
|
|
||
| from typing_extensions import ParamSpec | ||
| from zope.interface import implementer | ||
|
|
@@ -86,7 +88,7 @@ def __init__(self, reactor: ISynapseThreadlessReactor, server_name: str) -> None | |
| self._delayed_call_id: int = 0 | ||
| """Unique ID used to track delayed calls""" | ||
|
|
||
| self._looping_calls: list[LoopingCall] = [] | ||
| self._looping_calls: WeakSet[LoopingCall] = WeakSet() | ||
| """List of active looping calls""" | ||
|
|
||
| self._call_id_to_delayed_call: dict[int, IDelayedCall] = {} | ||
|
|
@@ -193,6 +195,7 @@ def _looping_call_common( | |
| if now: | ||
| looping_call_context_string = "looping_call_now" | ||
|
|
||
| @wraps(f) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Neat. TIL about |
||
| def wrapped_f(*args: P.args, **kwargs: P.kwargs) -> Deferred: | ||
| clock_debug_logger.debug( | ||
| "%s(%s): Executing callback", looping_call_context_string, instance_id | ||
|
|
@@ -240,7 +243,7 @@ def wrapped_f(*args: P.args, **kwargs: P.kwargs) -> Deferred: | |
| with context.PreserveLoggingContext(): | ||
| d = call.start(duration.as_secs(), now=now) | ||
| d.addErrback(log_failure, "Looping call died", consumeErrors=False) | ||
| self._looping_calls.append(call) | ||
| self._looping_calls.add(call) | ||
|
|
||
| clock_debug_logger.debug( | ||
| "%s(%s): Scheduled looping call every %sms later", | ||
|
|
@@ -302,6 +305,7 @@ def call_later( | |
| if self._is_shutdown: | ||
| raise Exception("Cannot start delayed call. Clock has been shutdown") | ||
|
|
||
| @wraps(callback) | ||
| def wrapped_callback(*args: Any, **kwargs: Any) -> None: | ||
| clock_debug_logger.debug("call_later(%s): Executing callback", call_id) | ||
|
|
||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,77 @@ | ||
| # | ||
| # This file is licensed under the Affero General Public License (AGPL) version 3. | ||
| # | ||
| # Copyright (C) 2025 Element Creations Ltd | ||
| # | ||
| # This program is free software: you can redistribute it and/or modify | ||
| # it under the terms of the GNU Affero General Public License as | ||
| # published by the Free Software Foundation, either version 3 of the | ||
| # License, or (at your option) any later version. | ||
| # | ||
| # See the GNU Affero General Public License for more details: | ||
| # <https://www.gnu.org/licenses/agpl-3.0.html>. | ||
| # | ||
| # | ||
|
|
||
| import weakref | ||
|
|
||
| from synapse.util.duration import Duration | ||
|
|
||
| from tests.unittest import HomeserverTestCase | ||
|
|
||
|
|
||
| class ClockTestCase(HomeserverTestCase): | ||
| def test_looping_calls_are_gced(self) -> None: | ||
| """Test that looping calls are garbage collected after being stopped. | ||
|
|
||
| The `Clock` tracks looping calls so to allow stopping of all looping | ||
| calls via the clock. | ||
| """ | ||
| clock = self.hs.get_clock() | ||
|
|
||
| # Create a new looping call, and take a weakref to it. | ||
| call = clock.looping_call(lambda: None, Duration(seconds=1)) | ||
|
|
||
| weak_call = weakref.ref(call) | ||
|
|
||
| # Stop the looping call. It should get garbage collected after this. | ||
| call.stop() | ||
|
|
||
| # Delete our strong reference to the call (otherwise it won't get garbage collected). | ||
| del call | ||
|
|
||
| # Check that the call has been garbage collected. | ||
| self.assertIsNone(weak_call()) | ||
|
|
||
| def test_looping_calls_stopped_on_clock_shutdown(self) -> None: | ||
| """Test that looping calls are stopped when the clock is shut down.""" | ||
| clock = self.hs.get_clock() | ||
|
|
||
| was_called = False | ||
|
|
||
| def on_call() -> None: | ||
| nonlocal was_called | ||
| was_called = True | ||
|
|
||
| # Create a new looping call. | ||
| call = clock.looping_call(on_call, Duration(seconds=1)) | ||
| weak_call = weakref.ref(call) | ||
| del call # Remove our strong reference to the call. | ||
|
|
||
| # The call should still exist. | ||
| self.assertIsNotNone(weak_call()) | ||
|
|
||
| # Advance the clock to trigger the call. | ||
| self.reactor.advance(2) | ||
| self.assertTrue(was_called) | ||
|
|
||
| # Shut down the clock, which should stop the looping call. | ||
| clock.shutdown() | ||
|
|
||
| # The call should have been garbage collected. | ||
| self.assertIsNone(weak_call()) | ||
|
|
||
| # Advance the clock again; the call should not be called again. | ||
| was_called = False | ||
| self.reactor.advance(2) | ||
| self.assertFalse(was_called) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How did you stumble upon this?
Investigation from seeing some memory graph?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know it started with seeing the matrix.org
client_readerstake a long time to do GC, resulting in them appearing down frequently, https://github.com/matrix-org/internal-config/issues/1677Then looking into why their memory usage was so high and kept growing.
Also, looking at this issue from the other day: #19392
I'm not sure how he jumped from there to looking at
looping_calls.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The basic steps were:
objgraphin the venv and runobjgraph.show_most_common_types(), this came back with a list ofcell,function(etc)... and thenLoopingCall.Clock._looping_calls. One can also use objgraph to generate graphs of references of objects to see what is holding on to the references, which also would have pointed toClock._looping_calls.