Skip to content

Commit 6cd5b7f

Browse files
committed
Push SSE history updates to non-owner viewers
Owner dispatch only delivers history_update events to the user who owns the changed history. Watching a shared or pinned non-owned history (e.g. via the multi-history view) silently received nothing under SSE because the dispatch path keys on history ownership and there was no way for a viewer to opt in. Add an additive viewer-subscription channel: the client POSTs the history ids it wants pushes for to /api/events/history-subscriptions, the dispatcher fans the record out to every webapp worker over Kombu, and history_update fanout pushes to subscribed viewers in addition to the owner. Subscriptions are refcounted on the client and replayed on every EventSource onopen so reconnects don't drop them. Per-worker subscription state is purged when a user's last connection on that worker closes; the client's onopen replay re-asserts the desired set. No authorization check on the dispatch path: SSE events only carry history ids, the client follows up with a normal access-controlled REST fetch, and leaking "history changed at T" pings is acceptable. MultipleViewItem now subscribes whenever it renders a history the current user does not own and SSE is enabled; owned histories continue through owner dispatch with zero subscribe round-trip. Tests: integration test exercising subscribe -> mutate -> receive -> unsubscribe -> stop receiving end-to-end, plus vitest for the client refcount and replay logic.
1 parent ee5ebd3 commit 6cd5b7f

10 files changed

Lines changed: 660 additions & 24 deletions

File tree

