Skip to content

Commit 801969d

Browse files
authored
Merge pull request #22629 from mvdbeek/improve-sse-reconnect-handling
Manage SSE reconnect on transient 5xx/429 outages
2 parents 058702d + 27d69f9 commit 801969d

3 files changed

Lines changed: 437 additions & 9 deletions

File tree

client/src/composables/useNotificationSSE.test.ts

Lines changed: 160 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,17 @@
99

1010
import flushPromises from "flush-promises";
1111
import { http, HttpResponse } from "msw";
12-
import { afterEach, beforeEach, describe, expect, it } from "vitest";
12+
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
13+
import { type EffectScope, effectScope } from "vue";
1314

1415
import { useServerMock } from "@/api/client/__mocks__";
1516

1617
import {
1718
_resetHistoryViewerSubscriptionsForTest,
19+
_resetSSESharedSourceForTest,
1820
addHistoryViewerSubscription,
1921
removeHistoryViewerSubscription,
22+
useSSE,
2023
} from "./useNotificationSSE";
2124

2225
interface SubscriptionRequest {
@@ -98,3 +101,159 @@ describe("useNotificationSSE viewer subscriptions", () => {
98101
expect(new Set(ids)).toEqual(new Set(["hist-A", "hist-B"]));
99102
});
100103
});
104+
105+
/**
106+
* Reconnect-on-CLOSED tests.
107+
*
108+
* The browser's native ``EventSource`` retries while ``readyState ===
109+
* CONNECTING`` but gives up once it flips to ``CLOSED`` — for example, when a
110+
* 429/5xx response arrives without ``text/event-stream``. The composable must
111+
* notice that flip and schedule a manual reopen with backoff so the client
112+
* doesn't silently drop to polling-only updates for the rest of the session.
113+
*
114+
* We stub ``globalThis.EventSource`` with a fake whose lifecycle the test
115+
* drives directly: this keeps the test off jsdom's ``EventSource`` (which
116+
* doesn't actually open sockets) and gives us a deterministic handle on the
117+
* instance count for the "a *new* EventSource was constructed after backoff"
118+
* assertion.
119+
*/
120+
class FakeEventSource {
121+
static CONNECTING = 0;
122+
static OPEN = 1;
123+
static CLOSED = 2;
124+
static instances: FakeEventSource[] = [];
125+
126+
readonly url: string;
127+
readyState: number = FakeEventSource.CONNECTING;
128+
onopen: (() => void) | null = null;
129+
onerror: (() => void) | null = null;
130+
onmessage: ((e: MessageEvent) => void) | null = null;
131+
addEventListener = vi.fn();
132+
removeEventListener = vi.fn();
133+
close = vi.fn(() => {
134+
this.readyState = FakeEventSource.CLOSED;
135+
});
136+
137+
constructor(url: string) {
138+
this.url = url;
139+
FakeEventSource.instances.push(this);
140+
}
141+
142+
static reset() {
143+
FakeEventSource.instances = [];
144+
}
145+
}
146+
147+
describe("useNotificationSSE managed reconnect", () => {
148+
let originalEventSource: typeof EventSource | undefined;
149+
// ``useSSE`` registers an ``onScopeDispose`` cleanup; outside a Vue
150+
// component setup that warns and ``vitest-fail-on-console`` upgrades the
151+
// warning to a test failure. Wrap each test in an explicit scope so the
152+
// disposal hook has somewhere to attach.
153+
let scope: EffectScope;
154+
155+
beforeEach(() => {
156+
FakeEventSource.reset();
157+
originalEventSource = (globalThis as unknown as { EventSource?: typeof EventSource }).EventSource;
158+
(globalThis as unknown as { EventSource: unknown }).EventSource = FakeEventSource;
159+
vi.useFakeTimers();
160+
_resetSSESharedSourceForTest();
161+
scope = effectScope();
162+
});
163+
164+
afterEach(() => {
165+
scope.stop();
166+
vi.useRealTimers();
167+
_resetSSESharedSourceForTest();
168+
if (originalEventSource) {
169+
(globalThis as unknown as { EventSource: typeof EventSource }).EventSource = originalEventSource;
170+
} else {
171+
delete (globalThis as unknown as { EventSource?: unknown }).EventSource;
172+
}
173+
});
174+
175+
it("schedules a reopen when onerror fires with readyState=CLOSED", () => {
176+
scope.run(() => {
177+
const { connect } = useSSE(() => {});
178+
connect();
179+
});
180+
expect(FakeEventSource.instances).toHaveLength(1);
181+
182+
const first = FakeEventSource.instances[0]!;
183+
// Simulate the browser giving up on the native retry.
184+
first.readyState = FakeEventSource.CLOSED;
185+
first.onerror?.();
186+
187+
// Until the backoff fires, no replacement EventSource exists yet.
188+
expect(FakeEventSource.instances).toHaveLength(1);
189+
190+
// The first attempt's backoff is in [500ms, 1500ms); 2000ms is past
191+
// the upper bound regardless of the jitter draw, so a fixed advance
192+
// is deterministic without seeding ``Math.random``.
193+
vi.advanceTimersByTime(2000);
194+
expect(FakeEventSource.instances).toHaveLength(2);
195+
});
196+
197+
it("does not reopen while readyState=CONNECTING (browser is still retrying natively)", () => {
198+
scope.run(() => {
199+
const { connect } = useSSE(() => {});
200+
connect();
201+
});
202+
const first = FakeEventSource.instances[0]!;
203+
first.readyState = FakeEventSource.CONNECTING;
204+
first.onerror?.();
205+
206+
// No manual scheduling while the browser is still trying — would
207+
// otherwise double-up reconnect work and accelerate the retry loop.
208+
// A full minute past any backoff envelope without a second instance
209+
// proves the managed path stayed dormant.
210+
vi.advanceTimersByTime(60_000);
211+
expect(FakeEventSource.instances).toHaveLength(1);
212+
});
213+
214+
it("resets the backoff counter on a successful onopen", () => {
215+
scope.run(() => {
216+
const { connect } = useSSE(() => {});
217+
connect();
218+
});
219+
220+
// Stack five back-to-back failures so the unjittered base delay grows
221+
// to the 30s cap. After this loop the next attempt's envelope is
222+
// [15s, 45s) — well outside the [500ms, 1500ms) envelope a freshly
223+
// reset counter would produce. ``45_001`` clears the worst-case jitter
224+
// draw on each iteration so the timer always fires.
225+
const STACKED = 5;
226+
for (let i = 0; i < STACKED; i++) {
227+
const current = FakeEventSource.instances.at(-1)!;
228+
current.readyState = FakeEventSource.CLOSED;
229+
current.onerror?.();
230+
vi.advanceTimersByTime(45_001);
231+
}
232+
const beforeReset = FakeEventSource.instances.length;
233+
expect(beforeReset).toBe(STACKED + 1);
234+
235+
// Trigger a sixth failure and verify the *capped* envelope: a 2s
236+
// advance is below the 15s lower bound, so no new instance appears.
237+
// Without this assertion the reset test below would pass even if the
238+
// counter never reset, since both paths would fire on a 2s advance.
239+
const stale = FakeEventSource.instances.at(-1)!;
240+
stale.readyState = FakeEventSource.CLOSED;
241+
stale.onerror?.();
242+
vi.advanceTimersByTime(2000);
243+
expect(FakeEventSource.instances).toHaveLength(beforeReset);
244+
245+
// Drain the in-flight capped timer so we have a fresh source whose
246+
// ``onopen`` will reset the counter.
247+
vi.advanceTimersByTime(45_001);
248+
expect(FakeEventSource.instances).toHaveLength(beforeReset + 1);
249+
const reopened = FakeEventSource.instances.at(-1)!;
250+
reopened.onopen?.();
251+
252+
// Counter reset → next failure's delay drops back to [500, 1500); a
253+
// 2s advance must fire it.
254+
reopened.readyState = FakeEventSource.CLOSED;
255+
reopened.onerror?.();
256+
vi.advanceTimersByTime(2000);
257+
expect(FakeEventSource.instances).toHaveLength(beforeReset + 2);
258+
});
259+
});

