|
36 | 36 | from twisted.internet.interfaces import IAddress, IConnector |
37 | 37 | from twisted.python.failure import Failure |
38 | 38 |
|
39 | | -from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable |
| 39 | +from synapse.logging.context import ( |
| 40 | + PreserveLoggingContext, |
| 41 | + current_context, |
| 42 | + make_deferred_yieldable, |
| 43 | +) |
40 | 44 | from synapse.metrics import SERVER_NAME_LABEL |
41 | 45 | from synapse.metrics.background_process_metrics import ( |
42 | 46 | BackgroundProcessLoggingContext, |
@@ -117,15 +121,33 @@ class RedisSubscriber(SubscriberProtocol): |
117 | 121 | def __init__(self, *args: Any, **kwargs: Any): |
118 | 122 | super().__init__(*args, **kwargs) |
119 | 123 |
|
| 124 | + # Capture the current context so we can use it later when `server_name` is set. |
| 125 | + self._sentinel_context = current_context() |
120 | 126 | # a logcontext which we use for processing incoming commands. We declare it as a |
121 | 127 | # background process so that the CPU stats get reported to prometheus. |
122 | | - with PreserveLoggingContext(): |
123 | | - # thanks to `PreserveLoggingContext()`, the new logcontext is guaranteed to |
124 | | - # capture the sentinel context as its containing context and won't prevent |
125 | | - # GC of / unintentionally reactivate what would be the current context. |
126 | | - self._logging_context = BackgroundProcessLoggingContext( |
127 | | - name="replication_command_handler", server_name=self.server_name |
128 | | - ) |
| 128 | + self._logging_context: Optional[BackgroundProcessLoggingContext] = None |
| 129 | + |
| 130 | + def _get_logging_context(self) -> BackgroundProcessLoggingContext: |
| 131 | + """ |
| 132 | + We lazily create the logging context so that `self.server_name` is set and |
| 133 | + available. See `RedisDirectTcpReplicationClientFactory.buildProtocol` for more |
| 134 | + details on why we set `self.server_name` after the fact instead of in the |
| 135 | + constructor. |
| 136 | + """ |
| 137 | + assert self.server_name is not None, ( |
| 138 | + "self.server_name must be set before using _get_logging_context()" |
| 139 | + ) |
| 140 | + if self._logging_context is None: |
| 141 | + # a logcontext which we use for processing incoming commands. We declare it as a |
| 142 | + # background process so that the CPU stats get reported to prometheus. |
| 143 | + with PreserveLoggingContext(self._sentinel_context): |
| 144 | + # thanks to `PreserveLoggingContext()`, the new logcontext is guaranteed to |
| 145 | + # capture the sentinel context as its containing context and won't prevent |
| 146 | + # GC of / unintentionally reactivate what would be the current context. |
| 147 | + self._logging_context = BackgroundProcessLoggingContext( |
| 148 | + name="replication_command_handler", server_name=self.server_name |
| 149 | + ) |
| 150 | + return self._logging_context |
129 | 151 |
|
130 | 152 | def connectionMade(self) -> None: |
131 | 153 | logger.info("Connected to redis") |
@@ -159,7 +181,7 @@ async def _send_subscribe(self) -> None: |
159 | 181 |
|
160 | 182 | def messageReceived(self, pattern: str, channel: str, message: str) -> None: |
161 | 183 | """Received a message from redis.""" |
162 | | - with PreserveLoggingContext(self._logging_context): |
| 184 | + with PreserveLoggingContext(self._get_logging_context()): |
163 | 185 | self._parse_and_dispatch_message(message) |
164 | 186 |
|
165 | 187 | def _parse_and_dispatch_message(self, message: str) -> None: |
@@ -218,7 +240,7 @@ def connectionLost(self, reason: Failure) -> None: # type: ignore[override] |
218 | 240 |
|
219 | 241 | # mark the logging context as finished by triggering `__exit__()` |
220 | 242 | with PreserveLoggingContext(): |
221 | | - with self._logging_context: |
| 243 | + with self._get_logging_context(): |
222 | 244 | pass |
223 | 245 | # the sentinel context is now active, which may not be correct. |
224 | 246 | # PreserveLoggingContext() will restore the correct logging context. |
|
0 commit comments