Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions synapse/app/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from synapse.storage.engines import create_engine
from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.manhole import manhole
from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
Expand Down Expand Up @@ -187,7 +187,11 @@ def start(config_options):
ps.start_listening(config.worker_listeners)

def run():
with LoggingContext("run"):
# make sure that we run the reactor with the sentinel log context,
# otherwise other PreserveLoggingContext instances will get confused
# and complain when they see the logcontext arbitrarily swapping
# between the sentinel and `run` logcontexts.
with PreserveLoggingContext():
logger.info("Running")
change_resource_limit(config.soft_file_limit)
if config.gc_thresholds:
Expand Down
8 changes: 6 additions & 2 deletions synapse/app/client_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from synapse.storage.engines import create_engine
from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.manhole import manhole
from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
Expand Down Expand Up @@ -193,7 +193,11 @@ def start(config_options):
ss.start_listening(config.worker_listeners)

def run():
with LoggingContext("run"):
# make sure that we run the reactor with the sentinel log context,
# otherwise other PreserveLoggingContext instances will get confused
# and complain when they see the logcontext arbitrarily swapping
# between the sentinel and `run` logcontexts.
with PreserveLoggingContext():
logger.info("Running")
change_resource_limit(config.soft_file_limit)
if config.gc_thresholds:
Expand Down
8 changes: 6 additions & 2 deletions synapse/app/federation_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from synapse.storage.engines import create_engine
from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.manhole import manhole
from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
Expand Down Expand Up @@ -184,7 +184,11 @@ def start(config_options):
ss.start_listening(config.worker_listeners)

def run():
with LoggingContext("run"):
# make sure that we run the reactor with the sentinel log context,
# otherwise other PreserveLoggingContext instances will get confused
# and complain when they see the logcontext arbitrarily swapping
# between the sentinel and `run` logcontexts.
with PreserveLoggingContext():
logger.info("Running")
change_resource_limit(config.soft_file_limit)
if config.gc_thresholds:
Expand Down
8 changes: 6 additions & 2 deletions synapse/app/federation_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from synapse.storage.presence import UserPresenceState
from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.manhole import manhole
from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
Expand Down Expand Up @@ -193,7 +193,11 @@ def start(config_options):
ps.start_listening(config.worker_listeners)

def run():
with LoggingContext("run"):
# make sure that we run the reactor with the sentinel log context,
# otherwise other PreserveLoggingContext instances will get confused
# and complain when they see the logcontext arbitrarily swapping
# between the sentinel and `run` logcontexts.
with PreserveLoggingContext():
logger.info("Running")
change_resource_limit(config.soft_file_limit)
if config.gc_thresholds:
Expand Down
9 changes: 7 additions & 2 deletions synapse/app/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
)
from synapse.config.homeserver import HomeServerConfig
from synapse.crypto import context_factory
from synapse.util.logcontext import LoggingContext
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.metrics import register_memory_metrics, get_metrics_for
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
from synapse.replication.resource import ReplicationResource, REPLICATION_PREFIX
Expand Down Expand Up @@ -456,7 +456,12 @@ def phone_stats_home():
def in_thread():
# Uncomment to enable tracing of log context changes.
# sys.settrace(logcontext_tracer)
with LoggingContext("run"):

# make sure that we run the reactor with the sentinel log context,
# otherwise other PreserveLoggingContext instances will get confused
# and complain when they see the logcontext arbitrarily swapping
# between the sentinel and `run` logcontexts.
with PreserveLoggingContext():
change_resource_limit(hs.config.soft_file_limit)
if hs.config.gc_thresholds:
gc.set_threshold(*hs.config.gc_thresholds)
Expand Down
8 changes: 6 additions & 2 deletions synapse/app/media_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from synapse.storage.media_repository import MediaRepositoryStore
from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.manhole import manhole
from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
Expand Down Expand Up @@ -190,7 +190,11 @@ def start(config_options):
ss.start_listening(config.worker_listeners)

