From 27d69f99272b78ce927fb2707b3ccacbb596b206 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Sun, 3 May 2026 12:52:23 +0200 Subject: [PATCH] Manage SSE reconnect on transient 5xx/429 outages MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The browser's native EventSource auto-retry gives up once readyState flips to CLOSED — typical for a 4xx/5xx response with no text/event-stream body — and the previous onerror handler did not detect that, leaving the client stranded on the polling fallback for the rest of the session. Take ownership of the reconnect loop on the client: close the source on onerror+CLOSED, schedule a reopen with full-jitter exponential backoff capped at 30 s, and reset the counter on a successful onopen. The replay-on-open viewer-subscription path is unchanged. Tested with a Playwright-only integration_selenium test that fails the first SSE request with a 503 via page.route() and asserts the client reconnects and delivers a subsequently-pushed notification. --- .../composables/useNotificationSSE.test.ts | 161 +++++++++++++++++- client/src/composables/useNotificationSSE.ts | 134 ++++++++++++++- .../test_sse_reconnect.py | 151 ++++++++++++++++ 3 files changed, 437 insertions(+), 9 deletions(-) create mode 100644 test/integration_selenium/test_sse_reconnect.py diff --git a/client/src/composables/useNotificationSSE.test.ts b/client/src/composables/useNotificationSSE.test.ts index 9675e87963fc..f291a760fef7 100644 --- a/client/src/composables/useNotificationSSE.test.ts +++ b/client/src/composables/useNotificationSSE.test.ts @@ -9,14 +9,17 @@ import flushPromises from "flush-promises"; import { http, HttpResponse } from "msw"; -import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { type EffectScope, effectScope } from "vue"; import { useServerMock } from "@/api/client/__mocks__"; import { _resetHistoryViewerSubscriptionsForTest, + _resetSSESharedSourceForTest, addHistoryViewerSubscription, removeHistoryViewerSubscription, + useSSE, } from "./useNotificationSSE"; interface SubscriptionRequest { @@ -98,3 +101,159 @@ describe("useNotificationSSE viewer subscriptions", () => { expect(new Set(ids)).toEqual(new Set(["hist-A", "hist-B"])); }); }); + +/** + * Reconnect-on-CLOSED tests. + * + * The browser's native ``EventSource`` retries while ``readyState === + * CONNECTING`` but gives up once it flips to ``CLOSED`` — for example, when a + * 429/5xx response arrives without ``text/event-stream``. The composable must + * notice that flip and schedule a manual reopen with backoff so the client + * doesn't silently drop to polling-only updates for the rest of the session. + * + * We stub ``globalThis.EventSource`` with a fake whose lifecycle the test + * drives directly: this keeps the test off jsdom's ``EventSource`` (which + * doesn't actually open sockets) and gives us a deterministic handle on the + * instance count for the "a *new* EventSource was constructed after backoff" + * assertion. + */ +class FakeEventSource { + static CONNECTING = 0; + static OPEN = 1; + static CLOSED = 2; + static instances: FakeEventSource[] = []; + + readonly url: string; + readyState: number = FakeEventSource.CONNECTING; + onopen: (() => void) | null = null; + onerror: (() => void) | null = null; + onmessage: ((e: MessageEvent) => void) | null = null; + addEventListener = vi.fn(); + removeEventListener = vi.fn(); + close = vi.fn(() => { + this.readyState = FakeEventSource.CLOSED; + }); + + constructor(url: string) { + this.url = url; + FakeEventSource.instances.push(this); + } + + static reset() { + FakeEventSource.instances = []; + } +} + +describe("useNotificationSSE managed reconnect", () => { + let originalEventSource: typeof EventSource | undefined; + // ``useSSE`` registers an ``onScopeDispose`` cleanup; outside a Vue + // component setup that warns and ``vitest-fail-on-console`` upgrades the + // warning to a test failure. Wrap each test in an explicit scope so the + // disposal hook has somewhere to attach. + let scope: EffectScope; + + beforeEach(() => { + FakeEventSource.reset(); + originalEventSource = (globalThis as unknown as { EventSource?: typeof EventSource }).EventSource; + (globalThis as unknown as { EventSource: unknown }).EventSource = FakeEventSource; + vi.useFakeTimers(); + _resetSSESharedSourceForTest(); + scope = effectScope(); + }); + + afterEach(() => { + scope.stop(); + vi.useRealTimers(); + _resetSSESharedSourceForTest(); + if (originalEventSource) { + (globalThis as unknown as { EventSource: typeof EventSource }).EventSource = originalEventSource; + } else { + delete (globalThis as unknown as { EventSource?: unknown }).EventSource; + } + }); + + it("schedules a reopen when onerror fires with readyState=CLOSED", () => { + scope.run(() => { + const { connect } = useSSE(() => {}); + connect(); + }); + expect(FakeEventSource.instances).toHaveLength(1); + + const first = FakeEventSource.instances[0]!; + // Simulate the browser giving up on the native retry. + first.readyState = FakeEventSource.CLOSED; + first.onerror?.(); + + // Until the backoff fires, no replacement EventSource exists yet. + expect(FakeEventSource.instances).toHaveLength(1); + + // The first attempt's backoff is in [500ms, 1500ms); 2000ms is past + // the upper bound regardless of the jitter draw, so a fixed advance + // is deterministic without seeding ``Math.random``. + vi.advanceTimersByTime(2000); + expect(FakeEventSource.instances).toHaveLength(2); + }); + + it("does not reopen while readyState=CONNECTING (browser is still retrying natively)", () => { + scope.run(() => { + const { connect } = useSSE(() => {}); + connect(); + }); + const first = FakeEventSource.instances[0]!; + first.readyState = FakeEventSource.CONNECTING; + first.onerror?.(); + + // No manual scheduling while the browser is still trying — would + // otherwise double-up reconnect work and accelerate the retry loop. + // A full minute past any backoff envelope without a second instance + // proves the managed path stayed dormant. + vi.advanceTimersByTime(60_000); + expect(FakeEventSource.instances).toHaveLength(1); + }); + + it("resets the backoff counter on a successful onopen", () => { + scope.run(() => { + const { connect } = useSSE(() => {}); + connect(); + }); + + // Stack five back-to-back failures so the unjittered base delay grows + // to the 30s cap. After this loop the next attempt's envelope is + // [15s, 45s) — well outside the [500ms, 1500ms) envelope a freshly + // reset counter would produce. ``45_001`` clears the worst-case jitter + // draw on each iteration so the timer always fires. + const STACKED = 5; + for (let i = 0; i < STACKED; i++) { + const current = FakeEventSource.instances.at(-1)!; + current.readyState = FakeEventSource.CLOSED; + current.onerror?.(); + vi.advanceTimersByTime(45_001); + } + const beforeReset = FakeEventSource.instances.length; + expect(beforeReset).toBe(STACKED + 1); + + // Trigger a sixth failure and verify the *capped* envelope: a 2s + // advance is below the 15s lower bound, so no new instance appears. + // Without this assertion the reset test below would pass even if the + // counter never reset, since both paths would fire on a 2s advance. + const stale = FakeEventSource.instances.at(-1)!; + stale.readyState = FakeEventSource.CLOSED; + stale.onerror?.(); + vi.advanceTimersByTime(2000); + expect(FakeEventSource.instances).toHaveLength(beforeReset); + + // Drain the in-flight capped timer so we have a fresh source whose + // ``onopen`` will reset the counter. + vi.advanceTimersByTime(45_001); + expect(FakeEventSource.instances).toHaveLength(beforeReset + 1); + const reopened = FakeEventSource.instances.at(-1)!; + reopened.onopen?.(); + + // Counter reset → next failure's delay drops back to [500, 1500); a + // 2s advance must fire it. + reopened.readyState = FakeEventSource.CLOSED; + reopened.onerror?.(); + vi.advanceTimersByTime(2000); + expect(FakeEventSource.instances).toHaveLength(beforeReset + 2); + }); +}); diff --git a/client/src/composables/useNotificationSSE.ts b/client/src/composables/useNotificationSSE.ts index 2d5a561a0549..870cea45f71d 100644 --- a/client/src/composables/useNotificationSSE.ts +++ b/client/src/composables/useNotificationSSE.ts @@ -18,12 +18,20 @@ export type SSEEventType = (typeof SSE_EVENT_TYPES)[number]; interface SSEDebugGlobals { __galaxy_sse_connected?: boolean; __galaxy_sse_last_event_ts?: number; + __galaxy_sse_reconnect_attempts?: number; } function sseGlobals(): SSEDebugGlobals { return window as unknown as SSEDebugGlobals; } +// Full-jitter exponential backoff bounds for managed reconnect. Aligned with +// the retry budget shape used by the polling paths (see +// ``isRetryableApiError`` in ``client/src/utils/simple-error.ts``); 30 s caps +// the delay during sustained 429/5xx so the client doesn't drift to minutes. +const RECONNECT_BASE_MS = 1000; +const RECONNECT_CAP_MS = 30_000; + // --------------------------------------------------------------------------- // Module-level shared EventSource. // @@ -48,6 +56,15 @@ const subscribers: Map> = new Map(); // exact same listeners (``addEventListener`` matches by reference). const dispatchers: Map = new Map(); +// Managed-reconnect state. We take over from the browser's native auto-retry +// once it flags ``readyState === CLOSED`` so that responses lacking a +// ``text/event-stream`` content type (a 429 / 5xx page, an HTML error page +// from a load balancer, etc.) don't strand the client on the polling +// fallback. ``reconnectAttempts`` is the input to the backoff formula and is +// reset to zero on every successful ``onopen``. +let reconnectAttempts = 0; +let reconnectTimer: ReturnType | null = null; + function openSourceIfNeeded() { if (sharedSource) { return; @@ -78,6 +95,14 @@ function openSourceIfNeeded() { // Global readiness flag so Selenium tests can distinguish a working // SSE pipeline from the polling fallback. sseGlobals().__galaxy_sse_connected = true; + // The connection is healthy again — drop any pending managed reopen + // and zero the backoff so the next failure starts at the base delay + // rather than wherever the previous outage left off. + reconnectAttempts = 0; + if (reconnectTimer !== null) { + clearTimeout(reconnectTimer); + reconnectTimer = null; + } // Re-assert any viewer subscriptions the user accumulated. The server // doesn't carry app-level subscription state across reconnects (it // only knows the user from the cookie), so the client owns the source @@ -86,12 +111,17 @@ function openSourceIfNeeded() { }; sharedSource.onerror = () => { - // EventSource auto-reconnects natively; SSE-vs-polling is a - // config-level decision (see historyStore / notificationsStore), so - // we must not give up on transient errors here — doing so would leave - // the client with no updates at all. sharedConnected.value = false; sseGlobals().__galaxy_sse_connected = false; + // The browser auto-retries while ``readyState === CONNECTING``; let + // it. Once it flips to ``CLOSED`` (response missing + // ``text/event-stream``, repeated network failure giving up, etc.) + // the native loop is done and we own the reconnect — otherwise the + // client silently drops to polling-only updates for the rest of the + // session. + if (sharedSource?.readyState === EventSource.CLOSED) { + scheduleReconnect(); + } }; // Browser EventSource teardown during a full-page navigation @@ -118,11 +148,70 @@ function closeSource() { sharedSource = null; sharedConnected.value = false; sseGlobals().__galaxy_sse_connected = false; + // Cancel any pending managed reopen — without this, ``pagehide``-driven + // teardown could be followed by ``setTimeout`` re-opening a stream we + // just deliberately closed. + if (reconnectTimer !== null) { + clearTimeout(reconnectTimer); + reconnectTimer = null; + } + reconnectAttempts = 0; if (typeof window !== "undefined") { window.removeEventListener("pagehide", closeSource); } } +/** + * Tear down the EventSource without disturbing the subscriber map so the + * scheduled reopen ends up wired to the same handler set. ``closeSource`` is + * the right tool when *no* listener wants more events; this is the right tool + * when listeners still exist and only the underlying socket needs to cycle. + */ +function closeSourceForReconnect() { + if (!sharedSource) { + return; + } + for (const [eventType, dispatcher] of dispatchers) { + sharedSource.removeEventListener(eventType, dispatcher); + } + dispatchers.clear(); + sharedSource.close(); + sharedSource = null; + sharedConnected.value = false; + sseGlobals().__galaxy_sse_connected = false; +} + +function scheduleReconnect() { + if (reconnectTimer !== null) { + // Already armed; the active timer will handle the next attempt. + return; + } + // Full-jitter exponential backoff: the random factor in [0.5, 1.5) + // smears retries from clients hitting the same outage so a recovering + // server isn't met with a synchronized stampede. + const exp = Math.min(RECONNECT_CAP_MS, RECONNECT_BASE_MS * 2 ** reconnectAttempts); + const delay = Math.floor(exp * (0.5 + Math.random())); + reconnectAttempts += 1; + const globals = sseGlobals(); + globals.__galaxy_sse_reconnect_attempts = (globals.__galaxy_sse_reconnect_attempts ?? 0) + 1; + closeSourceForReconnect(); + reconnectTimer = setTimeout(() => { + reconnectTimer = null; + // Subscribers may have all unsubscribed during the outage; if so, the + // shared source should stay closed. + let hasSubscribers = false; + for (const subs of subscribers.values()) { + if (subs.size > 0) { + hasSubscribers = true; + break; + } + } + if (hasSubscribers) { + openSourceIfNeeded(); + } + }, delay); +} + function addSubscriber(onEvent: Handler, eventTypes: readonly SSEEventType[]) { for (const eventType of eventTypes) { let subs = subscribers.get(eventType); @@ -157,10 +246,15 @@ function removeSubscriber(onEvent: Handler, eventTypes: readonly SSEEventType[]) /** * Composable for subscribing to events on the shared SSE stream. * - * The browser's EventSource handles reconnection automatically and sends the - * ``Last-Event-ID`` header so the server can catch up on missed events. Only - * one EventSource is opened per tab regardless of how many callers invoke - * this composable; the composable multiplexes dispatch per event type. + * Reconnection: the browser's native auto-retry handles the cheap path + * (transient network blips while ``readyState === CONNECTING``); once the + * source flips to ``CLOSED`` — typically a 4xx/5xx response with no + * ``text/event-stream`` body, which most browsers treat as fatal — this + * composable takes over with full-jitter exponential backoff capped at 30 s. + * The server emits ``id:`` per event so the ``Last-Event-ID`` header on + * reconnect lets the server catch up on missed events. Only one EventSource + * is opened per tab regardless of how many callers invoke this composable; + * the composable multiplexes dispatch per event type. * * @param onEvent - callback invoked for every matching SSE event * @param eventTypes - subset of event types to listen to (defaults to all) @@ -291,3 +385,27 @@ export function removeHistoryViewerSubscription(historyId: string): void { export function _resetHistoryViewerSubscriptionsForTest(): void { viewerSubscriptions.clear(); } + +/** Test-only: tear down the shared source and reconnect state. */ +export function _resetSSESharedSourceForTest(): void { + if (reconnectTimer !== null) { + clearTimeout(reconnectTimer); + reconnectTimer = null; + } + reconnectAttempts = 0; + if (sharedSource) { + for (const [eventType, dispatcher] of dispatchers) { + sharedSource.removeEventListener(eventType, dispatcher); + } + dispatchers.clear(); + sharedSource.close(); + sharedSource = null; + } + subscribers.clear(); + sharedConnected.value = false; + sseEverConnected.value = false; + const globals = sseGlobals(); + delete globals.__galaxy_sse_connected; + delete globals.__galaxy_sse_last_event_ts; + delete globals.__galaxy_sse_reconnect_attempts; +} diff --git a/test/integration_selenium/test_sse_reconnect.py b/test/integration_selenium/test_sse_reconnect.py new file mode 100644 index 000000000000..08fcd78502d7 --- /dev/null +++ b/test/integration_selenium/test_sse_reconnect.py @@ -0,0 +1,151 @@ +"""Playwright E2E test for the SSE managed-reconnect path. + +The browser's native ``EventSource`` retry gives up once it sees a non-event +stream response (typical for a 429 or 5xx fronted by a load balancer that +serves an HTML error page); the composable in +``client/src/composables/useNotificationSSE.ts`` notices that ``readyState`` +flips to ``CLOSED`` and reopens the stream with full-jitter exponential +backoff. This test fails the first request to ``/api/events/stream`` with a +503, then lets subsequent requests through, and asserts: + +1. The frontend's reconnect-attempts global advances (proving we went through + the managed path, not the native loop). +2. The stream comes back up (``window.__galaxy_sse_connected``). +3. Events delivered on the *new* stream actually fire the SSE listener — a + regression test for the case where the EventSource reconnects but the + per-event-type dispatchers were lost. + +Failure injection is browser-side via ``page.route()`` so no production server +code carries a test-only disconnect primitive. +""" + +from uuid import uuid4 + +from galaxy.util.wait import wait_on +from galaxy_test.selenium.framework import ( + managed_history, + playwright_only, + selenium_test, +) +from .framework import SeleniumIntegrationTestCase + +SSE_CONNECT_TIMEOUT_SECONDS = 30 +SSE_EVENT_TIMEOUT_SECONDS = 15 + + +class TestSSEReconnectSeleniumIntegration(SeleniumIntegrationTestCase): + ensure_registered = True + + @classmethod + def handle_galaxy_config_kwds(cls, config): + super().handle_galaxy_config_kwds(config) + # Mirrors test_notification_sse.py — the SSE pipeline is only enabled + # when both notifications and the SSE flag are on, and Celery is + # disabled so notification dispatch goes through the synchronous path + # the test asserts on. + config["enable_notification_system"] = True + config["enable_sse_updates"] = True + config["enable_celery_tasks"] = False + + def _wait_for_sse_connected(self) -> None: + wait_on( + lambda: True if self.execute_script("return window.__galaxy_sse_connected === true") else None, + "window.__galaxy_sse_connected === true", + timeout=SSE_CONNECT_TIMEOUT_SECONDS, + ) + + def _last_sse_event_ts(self) -> int: + return self.execute_script("return window.__galaxy_sse_last_event_ts || 0") or 0 + + def _wait_for_sse_event_after(self, baseline_ts: int) -> None: + wait_on( + lambda: True if self._last_sse_event_ts() > baseline_ts else None, + "window.__galaxy_sse_last_event_ts advanced past baseline", + timeout=SSE_EVENT_TIMEOUT_SECONDS, + ) + + def _reconnect_attempts(self) -> int: + return self.execute_script("return window.__galaxy_sse_reconnect_attempts || 0") or 0 + + @selenium_test + @managed_history + @playwright_only("Uses Playwright page.route() to fail the SSE endpoint with a transient 503.") + def test_client_reconnects_after_transient_503(self) -> None: + """Force a 503 on the first SSE request, then assert reconnect + event delivery.""" + # Resolve the browser's logged-in user (the Selenium-registered one) + # — same caveat as test_notification_sse.py: the API interactor's + # default-user key is a different user, so the SSE push must target + # the user attached to the browser cookie. + user_info = self.api_get("users/current") + user_id = user_info["id"] + + # Arm a one-shot 503 BEFORE navigating so the route is in place the + # moment the page opens its EventSource. ``served`` is closed over so + # only the first matching request is intercepted; subsequent attempts + # (the actual reconnect) hit the real Galaxy endpoint. + served = {"v": False} + + def handle_route(route, request): + if not served["v"]: + served["v"] = True + # 503 is in client-side RETRYABLE_STATUSES; the response has + # no text/event-stream body, which makes EventSource flip to + # readyState=CLOSED — exactly the path the managed reconnect + # handler covers. + route.fulfill( + status=503, + headers={"Content-Type": "text/plain"}, + body="overloaded", + ) + else: + route.continue_() + + self.page.route("**/api/events/stream", handle_route) + + # Navigate to a page that subscribes to the SSE pipeline. + self.get("user/notifications") + + # Connection must come up via the managed reconnect path. The first + # attempt is failed by the route above; the composable's onerror sees + # readyState=CLOSED, increments __galaxy_sse_reconnect_attempts, and + # schedules a reopen at backoff in [500ms, 1500ms). 30s budget covers + # ample time for navigation + reconnect on a slow CI runner. + self._wait_for_sse_connected() + + # Prove the connection arrived via reconnect, not on the first try — + # if the route hadn't fired (or 503 hadn't been treated as a fatal + # error by EventSource), the counter would still be 0. + attempts_after_reconnect = self._reconnect_attempts() + assert ( + attempts_after_reconnect >= 1 + ), f"expected at least one managed reconnect after a 503, got {attempts_after_reconnect}" + assert served["v"], "the route handler never fired — Playwright did not intercept the SSE request" + self.screenshot("sse_reconnect_connected") + + # Now push a notification to the browser-logged-in user. The event + # must arrive on the *reconnected* stream (the original was killed + # by the 503), exercising the dispatcher re-registration in the + # reconnect path. + baseline_ts = self._last_sse_event_ts() + subject = f"SSE Reconnect Test {uuid4()}" + notification_request = { + "recipients": {"user_ids": [user_id]}, + "notification": { + "source": "integration_tests", + "variant": "info", + "category": "message", + "content": { + "category": "message", + "subject": subject, + "message": "Delivered after a transient 503 + managed reconnect", + }, + }, + } + response = self._post("notifications", data=notification_request, admin=True, json=True) + self._assert_status_code_is_ok(response) + + # Timestamp only advances when the SSE listener fires — distinguishes + # SSE delivery from the polling fallback even in this reconnect path. + self._wait_for_sse_event_after(baseline_ts) + self.wait_for_xpath_visible(f'//*[contains(text(), "{subject}")]', timeout=SSE_EVENT_TIMEOUT_SECONDS) + self.screenshot("sse_reconnect_event_delivered")