client/src/composables/useNotificationSSE.ts

Lines changed: 126 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,20 @@ export type SSEEventType = (typeof SSE_EVENT_TYPES)[number];
1818
interface SSEDebugGlobals {
1919
__galaxy_sse_connected?: boolean;
2020
__galaxy_sse_last_event_ts?: number;
21+
__galaxy_sse_reconnect_attempts?: number;
2122
}
2223

2324
function sseGlobals(): SSEDebugGlobals {
2425
return window as unknown as SSEDebugGlobals;
2526
}
2627

28+
// Full-jitter exponential backoff bounds for managed reconnect. Aligned with
29+
// the retry budget shape used by the polling paths (see
30+
// ``isRetryableApiError`` in ``client/src/utils/simple-error.ts``); 30 s caps
31+
// the delay during sustained 429/5xx so the client doesn't drift to minutes.
32+
const RECONNECT_BASE_MS = 1000;
33+
const RECONNECT_CAP_MS = 30_000;
34+
2735
// ---------------------------------------------------------------------------
2836
// Module-level shared EventSource.
2937
//
@@ -48,6 +56,15 @@ const subscribers: Map<SSEEventType, Set<Handler>> = new Map();
4856
// exact same listeners (``addEventListener`` matches by reference).
4957
const dispatchers: Map<SSEEventType, Handler> = new Map();
5058

