Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 160 additions & 1 deletion client/src/composables/useNotificationSSE.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,17 @@

import flushPromises from "flush-promises";
import { http, HttpResponse } from "msw";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { type EffectScope, effectScope } from "vue";

import { useServerMock } from "@/api/client/__mocks__";

import {
_resetHistoryViewerSubscriptionsForTest,
_resetSSESharedSourceForTest,
addHistoryViewerSubscription,
removeHistoryViewerSubscription,
useSSE,
} from "./useNotificationSSE";

interface SubscriptionRequest {
Expand Down Expand Up @@ -98,3 +101,159 @@ describe("useNotificationSSE viewer subscriptions", () => {
expect(new Set(ids)).toEqual(new Set(["hist-A", "hist-B"]));
});
});

/**
* Reconnect-on-CLOSED tests.
*
* The browser's native ``EventSource`` retries while ``readyState ===
* CONNECTING`` but gives up once it flips to ``CLOSED`` — for example, when a
* 429/5xx response arrives without ``text/event-stream``. The composable must
* notice that flip and schedule a manual reopen with backoff so the client
* doesn't silently drop to polling-only updates for the rest of the session.
*
* We stub ``globalThis.EventSource`` with a fake whose lifecycle the test
* drives directly: this keeps the test off jsdom's ``EventSource`` (which
* doesn't actually open sockets) and gives us a deterministic handle on the
* instance count for the "a *new* EventSource was constructed after backoff"
* assertion.
*/
class FakeEventSource {
static CONNECTING = 0;
static OPEN = 1;
static CLOSED = 2;
static instances: FakeEventSource[] = [];

readonly url: string;
readyState: number = FakeEventSource.CONNECTING;
onopen: (() => void) | null = null;
onerror: (() => void) | null = null;
onmessage: ((e: MessageEvent) => void) | null = null;
addEventListener = vi.fn();
removeEventListener = vi.fn();
close = vi.fn(() => {
this.readyState = FakeEventSource.CLOSED;
});

constructor(url: string) {
this.url = url;
FakeEventSource.instances.push(this);
}

static reset() {
FakeEventSource.instances = [];
}
}

describe("useNotificationSSE managed reconnect", () => {
let originalEventSource: typeof EventSource | undefined;
// ``useSSE`` registers an ``onScopeDispose`` cleanup; outside a Vue
// component setup that warns and ``vitest-fail-on-console`` upgrades the
// warning to a test failure. Wrap each test in an explicit scope so the
// disposal hook has somewhere to attach.
let scope: EffectScope;

beforeEach(() => {
FakeEventSource.reset();
originalEventSource = (globalThis as unknown as { EventSource?: typeof EventSource }).EventSource;
(globalThis as unknown as { EventSource: unknown }).EventSource = FakeEventSource;
vi.useFakeTimers();
_resetSSESharedSourceForTest();
scope = effectScope();
});

afterEach(() => {
scope.stop();
vi.useRealTimers();
_resetSSESharedSourceForTest();
if (originalEventSource) {
(globalThis as unknown as { EventSource: typeof EventSource }).EventSource = originalEventSource;
} else {
delete (globalThis as unknown as { EventSource?: unknown }).EventSource;
}
});

it("schedules a reopen when onerror fires with readyState=CLOSED", () => {
scope.run(() => {
const { connect } = useSSE(() => {});
connect();
});
expect(FakeEventSource.instances).toHaveLength(1);

const first = FakeEventSource.instances[0]!;
// Simulate the browser giving up on the native retry.
first.readyState = FakeEventSource.CLOSED;
first.onerror?.();

// Until the backoff fires, no replacement EventSource exists yet.
expect(FakeEventSource.instances).toHaveLength(1);

// The first attempt's backoff is in [500ms, 1500ms); 2000ms is past
// the upper bound regardless of the jitter draw, so a fixed advance
// is deterministic without seeding ``Math.random``.
vi.advanceTimersByTime(2000);
expect(FakeEventSource.instances).toHaveLength(2);
});

it("does not reopen while readyState=CONNECTING (browser is still retrying natively)", () => {
scope.run(() => {
const { connect } = useSSE(() => {});
connect();
});
const first = FakeEventSource.instances[0]!;
first.readyState = FakeEventSource.CONNECTING;
first.onerror?.();

// No manual scheduling while the browser is still trying — would
// otherwise double-up reconnect work and accelerate the retry loop.
// A full minute past any backoff envelope without a second instance
// proves the managed path stayed dormant.
vi.advanceTimersByTime(60_000);
expect(FakeEventSource.instances).toHaveLength(1);
});

it("resets the backoff counter on a successful onopen", () => {
scope.run(() => {
const { connect } = useSSE(() => {});
connect();
});

// Stack five back-to-back failures so the unjittered base delay grows
// to the 30s cap. After this loop the next attempt's envelope is
// [15s, 45s) — well outside the [500ms, 1500ms) envelope a freshly
// reset counter would produce. ``45_001`` clears the worst-case jitter
// draw on each iteration so the timer always fires.
const STACKED = 5;
for (let i = 0; i < STACKED; i++) {
const current = FakeEventSource.instances.at(-1)!;
current.readyState = FakeEventSource.CLOSED;
current.onerror?.();
vi.advanceTimersByTime(45_001);
}
const beforeReset = FakeEventSource.instances.length;
expect(beforeReset).toBe(STACKED + 1);

// Trigger a sixth failure and verify the *capped* envelope: a 2s
// advance is below the 15s lower bound, so no new instance appears.
// Without this assertion the reset test below would pass even if the
// counter never reset, since both paths would fire on a 2s advance.
const stale = FakeEventSource.instances.at(-1)!;
stale.readyState = FakeEventSource.CLOSED;
stale.onerror?.();
vi.advanceTimersByTime(2000);
expect(FakeEventSource.instances).toHaveLength(beforeReset);

// Drain the in-flight capped timer so we have a fresh source whose
// ``onopen`` will reset the counter.
vi.advanceTimersByTime(45_001);
expect(FakeEventSource.instances).toHaveLength(beforeReset + 1);
const reopened = FakeEventSource.instances.at(-1)!;
reopened.onopen?.();

// Counter reset → next failure's delay drops back to [500, 1500); a
// 2s advance must fire it.
reopened.readyState = FakeEventSource.CLOSED;
reopened.onerror?.();
vi.advanceTimersByTime(2000);
expect(FakeEventSource.instances).toHaveLength(beforeReset + 2);
});
});
134 changes: 126 additions & 8 deletions client/src/composables/useNotificationSSE.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,20 @@ export type SSEEventType = (typeof SSE_EVENT_TYPES)[number];
interface SSEDebugGlobals {
__galaxy_sse_connected?: boolean;
__galaxy_sse_last_event_ts?: number;
__galaxy_sse_reconnect_attempts?: number;
}

