Skip to content

Commit e4d4bcb

Browse files
authored
Merge pull request #22513 from mvdbeek/sse-notifications
Server-Sent Events for history + notification updates
2 parents 882f22a + 014a4bf commit e4d4bcb

77 files changed

Lines changed: 5205 additions & 302 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/integration_selenium.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ env:
1919
YARN_INSTALL_OPTS: --frozen-lockfile
2020
GALAXY_CONFIG_SQLALCHEMY_WARN_20: '1'
2121
GALAXY_DEPENDENCIES_INSTALL_WEASYPRINT: '1'
22+
# TEMP: shake down SSE/notification system across full UI surface — revert before merge
23+
GALAXY_CONFIG_OVERRIDE_ENABLE_NOTIFICATION_SYSTEM: '1'
24+
GALAXY_CONFIG_OVERRIDE_ENABLE_SSE_HISTORY_UPDATES: '1'
25+
GALAXY_CONFIG_OVERRIDE_ENABLE_SSE_ENTRY_POINT_UPDATES: '1'
2226
concurrency:
2327
group: ${{ github.workflow }}-${{ github.ref }}
2428
cancel-in-progress: true

.github/workflows/playwright.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ env:
2020
GALAXY_TEST_SELENIUM_HEADLESS: 1
2121
YARN_INSTALL_OPTS: --frozen-lockfile
2222
GALAXY_CONFIG_SQLALCHEMY_WARN_20: '1'
23+
# TEMP: shake down SSE/notification system across full UI surface — revert before merge
24+
GALAXY_CONFIG_OVERRIDE_ENABLE_NOTIFICATION_SYSTEM: '1'
25+
GALAXY_CONFIG_OVERRIDE_ENABLE_SSE_HISTORY_UPDATES: '1'
26+
GALAXY_CONFIG_OVERRIDE_ENABLE_SSE_ENTRY_POINT_UPDATES: '1'
2327
concurrency:
2428
group: ${{ github.workflow }}-${{ github.ref }}
2529
cancel-in-progress: true

.github/workflows/selenium.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ env:
1919
GALAXY_TEST_SKIP_FLAKEY_TESTS_ON_ERROR: 1
2020
YARN_INSTALL_OPTS: --frozen-lockfile
2121
GALAXY_CONFIG_SQLALCHEMY_WARN_20: '1'
22+
# TEMP: shake down SSE/notification system across full UI surface — revert before merge
23+
GALAXY_CONFIG_OVERRIDE_ENABLE_NOTIFICATION_SYSTEM: '1'
24+
GALAXY_CONFIG_OVERRIDE_ENABLE_SSE_HISTORY_UPDATES: '1'
25+
GALAXY_CONFIG_OVERRIDE_ENABLE_SSE_ENTRY_POINT_UPDATES: '1'
2226
concurrency:
2327
group: ${{ github.workflow }}-${{ github.ref }}
2428
cancel-in-progress: true

client/src/api/client/pendingRequestsMiddleware.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ export const pendingRequestsMiddleware: Middleware = {
1818
return new Request(request, { headers });
1919
}
2020
const shared = getPendingAbortSignal();
21+
// Combine with any signal the caller may have set so we don't silently
22+
// drop their cancellation semantics.
2123
const signal = typeof AbortSignal.any === "function" ? AbortSignal.any([request.signal, shared]) : shared;
2224
return new Request(request, { signal });
2325
},

