Skip to content

Commit d5bc8bc

Browse files
committed
fix(services): tighten error handling on registry + eviction + sighup paths
Review pass #2 caught a cluster of silent-failure concerns around the registry hot paths and the eviction hooks that now wire it to gateway mutations, downstream DELETE, classification, and SIGHUP. Fixes land as one commit because they share a design: narrow the catches, raise the log level where a failure leaves state wrong, and introduce a dedicated exception type so "registry not initialised" stops hiding other RuntimeErrors. mcpgateway/services/upstream_session_registry.py: - New RegistryNotInitializedError(RuntimeError) so catch-sites can distinguish the "not started yet" case from other runtime errors (e.g. "Event loop is closed" during shutdown). Inherits RuntimeError for backwards compatibility with catch-sites written pre-split. - _probe_health: narrow the catch-all "except Exception → recreate" to (OSError, TimeoutError, McpError). AttributeError from MCP SDK drift, authorization errors, and other genuinely-unexpected conditions now propagate instead of driving an infinite reconnect loop against the same failure. - _default_session_factory.owner(): change except BaseException to except Exception so SystemExit / KeyboardInterrupt / CancelledError propagate promptly during shutdown. Add an add_done_callback that logs a warning if the owner task exits unexpectedly — previously a post-init upstream death silently left an orphaned session in self._sessions. - is_closed: bump the MCP-internals introspection except from pass to logger.debug with the exception type, so SDK drift is visible. - acquire(): wrap the yield in try/except (OSError, anyio.ClosedResourceError, anyio.BrokenResourceError). On transport-level errors from the caller body, evict so the next acquire rebuilds instead of handing back a dead session. Tool- level errors still leave the session in place. - close_all(): asyncio.gather the per-key evictions (with return_exceptions=True). Previously serial with per-key 5s cap — 50 stuck sessions = 4+ minute shutdown stall. mcpgateway/services/gateway_service.py _evict_upstream_sessions_for_gateway: - Catch RegistryNotInitializedError specifically for the tests/early- startup no-op case. Bump the generic-exception branch from debug to warning with gateway_id + exception type — this fires POST- commit, so a silent eviction failure leaves persisted stale credentials/URL/TLS material pinned on in-flight upstream sessions. mcpgateway/cache/session_registry.py remove_session: - Same RegistryNotInitializedError / warning treatment. An orphaned upstream after DELETE /mcp is otherwise invisible to ops. mcpgateway/services/server_classification_service.py _perform_classification: - Bump the Redis-purge catch from debug to warning. The entire point of this method is to KEEP classification keys absent so should_poll_server falls through to "always poll". A sustained purge failure re-opens the very regression this method exists to prevent (previous commit's Codex review fix). mcpgateway/handlers/signal_handlers.py sighup_reload: - Add upstream-registry drain between the SSL cache clear and the affinity-mapping drain. Previously SIGHUP only refreshed SSL contexts and cleared the affinity map — registry-held upstream ClientSessions kept their stale TLS material on the socket. - Catch RegistryNotInitializedError at debug for the uninitialised case; warning for other drain failures. Tests: - test_upstream_session_registry.py FakeClientSession now raises OSError (was RuntimeError) to match the narrowed _probe_health catch — the test's intent was "broken transport → recreate" and OSError is the accurate stand-in. - test_main_sighup.py: rewritten for the new three-step drain. Asserts SSL cache clear + registry.close_all() + affinity drain all fire, with the new log-message strings. Added a test covering the RegistryNotInitializedError debug-path branch. 529 related tests pass across registry + lifecycle + classification + tool_service + cache + sighup suites. Signed-off-by: Jonathan Springer <jps@s390x.com>
1 parent 0521f32 commit d5bc8bc

7 files changed

Lines changed: 213 additions & 50 deletions

File tree

mcpgateway/cache/session_registry.py

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1242,18 +1242,28 @@ def _db_remove() -> None:
12421242
logger.error(f"Database error removing session {session_id}: {e}")
12431243

