Skip to content

Commit 27d69f9

Browse files
committed
Manage SSE reconnect on transient 5xx/429 outages
The browser's native EventSource auto-retry gives up once readyState flips to CLOSED — typical for a 4xx/5xx response with no text/event-stream body — and the previous onerror handler did not detect that, leaving the client stranded on the polling fallback for the rest of the session. Take ownership of the reconnect loop on the client: close the source on onerror+CLOSED, schedule a reopen with full-jitter exponential backoff capped at 30 s, and reset the counter on a successful onopen. The replay-on-open viewer-subscription path is unchanged. Tested with a Playwright-only integration_selenium test that fails the first SSE request with a 503 via page.route() and asserts the client reconnects and delivers a subsequently-pushed notification.
1 parent 2e70707 commit 27d69f9

3 files changed

Lines changed: 437 additions & 9 deletions

File tree

client/src/composables/useNotificationSSE.test.ts

Lines changed: 160 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,17 @@
99

1010
import flushPromises from "flush-promises";
1111
import { http, HttpResponse } from "msw";
12-
import { afterEach, beforeEach, describe, expect, it } from "vitest";
12+
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
13+
import { type EffectScope, effectScope } from "vue";
1314

1415
import { useServerMock } from "@/api/client/__mocks__";
1516

1617
import {
1718
_resetHistoryViewerSubscriptionsForTest,
19+
_resetSSESharedSourceForTest,
1820
addHistoryViewerSubscription,
1921
removeHistoryViewerSubscription,
22+
useSSE,
2023
} from "./useNotificationSSE";
2124

