Skip to content

Commit 038e247

Browse files
committed
Push SSE history updates to non-owner viewers
Owner dispatch only delivers history_update events to the user who owns the changed history. Watching a shared or pinned non-owned history (e.g. via the multi-history view) silently received nothing under SSE because the dispatch path keys on history ownership and there was no way for a viewer to opt in. Add an additive viewer-subscription channel: the client POSTs the history ids it wants pushes for to /api/events/history-subscriptions, the dispatcher fans the record out to every webapp worker over Kombu, and history_update fanout pushes to subscribed viewers in addition to the owner. Subscriptions are refcounted on the client and replayed on every EventSource onopen so reconnects don't drop them. Per-worker subscription state is purged when a user's last connection on that worker closes; the client's onopen replay re-asserts the desired set. No authorization check on the dispatch path: SSE events only carry history ids, the client follows up with a normal access-controlled REST fetch, and leaking "history changed at T" pings is acceptable. MultipleViewItem now subscribes whenever it renders a history the current user does not own and SSE is enabled; owned histories continue through owner dispatch with zero subscribe round-trip. Tests: integration test exercising subscribe -> mutate -> receive -> unsubscribe -> stop receiving end-to-end, plus vitest for the client refcount and replay logic.
1 parent 802f369 commit 038e247

9 files changed

Lines changed: 546 additions & 24 deletions

File tree