12441244
# #4205: close any upstream MCP sessions this downstream session owned.
1245-
# Wrapped because the registry may not be initialized in every context
1246-
# (unit tests instantiate SessionRegistry directly), and an eviction
1247-
# failure must not interfere with downstream session teardown.
1248-
try:
1249-
# First-Party
1250-
from mcpgateway.services.upstream_session_registry import get_upstream_session_registry # pylint: disable=import-outside-toplevel
1245+
# Wrapped because (a) the registry may not be initialized in tests or
1246+
# very early bootstrap, and (b) an eviction failure must not interfere
1247+
# with downstream-session teardown. Eviction failure is logged at
1248+
# warning rather than debug because it leaves an orphaned upstream
1249+
# session whose presence is otherwise invisible to ops.
1250+
# First-Party
1251+
from mcpgateway.services.upstream_session_registry import ( # pylint: disable=import-outside-toplevel
1252+
RegistryNotInitializedError,
1253+
get_upstream_session_registry,
1254+
)
12511255

1256+
try:
12521257
await get_upstream_session_registry().evict_session(session_id)
1253-
except RuntimeError:
1254-
pass # Registry not initialized (tests, early shutdown) — nothing to do.
1255-
except Exception as exc:
1256-
logger.debug(f"Upstream session eviction for {session_id} failed: {exc}")
1258+
except RegistryNotInitializedError:
1259+
pass # Nothing to evict — tests or very-early bootstrap.
1260+
except Exception as exc: # noqa: BLE001
1261+
logger.warning(
1262+
"Upstream session eviction for downstream session %s failed (%s: %s); " "an orphaned upstream session may persist until its owner task exits",
1263+
session_id,
1264+
type(exc).__name__,
1265+
exc,
1266+
)
12571267

12581268
logger.info(f"Removed session: {session_id}")
12591269

mcpgateway/handlers/signal_handlers.py

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,16 @@
1313

1414

1515
async def sighup_reload() -> None:
16-
"""Clear SSL context cache and drain MCP session pool on SIGHUP for certificate rotation.
16+
"""Clear SSL context cache + drain upstream sessions on SIGHUP for certificate rotation.
1717
18-
Clears the SSL context cache to force recreation of SSL contexts
19-
with potentially updated certificates, and drains the MCP session
20-
pool so pooled connections reconnect with new TLS state.
18+
Three things have to happen in order for new TLS material to take effect
19+
on a worker without restart:
20+
1. Clear the SSL context cache so the next build uses new certs.
21+
2. Close every in-process upstream MCP session — they hold their TLS
22+
context on the socket and would keep using the old certs forever.
23+
3. Drain the session-affinity in-memory mapping so the next downstream
24+
request re-registers (Redis state survives; only the local fast-
25+
path cache is cleared).
2126
"""
2227
try:
2328
# First-Party
@@ -28,14 +33,31 @@ async def sighup_reload() -> None:
2833
except Exception as exc:
2934
logger.error(f"SIGHUP handler failed to clear SSL context cache: {exc}")
3035

36+
# #4205: upstream MCP sessions live in the registry now — draining only
37+
# the affinity mapping was leaving stale TLS contexts pinned to registry-
38+
# held ClientSessions.
39+
try:
40+
# First-Party
41+
from mcpgateway.services.upstream_session_registry import ( # pylint: disable=import-outside-toplevel
42+
RegistryNotInitializedError,
43+
get_upstream_session_registry,
44+
)
45+
46+
await get_upstream_session_registry().close_all()
47+
logger.info("SIGHUP: upstream session registry drained for TLS rotation")
48+
except RegistryNotInitializedError:
49+
logger.debug("SIGHUP: upstream session registry not initialised; skipping drain")
50+
except Exception as exc:
51+
logger.warning(f"SIGHUP: upstream session registry drain failed: {exc}")
52+
3153
try:
3254
# First-Party
3355
from mcpgateway.services.session_affinity import drain_session_affinity # pylint: disable=import-outside-toplevel
3456

3557
await drain_session_affinity()
36-
logger.info("SIGHUP: MCP session pool drained for TLS rotation")
58+
logger.info("SIGHUP: session-affinity mapping drained")
3759
except Exception as exc:
38-
logger.debug(f"SIGHUP: MCP session pool drain skipped: {exc}")
60+
logger.debug(f"SIGHUP: session-affinity drain skipped: {exc}")
3961

4062

4163
def sighup_handler(_signum: int, _frame: Any) -> None:

mcpgateway/services/gateway_service.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -446,15 +446,26 @@ async def _evict_upstream_sessions_for_gateway(gateway_id: str) -> int:
446446
The number of upstream sessions evicted (0 if the registry is
447447
unavailable or nothing matched).
448448
"""
449-
try:
450-
# First-Party
451-
from mcpgateway.services.upstream_session_registry import get_upstream_session_registry # pylint: disable=import-outside-toplevel
449+
# First-Party
450+
from mcpgateway.services.upstream_session_registry import ( # pylint: disable=import-outside-toplevel
451+
RegistryNotInitializedError,
452+
get_upstream_session_registry,
453+
)
452454

