Skip to content

Commit 2dc2c5b

Browse files
committed
Unify SSE: single shared EventSource, drop /api/notifications/stream
The three SSE-driven stores (history, notifications, entry points) each called useSSE() in their own setup(), producing three separate EventSource connections to /api/events/stream. HTTP/1.1 caps per-origin connections at six, so SSE alone consumed half the budget — and under iframe-heavy flows (scratchbook) it starved the pool and hung the tab. useSSE now multiplexes every subscriber through a single module-scoped EventSource with a per-event-type dispatch registry. Refcounted open/close so the socket is only created on first subscribe and closed when the last subscriber goes away. Existing store call sites are unchanged. While here: drop the dead /api/notifications/stream route. The generic /api/events/stream endpoint (lib/galaxy/webapps/galaxy/api/events.py) already serves notifications alongside history_update and entry_point_update events via the same SSEConnectionManager, and no frontend code referenced the notification-only path. Delete the route, its service method, and the auto-generated schema entries.
1 parent 08f1fb4 commit 2dc2c5b

4 files changed

Lines changed: 134 additions & 173 deletions

File tree

client/src/api/schema/schema.ts

Lines changed: 0 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -3995,32 +3995,6 @@ export interface paths {
39953995
patch?: never;
39963996
trace?: never;
39973997
};
3998-
"/api/notifications/stream": {
3999-
parameters: {
4000-
query?: never;
4001-
header?: never;
4002-
path?: never;
4003-
cookie?: never;
4004-
};
4005-
/**
4006-
* Server-Sent Events stream for real-time notification updates.
4007-
* @description Opens a Server-Sent Events (SSE) connection that pushes notification updates in real-time.
4008-
*
4009-
* On reconnect, the browser sends the ``Last-Event-ID`` header automatically.
4010-
* Any notifications created since that timestamp are delivered as a catch-up
4011-
* ``notification_status`` event before the stream begins.
4012-
*
4013-
* Anonymous users receive only broadcast events.
4014-
*/
4015-
get: operations["stream_notifications_api_notifications_stream_get"];
4016-
put?: never;
4017-
post?: never;
4018-
delete?: never;
4019-
options?: never;
4020-
head?: never;
4021-
patch?: never;
4022-
trace?: never;
4023-
};
40243998
"/api/notifications/{notification_id}": {
40253999
parameters: {
40264000
query?: never;
@@ -42485,46 +42459,6 @@ export interface operations {
4248542459
};
4248642460
};
4248742461
};
42488-
stream_notifications_api_notifications_stream_get: {
42489-
parameters: {
42490-
query?: never;
42491-
header?: {
42492-
"Last-Event-ID"?: string | null;
42493-
/** @description The user ID that will be used to effectively make this API call. Only admins and designated users can make API calls on behalf of other users. */
42494-
"run-as"?: string | null;
42495-
};
42496-
path?: never;
42497-
cookie?: never;
42498-
};
42499-
requestBody?: never;
42500-
responses: {
42501-
/** @description Successful Response */
42502-
200: {
42503-
headers: {
42504-
[name: string]: unknown;
42505-
};
42506-
content?: never;
42507-
};
42508-
/** @description Request Error */
42509-
"4XX": {
42510-
headers: {
42511-
[name: string]: unknown;
42512-
};
42513-
content: {
42514-
"application/json": components["schemas"]["MessageExceptionModel"];
42515-
};
42516-
};
42517-
/** @description Server Error */
42518-
"5XX": {
42519-
headers: {
42520-
[name: string]: unknown;
42521-
};
42522-
content: {
42523-
"application/json": components["schemas"]["MessageExceptionModel"];
42524-
};
42525-
};
42526-
};
42527-
};
4252842462
show_notification_api_notifications__notification_id__get: {
4252942463
parameters: {
4253042464
query?: never;

client/src/composables/useNotificationSSE.ts

Lines changed: 134 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -24,88 +24,165 @@ function sseGlobals(): SSEDebugGlobals {
2424
return window as unknown as SSEDebugGlobals;
2525
}
2626

27-
/**
28-
* Composable for connecting to the unified SSE event stream.
29-
*
30-
* The browser's EventSource handles reconnection automatically and
31-
* sends the Last-Event-ID header so the server can catch up on missed events.
32-
*
33-
* @param onEvent - callback invoked for every SSE event
34-
* @param eventTypes - subset of event types to listen to (defaults to all)
35-
*/
36-
export function useSSE(onEvent: (event: MessageEvent) => void, eventTypes: readonly SSEEventType[] = SSE_EVENT_TYPES) {
37-
const connected = ref(false);
38-
let eventSource: EventSource | null = null;
39-
40-
// Selenium tests watch __galaxy_sse_last_event_ts to prove that an
41-
// observable state change came from an SSE push and not the polling
42-
// fallback (where __galaxy_sse_last_event_ts would never advance).
43-
const trackedOnEvent = (event: MessageEvent) => {
44-
sseGlobals().__galaxy_sse_last_event_ts = Date.now();
45-
onEvent(event);
27+
// ---------------------------------------------------------------------------
28+
// Module-level shared EventSource.
29+
//
30+
// Every call to ``useSSE`` registers its handler against this one socket so
31+
// the tab opens a single ``/api/events/stream`` connection no matter how many
32+
// stores listen. HTTP/1.1 caps simultaneous connections per origin at six;
33+
// before this consolidation we burned three slots on SSE alone (history,
34+
// notifications, entry points), which is what starved the scratchbook iframe
35+
// flow — see the fix in ``client/src/entry/analysis/App.vue``.
36+
// ---------------------------------------------------------------------------
37+
38+
type Handler = (event: MessageEvent) => void;
39+
40+
let sharedSource: EventSource | null = null;
41+
const sharedConnected = ref(false);
42+
const subscribers: Map<SSEEventType, Set<Handler>> = new Map();
43+
// Track the per-type dispatchers we registered so ``closeSource`` removes the
44+
// exact same listeners (``addEventListener`` matches by reference).
45+
const dispatchers: Map<SSEEventType, Handler> = new Map();
46+
47+
function openSourceIfNeeded() {
48+
if (sharedSource) {
49+
return;
50+
}
51+
sharedSource = new EventSource(withPrefix("/api/events/stream"));
52+
53+
for (const eventType of SSE_EVENT_TYPES) {
54+
const dispatcher: Handler = (event) => {
55+
// Selenium tests watch ``__galaxy_sse_last_event_ts`` to prove that
56+
// an observable state change came from an SSE push and not the
57+
// polling fallback (where the global would never advance).
58+
sseGlobals().__galaxy_sse_last_event_ts = Date.now();
59+
const subs = subscribers.get(eventType);
60+
if (!subs) {
61+
return;
62+
}
63+
for (const handler of subs) {
64+
handler(event);
65+
}
66+
};
67+
dispatchers.set(eventType, dispatcher);
68+
sharedSource.addEventListener(eventType, dispatcher);
69+
}
70+
71+
sharedSource.onopen = () => {
72+
sharedConnected.value = true;
73+
// Global readiness flag so Selenium tests can distinguish a working
74+
// SSE pipeline from the polling fallback.
75+
sseGlobals().__galaxy_sse_connected = true;
76+
};
77+
78+
sharedSource.onerror = () => {
79+
// EventSource auto-reconnects natively; SSE-vs-polling is a
80+
// config-level decision (see historyStore / notificationsStore), so
81+
// we must not give up on transient errors here — doing so would leave
82+
// the client with no updates at all.
83+
sharedConnected.value = false;
84+
sseGlobals().__galaxy_sse_connected = false;
4685
};
4786

4887
// Browser EventSource teardown during a full-page navigation
4988
// (``window.location.href = …``) is not guaranteed to happen before the
5089
// browser issues requests for the new page — we've seen Chrome keep the
5190
// stream alive long enough that a login/register POST reload races the
5291
// close, and the new page then loads with a stale auth view. Force a
53-
// synchronous ``eventSource.close()`` during ``pagehide`` (fires for both
54-
// reloads and tab-close, unlike ``beforeunload``) to close that window.
55-
// The listener is registered only while a connection is live so composables
56-
// that never ``connect()`` don't leave dangling listeners behind.
57-
const onPageHide = () => disconnect();
92+
// synchronous ``close()`` during ``pagehide`` (fires for both reloads and
93+
// tab-close, unlike ``beforeunload``) to close that window.
94+
if (typeof window !== "undefined") {
95+
window.addEventListener("pagehide", closeSource);
96+
}
97+
}
5898

59-
function connect() {
60-
disconnect();
61-
const url = withPrefix("/api/events/stream");
62-
eventSource = new EventSource(url);
99+
function closeSource() {
100+
if (!sharedSource) {
101+
return;
102+
}
103+
for (const [eventType, dispatcher] of dispatchers) {
104+
sharedSource.removeEventListener(eventType, dispatcher);
105+
}
106+
dispatchers.clear();
107+
sharedSource.close();
108+
sharedSource = null;
109+
sharedConnected.value = false;
110+
sseGlobals().__galaxy_sse_connected = false;
111+
if (typeof window !== "undefined") {
112+
window.removeEventListener("pagehide", closeSource);
113+
}
114+
}
63115

64-
for (const eventType of eventTypes) {
65-
eventSource.addEventListener(eventType, trackedOnEvent);
116+
function addSubscriber(onEvent: Handler, eventTypes: readonly SSEEventType[]) {
117+
for (const eventType of eventTypes) {
118+
let subs = subscribers.get(eventType);
119+
if (!subs) {
120+
subs = new Set();
121+
subscribers.set(eventType, subs);
66122
}
123+
subs.add(onEvent);
124+
}
125+
}
67126

68-
eventSource.onopen = () => {
69-
connected.value = true;
70-
// Expose a global readiness flag so Selenium tests can distinguish
71-
// a working SSE pipeline from the polling fallback.
72-
sseGlobals().__galaxy_sse_connected = true;
73-
};
127+
function removeSubscriber(onEvent: Handler, eventTypes: readonly SSEEventType[]): boolean {
128+
let anyRemaining = false;
129+
for (const eventType of eventTypes) {
130+
const subs = subscribers.get(eventType);
131+
if (subs) {
132+
subs.delete(onEvent);
133+
if (subs.size === 0) {
134+
subscribers.delete(eventType);
135+
}
136+
}
137+
}
138+
for (const subs of subscribers.values()) {
139+
if (subs.size > 0) {
140+
anyRemaining = true;
141+
break;
142+
}
143+
}
144+
return anyRemaining;
145+
}
74146

75-
eventSource.onerror = () => {
76-
// EventSource auto-reconnects natively; SSE-vs-polling is a
77-
// config-level decision (see historyStore / notificationsStore),
78-
// so we must not give up on transient errors here — doing so
79-
// would leave the client with no updates at all.
80-
connected.value = false;
81-
sseGlobals().__galaxy_sse_connected = false;
82-
};
147+
/**
148+
* Composable for subscribing to events on the shared SSE stream.
149+
*
150+
* The browser's EventSource handles reconnection automatically and sends the
151+
* ``Last-Event-ID`` header so the server can catch up on missed events. Only
152+
* one EventSource is opened per tab regardless of how many callers invoke
153+
* this composable; the composable multiplexes dispatch per event type.
154+
*
155+
* @param onEvent - callback invoked for every matching SSE event
156+
* @param eventTypes - subset of event types to listen to (defaults to all)
157+
*/
158+
export function useSSE(onEvent: Handler, eventTypes: readonly SSEEventType[] = SSE_EVENT_TYPES) {
159+
let connected_: boolean = false;
83160

84-
if (typeof window !== "undefined") {
85-
window.addEventListener("pagehide", onPageHide);
161+
function connect() {
162+
if (connected_) {
163+
return;
86164
}
165+
connected_ = true;
166+
addSubscriber(onEvent, eventTypes);
167+
openSourceIfNeeded();
87168
}
88169

89170
function disconnect() {
90-
if (eventSource) {
91-
for (const eventType of eventTypes) {
92-
eventSource.removeEventListener(eventType, trackedOnEvent);
93-
}
94-
eventSource.close();
95-
eventSource = null;
171+
if (!connected_) {
172+
return;
96173
}
97-
if (typeof window !== "undefined") {
98-
window.removeEventListener("pagehide", onPageHide);
174+
connected_ = false;
175+
const anyRemaining = removeSubscriber(onEvent, eventTypes);
176+
if (!anyRemaining) {
177+
closeSource();
99178
}
100-
connected.value = false;
101-
sseGlobals().__galaxy_sse_connected = false;
102179
}
103180

104181
onScopeDispose(() => {
105182
disconnect();
106183
});
107184

108-
return { connect, disconnect, connected };
185+
return { connect, disconnect, connected: sharedConnected };
109186
}
110187

111188
/**

lib/galaxy/webapps/galaxy/api/notifications.py

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,10 @@
1010

1111
from fastapi import (
1212
Body,
13-
Header,
1413
Query,
15-
Request,
1614
Response,
1715
status,
1816
)
19-
from starlette.responses import StreamingResponse
2017

2118
from galaxy.managers.context import ProvidesUserContext
2219
from galaxy.schema.notifications import (
@@ -55,35 +52,6 @@
5552
class FastAPINotifications:
5653
service: NotificationService = depends(NotificationService)
5754

58-
@router.get(
59-
"/api/notifications/stream",
60-
summary="Server-Sent Events stream for real-time notification updates.",
61-
response_class=StreamingResponse,
62-
)
63-
async def stream_notifications(
64-
self,
65-
request: Request,
66-
trans: ProvidesUserContext = DependsOnTrans,
67-
last_event_id: Optional[str] = Header(None, alias="Last-Event-ID"),
68-
) -> StreamingResponse:
69-
"""Opens a Server-Sent Events (SSE) connection that pushes notification updates in real-time.
70-
71-
On reconnect, the browser sends the ``Last-Event-ID`` header automatically.
72-
Any notifications created since that timestamp are delivered as a catch-up
73-
``notification_status`` event before the stream begins.
74-
75-
Anonymous users receive only broadcast events.
76-
"""
77-
return StreamingResponse(
78-
self.service.open_stream(trans, last_event_id, request.is_disconnected),
79-
media_type="text/event-stream",
80-
headers={
81-
"Cache-Control": "no-cache",
82-
"Connection": "keep-alive",
83-
"X-Accel-Buffering": "no",
84-
},
85-
)
86-
8755
@router.get(
8856
"/api/notifications/status",
8957
summary="Returns the current status summary of the user's notifications since a particular date.",

lib/galaxy/webapps/galaxy/services/notifications.py

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
from collections.abc import AsyncIterator
21
from datetime import datetime
32
from typing import (
43
NoReturn,
@@ -15,7 +14,6 @@
1514
from galaxy.managers.context import ProvidesUserContext
1615
from galaxy.managers.notification import NotificationManager
1716
from galaxy.managers.sse import (
18-
IsDisconnected,
1917
make_event_id,
2018
parse_event_id,
2119
SSEConnectionManager,
@@ -50,22 +48,6 @@ def __init__(self, notification_manager: NotificationManager, sse_manager: SSECo
5048
self.notification_manager = notification_manager
5149
self.sse_manager = sse_manager
5250

53-
def open_stream(
54-
self,
55-
user_context: ProvidesUserContext,
56-
last_event_id: Optional[str],
57-
is_disconnected: IsDisconnected,
58-
) -> AsyncIterator[str]:
59-
"""Open an SSE notification stream for ``user_context``.
60-
61-
Enforces the notifications-enabled guard, builds the optional catch-up,
62-
and resolves the user id so the controller stays a thin wrapper.
63-
"""
64-
self.notification_manager.ensure_notifications_enabled()
65-
user_id = user_context.user.id if not user_context.anonymous else None
66-
catch_up = self.build_status_catchup(user_context, last_event_id)
67-
return self.sse_manager.stream(is_disconnected, user_id, catch_up=catch_up)
68-
6951
def send_notification(
7052
self, sender_context: ProvidesUserContext, payload: NotificationCreateRequestBody
7153
) -> Union[NotificationCreatedResponse, AsyncTaskResultSummary]:

0 commit comments

Comments
 (0)