client/src/api/schema/schema.ts

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1292,6 +1292,33 @@ export interface paths {
12921292
patch?: never;
12931293
trace?: never;
12941294
};
1295+
"/api/events/stream": {
1296+
parameters: {
1297+
query?: never;
1298+
header?: never;
1299+
path?: never;
1300+
cookie?: never;
1301+
};
1302+
/**
1303+
* Server-Sent Events stream for real-time updates.
1304+
* @description Opens a Server-Sent Events (SSE) connection that pushes real-time
1305+
* updates for notifications, history changes, and other events.
1306+
*
1307+
* On reconnect, the browser sends the ``Last-Event-ID`` header automatically.
1308+
* If the notification system is enabled, any notifications created since that
1309+
* timestamp are delivered as a catch-up ``notification_status`` event.
1310+
*
1311+
* Anonymous users receive only broadcast events.
1312+
*/
1313+
get: operations["stream_events_api_events_stream_get"];
1314+
put?: never;
1315+
post?: never;
1316+
delete?: never;
1317+
options?: never;
1318+
head?: never;
1319+
patch?: never;
1320+
trace?: never;
1321+
};
12951322
"/api/exports": {
12961323
parameters: {
12971324
query?: never;
@@ -33451,6 +33478,46 @@ export interface operations {
3345133478
};
3345233479
};
3345333480
};
33481+
stream_events_api_events_stream_get: {
33482+
parameters: {
33483+
query?: never;
33484+
header?: {
33485+
"Last-Event-ID"?: string | null;
33486+
/** @description The user ID that will be used to effectively make this API call. Only admins and designated users can make API calls on behalf of other users. */
33487+
"run-as"?: string | null;
33488+
};
33489+
path?: never;
33490+
cookie?: never;
33491+
};
33492+
requestBody?: never;
33493+
responses: {
33494+
/** @description Successful Response */
33495+
200: {
33496+
headers: {
33497+
[name: string]: unknown;
33498+
};
33499+
content?: never;
33500+
};
33501+
/** @description Request Error */
33502+
"4XX": {
33503+
headers: {
33504+
[name: string]: unknown;
33505+
};
33506+
content: {
33507+
"application/json": components["schemas"]["MessageExceptionModel"];
33508+
};
33509+
};
33510+
/** @description Server Error */
33511+
"5XX": {
33512+
headers: {
33513+
[name: string]: unknown;
33514+
};
33515+
content: {
33516+
"application/json": components["schemas"]["MessageExceptionModel"];
33517+
};
33518+
};
33519+
};
33520+
};
3345433521
index_api_exports_get: {
3345533522
parameters: {
3345633523
query?: {

client/src/composables/useAuthNavigation.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,9 @@ import { useNotificationsStore } from "@/stores/notificationsStore";
1616
* will use a fresh signal and is not affected.
1717
*/
1818
export function discardActiveConnectionsBeforeAuthNavigation() {
19-
// Stop polling watchers first so they can't kick off new fetches, then
20-
// abort any requests still in flight via the shared AbortController.
19+
// Order: close SSE streams first (synchronous TCP close), then stop the
20+
// polling watchers so they can't kick off new fetches, then abort any
21+
// requests still in flight via the shared AbortController.
2122
useHistoryStore().stopWatchingHistory();
2223
useEntryPointStore().stopWatchingEntryPoints();
2324
useNotificationsStore().stopWatchingNotifications();
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
import { onScopeDispose, ref } from "vue";
2+
3+
import { withPrefix } from "@/utils/redirect";
4+
5+
/**
6+
* All SSE event types the server may emit.
7+
*/
8+
export const SSE_EVENT_TYPES = [
9+
"notification_update",
10+
"broadcast_update",
11+
"notification_status",
12+
"history_update",
13+
"entry_point_update",
14+
] as const;
15+
16+
export type SSEEventType = (typeof SSE_EVENT_TYPES)[number];
17+
18+
interface SSEDebugGlobals {
19+
__galaxy_sse_connected?: boolean;
20+
__galaxy_sse_last_event_ts?: number;
21+
}
22+
23+
function sseGlobals(): SSEDebugGlobals {
24+
return window as unknown as SSEDebugGlobals;
25+
}
26+
27+
// ---------------------------------------------------------------------------
28+
// Module-level shared EventSource.
29+
//
30+
// Every call to ``useSSE`` registers its handler against this one socket so
31+
// the tab opens a single ``/api/events/stream`` connection no matter how many
32+
// stores listen. HTTP/1.1 caps simultaneous connections per origin at six;
33+
// before this consolidation we burned three slots on SSE alone (history,
34+
// notifications, entry points), which is what starved the scratchbook iframe
35+
// flow — see the fix in ``client/src/entry/analysis/App.vue``.
36+
// ---------------------------------------------------------------------------
37+
38+
type Handler = (event: MessageEvent) => void;
39+
40+
let sharedSource: EventSource | null = null;
41+
const sharedConnected = ref(false);
42+
const subscribers: Map<SSEEventType, Set<Handler>> = new Map();
43+
// Track the per-type dispatchers we registered so ``closeSource`` removes the
44+
// exact same listeners (``addEventListener`` matches by reference).
45+
const dispatchers: Map<SSEEventType, Handler> = new Map();
46+
47+
function openSourceIfNeeded() {
48+
if (sharedSource) {
49+
return;
50+
}
51+
sharedSource = new EventSource(withPrefix("/api/events/stream"));
52+
53+
for (const eventType of SSE_EVENT_TYPES) {
54+
const dispatcher: Handler = (event) => {
55+
// Selenium tests watch ``__galaxy_sse_last_event_ts`` to prove that
56+
// an observable state change came from an SSE push and not the
57+
// polling fallback (where the global would never advance).
58+
sseGlobals().__galaxy_sse_last_event_ts = Date.now();
59+
const subs = subscribers.get(eventType);
60+
if (!subs) {
61+
return;
62+
}
63+
for (const handler of subs) {
64+
handler(event);
65+
}
66+
};
67+
dispatchers.set(eventType, dispatcher);
68+
sharedSource.addEventListener(eventType, dispatcher);
69+
}
70+
71+
sharedSource.onopen = () => {
72+
sharedConnected.value = true;
73+
// Global readiness flag so Selenium tests can distinguish a working
74+
// SSE pipeline from the polling fallback.
75+
sseGlobals().__galaxy_sse_connected = true;
76+
};
77+
78+
sharedSource.onerror = () => {
79+
// EventSource auto-reconnects natively; SSE-vs-polling is a
80+
// config-level decision (see historyStore / notificationsStore), so
81+
// we must not give up on transient errors here — doing so would leave
82+
// the client with no updates at all.
83+
sharedConnected.value = false;
84+
sseGlobals().__galaxy_sse_connected = false;
85+
};
86+
87+
// Browser EventSource teardown during a full-page navigation
88+
// (``window.location.href = …``) is not guaranteed to happen before the
89+
// browser issues requests for the new page — we've seen Chrome keep the
90+
// stream alive long enough that a login/register POST reload races the
91+
// close, and the new page then loads with a stale auth view. Force a
92+
// synchronous ``close()`` during ``pagehide`` (fires for both reloads and
93+
// tab-close, unlike ``beforeunload``) to close that window.
94+
if (typeof window !== "undefined") {
95+
window.addEventListener("pagehide", closeSource);
96+
}
97+
}
98+
99+
function closeSource() {
100+
if (!sharedSource) {
101+
return;
102+
}
103+
for (const [eventType, dispatcher] of dispatchers) {
104+
sharedSource.removeEventListener(eventType, dispatcher);
105+
}
106+
dispatchers.clear();
107+
sharedSource.close();
108+
sharedSource = null;
109+
sharedConnected.value = false;
110+
sseGlobals().__galaxy_sse_connected = false;
111+
if (typeof window !== "undefined") {
112+
window.removeEventListener("pagehide", closeSource);
113+
}
114+
}
115+
116+
function addSubscriber(onEvent: Handler, eventTypes: readonly SSEEventType[]) {
117+
for (const eventType of eventTypes) {
118+
let subs = subscribers.get(eventType);
119+
if (!subs) {
120+
subs = new Set();
121+
subscribers.set(eventType, subs);
122+
}
123+
subs.add(onEvent);
124+
}
125+
}
126+
127+
function removeSubscriber(onEvent: Handler, eventTypes: readonly SSEEventType[]): boolean {
128+
let anyRemaining = false;
129+
for (const eventType of eventTypes) {
130+
const subs = subscribers.get(eventType);
131+
if (subs) {
132+
subs.delete(onEvent);
133+
if (subs.size === 0) {
134+
subscribers.delete(eventType);
135+
}
136+
}
137+
}
138+
for (const subs of subscribers.values()) {
139+
if (subs.size > 0) {
140+
anyRemaining = true;
141+
break;
142+
}
143+
}
144+
return anyRemaining;
145+
}
146+
147+
/**
148+
* Composable for subscribing to events on the shared SSE stream.
149+
*
150+
* The browser's EventSource handles reconnection automatically and sends the
151+
* ``Last-Event-ID`` header so the server can catch up on missed events. Only
152+
* one EventSource is opened per tab regardless of how many callers invoke
153+
* this composable; the composable multiplexes dispatch per event type.
154+
*
155+
* @param onEvent - callback invoked for every matching SSE event
156+
* @param eventTypes - subset of event types to listen to (defaults to all)
157+
*/
158+
export function useSSE(onEvent: Handler, eventTypes: readonly SSEEventType[] = SSE_EVENT_TYPES) {
159+
let connected_: boolean = false;
160+
161+
function connect() {
162+
if (connected_) {
163+
return;
164+
}
165+
connected_ = true;
166+
addSubscriber(onEvent, eventTypes);
167+
openSourceIfNeeded();
168+
}
169+
170+
function disconnect() {
171+
if (!connected_) {
172+
return;
173+
}
174+
connected_ = false;
175+
const anyRemaining = removeSubscriber(onEvent, eventTypes);
176+
if (!anyRemaining) {
177+
closeSource();
178+
}
179+
}
180+
181+
onScopeDispose(() => {
182+
disconnect();
183+
});
184+
185+
return { connect, disconnect, connected: sharedConnected };
186+
}
187+
188+
/**
189+
* @deprecated Use `useSSE` instead. This alias exists for backward compatibility.
190+
*/
191+
export const useNotificationSSE = useSSE;

client/src/entry/analysis/App.vue

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
</template>
4949
<script>
5050
import { storeToRefs } from "pinia";
51-
import { ref, watch } from "vue";
51+
import { computed, ref, watch } from "vue";
5252
import { useRoute } from "vue-router/composables";
5353
5454
import { getGalaxyInstance } from "@/app";
@@ -105,9 +105,29 @@ export default {
105105
const uploadModal = ref(null);
106106
setGlobalUploadModal(uploadModal);
107107
108-
const embedded = useRouteQueryBool("embed");
108+
// Treat any iframe context as embedded: scratchbook pops dataset
109+
// displays into ``WinBox`` iframes that hit the same routes without
110+
// an ``embed`` query param, and each one would otherwise open its own
111+
// SSE + polling traffic, quickly saturating the HTTP/1.1 per-origin
112+
// connection pool (e.g. ``test_scratchbook_window_persistence`` hangs
113+
// indefinitely after two windows are open).
114+
const inIframe = (() => {
115+
if (typeof window === "undefined") {
116+
return false;
117+
}
118+
try {
119+
return window.top !== window.self;
120+
} catch {
121+
// Cross-origin access throws — that's definitely an iframe.
122+
return true;
123+
}
124+
})();
125+
const embeddedQuery = useRouteQueryBool("embed");
126+
const embedded = computed(() => embeddedQuery.value || inIframe);
109127
const historyStore = useHistoryStore();
110-
historyStore.startWatchingHistory();
128+
if (!embedded.value) {
129+
historyStore.startWatchingHistory();
130+
}
111131
112132
watch(
113133
() => embedded.value,

0 commit comments

Comments
 (0)