function sseGlobals(): SSEDebugGlobals {
return window as unknown as SSEDebugGlobals;
}

// Full-jitter exponential backoff bounds for managed reconnect. Aligned with
// the retry budget shape used by the polling paths (see
// ``isRetryableApiError`` in ``client/src/utils/simple-error.ts``); 30 s caps
// the delay during sustained 429/5xx so the client doesn't drift to minutes.
const RECONNECT_BASE_MS = 1000;
const RECONNECT_CAP_MS = 30_000;

// ---------------------------------------------------------------------------
// Module-level shared EventSource.
//
Expand All @@ -48,6 +56,15 @@ const subscribers: Map<SSEEventType, Set<Handler>> = new Map();
// exact same listeners (``addEventListener`` matches by reference).
const dispatchers: Map<SSEEventType, Handler> = new Map();

// Managed-reconnect state. We take over from the browser's native auto-retry
// once it flags ``readyState === CLOSED`` so that responses lacking a
// ``text/event-stream`` content type (a 429 / 5xx page, an HTML error page
// from a load balancer, etc.) don't strand the client on the polling
// fallback. ``reconnectAttempts`` is the input to the backoff formula and is
// reset to zero on every successful ``onopen``.
let reconnectAttempts = 0;
let reconnectTimer: ReturnType<typeof setTimeout> | null = null;

