diff --git a/client/src/composables/useNotificationSSE.test.ts b/client/src/composables/useNotificationSSE.test.ts index 9675e87963fc..f291a760fef7 100644 --- a/client/src/composables/useNotificationSSE.test.ts +++ b/client/src/composables/useNotificationSSE.test.ts @@ -9,14 +9,17 @@ import flushPromises from "flush-promises"; import { http, HttpResponse } from "msw"; -import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { type EffectScope, effectScope } from "vue"; import { useServerMock } from "@/api/client/__mocks__"; import { _resetHistoryViewerSubscriptionsForTest, + _resetSSESharedSourceForTest, addHistoryViewerSubscription, removeHistoryViewerSubscription, + useSSE, } from "./useNotificationSSE"; interface SubscriptionRequest { @@ -98,3 +101,159 @@ describe("useNotificationSSE viewer subscriptions", () => { expect(new Set(ids)).toEqual(new Set(["hist-A", "hist-B"])); }); }); + +/** + * Reconnect-on-CLOSED tests. + * + * The browser's native ``EventSource`` retries while ``readyState === + * CONNECTING`` but gives up once it flips to ``CLOSED`` — for example, when a + * 429/5xx response arrives without ``text/event-stream``. The composable must + * notice that flip and schedule a manual reopen with backoff so the client + * doesn't silently drop to polling-only updates for the rest of the session. + * + * We stub ``globalThis.EventSource`` with a fake whose lifecycle the test + * drives directly: this keeps the test off jsdom's ``EventSource`` (which + * doesn't actually open sockets) and gives us a deterministic handle on the + * instance count for the "a *new* EventSource was constructed after backoff" + * assertion. + */ +class FakeEventSource { + static CONNECTING = 0; + static OPEN = 1; + static CLOSED = 2; + static instances: FakeEventSource[] = []; + + readonly url: string; + readyState: number = FakeEventSource.CONNECTING; + onopen: (() => void) | null = null; + onerror: (() => void) | null = null; + onmessage: ((e: MessageEvent) => void) | null = null; + addEventListener = vi.fn(); + removeEventListener = vi.fn(); + close = vi.fn(() => { + this.readyState = FakeEventSource.CLOSED; + }); + + constructor(url: string) { + this.url = url; + FakeEventSource.instances.push(this); + } + + static reset() { + FakeEventSource.instances = []; + } +} + +describe("useNotificationSSE managed reconnect", () => { + let originalEventSource: typeof EventSource | undefined; + // ``useSSE`` registers an ``onScopeDispose`` cleanup; outside a Vue + // component setup that warns and ``vitest-fail-on-console`` upgrades the + // warning to a test failure. Wrap each test in an explicit scope so the + // disposal hook has somewhere to attach. + let scope: EffectScope; + + beforeEach(() => { + FakeEventSource.reset(); + originalEventSource = (globalThis as unknown as { EventSource?: typeof EventSource }).EventSource; + (globalThis as unknown as { EventSource: unknown }).EventSource = FakeEventSource; + vi.useFakeTimers(); + _resetSSESharedSourceForTest(); + scope = effectScope(); + }); + + afterEach(() => { + scope.stop(); + vi.useRealTimers(); + _resetSSESharedSourceForTest(); + if (originalEventSource) { + (globalThis as unknown as { EventSource: typeof EventSource }).EventSource = originalEventSource; + } else { + delete (globalThis as unknown as { EventSource?: unknown }).EventSource; + } + }); + + it("schedules a reopen when onerror fires with readyState=CLOSED", () => { + scope.run(() => { + const { connect } = useSSE(() => {}); + connect(); + }); + expect(FakeEventSource.instances).toHaveLength(1); + + const first = FakeEventSource.instances[0]!; + // Simulate the browser giving up on the native retry. + first.readyState = FakeEventSource.CLOSED; + first.onerror?.(); + + // Until the backoff fires, no replacement EventSource exists yet. + expect(FakeEventSource.instances).toHaveLength(1); + + // The first attempt's backoff is in [500ms, 1500ms); 2000ms is past + // the upper bound regardless of the jitter draw, so a fixed advance + // is deterministic without seeding ``Math.random``. + vi.advanceTimersByTime(2000); + expect(FakeEventSource.instances).toHaveLength(2); + }); + + it("does not reopen while readyState=CONNECTING (browser is still retrying natively)", () => { + scope.run(() => { + const { connect } = useSSE(() => {}); + connect(); + }); + const first = FakeEventSource.instances[0]!; + first.readyState = FakeEventSource.CONNECTING; + first.onerror?.(); + + // No manual scheduling while the browser is still trying — would + // otherwise double-up reconnect work and accelerate the retry loop. + // A full minute past any backoff envelope without a second instance + // proves the managed path stayed dormant. + vi.advanceTimersByTime(60_000); + expect(FakeEventSource.instances).toHaveLength(1); + }); + + it("resets the backoff counter on a successful onopen", () => { + scope.run(() => { + const { connect } = useSSE(() => {}); + connect(); + }); + + // Stack five back-to-back failures so the unjittered base delay grows + // to the 30s cap. After this loop the next attempt's envelope is + // [15s, 45s) — well outside the [500ms, 1500ms) envelope a freshly + // reset counter would produce. ``45_001`` clears the worst-case jitter + // draw on each iteration so the timer always fires. + const STACKED = 5; + for (let i = 0; i < STACKED; i++) { + const current = FakeEventSource.instances.at(-1)!; + current.readyState = FakeEventSource.CLOSED; + current.onerror?.(); + vi.advanceTimersByTime(45_001); + } + const beforeReset = FakeEventSource.instances.length; + expect(beforeReset).toBe(STACKED + 1); + + // Trigger a sixth failure and verify the *capped* envelope: a 2s + // advance is below the 15s lower bound, so no new instance appears. + // Without this assertion the reset test below would pass even if the + // counter never reset, since both paths would fire on a 2s advance. + const stale = FakeEventSource.instances.at(-1)!; + stale.readyState = FakeEventSource.CLOSED; + stale.onerror?.(); + vi.advanceTimersByTime(2000); + expect(FakeEventSource.instances).toHaveLength(beforeReset); + + // Drain the in-flight capped timer so we have a fresh source whose + // ``onopen`` will reset the counter. + vi.advanceTimersByTime(45_001); + expect(FakeEventSource.instances).toHaveLength(beforeReset + 1); + const reopened = FakeEventSource.instances.at(-1)!; + reopened.onopen?.(); + + // Counter reset → next failure's delay drops back to [500, 1500); a + // 2s advance must fire it. + reopened.readyState = FakeEventSource.CLOSED; + reopened.onerror?.(); + vi.advanceTimersByTime(2000); + expect(FakeEventSource.instances).toHaveLength(beforeReset + 2); + }); +}); diff --git a/client/src/composables/useNotificationSSE.ts b/client/src/composables/useNotificationSSE.ts index 2d5a561a0549..870cea45f71d 100644 --- a/client/src/composables/useNotificationSSE.ts +++ b/client/src/composables/useNotificationSSE.ts @@ -18,12 +18,20 @@ export type SSEEventType = (typeof SSE_EVENT_TYPES)[number]; interface SSEDebugGlobals { __galaxy_sse_connected?: boolean; __galaxy_sse_last_event_ts?: number; + __galaxy_sse_reconnect_attempts?: number; } function sseGlobals(): SSEDebugGlobals { return window as unknown as SSEDebugGlobals; } +// Full-jitter exponential backoff bounds for managed reconnect. Aligned with +// the retry budget shape used by the polling paths (see +// ``isRetryableApiError`` in ``client/src/utils/simple-error.ts``); 30 s caps +// the delay during sustained 429/5xx so the client doesn't drift to minutes. +const RECONNECT_BASE_MS = 1000; +const RECONNECT_CAP_MS = 30_000; + // --------------------------------------------------------------------------- // Module-level shared EventSource. // @@ -48,6 +56,15 @@ const subscribers: Map> = new Map(); // exact same listeners (``addEventListener`` matches by reference). const dispatchers: Map = new Map(); +// Managed-reconnect state. We take over from the browser's native auto-retry +// once it flags ``readyState === CLOSED`` so that responses lacking a +// ``text/event-stream`` content type (a 429 / 5xx page, an HTML error page +// from a load balancer, etc.) don't strand the client on the polling +// fallback. ``reconnectAttempts`` is the input to the backoff formula and is +// reset to zero on every successful ``onopen``. +let reconnectAttempts = 0; +let reconnectTimer: ReturnType | null = null; + function openSourceIfNeeded() { if (sharedSource) { return; @@ -78,6 +95,14 @@ function openSourceIfNeeded() { // Global readiness flag so Selenium tests can distinguish a working // SSE pipeline from the polling fallback. sseGlobals().__galaxy_sse_connected = true; + // The connection is healthy again — drop any pending managed reopen + // and zero the backoff so the next failure starts at the base delay + // rather than wherever the previous outage left off. + reconnectAttempts = 0; + if (reconnectTimer !== null) { + clearTimeout(reconnectTimer); + reconnectTimer = null; + } // Re-assert any viewer subscriptions the user accumulated. The server // doesn't carry app-level subscription state across reconnects (it // only knows the user from the cookie), so the client owns the source @@ -86,12 +111,17 @@ function openSourceIfNeeded() { }; sharedSource.onerror = () => { - // EventSource auto-reconnects natively; SSE-vs-polling is a - // config-level decision (see historyStore / notificationsStore), so - // we must not give up on transient errors here — doing so would leave - // the client with no updates at all. sharedConnected.value = false; sseGlobals().__galaxy_sse_connected = false; + // The browser auto-retries while ``readyState === CONNECTING``; let + // it. Once it flips to ``CLOSED`` (response missing + // ``text/event-stream``, repeated network failure giving up, etc.) + // the native loop is done and we own the reconnect — otherwise the + // client silently drops to polling-only updates for the rest of the + // session. + if (sharedSource?.readyState === EventSource.CLOSED) { + scheduleReconnect(); + } }; // Browser EventSource teardown during a full-page navigation @@ -118,11 +148,70 @@ function closeSource() { sharedSource = null; sharedConnected.value = false; sseGlobals().__galaxy_sse_connected = false; + // Cancel any pending managed reopen — without this, ``pagehide``-driven + // teardown could be followed by ``setTimeout`` re-opening a stream we + // just deliberately closed. + if (reconnectTimer !== null) { + clearTimeout(reconnectTimer); + reconnectTimer = null; + } + reconnectAttempts = 0; if (typeof window !== "undefined") { window.removeEventListener("pagehide", closeSource); } } +/** + * Tear down the EventSource without disturbing the subscriber map so the + * scheduled reopen ends up wired to the same handler set. ``closeSource`` is + * the right tool when *no* listener wants more events; this is the right tool + * when listeners still exist and only the underlying socket needs to cycle. + */ +function closeSourceForReconnect() { + if (!sharedSource) { + return; + } + for (const [eventType, dispatcher] of dispatchers) { + sharedSource.removeEventListener(eventType, dispatcher); + } + dispatchers.clear(); + sharedSource.close(); + sharedSource = null; + sharedConnected.value = false; + sseGlobals().__galaxy_sse_connected = false; +} + +function scheduleReconnect() { + if (reconnectTimer !== null) { + // Already armed; the active timer will handle the next attempt. + return; + } + // Full-jitter exponential backoff: the random factor in [0.5, 1.5) + // smears retries from clients hitting the same outage so a recovering + // server isn't met with a synchronized stampede. + const exp = Math.min(RECONNECT_CAP_MS, RECONNECT_BASE_MS * 2 ** reconnectAttempts); + const delay = Math.floor(exp * (0.5 + Math.random())); + reconnectAttempts += 1; + const globals = sseGlobals(); + globals.__galaxy_sse_reconnect_attempts = (globals.__galaxy_sse_reconnect_attempts ?? 0) + 1; + closeSourceForReconnect(); + reconnectTimer = setTimeout(() => { + reconnectTimer = null; + // Subscribers may have all unsubscribed during the outage; if so, the + // shared source should stay closed. + let hasSubscribers = false; + for (const subs of subscribers.values()) { + if (subs.size > 0) { + hasSubscribers = true; + break; + } + } + if (hasSubscribers) { + openSourceIfNeeded(); + } + }, delay); +} + function addSubscriber(onEvent: Handler, eventTypes: readonly SSEEventType[]) { for (const eventType of eventTypes) { let subs = subscribers.get(eventType); @@ -157,10 +246,15 @@ function removeSubscriber(onEvent: Handler, eventTypes: readonly SSEEventType[]) /** * Composable for subscribing to events on the shared SSE stream. * - * The browser's EventSource handles reconnection automatically and sends the - * ``Last-Event-ID`` header so the server can catch up on missed events. Only - * one EventSource is opened per tab regardless of how many callers invoke - * this composable; the composable multiplexes dispatch per event type. + * Reconnection: the browser's native auto-retry handles the cheap path + * (transient network blips while ``readyState === CONNECTING``); once the + * source flips to ``CLOSED`` — typically a 4xx/5xx response with no + * ``text/event-stream`` body, which most browsers treat as fatal — this + * composable takes over with full-jitter exponential backoff capped at 30 s. + * The server emits ``id:`` per event so the ``Last-Event-ID`` header on + * reconnect lets the server catch up on missed events. Only one EventSource + * is opened per tab regardless of how many callers invoke this composable; + * the composable multiplexes dispatch per event type. * * @param onEvent - callback invoked for every matching SSE event * @param eventTypes - subset of event types to listen to (defaults to all) @@ -291,3 +385,27 @@ export function removeHistoryViewerSubscription(historyId: string): void { export function _resetHistoryViewerSubscriptionsForTest(): void { viewerSubscriptions.clear(); } + +/** Test-only: tear down the shared source and reconnect state. */ +export function _resetSSESharedSourceForTest(): void { + if (reconnectTimer !== null) { + clearTimeout(reconnectTimer); + reconnectTimer = null; + } + reconnectAttempts = 0; + if (sharedSource) { + for (const [eventType, dispatcher] of dispatchers) { + sharedSource.removeEventListener(eventType, dispatcher); + } + dispatchers.clear(); + sharedSource.close(); + sharedSource = null; + } + subscribers.clear(); + sharedConnected.value = false; + sseEverConnected.value = false; + const globals = sseGlobals(); + delete globals.__galaxy_sse_connected; + delete globals.__galaxy_sse_last_event_ts; + delete globals.__galaxy_sse_reconnect_attempts; +} diff --git a/test/integration_selenium/test_sse_reconnect.py b/test/integration_selenium/test_sse_reconnect.py new file mode 100644 index 000000000000..08fcd78502d7 --- /dev/null +++ b/test/integration_selenium/test_sse_reconnect.py @@ -0,0 +1,151 @@ +"""Playwright E2E test for the SSE managed-reconnect path. + +The browser's native ``EventSource`` retry gives up once it sees a non-event +stream response (typical for a 429 or 5xx fronted by a load balancer that +serves an HTML error page); the composable in +``client/src/composables/useNotificationSSE.ts`` notices that ``readyState`` +flips to ``CLOSED`` and reopens the stream with full-jitter exponential +backoff. This test fails the first request to ``/api/events/stream`` with a +503, then lets subsequent requests through, and asserts: + +1. The frontend's reconnect-attempts global advances (proving we went through + the managed path, not the native loop). +2. The stream comes back up (``window.__galaxy_sse_connected``). +3. Events delivered on the *new* stream actually fire the SSE listener — a + regression test for the case where the EventSource reconnects but the + per-event-type dispatchers were lost. + +Failure injection is browser-side via ``page.route()`` so no production server +code carries a test-only disconnect primitive. +""" + +from uuid import uuid4 + +from galaxy.util.wait import wait_on +from galaxy_test.selenium.framework import ( + managed_history, + playwright_only, + selenium_test, +) +from .framework import SeleniumIntegrationTestCase + +SSE_CONNECT_TIMEOUT_SECONDS = 30 +SSE_EVENT_TIMEOUT_SECONDS = 15 + + +class TestSSEReconnectSeleniumIntegration(SeleniumIntegrationTestCase): + ensure_registered = True + + @classmethod + def handle_galaxy_config_kwds(cls, config): + super().handle_galaxy_config_kwds(config) + # Mirrors test_notification_sse.py — the SSE pipeline is only enabled + # when both notifications and the SSE flag are on, and Celery is + # disabled so notification dispatch goes through the synchronous path + # the test asserts on. + config["enable_notification_system"] = True + config["enable_sse_updates"] = True + config["enable_celery_tasks"] = False + + def _wait_for_sse_connected(self) -> None: + wait_on( + lambda: True if self.execute_script("return window.__galaxy_sse_connected === true") else None, + "window.__galaxy_sse_connected === true", + timeout=SSE_CONNECT_TIMEOUT_SECONDS, + ) + + def _last_sse_event_ts(self) -> int: + return self.execute_script("return window.__galaxy_sse_last_event_ts || 0") or 0 + + def _wait_for_sse_event_after(self, baseline_ts: int) -> None: + wait_on( + lambda: True if self._last_sse_event_ts() > baseline_ts else None, + "window.__galaxy_sse_last_event_ts advanced past baseline", + timeout=SSE_EVENT_TIMEOUT_SECONDS, + ) + + def _reconnect_attempts(self) -> int: + return self.execute_script("return window.__galaxy_sse_reconnect_attempts || 0") or 0 + + @selenium_test + @managed_history + @playwright_only("Uses Playwright page.route() to fail the SSE endpoint with a transient 503.") + def test_client_reconnects_after_transient_503(self) -> None: + """Force a 503 on the first SSE request, then assert reconnect + event delivery.""" + # Resolve the browser's logged-in user (the Selenium-registered one) + # — same caveat as test_notification_sse.py: the API interactor's + # default-user key is a different user, so the SSE push must target + # the user attached to the browser cookie. + user_info = self.api_get("users/current") + user_id = user_info["id"] + + # Arm a one-shot 503 BEFORE navigating so the route is in place the + # moment the page opens its EventSource. ``served`` is closed over so + # only the first matching request is intercepted; subsequent attempts + # (the actual reconnect) hit the real Galaxy endpoint. + served = {"v": False} + + def handle_route(route, request): + if not served["v"]: + served["v"] = True + # 503 is in client-side RETRYABLE_STATUSES; the response has + # no text/event-stream body, which makes EventSource flip to + # readyState=CLOSED — exactly the path the managed reconnect + # handler covers. + route.fulfill( + status=503, + headers={"Content-Type": "text/plain"}, + body="overloaded", + ) + else: + route.continue_() + + self.page.route("**/api/events/stream", handle_route) + + # Navigate to a page that subscribes to the SSE pipeline. + self.get("user/notifications") + + # Connection must come up via the managed reconnect path. The first + # attempt is failed by the route above; the composable's onerror sees + # readyState=CLOSED, increments __galaxy_sse_reconnect_attempts, and + # schedules a reopen at backoff in [500ms, 1500ms). 30s budget covers + # ample time for navigation + reconnect on a slow CI runner. + self._wait_for_sse_connected() + + # Prove the connection arrived via reconnect, not on the first try — + # if the route hadn't fired (or 503 hadn't been treated as a fatal + # error by EventSource), the counter would still be 0. + attempts_after_reconnect = self._reconnect_attempts() + assert ( + attempts_after_reconnect >= 1 + ), f"expected at least one managed reconnect after a 503, got {attempts_after_reconnect}" + assert served["v"], "the route handler never fired — Playwright did not intercept the SSE request" + self.screenshot("sse_reconnect_connected") + + # Now push a notification to the browser-logged-in user. The event + # must arrive on the *reconnected* stream (the original was killed + # by the 503), exercising the dispatcher re-registration in the + # reconnect path. + baseline_ts = self._last_sse_event_ts() + subject = f"SSE Reconnect Test {uuid4()}" + notification_request = { + "recipients": {"user_ids": [user_id]}, + "notification": { + "source": "integration_tests", + "variant": "info", + "category": "message", + "content": { + "category": "message", + "subject": subject, + "message": "Delivered after a transient 503 + managed reconnect", + }, + }, + } + response = self._post("notifications", data=notification_request, admin=True, json=True) + self._assert_status_code_is_ok(response) + + # Timestamp only advances when the SSE listener fires — distinguishes + # SSE delivery from the polling fallback even in this reconnect path. + self._wait_for_sse_event_after(baseline_ts) + self.wait_for_xpath_visible(f'//*[contains(text(), "{subject}")]', timeout=SSE_EVENT_TIMEOUT_SECONDS) + self.screenshot("sse_reconnect_event_delivered")