2225
interface SubscriptionRequest {
@@ -98,3 +101,159 @@ describe("useNotificationSSE viewer subscriptions", () => {
98101
expect(new Set(ids)).toEqual(new Set(["hist-A", "hist-B"]));
99102
});
100103
});
104+
105+
/**
106+
* Reconnect-on-CLOSED tests.
107+
*
108+
* The browser's native ``EventSource`` retries while ``readyState ===
109+
* CONNECTING`` but gives up once it flips to ``CLOSED`` — for example, when a
110+
* 429/5xx response arrives without ``text/event-stream``. The composable must
111+
* notice that flip and schedule a manual reopen with backoff so the client
112+
* doesn't silently drop to polling-only updates for the rest of the session.
113+
*
114+
* We stub ``globalThis.EventSource`` with a fake whose lifecycle the test
115+
* drives directly: this keeps the test off jsdom's ``EventSource`` (which
116+
* doesn't actually open sockets) and gives us a deterministic handle on the
117+
* instance count for the "a *new* EventSource was constructed after backoff"
118+
* assertion.
119+
*/
120+
class FakeEventSource {
121+
static CONNECTING = 0;
122+
static OPEN = 1;
123+
static CLOSED = 2;
124+
static instances: FakeEventSource[] = [];
125+
126+
readonly url: string;
127+
readyState: number = FakeEventSource.CONNECTING;
128+
onopen: (() => void) | null = null;
129+
onerror: (() => void) | null = null;
130+
onmessage: ((e: MessageEvent) => void) | null = null;
131+
addEventListener = vi.fn();
132+
removeEventListener = vi.fn();
133+
close = vi.fn(() => {
134+
this.readyState = FakeEventSource.CLOSED;
135+
});
136+
137+
constructor(url: string) {
138+
this.url = url;
139+
FakeEventSource.instances.push(this);
140+
}
141+
142+
static reset() {
143+
FakeEventSource.instances = [];
144+
}
145+
}
146+
147+
describe("useNotificationSSE managed reconnect", () => {
148+
let originalEventSource: typeof EventSource | undefined;
149+
// ``useSSE`` registers an ``onScopeDispose`` cleanup; outside a Vue
150+
// component setup that warns and ``vitest-fail-on-console`` upgrades the
151+
// warning to a test failure. Wrap each test in an explicit scope so the
152+
// disposal hook has somewhere to attach.
153+
let scope: EffectScope;
154+
155+
beforeEach(() => {
156+
FakeEventSource.reset();
157+
originalEventSource = (globalThis as unknown as { EventSource?: typeof EventSource }).EventSource;
158+
(globalThis as unknown as { EventSource: unknown }).EventSource = FakeEventSource;
159+
vi.useFakeTimers();
160+
_resetSSESharedSourceForTest();
161+
scope = effectScope();
162+
});
163+
164+
afterEach(() => {
165+
scope.stop();
166+
vi.useRealTimers();
167+
_resetSSESharedSourceForTest();
168+
if (originalEventSource) {
169+
(globalThis as unknown as { EventSource: typeof EventSource }).EventSource = originalEventSource;
170+
} else {
171+
delete (globalThis as unknown as { EventSource?: unknown }).EventSource;
172+
}
173+
});
174+
175+
it("schedules a reopen when onerror fires with readyState=CLOSED", () => {
176+
scope.run(() => {
177+
const { connect } = useSSE(() => {});
178+
connect();
179+
});
180+
expect(FakeEventSource.instances).toHaveLength(1);
181+
182+
const first = FakeEventSource.instances[0]!;
183+
// Simulate the browser giving up on the native retry.
184+
first.readyState = FakeEventSource.CLOSED;
185+
first.onerror?.();
186+
187+
// Until the backoff fires, no replacement EventSource exists yet.
188+
expect(FakeEventSource.instances).toHaveLength(1);
189+
190+
// The first attempt's backoff is in [500ms, 1500ms); 2000ms is past
191+
// the upper bound regardless of the jitter draw, so a fixed advance
192+
// is deterministic without seeding ``Math.random``.
193+
vi.advanceTimersByTime(2000);
194+
expect(FakeEventSource.instances).toHaveLength(2);
195+
});
196+
197+
it("does not reopen while readyState=CONNECTING (browser is still retrying natively)", () => {
198+
scope.run(() => {
199+
const { connect } = useSSE(() => {});
200+
connect();
201+
});
202+
const first = FakeEventSource.instances[0]!;
203+
first.readyState = FakeEventSource.CONNECTING;
204+
first.onerror?.();
205+
206+
// No manual scheduling while the browser is still trying — would
207+
// otherwise double-up reconnect work and accelerate the retry loop.
208+
// A full minute past any backoff envelope without a second instance
209+
// proves the managed path stayed dormant.
210+
vi.advanceTimersByTime(60_000);
211+
expect(FakeEventSource.instances).toHaveLength(1);
212+
});
213+
214+
it("resets the backoff counter on a successful onopen", () => {
215+
scope.run(() => {
216+
const { connect } = useSSE(() => {});
217+
connect();
218+
});
219+
220+
// Stack five back-to-back failures so the unjittered base delay grows
221+
// to the 30s cap. After this loop the next attempt's envelope is
222+
// [15s, 45s) — well outside the [500ms, 1500ms) envelope a freshly
223+
// reset counter would produce. ``45_001`` clears the worst-case jitter
224+
// draw on each iteration so the timer always fires.
225+
const STACKED = 5;
226+
for (let i = 0; i < STACKED; i++) {
227+
const current = FakeEventSource.instances.at(-1)!;
228+
current.readyState = FakeEventSource.CLOSED;
229+
current.onerror?.();
230+
vi.advanceTimersByTime(45_001);
231+
}
232+
const beforeReset = FakeEventSource.instances.length;
233+
expect(beforeReset).toBe(STACKED + 1);
234+
235+
// Trigger a sixth failure and verify the *capped* envelope: a 2s
236+
// advance is below the 15s lower bound, so no new instance appears.
237+
// Without this assertion the reset test below would pass even if the
238+
// counter never reset, since both paths would fire on a 2s advance.
239+
const stale = FakeEventSource.instances.at(-1)!;
240+
stale.readyState = FakeEventSource.CLOSED;
241+
stale.onerror?.();
242+
vi.advanceTimersByTime(2000);
243+
expect(FakeEventSource.instances).toHaveLength(beforeReset);
244+
245+
// Drain the in-flight capped timer so we have a fresh source whose
246+
// ``onopen`` will reset the counter.
247+
vi.advanceTimersByTime(45_001);
248+
expect(FakeEventSource.instances).toHaveLength(beforeReset + 1);
249+
const reopened = FakeEventSource.instances.at(-1)!;
250+
reopened.onopen?.();
251+
252+
// Counter reset → next failure's delay drops back to [500, 1500); a
253+
// 2s advance must fire it.
254+
reopened.readyState = FakeEventSource.CLOSED;
255+
reopened.onerror?.();
256+
vi.advanceTimersByTime(2000);
257+
expect(FakeEventSource.instances).toHaveLength(beforeReset + 2);
258+
});
259+
});

client/src/composables/useNotificationSSE.ts

Lines changed: 126 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,20 @@ export type SSEEventType = (typeof SSE_EVENT_TYPES)[number];
1818
interface SSEDebugGlobals {
1919
__galaxy_sse_connected?: boolean;
2020
__galaxy_sse_last_event_ts?: number;
21+
__galaxy_sse_reconnect_attempts?: number;
2122
}
2223