455+
try:
453456
return await get_upstream_session_registry().evict_gateway(gateway_id)
454-
except RuntimeError:
457+
except RegistryNotInitializedError:
458+
# Unit tests / very-early startup — nothing to evict by definition.
455459
return 0
456-
except Exception as exc: # noqa: BLE001 — log and swallow; see docstring
457-
logger.debug(f"Upstream session eviction for gateway {gateway_id} failed: {exc}")
460+
except Exception as exc: # noqa: BLE001 — see docstring; logged at warning because this
461+
# fires POST-commit: auth / URL / TLS change is already persisted, so a silent eviction
462+
# failure leaves in-flight downstream sessions talking to the stale gateway state.
463+
logger.warning(
464+
"Upstream session eviction for gateway %s failed (%s: %s); stale sessions may " "persist until their downstream session ends",
465+
gateway_id,
466+
type(exc).__name__,
467+
exc,
468+
)
458469
return 0
459470

460471

mcpgateway/services/server_classification_service.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -251,8 +251,16 @@ async def _perform_classification(self) -> None:
251251
self.CLASSIFICATION_METADATA_KEY,
252252
self.CLASSIFICATION_TIMESTAMP_KEY,
253253
)
254-
except Exception as exc: # noqa: BLE001 — best-effort purge
255-
logger.debug(f"Classification key purge failed: {exc}")
254+
except Exception as exc: # noqa: BLE001
255+
# Warn rather than debug: the whole point of this cycle is to KEEP the
256+
# classification keys absent so should_poll_server falls through to
257+
# "poll now". A sustained purge failure re-opens the exact regression
258+
# this method exists to prevent (#4205 follow-up). See the docstring.
259+
logger.warning(
260+
"Classification key purge failed (%s: %s); stale hot/cold state " "may linger in Redis and bias should_poll_server toward the cold schedule",
261+
type(exc).__name__,
262+
exc,
263+
)
256264