def run():
with LoggingContext("run"):
# make sure that we run the reactor with the sentinel log context,
# otherwise other PreserveLoggingContext instances will get confused
# and complain when they see the logcontext arbitrarily swapping
# between the sentinel and `run` logcontexts.
with PreserveLoggingContext():
logger.info("Running")
change_resource_limit(config.soft_file_limit)
if config.gc_thresholds:
Expand Down
9 changes: 7 additions & 2 deletions synapse/app/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
from synapse.storage import DataStore
from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, preserve_fn
from synapse.util.logcontext import LoggingContext, preserve_fn, \
PreserveLoggingContext
from synapse.util.manhole import manhole
from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
Expand Down Expand Up @@ -275,7 +276,11 @@ def start(config_options):
ps.start_listening(config.worker_listeners)

def run():
with LoggingContext("run"):
# make sure that we run the reactor with the sentinel log context,
# otherwise other PreserveLoggingContext instances will get confused
# and complain when they see the logcontext arbitrarily swapping
# between the sentinel and `run` logcontexts.
with PreserveLoggingContext():
logger.info("Running")
change_resource_limit(config.soft_file_limit)
if config.gc_thresholds:
Expand Down
9 changes: 7 additions & 2 deletions synapse/app/synchrotron.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@
from synapse.storage.roommember import RoomMemberStore
from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, preserve_fn
from synapse.util.logcontext import LoggingContext, preserve_fn, \
PreserveLoggingContext
from synapse.util.manhole import manhole
from synapse.util.rlimit import change_resource_limit
from synapse.util.stringutils import random_string
Expand Down Expand Up @@ -496,7 +497,11 @@ def start(config_options):
ss.start_listening(config.worker_listeners)

def run():
with LoggingContext("run"):
# make sure that we run the reactor with the sentinel log context,
# otherwise other PreserveLoggingContext instances will get confused
# and complain when they see the logcontext arbitrarily swapping
# between the sentinel and `run` logcontexts.
with PreserveLoggingContext():
logger.info("Running")
change_resource_limit(config.soft_file_limit)
if config.gc_thresholds:
Expand Down
5 changes: 3 additions & 2 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -933,8 +933,9 @@ def do_invite_join(self, target_hosts, room_id, joinee, content):
# lots of requests for missing prev_events which we do actually
# have. Hence we fire off the deferred, but don't wait for it.

synapse.util.logcontext.reset_context_after_deferred(
self._handle_queued_pdus(room_queue))
synapse.util.logcontext.preserve_fn(self._handle_queued_pdus)(
room_queue
)

defer.returnValue(True)

Expand Down
15 changes: 3 additions & 12 deletions synapse/storage/background_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import synapse.util.async

from ._base import SQLBaseStore
from . import engines
Expand Down Expand Up @@ -84,24 +85,14 @@ def __init__(self, hs):
self._background_update_performance = {}
self._background_update_queue = []
self._background_update_handlers = {}
self._background_update_timer = None

@defer.inlineCallbacks
def start_doing_background_updates(self):
assert self._background_update_timer is None, \
"background updates already running"

logger.info("Starting background schema updates")

while True:
sleep = defer.Deferred()
self._background_update_timer = self._clock.call_later(
self.BACKGROUND_UPDATE_INTERVAL_MS / 1000., sleep.callback, None
)
try:
yield sleep
finally:
self._background_update_timer = None
yield synapse.util.async.sleep(
self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.)