2324
function sseGlobals(): SSEDebugGlobals {
2425
return window as unknown as SSEDebugGlobals;
2526
}
2627

28+
// Full-jitter exponential backoff bounds for managed reconnect. Aligned with
29+
// the retry budget shape used by the polling paths (see
30+
// ``isRetryableApiError`` in ``client/src/utils/simple-error.ts``); 30 s caps
31+
// the delay during sustained 429/5xx so the client doesn't drift to minutes.
32+
const RECONNECT_BASE_MS = 1000;
33+
const RECONNECT_CAP_MS = 30_000;
34+
2735
// ---------------------------------------------------------------------------
2836
// Module-level shared EventSource.
2937
//
@@ -48,6 +56,15 @@ const subscribers: Map<SSEEventType, Set<Handler>> = new Map();
4856
// exact same listeners (``addEventListener`` matches by reference).
4957
const dispatchers: Map<SSEEventType, Handler> = new Map();
5058

59+
// Managed-reconnect state. We take over from the browser's native auto-retry
60+
// once it flags ``readyState === CLOSED`` so that responses lacking a
61+
// ``text/event-stream`` content type (a 429 / 5xx page, an HTML error page
62+
// from a load balancer, etc.) don't strand the client on the polling
63+
// fallback. ``reconnectAttempts`` is the input to the backoff formula and is
64+
// reset to zero on every successful ``onopen``.
65+
let reconnectAttempts = 0;
66+
let reconnectTimer: ReturnType<typeof setTimeout> | null = null;
67+
5168
function openSourceIfNeeded() {
5269
if (sharedSource) {
5370
return;
@@ -78,6 +95,14 @@ function openSourceIfNeeded() {
7895
// Global readiness flag so Selenium tests can distinguish a working
7996
// SSE pipeline from the polling fallback.
8097
sseGlobals().__galaxy_sse_connected = true;
98+
// The connection is healthy again — drop any pending managed reopen
99+
// and zero the backoff so the next failure starts at the base delay
100+
// rather than wherever the previous outage left off.
101+
reconnectAttempts = 0;
102+
if (reconnectTimer !== null) {
103+
clearTimeout(reconnectTimer);
104+
reconnectTimer = null;
105+
}
81106
// Re-assert any viewer subscriptions the user accumulated. The server
82107
// doesn't carry app-level subscription state across reconnects (it
83108
// only knows the user from the cookie), so the client owns the source
@@ -86,12 +111,17 @@ function openSourceIfNeeded() {
86111
};
87112

88113
sharedSource.onerror = () => {
89-
// EventSource auto-reconnects natively; SSE-vs-polling is a
90-
// config-level decision (see historyStore / notificationsStore), so
91-
// we must not give up on transient errors here — doing so would leave
92-
// the client with no updates at all.
93114
sharedConnected.value = false;
94115
sseGlobals().__galaxy_sse_connected = false;
116+
// The browser auto-retries while ``readyState === CONNECTING``; let
117+
// it. Once it flips to ``CLOSED`` (response missing
118+
// ``text/event-stream``, repeated network failure giving up, etc.)
119+
// the native loop is done and we own the reconnect — otherwise the
120+
// client silently drops to polling-only updates for the rest of the
121+
// session.
122+
if (sharedSource?.readyState === EventSource.CLOSED) {
123+
scheduleReconnect();
124+
}
95125
};
96126

97127
// Browser EventSource teardown during a full-page navigation
@@ -118,11 +148,70 @@ function closeSource() {
118148
sharedSource = null;
119149
sharedConnected.value = false;
120150
sseGlobals().__galaxy_sse_connected = false;
151+
// Cancel any pending managed reopen — without this, ``pagehide``-driven
152+
// teardown could be followed by ``setTimeout`` re-opening a stream we
153+
// just deliberately closed.
154+
if (reconnectTimer !== null) {
155+
clearTimeout(reconnectTimer);
156+
reconnectTimer = null;
157+
}
158+
reconnectAttempts = 0;
121159
if (typeof window !== "undefined") {
122160
window.removeEventListener("pagehide", closeSource);
123161
}
124162
}
125163

164+
/**
165+
* Tear down the EventSource without disturbing the subscriber map so the
166+
* scheduled reopen ends up wired to the same handler set. ``closeSource`` is
167+
* the right tool when *no* listener wants more events; this is the right tool
168+
* when listeners still exist and only the underlying socket needs to cycle.
169+
*/
170+
function closeSourceForReconnect() {
171+
if (!sharedSource) {
172+
return;
173+
}
174+
for (const [eventType, dispatcher] of dispatchers) {
175+
sharedSource.removeEventListener(eventType, dispatcher);
176+
}
177+
dispatchers.clear();
178+
sharedSource.close();
179+
sharedSource = null;
180+
sharedConnected.value = false;
181+
sseGlobals().__galaxy_sse_connected = false;
182+
}
183+
184+
function scheduleReconnect() {
185+
if (reconnectTimer !== null) {
186+
// Already armed; the active timer will handle the next attempt.
187+
return;
188+
}
189+
// Full-jitter exponential backoff: the random factor in [0.5, 1.5)
190+
// smears retries from clients hitting the same outage so a recovering
191+
// server isn't met with a synchronized stampede.
192+
const exp = Math.min(RECONNECT_CAP_MS, RECONNECT_BASE_MS * 2 ** reconnectAttempts);
193+
const delay = Math.floor(exp * (0.5 + Math.random()));
194+
reconnectAttempts += 1;
195+
const globals = sseGlobals();
196+
globals.__galaxy_sse_reconnect_attempts = (globals.__galaxy_sse_reconnect_attempts ?? 0) + 1;
197+
closeSourceForReconnect();
198+
reconnectTimer = setTimeout(() => {
199+
reconnectTimer = null;
200+
// Subscribers may have all unsubscribed during the outage; if so, the
201+
// shared source should stay closed.
202+
let hasSubscribers = false;
203+
for (const subs of subscribers.values()) {
204+
if (subs.size > 0) {
205+
hasSubscribers = true;
206+
break;
207+
}
208+
}
209+
if (hasSubscribers) {
210+
openSourceIfNeeded();
211+
}
212+
}, delay);
213+
}
214+
126215
function addSubscriber(onEvent: Handler, eventTypes: readonly SSEEventType[]) {
127216
for (const eventType of eventTypes) {
128217
let subs = subscribers.get(eventType);
@@ -157,10 +246,15 @@ function removeSubscriber(onEvent: Handler, eventTypes: readonly SSEEventType[])
157246
/**
158247
* Composable for subscribing to events on the shared SSE stream.
159248
*
160-
* The browser's EventSource handles reconnection automatically and sends the
161-
* ``Last-Event-ID`` header so the server can catch up on missed events. Only
162-
* one EventSource is opened per tab regardless of how many callers invoke
163-
* this composable; the composable multiplexes dispatch per event type.
249+
* Reconnection: the browser's native auto-retry handles the cheap path
250+
* (transient network blips while ``readyState === CONNECTING``); once the
251+
* source flips to ``CLOSED`` — typically a 4xx/5xx response with no
252+
* ``text/event-stream`` body, which most browsers treat as fatal — this
253+
* composable takes over with full-jitter exponential backoff capped at 30 s.
254+
* The server emits ``id:`` per event so the ``Last-Event-ID`` header on
255+
* reconnect lets the server catch up on missed events. Only one EventSource
256+
* is opened per tab regardless of how many callers invoke this composable;
257+
* the composable multiplexes dispatch per event type.
164258
*
165259
* @param onEvent - callback invoked for every matching SSE event
166260
* @param eventTypes - subset of event types to listen to (defaults to all)
@@ -291,3 +385,27 @@ export function removeHistoryViewerSubscription(historyId: string): void {
291385
export function _resetHistoryViewerSubscriptionsForTest(): void {
292386
viewerSubscriptions.clear();
293387
}
388+
389+
/** Test-only: tear down the shared source and reconnect state. */
390+
export function _resetSSESharedSourceForTest(): void {
391+
if (reconnectTimer !== null) {
392+
clearTimeout(reconnectTimer);
393+
reconnectTimer = null;
394+
}
395+
reconnectAttempts = 0;
396+
if (sharedSource) {
397+
for (const [eventType, dispatcher] of dispatchers) {
398+
sharedSource.removeEventListener(eventType, dispatcher);
399+
}
400+
dispatchers.clear();
401+
sharedSource.close();
402+
sharedSource = null;
403+
}
404+
subscribers.clear();
405+
sharedConnected.value = false;
406+
sseEverConnected.value = false;
407+
const globals = sseGlobals();
408+
delete globals.__galaxy_sse_connected;
409+
delete globals.__galaxy_sse_last_event_ts;
410+
delete globals.__galaxy_sse_reconnect_attempts;
411+
}

0 commit comments

Comments
 (0)