function openSourceIfNeeded() {
if (sharedSource) {
return;
Expand Down Expand Up @@ -78,6 +95,14 @@ function openSourceIfNeeded() {
// Global readiness flag so Selenium tests can distinguish a working
// SSE pipeline from the polling fallback.
sseGlobals().__galaxy_sse_connected = true;
// The connection is healthy again — drop any pending managed reopen
// and zero the backoff so the next failure starts at the base delay
// rather than wherever the previous outage left off.
reconnectAttempts = 0;
if (reconnectTimer !== null) {
clearTimeout(reconnectTimer);
reconnectTimer = null;
}
// Re-assert any viewer subscriptions the user accumulated. The server
// doesn't carry app-level subscription state across reconnects (it
// only knows the user from the cookie), so the client owns the source
Expand All @@ -86,12 +111,17 @@ function openSourceIfNeeded() {
};

sharedSource.onerror = () => {
// EventSource auto-reconnects natively; SSE-vs-polling is a
// config-level decision (see historyStore / notificationsStore), so
// we must not give up on transient errors here — doing so would leave
// the client with no updates at all.
sharedConnected.value = false;
sseGlobals().__galaxy_sse_connected = false;
// The browser auto-retries while ``readyState === CONNECTING``; let
// it. Once it flips to ``CLOSED`` (response missing
// ``text/event-stream``, repeated network failure giving up, etc.)
// the native loop is done and we own the reconnect — otherwise the
// client silently drops to polling-only updates for the rest of the
// session.
if (sharedSource?.readyState === EventSource.CLOSED) {
scheduleReconnect();
}
};

// Browser EventSource teardown during a full-page navigation
Expand All @@ -118,11 +148,70 @@ function closeSource() {
sharedSource = null;
sharedConnected.value = false;
sseGlobals().__galaxy_sse_connected = false;
// Cancel any pending managed reopen — without this, ``pagehide``-driven
// teardown could be followed by ``setTimeout`` re-opening a stream we
// just deliberately closed.
if (reconnectTimer !== null) {
clearTimeout(reconnectTimer);
reconnectTimer = null;
}
reconnectAttempts = 0;
if (typeof window !== "undefined") {
window.removeEventListener("pagehide", closeSource);
}
}

/**
* Tear down the EventSource without disturbing the subscriber map so the
* scheduled reopen ends up wired to the same handler set. ``closeSource`` is
* the right tool when *no* listener wants more events; this is the right tool
* when listeners still exist and only the underlying socket needs to cycle.
*/
function closeSourceForReconnect() {
if (!sharedSource) {
return;
}
for (const [eventType, dispatcher] of dispatchers) {
sharedSource.removeEventListener(eventType, dispatcher);
}
dispatchers.clear();
sharedSource.close();
sharedSource = null;
sharedConnected.value = false;
sseGlobals().__galaxy_sse_connected = false;
}

function scheduleReconnect() {
if (reconnectTimer !== null) {
// Already armed; the active timer will handle the next attempt.
return;
}
// Full-jitter exponential backoff: the random factor in [0.5, 1.5)
// smears retries from clients hitting the same outage so a recovering
// server isn't met with a synchronized stampede.
const exp = Math.min(RECONNECT_CAP_MS, RECONNECT_BASE_MS * 2 ** reconnectAttempts);
const delay = Math.floor(exp * (0.5 + Math.random()));
reconnectAttempts += 1;
const globals = sseGlobals();
globals.__galaxy_sse_reconnect_attempts = (globals.__galaxy_sse_reconnect_attempts ?? 0) + 1;
closeSourceForReconnect();
reconnectTimer = setTimeout(() => {
reconnectTimer = null;
// Subscribers may have all unsubscribed during the outage; if so, the
// shared source should stay closed.
let hasSubscribers = false;
for (const subs of subscribers.values()) {
if (subs.size > 0) {
hasSubscribers = true;
break;
}
}
if (hasSubscribers) {
openSourceIfNeeded();
}
}, delay);
}

function addSubscriber(onEvent: Handler, eventTypes: readonly SSEEventType[]) {
for (const eventType of eventTypes) {
let subs = subscribers.get(eventType);
Expand Down Expand Up @@ -157,10 +246,15 @@ function removeSubscriber(onEvent: Handler, eventTypes: readonly SSEEventType[])
/**
* Composable for subscribing to events on the shared SSE stream.
*
* The browser's EventSource handles reconnection automatically and sends the
* ``Last-Event-ID`` header so the server can catch up on missed events. Only
* one EventSource is opened per tab regardless of how many callers invoke
* this composable; the composable multiplexes dispatch per event type.
* Reconnection: the browser's native auto-retry handles the cheap path
* (transient network blips while ``readyState === CONNECTING``); once the
* source flips to ``CLOSED`` — typically a 4xx/5xx response with no
* ``text/event-stream`` body, which most browsers treat as fatal — this
* composable takes over with full-jitter exponential backoff capped at 30 s.
* The server emits ``id:`` per event so the ``Last-Event-ID`` header on
* reconnect lets the server catch up on missed events. Only one EventSource
* is opened per tab regardless of how many callers invoke this composable;
* the composable multiplexes dispatch per event type.
*
* @param onEvent - callback invoked for every matching SSE event
* @param eventTypes - subset of event types to listen to (defaults to all)
Expand Down Expand Up @@ -291,3 +385,27 @@ export function removeHistoryViewerSubscription(historyId: string): void {
export function _resetHistoryViewerSubscriptionsForTest(): void {
viewerSubscriptions.clear();
}

/** Test-only: tear down the shared source and reconnect state. */
export function _resetSSESharedSourceForTest(): void {
if (reconnectTimer !== null) {
clearTimeout(reconnectTimer);
reconnectTimer = null;
}
reconnectAttempts = 0;
if (sharedSource) {
for (const [eventType, dispatcher] of dispatchers) {
sharedSource.removeEventListener(eventType, dispatcher);
}
dispatchers.clear();
sharedSource.close();
sharedSource = null;
}
subscribers.clear();
sharedConnected.value = false;
sseEverConnected.value = false;
const globals = sseGlobals();
delete globals.__galaxy_sse_connected;
delete globals.__galaxy_sse_last_event_ts;
delete globals.__galaxy_sse_reconnect_attempts;
}
Loading
Loading