try:
result = yield self.do_next_background_update(
Expand Down
61 changes: 29 additions & 32 deletions synapse/util/logcontext.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,47 +308,44 @@ def preserve_context_over_deferred(deferred, context=None):
return d


def reset_context_after_deferred(deferred):
"""If the deferred is incomplete, add a callback which will reset the
context.

This is useful when you want to fire off a deferred, but don't want to
wait for it to complete. (The deferred will restore the current log context
when it completes, so if you don't do anything, it will leak log context.)

(If this feels asymmetric, consider it this way: we are effectively forking
a new thread of execution. We are probably currently within a
``with LoggingContext()`` block, which is supposed to have a single entry
and exit point. But by spawning off another deferred, we are effectively
adding a new exit point.)
def preserve_fn(f):
"""Wraps a function, to ensure that the current context is restored after
return from the function, and that the sentinel context is set once the
deferred returned by the funtion completes.

Args:
deferred (defer.Deferred): deferred
Useful for wrapping functions that return a deferred which you don't yield
on.
"""
def reset_context(result):
LoggingContext.set_current_context(LoggingContext.sentinel)
return result

if not deferred.called:
deferred.addBoth(reset_context)


def preserve_fn(f):
"""Ensures that function is called with correct context and that context is
restored after return. Useful for wrapping functions that return a deferred
which you don't yield on.
"""
# XXX: why is this here rather than inside g? surely we want to preserve
# the context from the time the function was called, not when it was
# wrapped?
current = LoggingContext.current_context()

def g(*args, **kwargs):
with PreserveLoggingContext(current):
res = f(*args, **kwargs)
if isinstance(res, defer.Deferred):
return preserve_context_over_deferred(
res, context=LoggingContext.sentinel
)
else:
return res
res = f(*args, **kwargs)
if isinstance(res, defer.Deferred) and not res.called:
# The function will have reset the context before returning, so
# we need to restore it now.
LoggingContext.set_current_context(current)

# The original context will be restored when the deferred
# completes, but there is nothing waiting for it, so it will
# get leaked into the reactor or some other function which
# wasn't expecting it. We therefore need to reset the context
# here.
#
# (If this feels asymmetric, consider it this way: we are
# effectively forking a new thread of execution. We are
# probably currently within a ``with LoggingContext()`` block,
# which is supposed to have a single entry and exit point. But
# by spawning off another deferred, we are effectively
# adding a new exit point.)
res.addBoth(reset_context)
return res
return g


Expand Down
61 changes: 61 additions & 0 deletions tests/util/test_log_context.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import twisted.python.failure
from twisted.internet import defer
from twisted.internet import reactor
from .. import unittest

from synapse.util.async import sleep
from synapse.util import logcontext
from synapse.util.logcontext import LoggingContext


Expand Down Expand Up @@ -33,3 +35,62 @@ def competing_callback():
context_one.test_key = "one"
yield sleep(0)
self._check_test_key("one")

def _test_preserve_fn(self, function):
sentinel_context = LoggingContext.current_context()

callback_completed = [False]

@defer.inlineCallbacks
def cb():
context_one.test_key = "one"
yield function()
self._check_test_key("one")

callback_completed[0] = True

with LoggingContext() as context_one:
context_one.test_key = "one"

# fire off function, but don't wait on it.
logcontext.preserve_fn(cb)()

self._check_test_key("one")

# now wait for the function under test to have run, and check that
# the logcontext is left in a sane state.
d2 = defer.Deferred()

def check_logcontext():
if not callback_completed[0]:
reactor.callLater(0.01, check_logcontext)
return

# make sure that the context was reset before it got thrown back
# into the reactor
try:
self.assertIs(LoggingContext.current_context(),
sentinel_context)
d2.callback(None)
except BaseException:
d2.errback(twisted.python.failure.Failure())

reactor.callLater(0.01, check_logcontext)

# test is done once d2 finishes
return d2

def test_preserve_fn_with_blocking_fn(self):
@defer.inlineCallbacks
def blocking_function():
yield sleep(0)

return self._test_preserve_fn(blocking_function)

def test_preserve_fn_with_non_blocking_fn(self):
@defer.inlineCallbacks
def nonblocking_function():
with logcontext.PreserveLoggingContext():
yield defer.succeed(None)

return self._test_preserve_fn(nonblocking_function)