Skip to content

Commit 548b1bd

Browse files
committed
Address review feedback for SSE notifications
Backend - Narrow SSEEventDispatcher's dependencies. Previous constructor took the whole app and reached for `queue_worker`, `application_stack`, `config.server_name` off it (via send_control_task) — a service- locator pattern where the `MinimalManagerApp` annotation didn't even cover the accessed fields. Inject `queue_worker: Optional[ GalaxyQueueWorker]` and `application_stack: ApplicationStack` directly and publish via `ControlTask(qw).send_task(...)` so the dispatcher is unit-testable without a full app. - Move user-id resolution, notifications-enabled guard, and catch-up wiring out of `api/events.py` and `api/notifications.py`. Controllers are now pure `StreamingResponse` wrappers around `EventsService.open_stream(...)` / `NotificationService.open_stream (...)`, matching the three-layer architecture. - Drop the starlette `Request` leak from `managers/sse.py`. `SSEConnectionManager.stream` now takes an `IsDisconnected` (async callable) so the manager stays framework-agnostic; services pass `request.is_disconnected` in. - Centralise SSE event-id generation. New `make_event_id()` / `parse_event_id()` helpers in `managers/sse.py` use `galaxy.model.orm.now` (timezone-naive UTC, matching the rest of Galaxy's DB timestamps) instead of `datetime.utcnow()` scattered across three callers — previous code risked a tz-aware/naïve mix that would silently drop Last-Event-ID catch-up. - Drop dead `app.sse_connection_manager` attribute and `StructuredApp.sse_connection_manager` field. Nothing reads them; the manager is consumed via Lagom (`depends(SSEConnectionManager)` / `app[SSEConnectionManager]`). - Type-annotate the public control-task surface: `send_control_task`, `ControlTask.send_task`, `all_control_queues_for_declare`, SSE control-task handlers (`notify_users` / `notify_broadcast` / `history_update`), and both new stream endpoints. Tighten `SSEEventDispatcher.history_update` to `dict[str, list[int]]`. Tests - Replace the substring `data` matches with `json.loads(data)["content"] ["subject"]` so a regression in the NotificationResponse envelope (renamed keys, missing fields) actually fails. Confirmed shape against `NotificationResponse.model_dump_json()`. - Assert the *absence* of pre-Last-Event-ID subjects in the catch-up test. Without this, a server that replays everything on every reconnect silently passes. - Delete smoke tests that only checked content-type + 200 (subsumed by the functional tests) and the two "existing polling API still works" tests that didn't exercise SSE at all. - Add `window.__galaxy_sse_last_event_ts` to the `useSSE` composable and gate the selenium bell/notification tests on it advancing past a baseline. The previous 15s wait would silently become a polling test if the poll interval ever dropped — this makes "update came from SSE" a positive assertion. - Vitest: add a shared `_testing/sseStoreSupport.ts` helper for the two store tests to share a `useSSE` mock that captures the `onEvent` callback. Use it to synthesize real `MessageEvent`s and assert store-state changes for `notification_update`, `notification_status`, and `history_update` — the most consequential handlers were previously uncovered. Also save/restore `document.visibilityState` so the patching doesn't leak across tests, drop the dead `mockSseConnected` ref, tighten the idempotency assertion to advance time by one interval, and collapse the repeated orchestration into a `primeStore(...)` helper.
1 parent 4ba4a90 commit 548b1bd

17 files changed

Lines changed: 469 additions & 319 deletions

File tree

client/src/composables/useNotificationSSE.ts

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,15 @@ export const SSE_EVENT_TYPES = [
1414

1515
export type SSEEventType = (typeof SSE_EVENT_TYPES)[number];
1616

17+
interface SSEDebugGlobals {
18+
__galaxy_sse_connected?: boolean;
19+
__galaxy_sse_last_event_ts?: number;
20+
}
21+
22+
function sseGlobals(): SSEDebugGlobals {
23+
return window as unknown as SSEDebugGlobals;
24+
}
25+
1726
/**
1827
* Composable for connecting to the unified SSE event stream.
1928
*
@@ -27,20 +36,28 @@ export function useSSE(onEvent: (event: MessageEvent) => void, eventTypes: reado
2736
const connected = ref(false);
2837
let eventSource: EventSource | null = null;
2938

39+
// Selenium tests watch __galaxy_sse_last_event_ts to prove that an
40+
// observable state change came from an SSE push and not the polling
41+
// fallback (where __galaxy_sse_last_event_ts would never advance).
42+
const trackedOnEvent = (event: MessageEvent) => {
43+
sseGlobals().__galaxy_sse_last_event_ts = Date.now();
44+
onEvent(event);
45+
};
46+
3047
function connect() {
3148
disconnect();
3249
const url = withPrefix("/api/events/stream");
3350
eventSource = new EventSource(url);
3451

3552
for (const eventType of eventTypes) {
36-
eventSource.addEventListener(eventType, onEvent);
53+
eventSource.addEventListener(eventType, trackedOnEvent);
3754
}
3855

3956
eventSource.onopen = () => {
4057
connected.value = true;
4158
// Expose a global readiness flag so Selenium tests can distinguish
4259
// a working SSE pipeline from the polling fallback.
43-
(window as unknown as { __galaxy_sse_connected?: boolean }).__galaxy_sse_connected = true;
60+
sseGlobals().__galaxy_sse_connected = true;
4461
};
4562

4663
eventSource.onerror = () => {
@@ -49,20 +66,20 @@ export function useSSE(onEvent: (event: MessageEvent) => void, eventTypes: reado
4966
// so we must not give up on transient errors here — doing so
5067
// would leave the client with no updates at all.
5168
connected.value = false;
52-
(window as unknown as { __galaxy_sse_connected?: boolean }).__galaxy_sse_connected = false;
69+
sseGlobals().__galaxy_sse_connected = false;
5370
};
5471
}
5572

5673
function disconnect() {
5774
if (eventSource) {
5875
for (const eventType of eventTypes) {
59-
eventSource.removeEventListener(eventType, onEvent);
76+
eventSource.removeEventListener(eventType, trackedOnEvent);
6077
}
6178
eventSource.close();
6279
eventSource = null;
6380
}
6481
connected.value = false;
65-
(window as unknown as { __galaxy_sse_connected?: boolean }).__galaxy_sse_connected = false;
82+
sseGlobals().__galaxy_sse_connected = false;
6683
}
6784

6885
onScopeDispose(() => {
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/**
2+
* Shared test helpers for the SSE-driven stores (historyStore, notificationsStore).
3+
*
4+
* Both stores consume the same `useSSE` composable and need:
5+
* - a mock that captures the onEvent callback so tests can synthesize SSE messages;
6+
* - visibility-state patching without leaking across tests (JSDOM's `document`
7+
* is shared by every test in the same worker, so an unrestored
8+
* `Object.defineProperty` causes silent bleed).
9+
*
10+
* Because ``vi.mock`` is hoisted above module-level variables, tests must
11+
* construct the SSE-mock state via ``vi.hoisted`` and then hand it to
12+
* ``sseMockFactory`` from inside the ``vi.mock`` factory. See the ``.test.ts``
13+
* files in this directory for the pattern.
14+
*/
15+
16+
import { vi } from "vitest";
17+
18+
export interface SSEMockState {
19+
onEvent: ((event: MessageEvent) => void) | null;
20+
connect: ReturnType<typeof vi.fn>;
21+
disconnect: ReturnType<typeof vi.fn>;
22+
}
23+
24+
/** Build the factory used with ``vi.mock("@/composables/useNotificationSSE", ...)``. */
25+
export function sseMockFactory(state: SSEMockState) {
26+
return {
27+
useSSE: vi.fn((onEvent: (event: MessageEvent) => void) => {
28+
state.onEvent = onEvent;
29+
return { connect: state.connect, disconnect: state.disconnect };
30+
}),
31+
};
32+
}
33+
34+
/** Synthesize an SSE message through the captured handler. */
35+
export function emitSse(state: SSEMockState, type: string, payload: unknown): void {
36+
if (!state.onEvent) {
37+
throw new Error("useSSE was not called by the store under test — cannot emit an SSE event");
38+
}
39+
state.onEvent(new MessageEvent(type, { data: JSON.stringify(payload) }));
40+
}
41+
42+
/**
43+
* Save the current ``document.visibilityState`` descriptor and return a restorer.
44+
* Call the restorer in ``afterEach`` to prevent patching from leaking into later tests.
45+
*/
46+
export function useVisibilityPatch(): {
47+
set: (state: "visible" | "hidden") => void;
48+
restore: () => void;
49+
} {
50+
const original = Object.getOwnPropertyDescriptor(document, "visibilityState");
51+
return {
52+
set(state: "visible" | "hidden") {
53+
Object.defineProperty(document, "visibilityState", {
54+
configurable: true,
55+
get: () => state,
56+
});
57+
document.dispatchEvent(new Event("visibilitychange"));
58+
},
59+
restore() {
60+
if (original) {
61+
Object.defineProperty(document, "visibilityState", original);
62+
} else {
63+
delete (document as unknown as Record<string, unknown>).visibilityState;
64+
}
65+
},
66+
};
67+
}
Lines changed: 75 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,23 @@
11
import flushPromises from "flush-promises";
22
import { createPinia, setActivePinia } from "pinia";
33
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
4-
import { ref } from "vue";
54

65
import { useServerMock } from "@/api/client/__mocks__";
76

7+
import { emitSse, sseMockFactory, useVisibilityPatch } from "./_testing/sseStoreSupport";
88
import { useHistoryStore } from "./historyStore";
99

10-
// Capture SSE composable usage — neither test should rely on the real
11-
// EventSource. The `connected` ref stays false by default; the store must
12-
// NOT key polling behavior off it.
13-
const mockSseConnect = vi.fn();
14-
const mockSseDisconnect = vi.fn();
15-
const mockSseConnected = ref(false);
16-
17-
vi.mock("@/composables/useNotificationSSE", () => ({
18-
useSSE: vi.fn(() => ({
19-
connect: mockSseConnect,
20-
disconnect: mockSseDisconnect,
21-
connected: mockSseConnected,
22-
})),
23-
}));
10+
// ``vi.mock`` is hoisted above module-level ``const`` declarations, so the
11+
// capture-state has to be built via ``vi.hoisted`` to be visible to the factory.
12+
const sseState = vi.hoisted(() => {
13+
return {
14+
onEvent: null as ((event: MessageEvent) => void) | null,
15+
connect: vi.fn(),
16+
disconnect: vi.fn(),
17+
};
18+
});
19+
20+
vi.mock("@/composables/useNotificationSSE", () => sseMockFactory(sseState));
2421

2522
// `watchHistory(app)` is the polling handler invoked on the short/long
2623
// interval. We mock it so each invocation is observable without pulling in
@@ -40,31 +37,39 @@ vi.mock("@/app", () => ({
4037

4138
const { server, http } = useServerMock();
4239

43-
function configResponse(enableSse: boolean) {
44-
return { enable_sse_history_updates: enableSse };
45-
}
46-
4740
function registerDefaultHandlers({ enableSse }: { enableSse: boolean }) {
4841
server.use(
4942
http.get("/api/configuration", ({ response }) => {
5043
// eslint-disable-next-line @typescript-eslint/no-explicit-any
51-
return response(200).json(configResponse(enableSse) as any);
44+
return response(200).json({ enable_sse_history_updates: enableSse } as any);
5245
}),
5346
);
5447
}
5548

49+
async function primeStore(startFn: () => void): Promise<void> {
50+
startFn();
51+
// Config load is async; let the watch fire and the initial fetch complete.
52+
await flushPromises();
53+
await vi.runOnlyPendingTimersAsync();
54+
await flushPromises();
55+
}
56+
5657
describe("historyStore — config-driven SSE vs polling", () => {
58+
let visibility: ReturnType<typeof useVisibilityPatch>;
59+
5760
beforeEach(() => {
5861
setActivePinia(createPinia());
59-
mockSseConnect.mockClear();
60-
mockSseDisconnect.mockClear();
61-
mockSseConnected.value = false;
62+
sseState.connect.mockClear();
63+
sseState.disconnect.mockClear();
64+
sseState.onEvent = null;
6265
mockWatchHistory.mockClear();
6366
mockRefreshHistoryFromPush.mockClear();
6467
vi.useFakeTimers();
68+
visibility = useVisibilityPatch();
6569
});
6670

6771
afterEach(() => {
72+
visibility.restore();
6873
vi.useRealTimers();
6974
});
7075

@@ -75,17 +80,9 @@ describe("historyStore — config-driven SSE vs polling", () => {
7580

7681
it("primes the store with one initial load, connects SSE, and does not keep polling", async () => {
7782
const store = useHistoryStore();
83+
await primeStore(() => store.startWatchingHistory());
7884

79-
// `startWatchingHistory` is an exported alias for
80-
// `startWatchingHistoryWithSSE` (see historyStore.ts).
81-
store.startWatchingHistory();
82-
83-
// Config loads async; let the watch fire.
84-
await flushPromises();
85-
await vi.runOnlyPendingTimersAsync();
86-
await flushPromises();
87-
88-
expect(mockSseConnect).toHaveBeenCalledTimes(1);
85+
expect(sseState.connect).toHaveBeenCalledTimes(1);
8986
// One-shot initial fetch so the history panel isn't empty before
9087
// the first SSE event arrives.
9188
expect(mockWatchHistory).toHaveBeenCalledTimes(1);
@@ -99,33 +96,52 @@ describe("historyStore — config-driven SSE vs polling", () => {
9996

10097
it("does not start polling when the tab regains visibility", async () => {
10198
const store = useHistoryStore();
102-
store.startWatchingHistory();
103-
await flushPromises();
104-
await vi.runOnlyPendingTimersAsync();
105-
await flushPromises();
99+
await primeStore(() => store.startWatchingHistory());
106100
expect(mockWatchHistory).toHaveBeenCalledTimes(1);
107101

108102
// Simulate a tab hide/show cycle. `useResourceWatcher` registers
109103
// a `visibilitychange` listener whose handler calls
110104
// `startWatchingResourceIfNeeded` — in SSE mode that would
111105
// silently resume polling. Because we never instantiated the
112106
// watcher, no listener should exist and no poll should fire.
113-
Object.defineProperty(document, "visibilityState", {
114-
configurable: true,
115-
get: () => "hidden",
116-
});
117-
document.dispatchEvent(new Event("visibilitychange"));
118-
Object.defineProperty(document, "visibilityState", {
119-
configurable: true,
120-
get: () => "visible",
121-
});
122-
document.dispatchEvent(new Event("visibilitychange"));
107+
visibility.set("hidden");
108+
visibility.set("visible");
123109

124110
await flushPromises();
125111
vi.advanceTimersByTime(30_000);
126112
await flushPromises();
127113
expect(mockWatchHistory).toHaveBeenCalledTimes(1);
128114
});
115+
116+
it("triggers refreshHistoryFromPush when an SSE event names the current history", async () => {
117+
const store = useHistoryStore();
118+
await primeStore(() => store.startWatchingHistory());
119+
// Drive the store to a known current-history id so the handler has
120+
// something to match against. ``currentHistoryId`` is a computed
121+
// that only returns the stored id when the history is present in
122+
// ``storedHistories``, so the history has to be registered too.
123+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
124+
store.setHistory({ id: "hist-1" } as any);
125+
store.setCurrentHistoryId("hist-1");
126+
127+
mockRefreshHistoryFromPush.mockClear();
128+
emitSse(sseState, "history_update", { history_ids: ["hist-1", "hist-2"] });
129+
await flushPromises();
130+
131+
expect(mockRefreshHistoryFromPush).toHaveBeenCalledTimes(1);
132+
});
133+
134+
it("ignores SSE history events that do not include the current history", async () => {
135+
const store = useHistoryStore();
136+
await primeStore(() => store.startWatchingHistory());
137+
store.setCurrentHistoryId("hist-1");
138+
139+
mockRefreshHistoryFromPush.mockClear();
140+
emitSse(sseState, "history_update", { history_ids: ["hist-2"] });
141+
await flushPromises();
142+
143+
expect(mockRefreshHistoryFromPush).not.toHaveBeenCalled();
144+
});
129145
});
130146

131147
describe("when enable_sse_history_updates is false (polling scenario)", () => {
@@ -135,15 +151,9 @@ describe("historyStore — config-driven SSE vs polling", () => {
135151

136152
it("does not connect SSE and polls on the configured interval", async () => {
137153
const store = useHistoryStore();
154+
await primeStore(() => store.startWatchingHistory());
138155

139-
store.startWatchingHistory();
140-
141-
// Let the config load and the initial watch fire.
142-
await flushPromises();
143-
await vi.runOnlyPendingTimersAsync();
144-
await flushPromises();
145-
146-
expect(mockSseConnect).not.toHaveBeenCalled();
156+
expect(sseState.connect).not.toHaveBeenCalled();
147157

148158
// The resource watcher invokes the handler immediately on start
149159
// and then re-schedules after each completion. Advance past the
@@ -156,24 +166,23 @@ describe("historyStore — config-driven SSE vs polling", () => {
156166
expect(mockWatchHistory.mock.calls.length).toBeGreaterThan(initialCalls);
157167
});
158168

159-
it("calling startWatchingHistory again is idempotent (no second SSE, polling already running)", async () => {
169+
it("calling startWatchingHistory again is idempotent (no second SSE, polling tick count +1 only)", async () => {
160170
const store = useHistoryStore();
161-
162-
store.startWatchingHistory();
163-
await flushPromises();
164-
await vi.runOnlyPendingTimersAsync();
165-
await flushPromises();
171+
await primeStore(() => store.startWatchingHistory());
166172

167173
const pollsAfterFirst = mockWatchHistory.mock.calls.length;
168174

169175
store.startWatchingHistory();
170176
await flushPromises();
171177

172-
expect(mockSseConnect).not.toHaveBeenCalled();
173-
// Calling again does not schedule an additional independent
174-
// polling loop — the handler should not have been fired an
175-
// extra time by the second call alone.
176-
expect(mockWatchHistory.mock.calls.length).toBe(pollsAfterFirst);
178+
expect(sseState.connect).not.toHaveBeenCalled();
179+
// Calling again must not schedule a second independent polling loop.
180+
// Advance past one interval and confirm only one handler tick fires,
181+
// not two.
182+
await vi.advanceTimersByTimeAsync(3000);
183+
await flushPromises();
184+
const deltaAfterSecond = mockWatchHistory.mock.calls.length - pollsAfterFirst;
185+
expect(deltaAfterSecond).toBeLessThanOrEqual(1);
177186
});
178187
});
179188
});

0 commit comments

Comments
 (0)