client/src/components/History/Multiple/MultipleViewItem.vue

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,17 @@ import { faTimes } from "@fortawesome/free-solid-svg-icons";
33
import { FontAwesomeIcon } from "@fortawesome/vue-fontawesome";
44
import { BButton } from "bootstrap-vue";
55
import { storeToRefs } from "pinia";
6-
import { computed, ref } from "vue";
6+
import { computed, ref, watch } from "vue";
77
8+
import { userOwnsHistory } from "@/api";
89
import { useExtendedHistory } from "@/composables/detailedHistory";
10+
import {
11+
addHistoryViewerSubscription,
12+
removeHistoryViewerSubscription,
13+
} from "@/composables/useNotificationSSE";
14+
import { useConfigStore } from "@/stores/configurationStore";
915
import { useHistoryStore } from "@/stores/historyStore";
16+
import { useUserStore } from "@/stores/userStore";
1017
1118
import CollectionPanel from "@/components/History/CurrentCollection/CollectionPanel.vue";
1219
import HistoryNavigation from "@/components/History/CurrentHistory/HistoryNavigation.vue";
@@ -24,6 +31,8 @@ const props = defineProps<Props>();
2431
2532
const historyStore = useHistoryStore();
2633
const { currentHistoryId, pinnedHistories } = storeToRefs(historyStore);
34+
const { currentUser } = storeToRefs(useUserStore());
35+
const configStore = useConfigStore();
2736
2837
const { history } = useExtendedHistory(props.source.id);
2938
@@ -33,6 +42,31 @@ const sameToCurrent = computed(() => {
3342
return currentHistoryId.value === props.source.id;
3443
});
3544
45+
// Owner routing covers histories the current user owns; only non-owned
46+
// histories need a viewer subscription. Skip in polling mode — there's no
47+
// SSE channel to push events to and the REST call would still hit the queue.
48+
const needsViewerSubscription = computed(() => {
49+
if (!configStore.config?.enable_sse_updates) {
50+
return false;
51+
}
52+
if (!history.value || !currentUser.value) {
53+
return false;
54+
}
55+
return !userOwnsHistory(currentUser.value, history.value);
56+
});
57+
58+
watch(
59+
[needsViewerSubscription, () => props.source.id],
60+
([subscribe, id], _previous, onCleanup) => {
61+
if (!subscribe || !id) {
62+
return;
63+
}
64+
addHistoryViewerSubscription(id);
65+
onCleanup(() => removeHistoryViewerSubscription(id));
66+
},
67+
{ immediate: true },
68+
);
69+
3670
function onViewCollection(collection: object) {
3771
selectedCollections.value = [...selectedCollections.value, collection];
3872
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/**
2+
* Unit tests for the viewer-subscription helpers in useNotificationSSE.
3+
*
4+
* The shared EventSource isn't exercised here — these tests focus on the
5+
* client-side bookkeeping: refcounting, dedup, and HTTP shape. Reconnect
6+
* replay is covered by an MSW request log assertion plus an explicit call
7+
* into the (test-only) onopen replay path.
8+
*/
9+
10+
import { http, HttpResponse } from "msw";
11+
import { afterEach, beforeEach, describe, expect, it } from "vitest";
12+
13+
import { useServerMock } from "@/api/client/__mocks__";
14+
15+
import {
16+
_resetHistoryViewerSubscriptionsForTest,
17+
addHistoryViewerSubscription,
18+
removeHistoryViewerSubscription,
19+
} from "./useNotificationSSE";
20+
21+
interface SubscriptionRequest {
22+
method: "POST" | "DELETE";
23+
history_ids: string[];
24+
}
25+
26+
const { server } = useServerMock();
27+
28+
describe("useNotificationSSE viewer subscriptions", () => {
29+
let requests: SubscriptionRequest[];
30+
31+
beforeEach(() => {
32+
_resetHistoryViewerSubscriptionsForTest();
33+
requests = [];
34+
const recordHandler = async ({ request }: { request: Request }) => {
35+
const body = (await request.json()) as { history_ids: string[] };
36+
requests.push({
37+
method: request.method as "POST" | "DELETE",
38+
history_ids: body.history_ids,
39+
});
40+
return new HttpResponse(null, { status: 204 });
41+
};
42+
server.use(
43+
http.post("/api/events/history-subscriptions", recordHandler),
44+
http.delete("/api/events/history-subscriptions", recordHandler),
45+
);
46+
});
47+
48+
afterEach(() => {
49+
_resetHistoryViewerSubscriptionsForTest();
50+
});
51+
52+
it("POSTs once per first subscriber for a given history id", async () => {
53+
addHistoryViewerSubscription("hist-A");
54+
await Promise.resolve();
55+
expect(requests).toHaveLength(1);
56+
expect(requests[0]?.method).toBe("POST");
57+
expect(requests[0]?.history_ids).toEqual(["hist-A"]);
58+
});
59+
60+
it("refcounts duplicate subscriptions — second add is a no-op on the wire", async () => {
61+
addHistoryViewerSubscription("hist-A");
62+
addHistoryViewerSubscription("hist-A");
63+
await Promise.resolve();
64+
expect(requests.filter((r) => r.method === "POST")).toHaveLength(1);
65+
});
66+
67+
it("only DELETEs when the last subscriber for an id releases", async () => {
68+
addHistoryViewerSubscription("hist-A");
69+
addHistoryViewerSubscription("hist-A");
70+
await Promise.resolve();
71+
const postCount = requests.filter((r) => r.method === "POST").length;
72+
73+
removeHistoryViewerSubscription("hist-A");
74+
await Promise.resolve();
75+
// First remove still has one outstanding refcount — must not DELETE yet.
76+
expect(requests.filter((r) => r.method === "DELETE")).toHaveLength(0);
77+
expect(requests.filter((r) => r.method === "POST")).toHaveLength(postCount);
78+
79+
removeHistoryViewerSubscription("hist-A");
80+
await Promise.resolve();
81+
const deletes = requests.filter((r) => r.method === "DELETE");
82+
expect(deletes).toHaveLength(1);
83+
expect(deletes[0]?.history_ids).toEqual(["hist-A"]);
84+
});
85+
86+
it("ignores unsubscribes for ids that were never subscribed", async () => {
87+
removeHistoryViewerSubscription("hist-never");
88+
await Promise.resolve();
89+
expect(requests).toHaveLength(0);
90+
});
91+
92+
it("tracks distinct history ids independently", async () => {
93+
addHistoryViewerSubscription("hist-A");
94+
addHistoryViewerSubscription("hist-B");
95+
await Promise.resolve();
96+
const ids = requests.filter((r) => r.method === "POST").map((r) => r.history_ids[0]);
97+
expect(new Set(ids)).toEqual(new Set(["hist-A", "hist-B"]));
98+
});
99+
});

client/src/composables/useNotificationSSE.ts

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,11 @@ function openSourceIfNeeded() {
7878
// Global readiness flag so Selenium tests can distinguish a working
7979
// SSE pipeline from the polling fallback.
8080
sseGlobals().__galaxy_sse_connected = true;
81+
// Re-assert any viewer subscriptions the user accumulated. The server
82+
// doesn't carry app-level subscription state across reconnects (it
83+
// only knows the user from the cookie), so the client owns the source
84+
// of truth and replays it on every successful open.
85+
replayViewerSubscriptionsOnOpen();
8186
};
8287

8388
sharedSource.onerror = () => {
@@ -207,3 +212,82 @@ export function useSSEConnectionStatus() {
207212
hasEverConnected: readonly(sseEverConnected),
208213
};
209214
}
215+
216+
// ---------------------------------------------------------------------------
217+
// Viewer subscriptions for non-owned histories
218+
//
219+
// Owner routing already covers history_update events for histories the
220+
// current user owns. Watching a non-owned history (e.g. a shared history
221+
// pinned in the multi-history view) requires the client to opt in by POSTing
222+
// the history id to ``/api/events/history-subscriptions``. The server keeps a
223+
// per-worker map and pushes the same history_update events to every viewer
224+
// in that map. The desired set is held module-level so reconnects can
225+
// replay it and so multiple components subscribed to the same id only emit
226+
// one HTTP call.
227+
// ---------------------------------------------------------------------------
228+
229+
const viewerSubscriptions = new Map<string, number>();
230+
231+
function postViewerSubscription(method: "POST" | "DELETE", historyIds: string[]): Promise<void> {
232+
if (historyIds.length === 0) {
233+
return Promise.resolve();
234+
}
235+
return fetch(withPrefix("/api/events/history-subscriptions"), {
236+
method,
237+
headers: { "Content-Type": "application/json", Accept: "application/json" },
238+
credentials: "same-origin",
239+
body: JSON.stringify({ history_ids: historyIds }),
240+
}).then((response) => {
241+
if (!response.ok) {
242+
throw new Error(`history-subscriptions ${method} failed: ${response.status}`);
243+
}
244+
});
245+
}
246+
247+
function replayViewerSubscriptionsOnOpen(): void {
248+
const ids = [...viewerSubscriptions.keys()];
249+
if (ids.length === 0) {
250+
return;
251+
}
252+
postViewerSubscription("POST", ids).catch((err) =>
253+
console.error("Failed to replay history viewer subscriptions on reconnect:", err),
254+
);
255+
}
256+
257+
/**
258+
* Add a viewer subscription for a history this user does not own. Refcounted
259+
* so two components watching the same history share one server-side
260+
* subscription and the first to mount opens it, last to unmount closes it.
261+
*
262+
* Idempotent per call: re-subscribing an already-tracked id does not POST a
263+
* duplicate.
264+
*/
265+
export function addHistoryViewerSubscription(historyId: string): void {
266+
const previous = viewerSubscriptions.get(historyId) ?? 0;
267+
viewerSubscriptions.set(historyId, previous + 1);
268+
if (previous === 0) {
269+
postViewerSubscription("POST", [historyId]).catch((err) =>
270+
console.error(`Failed to subscribe to history ${historyId}:`, err),
271+
);
272+
}
273+
}
274+
275+
export function removeHistoryViewerSubscription(historyId: string): void {
276+
const previous = viewerSubscriptions.get(historyId);
277+
if (!previous) {
278+
return;
279+
}
280+
if (previous > 1) {
281+
viewerSubscriptions.set(historyId, previous - 1);
282+
return;
283+
}
284+
viewerSubscriptions.delete(historyId);
285+
postViewerSubscription("DELETE", [historyId]).catch((err) =>
286+
console.error(`Failed to unsubscribe from history ${historyId}:`, err),
287+
);
288+
}
289+
290+
/** Test-only: drain the desired set so per-test state doesn't leak. */
291+
export function _resetHistoryViewerSubscriptionsForTest(): void {
292+
viewerSubscriptions.clear();
293+
}

lib/galaxy/managers/sse.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,12 @@ def __init__(self, statsd_client: Optional[VanillaGalaxyStatsdClient] = None) ->
8585
self._broadcast_connections: set[asyncio.Queue] = set()
8686
self._loop: Optional[asyncio.AbstractEventLoop] = None
8787
self._statsd_client = statsd_client
88+
# Viewer subscriptions for non-owned histories. Each worker keeps its
89+
# own copy; the producer fans out subscribe/unsubscribe via Kombu so
90+
# every webapp process learns about it. Maps encoded history_id (str)
91+
# to the set of user_ids / session_ids that have asked for events.
92+
self._history_viewers_user: dict[str, set[int]] = defaultdict(set)
93+
self._history_viewers_session: dict[str, set[int]] = defaultdict(set)
8894

8995
def _ensure_loop(self) -> None:
9096
"""Capture the running asyncio event loop. Must be called from async context."""
@@ -134,10 +140,16 @@ def disconnect(
134140
self._connections[user_id].discard(queue)
135141
if not self._connections[user_id]:
136142
del self._connections[user_id]
143+
# Last connection for this user on this worker — drop their
144+
# viewer subscriptions so the maps don't grow unbounded over
145+
# long-lived processes. The client re-asserts subscriptions
146+
# on every ``onopen`` so the next reconnect restores them.
147+
self._purge_user_viewer_subscriptions(user_id)
137148
if galaxy_session_id is not None:
138149
self._session_connections[galaxy_session_id].discard(queue)
139150
if not self._session_connections[galaxy_session_id]:
140151
del self._session_connections[galaxy_session_id]
152+
self._purge_session_viewer_subscriptions(galaxy_session_id)
141153
self._broadcast_connections.discard(queue)
142154
log.debug(
143155
"SSE connection closed for user_id=%s session_id=%s (total=%d)",
@@ -167,6 +179,60 @@ def push_broadcast(self, event: SSEEvent) -> None:
167179
for queue in list(self._broadcast_connections):
168180
self._safe_put(queue, event)
169181

182+
def push_to_history_viewers(self, history_id: str, event: SSEEvent) -> None:
183+
"""Thread-safe. Push an event to every user/session that has subscribed
184+
to ``history_id`` as a viewer (i.e. is watching a history they don't
185+
own). Owner routing remains a separate path so the common "user changes
186+
their own history" case never depends on a subscribe round trip.
187+
"""
188+
for user_id in list(self._history_viewers_user.get(history_id, ())):
189+
self.push_to_user(user_id, event)
190+
for session_id in list(self._history_viewers_session.get(history_id, ())):
191+
self.push_to_session(session_id, event)
192+
193+
# -- Viewer subscription bookkeeping --
194+
# These mutate per-worker state and are called from the Kombu task thread
195+
# via the ``subscribe_history_viewer`` / ``unsubscribe_history_viewer``
196+
# control tasks. They are not thread-safe with each other (defaultdict
197+
# mutations are not atomic), but Kombu drains the control queue serially
198+
# so concurrent calls into a single manager don't happen in practice.
199+
200+
def subscribe_user_viewer(self, user_id: int, history_id: str) -> None:
201+
self._history_viewers_user[history_id].add(user_id)
202+
203+
def unsubscribe_user_viewer(self, user_id: int, history_id: str) -> None:
204+
viewers = self._history_viewers_user.get(history_id)
205+
if viewers is None:
206+
return
207+
viewers.discard(user_id)
208+
if not viewers:
209+
del self._history_viewers_user[history_id]
210+
211+
def subscribe_session_viewer(self, session_id: int, history_id: str) -> None:
212+
self._history_viewers_session[history_id].add(session_id)
213+
214+
def unsubscribe_session_viewer(self, session_id: int, history_id: str) -> None:
215+
viewers = self._history_viewers_session.get(history_id)
216+
if viewers is None:
217+
return
218+
viewers.discard(session_id)
219+
if not viewers:
220+
del self._history_viewers_session[history_id]
221+
222+
def _purge_user_viewer_subscriptions(self, user_id: int) -> None:
223+
"""Drop every viewer subscription this user holds on this worker.
224+
225+
Called after their last connection on this worker closes. The map of
226+
``history_id → user_ids`` is small enough that a full scan is fine —
227+
the inverse index isn't worth maintaining at this scale.
228+
"""
229+
for history_id in list(self._history_viewers_user.keys()):
230+
self.unsubscribe_user_viewer(user_id, history_id)
231+
232+
def _purge_session_viewer_subscriptions(self, session_id: int) -> None:
233+
for history_id in list(self._history_viewers_session.keys()):
234+
self.unsubscribe_session_viewer(session_id, history_id)
235+
170236
def _safe_put(self, queue: asyncio.Queue, event: SSEEvent) -> None:
171237
"""Cross the thread boundary safely using ``call_soon_threadsafe``."""
172238
if self._loop is None or self._loop.is_closed():

lib/galaxy/managers/sse_dispatch.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,39 @@ def history_update(
150150
kwargs["session_updates"] = session_updates
151151
self._send("history_update", kwargs)
152152

153+
def subscribe_history_viewer(
154+
self,
155+
history_id: str,
156+
user_id: Optional[int] = None,
157+
session_id: Optional[int] = None,
158+
) -> None:
159+
"""Broadcast a viewer-subscription record so every webapp worker can
160+
push history_update events to a user/session watching a history they
161+
don't own. The dispatch is fire-and-forget — workers apply the
162+
record locally on receipt. Multi-worker deployments rely on this to
163+
keep subscriptions consistent regardless of which worker fields the
164+
REST call.
165+
"""
166+
kwargs: dict[str, Any] = {"history_id": history_id}
167+
if user_id is not None:
168+
kwargs["user_id"] = user_id
169+
if session_id is not None:
170+
kwargs["session_id"] = session_id
171+
self._send("subscribe_history_viewer", kwargs)
172+
173+
def unsubscribe_history_viewer(
174+
self,
175+
history_id: str,
176+
user_id: Optional[int] = None,
177+
session_id: Optional[int] = None,
178+
) -> None:
179+
kwargs: dict[str, Any] = {"history_id": history_id}
180+
if user_id is not None:
181+
kwargs["user_id"] = user_id
182+
if session_id is not None:
183+
kwargs["session_id"] = session_id
184+
self._send("unsubscribe_history_viewer", kwargs)
185+
153186
def entry_point_update(self, user_id: int, event_id: Optional[str] = None) -> None:
154187
"""Fan out a wake-up ``entry_point_update`` event for one user.
155188

0 commit comments

Comments
 (0)