59+
// Managed-reconnect state. We take over from the browser's native auto-retry
60+
// once it flags ``readyState === CLOSED`` so that responses lacking a
61+
// ``text/event-stream`` content type (a 429 / 5xx page, an HTML error page
62+
// from a load balancer, etc.) don't strand the client on the polling
63+
// fallback. ``reconnectAttempts`` is the input to the backoff formula and is
64+
// reset to zero on every successful ``onopen``.
65+
let reconnectAttempts = 0;
66+
let reconnectTimer: ReturnType<typeof setTimeout> | null = null;
67+
5168
function openSourceIfNeeded() {
5269
if (sharedSource) {
5370
return;
@@ -78,6 +95,14 @@ function openSourceIfNeeded() {
7895
// Global readiness flag so Selenium tests can distinguish a working
7996
// SSE pipeline from the polling fallback.
8097
sseGlobals().__galaxy_sse_connected = true;
98+
// The connection is healthy again — drop any pending managed reopen
99+
// and zero the backoff so the next failure starts at the base delay
100+
// rather than wherever the previous outage left off.
101+
reconnectAttempts = 0;
102+
if (reconnectTimer !== null) {
103+
clearTimeout(reconnectTimer);
104+
reconnectTimer = null;
105+
}
81106
// Re-assert any viewer subscriptions the user accumulated. The server
82107
// doesn't carry app-level subscription state across reconnects (it
83108
// only knows the user from the cookie), so the client owns the source
@@ -86,12 +111,17 @@ function openSourceIfNeeded() {
86111
};
87112

88113
sharedSource.onerror = () => {
89-
// EventSource auto-reconnects natively; SSE-vs-polling is a
90-
// config-level decision (see historyStore / notificationsStore), so
91-
// we must not give up on transient errors here — doing so would leave
92-
// the client with no updates at all.
93114
sharedConnected.value = false;
94115
sseGlobals().__galaxy_sse_connected = false;
116+
// The browser auto-retries while ``readyState === CONNECTING``; let
117+
// it. Once it flips to ``CLOSED`` (response missing
118+
// ``text/event-stream``, repeated network failure giving up, etc.)
119+
// the native loop is done and we own the reconnect — otherwise the
120+
// client silently drops to polling-only updates for the rest of the
121+
// session.
122+
if (sharedSource?.readyState === EventSource.CLOSED) {
123+
scheduleReconnect();
124+
}
95125
};
96126

97127
// Browser EventSource teardown during a full-page navigation
@@ -118,11 +148,70 @@ function closeSource() {
118148
sharedSource = null;
119149
sharedConnected.value = false;
120150
sseGlobals().__galaxy_sse_connected = false;
151+
// Cancel any pending managed reopen — without this, ``pagehide``-driven
152+
// teardown could be followed by ``setTimeout`` re-opening a stream we
153+
// just deliberately closed.
154+
if (reconnectTimer !== null) {
155+
clearTimeout(reconnectTimer);
156+
reconnectTimer = null;
157+
}
158+
reconnectAttempts = 0;
121159
if (typeof window !== "undefined") {
122160
window.removeEventListener("pagehide", closeSource);
123161
}
124162
}
125163

164+
/**
165+
* Tear down the EventSource without disturbing the subscriber map so the
166+
* scheduled reopen ends up wired to the same handler set. ``closeSource`` is
167+
* the right tool when *no* listener wants more events; this is the right tool
168+
* when listeners still exist and only the underlying socket needs to cycle.
169+
*/
170+
function closeSourceForReconnect() {
171+
if (!sharedSource) {
172+
return;
173+
}
174+
for (const [eventType, dispatcher] of dispatchers) {
175+
sharedSource.removeEventListener(eventType, dispatcher);
176+
}
177+
dispatchers.clear();
178+
sharedSource.close();
179+
sharedSource = null;
180+
sharedConnected.value = false;
181+
sseGlobals().__galaxy_sse_connected = false;
182+
}
183+
184+
function scheduleReconnect() {
185+
if (reconnectTimer !== null) {
186+
// Already armed; the active timer will handle the next attempt.
187+
return;
188+
}
189+
// Full-jitter exponential backoff: the random factor in [0.5, 1.5)
190+
// smears retries from clients hitting the same outage so a recovering
191+
// server isn't met with a synchronized stampede.
192+
const exp = Math.min(RECONNECT_CAP_MS, RECONNECT_BASE_MS * 2 ** reconnectAttempts);
193+
const delay = Math.floor(exp * (0.5 + Math.random()));
194+
reconnectAttempts += 1;
195+
const globals = sseGlobals();
196+
globals.__galaxy_sse_reconnect_attempts = (globals.__galaxy_sse_reconnect_attempts ?? 0) + 1;
197+
closeSourceForReconnect();
198+
reconnectTimer = setTimeout(() => {
199+
reconnectTimer = null;
200+
// Subscribers may have all unsubscribed during the outage; if so, the
201+
// shared source should stay closed.
202+
let hasSubscribers = false;
203+
for (const subs of subscribers.values()) {
204+
if (subs.size > 0) {
205+
hasSubscribers = true;
206+
break;
207+
}
208+
}
209+
if (hasSubscribers) {
210+
openSourceIfNeeded();
211+
}
212+
}, delay);
213+
}
214+
126215
function addSubscriber(onEvent: Handler, eventTypes: readonly SSEEventType[]) {
127216
for (const eventType of eventTypes) {
128217
let subs = subscribers.get(eventType);
@@ -157,10 +246,15 @@ function removeSubscriber(onEvent: Handler, eventTypes: readonly SSEEventType[])
157246
/**
158247
* Composable for subscribing to events on the shared SSE stream.
159248
*
160-
* The browser's EventSource handles reconnection automatically and sends the
161-
* ``Last-Event-ID`` header so the server can catch up on missed events. Only
162-
* one EventSource is opened per tab regardless of how many callers invoke
163-
* this composable; the composable multiplexes dispatch per event type.
249+
* Reconnection: the browser's native auto-retry handles the cheap path
250+
* (transient network blips while ``readyState === CONNECTING``); once the
251+
* source flips to ``CLOSED`` — typically a 4xx/5xx response with no
252+
* ``text/event-stream`` body, which most browsers treat as fatal — this
253+
* composable takes over with full-jitter exponential backoff capped at 30 s.
254+
* The server emits ``id:`` per event so the ``Last-Event-ID`` header on
255+
* reconnect lets the server catch up on missed events. Only one EventSource
256+
* is opened per tab regardless of how many callers invoke this composable;
257+
* the composable multiplexes dispatch per event type.
164258
*
165259
* @param onEvent - callback invoked for every matching SSE event
166260
* @param eventTypes - subset of event types to listen to (defaults to all)
@@ -291,3 +385,27 @@ export function removeHistoryViewerSubscription(historyId: string): void {
291385
export function _resetHistoryViewerSubscriptionsForTest(): void {
292386
viewerSubscriptions.clear();
293387
}
388+
389+
/** Test-only: tear down the shared source and reconnect state. */
390+
export function _resetSSESharedSourceForTest(): void {
391+
if (reconnectTimer !== null) {
392+
clearTimeout(reconnectTimer);
393+
reconnectTimer = null;
394+
}
395+
reconnectAttempts = 0;
396+
if (sharedSource) {
397+
for (const [eventType, dispatcher] of dispatchers) {
398+
sharedSource.removeEventListener(eventType, dispatcher);
399+
}
400+
dispatchers.clear();
401+
sharedSource.close();
402+
sharedSource = null;
403+
}
404+
subscribers.clear();
405+
sharedConnected.value = false;
406+
sseEverConnected.value = false;
407+
const globals = sseGlobals();
408+
delete globals.__galaxy_sse_connected;
409+
delete globals.__galaxy_sse_last_event_ts;
410+
delete globals.__galaxy_sse_reconnect_attempts;
411+
}

0 commit comments

Comments
 (0)