257265
async def get_server_classification(self, url: str) -> Optional[str]:
258266
"""Get classification for a server (hot/cold).

mcpgateway/services/upstream_session_registry.py

Lines changed: 84 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -170,8 +170,12 @@ def is_closed(self) -> bool:
170170
open_rx = getattr(state, "open_receive_channels", 1)
171171
if isinstance(open_rx, int) and open_rx == 0:
172172
return True
173-
except Exception: # nosec B110 — degrade gracefully if MCP internals shift
174-
pass
173+
except Exception as exc: # noqa: BLE001 — degrade gracefully if MCP internals shift
174+
logger.debug(
175+
"is_closed introspection on ClientSession internals raised %s: %s; " "next acquire will fall back to owner-task liveness only",
176+
type(exc).__name__,
177+
exc,
178+
)
175179
return False
176180

177181

@@ -225,7 +229,7 @@ async def owner() -> None:
225229
if req.message_handler_factory is not None:
226230
try:
227231
message_handler = req.message_handler_factory(req.url, req.gateway_id)
228-
except Exception as exc:
232+
except Exception as exc: # noqa: BLE001 — handler failure is not fatal
229233
logger.warning(
230234
"Failed to build message handler for %s: %s",
231235
sanitize_url_for_logging(req.url),
@@ -238,12 +242,32 @@ async def owner() -> None:
238242
# Block until the registry signals shutdown; do NOT rely on
239243
# task cancellation from a request handler (see class docs).
240244
await shutdown_event.wait()
241-
except BaseException as exc:
245+
except Exception as exc: # noqa: BLE001 — see below
246+
# Broad catch on purpose: the upstream-setup path runs many
247+
# third-party coroutines (httpx, anyio, MCP SDK) whose exception
248+
# classes we cannot enumerate. BaseException is deliberately NOT
249+
# caught — SystemExit / KeyboardInterrupt / CancelledError must
250+
# propagate so the task exits promptly during shutdown.
242251
if not ready.done():
243252
ready.set_exception(RuntimeError(f"Failed to create upstream MCP session for {req.url}: {exc}"))
244253

245254
task = asyncio.create_task(owner(), name=f"upstream-session-{sanitize_url_for_logging(req.url)}")
246255

256+
def _log_owner_exit(done_task: asyncio.Task) -> None:
257+
"""Surface unexpected owner-task deaths so an orphaned upstream session is visible to ops."""
258+
if done_task.cancelled():
259+
return
260+
exc = done_task.exception()
261+
if exc is not None:
262+
logger.warning(
263+
"Upstream MCP owner task for %s exited with %s: %s — upstream session may be orphaned",
264+
sanitize_url_for_logging(req.url),
265+
type(exc).__name__,
266+
exc,
267+
)
268+
269+
task.add_done_callback(_log_owner_exit)
270+
247271
success = False
248272
try:
249273
session, transport_ctx_ref = await asyncio.wait_for(ready, timeout=req.timeout_seconds)
@@ -255,7 +279,7 @@ async def owner() -> None:
255279
with anyio.move_on_after(_DEFAULT_SHUTDOWN_TIMEOUT_SECONDS):
256280
try:
257281
await task
258-
except BaseException: # nosec B110
282+
except (asyncio.CancelledError, Exception): # noqa: BLE001 — cleanup swallow
259283
pass
260284

261285
# Smuggle the task + shutdown event onto the transport_ctx so the registry
@@ -373,7 +397,24 @@ async def acquire(
373397
session.last_used = time.time()
374398
session.use_count += 1
375399

376-
yield session
400+
# Hand out the session with no lock held: MCP ClientSession multiplexes
401+
# concurrent requests over its transport via JSON-RPC ids, so there's no
402+
# reason to serialize callers. If the caller's body raises a transport-
403+
# level error (server closed the stream, socket broke), evict so the
404+
# next acquire rebuilds instead of handing out a dead session.
405+
try:
406+
yield session
407+
except (OSError, anyio.ClosedResourceError, anyio.BrokenResourceError) as exc:
408+
logger.info(
409+
"acquire() caller raised %s for gateway=%s; evicting upstream so next acquire rebuilds",
410+
type(exc).__name__,
411+
gateway_id,
412+
)
413+
await self._evict_key(key)
414+
raise
415+
# All other exceptions (tool-level errors from the upstream, caller
416+
# application errors) intentionally leave the session in place — the
417+
# transport is fine, the caller just didn't like the result.
377418

378419
async def evict_session(self, downstream_session_id: str) -> int:
379420
"""Close and remove every upstream session owned by this downstream session id.
@@ -403,11 +444,19 @@ async def evict_gateway(self, gateway_id: str) -> int:
403444
return count
404445

405446
async def close_all(self) -> None:
406-
"""Drain every upstream session. Intended for app shutdown."""
447+
"""Drain every upstream session concurrently. Intended for app shutdown.
448+
449+
Each ``_evict_key`` can take up to ``shutdown_timeout_seconds`` waiting
450+
for the owner task to exit; running them in series on a worker with
451+
dozens of downstream sessions would turn shutdown into a multi-minute
452+
stall. ``asyncio.gather`` caps the total drain at roughly
453+
``shutdown_timeout_seconds`` plus a small constant.
454+
"""
407455
async with self._global_lock:
408456
keys = list(self._sessions.keys())
409-
for key in keys:
410-
await self._evict_key(key)
457+
if not keys:
458+
return
459+
await asyncio.gather(*[self._evict_key(k) for k in keys], return_exceptions=True)
411460