client/src/api/schema/schema.ts

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1292,6 +1292,31 @@ export interface paths {
12921292
patch?: never;
12931293
trace?: never;
12941294
};
1295+
"/api/events/history-subscriptions": {
1296+
parameters: {
1297+
query?: never;
1298+
header?: never;
1299+
path?: never;
1300+
cookie?: never;
1301+
};
1302+
get?: never;
1303+
put?: never;
1304+
/**
1305+
* Subscribe to history_update SSE events for histories you don't own.
1306+
* @description Asks every webapp worker to start routing ``history_update`` events
1307+
* for these histories to the requesting user/session, in addition to the
1308+
* default owner-routing. Idempotent: re-subscribing to the same id is a
1309+
* no-op. Clients re-send the full set after each ``EventSource.onopen``
1310+
* so reconnects don't drop subscriptions.
1311+
*/
1312+
post: operations["subscribe_history_viewer_api_events_history_subscriptions_post"];
1313+
/** Cancel viewer subscriptions for these histories. */
1314+
delete: operations["unsubscribe_history_viewer_api_events_history_subscriptions_delete"];
1315+
options?: never;
1316+
head?: never;
1317+
patch?: never;
1318+
trace?: never;
1319+
};
12951320
"/api/events/stream": {
12961321
parameters: {
12971322
query?: never;
@@ -15433,6 +15458,14 @@ export interface components {
1543315458
*/
1543415459
url: string;
1543515460
};
15461+
/**
15462+
* HistoryViewerSubscriptionPayload
15463+
* @description REST payload for ``/api/events/history-subscriptions`` endpoints.
15464+
*/
15465+
HistoryViewerSubscriptionPayload: {
15466+
/** History Ids */
15467+
history_ids: string[];
15468+
};
1543615469
/**
1543715470
* Hyperlink
1543815471
* @description Represents some text with an Hyperlink.
@@ -33478,6 +33511,92 @@ export interface operations {
3347833511
};
3347933512
};
3348033513
};
33514+
subscribe_history_viewer_api_events_history_subscriptions_post: {
33515+
parameters: {
33516+
query?: never;
33517+
header?: {
33518+
/** @description The user ID that will be used to effectively make this API call. Only admins and designated users can make API calls on behalf of other users. */
33519+
"run-as"?: string | null;
33520+
};
33521+
path?: never;
33522+
cookie?: never;
33523+
};
33524+
requestBody: {
33525+
content: {
33526+
"application/json": components["schemas"]["HistoryViewerSubscriptionPayload"];
33527+
};
33528+
};
33529+
responses: {
33530+
/** @description Successful Response */
33531+
204: {
33532+
headers: {
33533+
[name: string]: unknown;
33534+
};
33535+
content?: never;
33536+
};
33537+
/** @description Request Error */
33538+
"4XX": {
33539+
headers: {
33540+
[name: string]: unknown;
33541+
};
33542+
content: {
33543+
"application/json": components["schemas"]["MessageExceptionModel"];
33544+
};
33545+
};
33546+
/** @description Server Error */
33547+
"5XX": {
33548+
headers: {
33549+
[name: string]: unknown;
33550+
};
33551+
content: {
33552+
"application/json": components["schemas"]["MessageExceptionModel"];
33553+
};
33554+
};
33555+
};
33556+
};
33557+
unsubscribe_history_viewer_api_events_history_subscriptions_delete: {
33558+
parameters: {
33559+
query?: never;
33560+
header?: {
33561+
/** @description The user ID that will be used to effectively make this API call. Only admins and designated users can make API calls on behalf of other users. */
33562+
"run-as"?: string | null;
33563+
};
33564+
path?: never;
33565+
cookie?: never;
33566+
};
33567+
requestBody: {
33568+
content: {
33569+
"application/json": components["schemas"]["HistoryViewerSubscriptionPayload"];
33570+
};
33571+
};
33572+
responses: {
33573+
/** @description Successful Response */
33574+
204: {
33575+
headers: {
33576+
[name: string]: unknown;
33577+
};
33578+
content?: never;
33579+
};
33580+
/** @description Request Error */
33581+
"4XX": {
33582+
headers: {
33583+
[name: string]: unknown;
33584+
};
33585+
content: {
33586+
"application/json": components["schemas"]["MessageExceptionModel"];
33587+
};
33588+
};
33589+
/** @description Server Error */
33590+
"5XX": {
33591+
headers: {
33592+
[name: string]: unknown;
33593+
};
33594+
content: {
33595+
"application/json": components["schemas"]["MessageExceptionModel"];
33596+
};
33597+
};
33598+
};
33599+
};
3348133600
stream_events_api_events_stream_get: {
3348233601
parameters: {
3348333602
query?: never;

client/src/components/History/Multiple/MultipleViewItem.vue

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,14 @@ import { faTimes } from "@fortawesome/free-solid-svg-icons";
33
import { FontAwesomeIcon } from "@fortawesome/vue-fontawesome";
44
import { BButton } from "bootstrap-vue";
55
import { storeToRefs } from "pinia";
6-
import { computed, ref } from "vue";
6+
import { computed, ref, watch } from "vue";
77
8+
import { userOwnsHistory } from "@/api";
9+
import { useConfig } from "@/composables/config";
810
import { useExtendedHistory } from "@/composables/detailedHistory";
11+
import { addHistoryViewerSubscription, removeHistoryViewerSubscription } from "@/composables/useNotificationSSE";
912
import { useHistoryStore } from "@/stores/historyStore";
13+
import { useUserStore } from "@/stores/userStore";
1014
1115
import CollectionPanel from "@/components/History/CurrentCollection/CollectionPanel.vue";
1216
import HistoryNavigation from "@/components/History/CurrentHistory/HistoryNavigation.vue";
@@ -24,6 +28,8 @@ const props = defineProps<Props>();
2428
2529
const historyStore = useHistoryStore();
2630
const { currentHistoryId, pinnedHistories } = storeToRefs(historyStore);
31+
const { currentUser } = storeToRefs(useUserStore());
32+
const { config } = useConfig();
2733
2834
const { history } = useExtendedHistory(props.source.id);
2935
@@ -33,6 +39,31 @@ const sameToCurrent = computed(() => {
3339
return currentHistoryId.value === props.source.id;
3440
});
3541
42+
// Owner routing covers histories the current user owns; only non-owned
43+
// histories need a viewer subscription. Skip in polling mode — there's no
44+
// SSE channel to push events to and the REST call would still hit the queue.
45+
const needsViewerSubscription = computed(() => {
46+
if (!config.value?.enable_sse_updates) {
47+
return false;
48+
}
49+
if (!history.value || !currentUser.value) {
50+
return false;
51+
}
52+
return !userOwnsHistory(currentUser.value, history.value);
53+
});
54+
55+
watch(
56+
[needsViewerSubscription, () => props.source.id],
57+
([subscribe, id], _previous, onCleanup) => {
58+
if (!subscribe || !id) {
59+
return;
60+
}
61+
addHistoryViewerSubscription(id);
62+
onCleanup(() => removeHistoryViewerSubscription(id));
63+
},
64+
{ immediate: true },
65+
);
66+
3667
function onViewCollection(collection: object) {
3768
selectedCollections.value = [...selectedCollections.value, collection];
3869
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/**
2+
* Unit tests for the viewer-subscription helpers in useNotificationSSE.
3+
*
4+
* The shared EventSource isn't exercised here — these tests focus on the
5+
* client-side bookkeeping: refcounting, dedup, and HTTP shape. Reconnect
6+
* replay is covered by an MSW request log assertion plus an explicit call
7+
* into the (test-only) onopen replay path.
8+
*/
9+
10+
import flushPromises from "flush-promises";
11+
import { http, HttpResponse } from "msw";
12+
import { afterEach, beforeEach, describe, expect, it } from "vitest";
13+
14+
import { useServerMock } from "@/api/client/__mocks__";
15+
16+
import {
17+
_resetHistoryViewerSubscriptionsForTest,
18+
addHistoryViewerSubscription,
19+
removeHistoryViewerSubscription,
20+
} from "./useNotificationSSE";
21+
22+
interface SubscriptionRequest {
23+
method: "POST" | "DELETE";
24+
history_ids: string[];
25+
}
26+
27+
const { server } = useServerMock();
28+
29+
describe("useNotificationSSE viewer subscriptions", () => {
30+
let requests: SubscriptionRequest[];
31+
32+
beforeEach(() => {
33+
_resetHistoryViewerSubscriptionsForTest();
34+
requests = [];
35+
const recordHandler = async ({ request }: { request: Request }) => {
36+
const body = (await request.json()) as { history_ids: string[] };
37+
requests.push({
38+
method: request.method as "POST" | "DELETE",
39+
history_ids: body.history_ids,
40+
});
41+
return new HttpResponse(null, { status: 204 });
42+
};
43+
server.use(
44+
http.post("/api/events/history-subscriptions", recordHandler),
45+
http.delete("/api/events/history-subscriptions", recordHandler),
46+
);
47+
});
48+
49+
afterEach(() => {
50+
_resetHistoryViewerSubscriptionsForTest();
51+
});
52+
53+
it("POSTs once per first subscriber for a given history id", async () => {
54+
addHistoryViewerSubscription("hist-A");
55+
await flushPromises();
56+
expect(requests).toHaveLength(1);
57+
expect(requests[0]?.method).toBe("POST");
58+
expect(requests[0]?.history_ids).toEqual(["hist-A"]);
59+
});
60+
61+
it("refcounts duplicate subscriptions — second add is a no-op on the wire", async () => {
62+
addHistoryViewerSubscription("hist-A");
63+
addHistoryViewerSubscription("hist-A");
64+
await flushPromises();
65+
expect(requests.filter((r) => r.method === "POST")).toHaveLength(1);
66+
});
67+
68+
it("only DELETEs when the last subscriber for an id releases", async () => {
69+
addHistoryViewerSubscription("hist-A");
70+
addHistoryViewerSubscription("hist-A");
71+
await flushPromises();
72+
const postCount = requests.filter((r) => r.method === "POST").length;
73+
74+
removeHistoryViewerSubscription("hist-A");
75+
await flushPromises();
76+
// First remove still has one outstanding refcount — must not DELETE yet.
77+
expect(requests.filter((r) => r.method === "DELETE")).toHaveLength(0);
78+
expect(requests.filter((r) => r.method === "POST")).toHaveLength(postCount);
79+
80+
removeHistoryViewerSubscription("hist-A");
81+
await flushPromises();
82+
const deletes = requests.filter((r) => r.method === "DELETE");
83+
expect(deletes).toHaveLength(1);
84+
expect(deletes[0]?.history_ids).toEqual(["hist-A"]);
85+
});
86+
87+
it("ignores unsubscribes for ids that were never subscribed", async () => {
88+
removeHistoryViewerSubscription("hist-never");
89+
await flushPromises();
90+
expect(requests).toHaveLength(0);
91+
});
92+
93+
it("tracks distinct history ids independently", async () => {
94+
addHistoryViewerSubscription("hist-A");
95+
addHistoryViewerSubscription("hist-B");
96+
await flushPromises();
97+
const ids = requests.filter((r) => r.method === "POST").map((r) => r.history_ids[0]);
98+
expect(new Set(ids)).toEqual(new Set(["hist-A", "hist-B"]));
99+
});
100+
});

client/src/composables/useNotificationSSE.ts

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,11 @@ function openSourceIfNeeded() {
7878
// Global readiness flag so Selenium tests can distinguish a working
7979
// SSE pipeline from the polling fallback.
8080
sseGlobals().__galaxy_sse_connected = true;
81+
// Re-assert any viewer subscriptions the user accumulated. The server
82+
// doesn't carry app-level subscription state across reconnects (it
83+
// only knows the user from the cookie), so the client owns the source
84+
// of truth and replays it on every successful open.
85+
replayViewerSubscriptionsOnOpen();
8186
};
8287

8388
sharedSource.onerror = () => {
@@ -207,3 +212,82 @@ export function useSSEConnectionStatus() {
207212
hasEverConnected: readonly(sseEverConnected),
208213
};
209214
}
215+
216+
// ---------------------------------------------------------------------------
217+
// Viewer subscriptions for non-owned histories
218+
//
219+
// Owner routing already covers history_update events for histories the
220+
// current user owns. Watching a non-owned history (e.g. a shared history
221+
// pinned in the multi-history view) requires the client to opt in by POSTing
222+
// the history id to ``/api/events/history-subscriptions``. The server keeps a
223+
// per-worker map and pushes the same history_update events to every viewer
224+
// in that map. The desired set is held module-level so reconnects can
225+
// replay it and so multiple components subscribed to the same id only emit
226+
// one HTTP call.
227+
// ---------------------------------------------------------------------------
228+
229+
const viewerSubscriptions = new Map<string, number>();
230+
231+
function postViewerSubscription(method: "POST" | "DELETE", historyIds: string[]): Promise<void> {
232+
if (historyIds.length === 0) {
233+
return Promise.resolve();
234+
}
235+
return fetch(withPrefix("/api/events/history-subscriptions"), {
236+
method,
237+
headers: { "Content-Type": "application/json", Accept: "application/json" },
238+
credentials: "same-origin",
239+
body: JSON.stringify({ history_ids: historyIds }),
240+
}).then((response) => {
241+
if (!response.ok) {
242+
throw new Error(`history-subscriptions ${method} failed: ${response.status}`);
243+
}
244+
});
245+
}
246+
247+
function replayViewerSubscriptionsOnOpen(): void {
248+
const ids = [...viewerSubscriptions.keys()];
249+
if (ids.length === 0) {
250+
return;
251+
}
252+
postViewerSubscription("POST", ids).catch((err) =>
253+
console.error("Failed to replay history viewer subscriptions on reconnect:", err),
254+
);
255+
}
256+
257+
/**
258+
* Add a viewer subscription for a history this user does not own. Refcounted
259+
* so two components watching the same history share one server-side
260+
* subscription and the first to mount opens it, last to unmount closes it.
261+
*
262+
* Idempotent per call: re-subscribing an already-tracked id does not POST a
263+
* duplicate.
264+
*/
265+
export function addHistoryViewerSubscription(historyId: string): void {
266+
const previous = viewerSubscriptions.get(historyId) ?? 0;
267+
viewerSubscriptions.set(historyId, previous + 1);
268+
if (previous === 0) {
269+
postViewerSubscription("POST", [historyId]).catch((err) =>
270+
console.error(`Failed to subscribe to history ${historyId}:`, err),
271+
);
272+
}
273+
}
274+
275+
export function removeHistoryViewerSubscription(historyId: string): void {
276+
const previous = viewerSubscriptions.get(historyId);
277+
if (!previous) {
278+
return;
279+
}
280+
if (previous > 1) {
281+
viewerSubscriptions.set(historyId, previous - 1);
282+
return;
283+
}
284+
viewerSubscriptions.delete(historyId);
285+
postViewerSubscription("DELETE", [historyId]).catch((err) =>
286+
console.error(`Failed to unsubscribe from history ${historyId}:`, err),
287+
);
288+
}
289+
290+
/** Test-only: drain the desired set so per-test state doesn't leak. */
291+
export function _resetHistoryViewerSubscriptionsForTest(): void {
292+
viewerSubscriptions.clear();
293+
}

0 commit comments

Comments
 (0)