412461
def snapshot(self) -> RegistrySnapshot:
413462
"""Return a point-in-time copy of the registry's counters."""
@@ -495,7 +544,17 @@ async def _create_session(
495544
)
496545

497546
async def _probe_health(self, upstream: UpstreamSession) -> bool:
498-
"""Run the health check chain against an idle session. Returns False if all probes fail."""
547+
"""Run the health check chain against an idle session. Returns False if all probes fail.
548+
549+
Exception policy: we ADVANCE on ``TimeoutError`` and on
550+
``McpError(METHOD_NOT_FOUND)`` (the server chose not to implement
551+
this probe), and we FAIL FAST on everything else transport- or
552+
protocol-level (``OSError`` / anyio stream errors / other ``McpError``s)
553+
— recreating a session on "permission denied" or "request too large"
554+
would loop against the same failure. Genuinely unexpected exceptions
555+
(``AttributeError`` from SDK drift, etc.) propagate so they surface in
556+
telemetry instead of silently triggering a reconnect loop.
557+
"""
499558
for method in _HEALTH_CHECK_CHAIN:
500559
try:
501560
if method == "skip":
@@ -517,7 +576,8 @@ async def _probe_health(self, upstream: UpstreamSession) -> bool:
517576
return False
518577
except TimeoutError:
519578
continue
520-
except Exception:
579+
except OSError:
580+
# Socket / stream error — upstream is dead.
521581
self._metrics.health_check_failures += 1
522582
return False
523583
self._metrics.health_check_failures += 1
@@ -562,6 +622,17 @@ class _AcquireDecision(Enum):
562622
_registry: Optional[UpstreamSessionRegistry] = None
563623

564624

625+
class RegistryNotInitializedError(RuntimeError):
626+
"""Raised when ``get_upstream_session_registry()`` is called before startup init.
627+
628+
Callers that need to distinguish "registry not available yet" from other
629+
runtime errors (so they can silently no-op in tests / early bootstrap
630+
without also swallowing unrelated ``RuntimeError``s like "Event loop is
631+
closed") should catch this type specifically. Inherits ``RuntimeError``
632+
for backwards compatibility with catch-sites written before the split.
633+
"""
634+
635+
565636
def init_upstream_session_registry(
566637
*,
567638
message_handler_factory: Optional[MessageHandlerFactory] = None,
@@ -574,9 +645,9 @@ def init_upstream_session_registry(
574645

575646

576647
def get_upstream_session_registry() -> UpstreamSessionRegistry:
577-
"""Return the process-wide registry or raise if it has not been initialized."""
648+
"""Return the process-wide registry or raise ``RegistryNotInitializedError``."""
578649
if _registry is None:
579-
raise RuntimeError("UpstreamSessionRegistry has not been initialized; call init_upstream_session_registry() first")
650+
raise RegistryNotInitializedError("UpstreamSessionRegistry has not been initialized; call init_upstream_session_registry() first")
580651
return _registry
581652

582653

tests/unit/mcpgateway/services/test_upstream_session_registry.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,15 @@ async def send_ping(self) -> None:
5656
if self.probe_exception is not None:
5757
raise self.probe_exception
5858
if not self.healthy:
59-
raise RuntimeError("ping failed")
59+
# Use a transport-level error — production _probe_health narrows its
60+
# catch to (OSError, ...) so unexpected exception classes propagate
61+
# as signals of SDK drift rather than silent reconnect loops.
62+
raise OSError("ping failed")
6063

6164
async def list_tools(self) -> None:
6265
self.list_tools_calls += 1
6366
if not self.healthy:
64-
raise RuntimeError("list_tools failed")
67+
raise OSError("list_tools failed")
6568

6669

6770
def _make_fake_factory():

0 commit comments

Comments
 (0)