diff --git a/client/src/api/schema/schema.ts b/client/src/api/schema/schema.ts index 30706ea28737..f02125d0832a 100644 --- a/client/src/api/schema/schema.ts +++ b/client/src/api/schema/schema.ts @@ -1292,6 +1292,31 @@ export interface paths { patch?: never; trace?: never; }; + "/api/events/history-subscriptions": { + parameters: { + query?: never; + header?: never; + path?: never; + cookie?: never; + }; + get?: never; + put?: never; + /** + * Subscribe to history_update SSE events for histories you don't own. + * @description Asks every webapp worker to start routing ``history_update`` events + * for these histories to the requesting user/session, in addition to the + * default owner-routing. Idempotent: re-subscribing to the same id is a + * no-op. Clients re-send the full set after each ``EventSource.onopen`` + * so reconnects don't drop subscriptions. + */ + post: operations["subscribe_history_viewer_api_events_history_subscriptions_post"]; + /** Cancel viewer subscriptions for these histories. */ + delete: operations["unsubscribe_history_viewer_api_events_history_subscriptions_delete"]; + options?: never; + head?: never; + patch?: never; + trace?: never; + }; "/api/events/stream": { parameters: { query?: never; @@ -15433,6 +15458,14 @@ export interface components { */ url: string; }; + /** + * HistoryViewerSubscriptionPayload + * @description REST payload for ``/api/events/history-subscriptions`` endpoints. + */ + HistoryViewerSubscriptionPayload: { + /** History Ids */ + history_ids: string[]; + }; /** * Hyperlink * @description Represents some text with an Hyperlink. @@ -33478,6 +33511,92 @@ export interface operations { }; }; }; + subscribe_history_viewer_api_events_history_subscriptions_post: { + parameters: { + query?: never; + header?: { + /** @description The user ID that will be used to effectively make this API call. Only admins and designated users can make API calls on behalf of other users. */ + "run-as"?: string | null; + }; + path?: never; + cookie?: never; + }; + requestBody: { + content: { + "application/json": components["schemas"]["HistoryViewerSubscriptionPayload"]; + }; + }; + responses: { + /** @description Successful Response */ + 204: { + headers: { + [name: string]: unknown; + }; + content?: never; + }; + /** @description Request Error */ + "4XX": { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["MessageExceptionModel"]; + }; + }; + /** @description Server Error */ + "5XX": { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["MessageExceptionModel"]; + }; + }; + }; + }; + unsubscribe_history_viewer_api_events_history_subscriptions_delete: { + parameters: { + query?: never; + header?: { + /** @description The user ID that will be used to effectively make this API call. Only admins and designated users can make API calls on behalf of other users. */ + "run-as"?: string | null; + }; + path?: never; + cookie?: never; + }; + requestBody: { + content: { + "application/json": components["schemas"]["HistoryViewerSubscriptionPayload"]; + }; + }; + responses: { + /** @description Successful Response */ + 204: { + headers: { + [name: string]: unknown; + }; + content?: never; + }; + /** @description Request Error */ + "4XX": { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["MessageExceptionModel"]; + }; + }; + /** @description Server Error */ + "5XX": { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["MessageExceptionModel"]; + }; + }; + }; + }; stream_events_api_events_stream_get: { parameters: { query?: never; diff --git a/client/src/components/History/CurrentHistory/HistoryCounter.test.ts b/client/src/components/History/CurrentHistory/HistoryCounter.test.ts new file mode 100644 index 000000000000..795223d13d19 --- /dev/null +++ b/client/src/components/History/CurrentHistory/HistoryCounter.test.ts @@ -0,0 +1,202 @@ +import { getLocalVue } from "@tests/vitest/helpers"; +import { shallowMount } from "@vue/test-utils"; +import flushPromises from "flush-promises"; +import { createPinia, setActivePinia } from "pinia"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +import type { RegisteredUser } from "@/api"; +import { useServerMock } from "@/api/client/__mocks__"; +import { setSseConnected, setSseHasEverConnected, sseMockFactory } from "@/stores/_testing/sseStoreSupport"; +import { useConfigStore } from "@/stores/configurationStore"; +import { useUserStore } from "@/stores/userStore"; + +import HistoryCounter from "./HistoryCounter.vue"; + +const sseState = vi.hoisted(() => ({ + onEvent: null as ((event: MessageEvent) => void) | null, + connect: vi.fn(), + disconnect: vi.fn(), +})); + +vi.mock("@/composables/useNotificationSSE", () => sseMockFactory(sseState)); + +// userStore wires its localStorage-backed refs through this composable; the +// real watcher hits ``window.localStorage`` which jsdom doesn't expose with a +// usable Storage prototype here. We don't read any of those refs in this +// test, so a ref-returning stub is enough to keep userStore initialization +// happy. +vi.mock("@/composables/userLocalStorageFromHashedId", async () => { + const { ref } = await import("vue"); + return { + useUserLocalStorageFromHashId: (_key: string, initialValue: T) => ref(initialValue), + }; +}); + +const { server, http } = useServerMock(); + +const localVue = getLocalVue(); + +const baseHistory = { + id: "hist-1", + name: "Test history", + user_id: "user-1", + size: 0, + contents_active: { active: 0, deleted: 0, hidden: 0 }, + update_time: new Date().toISOString(), + create_time: new Date().toISOString(), + deleted: false, + archived: false, + purged: false, + published: false, +}; + +function registerConfigHandler(enableSse: boolean): void { + server.use( + http.get("/api/configuration", ({ response }) => { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + return response(200).json({ enable_sse_updates: enableSse } as any); + }), + ); +} + +function setEnableSse(enabled: boolean): void { + registerConfigHandler(enabled); + // The store kicks off ``loadConfig`` on creation; ``setConfiguration`` + // makes the value visible synchronously regardless of the network round + // trip so the component reads it on mount. + useConfigStore().setConfiguration({ enable_sse_updates: enabled } as never); + // The refresh button is gated on ``currentUser``; without a logged-in + // user the BButtonGroup that contains it is never rendered. + useUserStore().currentUser = { id: "user-1", email: "u@example.com" } as RegisteredUser; +} + +function mountCounter(props: Partial<{ lastChecked: Date; isWatching: boolean }> = {}) { + return shallowMount(HistoryCounter as unknown as object, { + propsData: { + history: baseHistory, + lastChecked: props.lastChecked ?? new Date(), + isWatching: props.isWatching ?? true, + }, + localVue, + }); +} + +function refreshButton(wrapper: ReturnType) { + return wrapper.get(".history-refresh-button"); +} + +describe("HistoryCounter — refresh button", () => { + beforeEach(() => { + setActivePinia(createPinia()); + sseState.connect.mockClear(); + sseState.disconnect.mockClear(); + // sseMockFactory lazily creates these refs on first call; reset to a + // known state for each test. + Reflect.deleteProperty(sseState, "connected"); + Reflect.deleteProperty(sseState, "hasEverConnected"); + // Re-create refs by invoking the factory once — every component mount + // already triggers this, but doing it explicitly makes the per-test + // state setup obvious. + sseMockFactory(sseState); + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + describe("SSE mode", () => { + beforeEach(() => { + setEnableSse(true); + }); + + it('shows "Refresh history" with a link variant when the connection is healthy', async () => { + setSseConnected(sseState, true); + setSseHasEverConnected(sseState, true); + + const wrapper = mountCounter(); + await flushPromises(); + + const button = refreshButton(wrapper); + expect(button.attributes("title")).toBe("Refresh history"); + expect(button.attributes("variant")).toBe("link"); + }); + + it("does not flag the initial-connect window as a connection loss", async () => { + // EventSource hasn't opened yet — connected=false, hasEverConnected=false. + setSseConnected(sseState, false); + setSseHasEverConnected(sseState, false); + + const wrapper = mountCounter(); + await flushPromises(); + + const button = refreshButton(wrapper); + expect(button.attributes("title")).toBe("Refresh history"); + expect(button.attributes("variant")).toBe("link"); + }); + + it("turns red when the SSE connection is lost after a successful open", async () => { + setSseConnected(sseState, true); + setSseHasEverConnected(sseState, true); + + const wrapper = mountCounter(); + await flushPromises(); + + // Simulate the EventSource onerror path: connection drops after + // it had previously been established. + setSseConnected(sseState, false); + await flushPromises(); + + const button = refreshButton(wrapper); + expect(button.attributes("title")).toBe("Live updates disconnected. Click to refresh."); + expect(button.attributes("variant")).toBe("danger"); + }); + }); + + describe("polling mode", () => { + beforeEach(() => { + setEnableSse(false); + }); + + it("shows the legacy 'Last refreshed …' title with a link variant when fresh", async () => { + const wrapper = mountCounter({ lastChecked: new Date(), isWatching: true }); + await flushPromises(); + + const button = refreshButton(wrapper); + expect(button.attributes("title")).toMatch(/^Last refreshed .+ ago$/); + expect(button.attributes("variant")).toBe("link"); + }); + + it("turns red after 2 minutes of staleness", async () => { + // 3 minutes ago — past the 120000ms cutoff in HistoryCounter. + const stale = new Date(Date.now() - 3 * 60 * 1000); + const wrapper = mountCounter({ lastChecked: stale, isWatching: true }); + await flushPromises(); + + const button = refreshButton(wrapper); + expect(button.attributes("title")).toMatch(/Consider reloading the page\.$/); + expect(button.attributes("variant")).toBe("danger"); + }); + + it("turns red when the resource watcher reports it is no longer watching", async () => { + const wrapper = mountCounter({ lastChecked: new Date(), isWatching: false }); + await flushPromises(); + + const button = refreshButton(wrapper); + expect(button.attributes("variant")).toBe("danger"); + }); + }); + + it("emits reloadContents when the refresh button is clicked", async () => { + setEnableSse(true); + setSseConnected(sseState, true); + setSseHasEverConnected(sseState, true); + + const wrapper = mountCounter(); + await flushPromises(); + await refreshButton(wrapper).trigger("click"); + + expect(wrapper.emitted("reloadContents")).toBeTruthy(); + expect(wrapper.emitted("reloadContents")?.length).toBe(1); + }); +}); diff --git a/client/src/components/History/CurrentHistory/HistoryCounter.vue b/client/src/components/History/CurrentHistory/HistoryCounter.vue index 16331bb94eb4..bf822c56521a 100644 --- a/client/src/components/History/CurrentHistory/HistoryCounter.vue +++ b/client/src/components/History/CurrentHistory/HistoryCounter.vue @@ -11,7 +11,9 @@ import { useRouter } from "vue-router/composables"; import { type HistorySummaryExtended, userOwnsHistory } from "@/api"; import { HistoryFilters } from "@/components/History/HistoryFilters.js"; +import { useConfig } from "@/composables/config"; import { useHistoryContentStats } from "@/composables/historyContentStats"; +import { useSSEConnectionStatus } from "@/composables/useNotificationSSE"; import { useUserStore } from "@/stores/userStore"; import localize from "@/utils/localization"; @@ -39,10 +41,18 @@ const emit = defineEmits(["update:filter-text", "reloadContents"]); const router = useRouter(); const { currentUser, isAnonymous } = storeToRefs(useUserStore()); +const { config } = useConfig(); +const { connected: sseConnected, hasEverConnected: sseHasEverConnected } = useSSEConnectionStatus(); const { historySize, numItemsActive, numItemsDeleted, numItemsHidden } = useHistoryContentStats( toRef(props, "history"), ); +const sseMode = computed(() => config.value?.enable_sse_updates === true); +// Treat the connection as "lost" only after a successful open: the brief +// initial-connect window where ``sseConnected`` is still false isn't a +// real outage and shouldn't go red. +const sseLost = computed(() => sseMode.value && sseHasEverConnected.value && !sseConnected.value); + const reloadButtonLoading = ref(false); const reloadButtonTitle = ref(""); const reloadButtonVariant = ref("link"); @@ -84,6 +94,20 @@ function getCurrentFilterVal(filter: string) { } function updateTime() { + if (sseMode.value) { + // Under SSE the "last checked" timestamp ticks only when the history + // actually changes — a 2-minute idle window is normal and shouldn't + // be presented as staleness. Surface a connection-lost warning + // instead, gated on a previous successful open. + if (sseLost.value) { + reloadButtonTitle.value = "Live updates disconnected. Click to refresh."; + reloadButtonVariant.value = "danger"; + } else { + reloadButtonTitle.value = "Refresh history"; + reloadButtonVariant.value = "link"; + } + return; + } const diffToNow = formatDistanceToNowStrict(props.lastChecked, { addSuffix: true }); const diffToNowSec = Date.now().valueOf() - props.lastChecked.valueOf(); // if history isn't being watched or hasn't been watched/polled for over 2 minutes @@ -104,9 +128,14 @@ async function reloadContents() { }, 1000); } +// Re-render the button as soon as the SSE state flips, instead of waiting up +// to a second for the next setInterval tick — connection-loss feedback should +// be immediate. +watchImmediate([sseMode, sseLost], updateTime); + onMounted(() => { - updateTime(); - // update every second + // The polling-mode title is derived from a wall-clock diff that has no + // reactive dependency, so a 1s tick keeps "Last refreshed Xs ago" fresh. setInterval(updateTime, 1000); }); diff --git a/client/src/components/History/CurrentHistory/HistoryPanel.vue b/client/src/components/History/CurrentHistory/HistoryPanel.vue index edca83c38372..540332b1a24f 100644 --- a/client/src/components/History/CurrentHistory/HistoryPanel.vue +++ b/client/src/components/History/CurrentHistory/HistoryPanel.vue @@ -4,6 +4,7 @@ import { storeToRefs } from "pinia"; import { computed, onMounted, ref, set as VueSet, unref, watch } from "vue"; import { type HistoryItemSummary, type HistorySummaryExtended, userOwnsHistory } from "@/api"; +import { getGalaxyInstance } from "@/app"; import ExpandedItems from "@/components/History/Content/ExpandedItems"; import { HistoryFilters } from "@/components/History/HistoryFilters"; import { deleteContent, updateContentFields } from "@/components/History/model/queries"; @@ -13,6 +14,7 @@ import { useHistoryStore } from "@/stores/historyStore"; import { useUserStore } from "@/stores/userStore"; import { type Alias, getOperatorForAlias } from "@/utils/filtering"; import { setItemDragstart } from "@/utils/setDrag"; +import { refreshHistoryFromPush } from "@/watch/watchHistory"; import { useHistoryDragDrop } from "../../../composables/historyDragDrop"; @@ -328,7 +330,11 @@ function updateContentStats() { } function reloadContents() { - historyStore.startWatchingHistory(); + // ``startWatchingHistory`` is idempotent, so the prior call did nothing + // once SSE/polling was already initialized. Force a refresh through the + // same code path SSE pushes use so the user-initiated click actually + // re-fetches the history and items. + refreshHistoryFromPush(getGalaxyInstance()).catch((err) => console.error("Manual history refresh failed:", err)); } function setInvisible(item: HistoryItemSummary) { diff --git a/client/src/components/History/Multiple/MultipleViewItem.vue b/client/src/components/History/Multiple/MultipleViewItem.vue index 09f183069a15..c4d9991fb0ea 100644 --- a/client/src/components/History/Multiple/MultipleViewItem.vue +++ b/client/src/components/History/Multiple/MultipleViewItem.vue @@ -3,10 +3,14 @@ import { faTimes } from "@fortawesome/free-solid-svg-icons"; import { FontAwesomeIcon } from "@fortawesome/vue-fontawesome"; import { BButton } from "bootstrap-vue"; import { storeToRefs } from "pinia"; -import { computed, ref } from "vue"; +import { computed, ref, watch } from "vue"; +import { userOwnsHistory } from "@/api"; +import { useConfig } from "@/composables/config"; import { useExtendedHistory } from "@/composables/detailedHistory"; +import { addHistoryViewerSubscription, removeHistoryViewerSubscription } from "@/composables/useNotificationSSE"; import { useHistoryStore } from "@/stores/historyStore"; +import { useUserStore } from "@/stores/userStore"; import CollectionPanel from "@/components/History/CurrentCollection/CollectionPanel.vue"; import HistoryNavigation from "@/components/History/CurrentHistory/HistoryNavigation.vue"; @@ -24,6 +28,8 @@ const props = defineProps(); const historyStore = useHistoryStore(); const { currentHistoryId, pinnedHistories } = storeToRefs(historyStore); +const { currentUser } = storeToRefs(useUserStore()); +const { config } = useConfig(); const { history } = useExtendedHistory(props.source.id); @@ -33,6 +39,31 @@ const sameToCurrent = computed(() => { return currentHistoryId.value === props.source.id; }); +// Owner routing covers histories the current user owns; only non-owned +// histories need a viewer subscription. Skip in polling mode — there's no +// SSE channel to push events to and the REST call would still hit the queue. +const needsViewerSubscription = computed(() => { + if (!config.value?.enable_sse_updates) { + return false; + } + if (!history.value || !currentUser.value) { + return false; + } + return !userOwnsHistory(currentUser.value, history.value); +}); + +watch( + [needsViewerSubscription, () => props.source.id], + ([subscribe, id], _previous, onCleanup) => { + if (!subscribe || !id) { + return; + } + addHistoryViewerSubscription(id); + onCleanup(() => removeHistoryViewerSubscription(id)); + }, + { immediate: true }, +); + function onViewCollection(collection: object) { selectedCollections.value = [...selectedCollections.value, collection]; } diff --git a/client/src/composables/useNotificationSSE.test.ts b/client/src/composables/useNotificationSSE.test.ts new file mode 100644 index 000000000000..9675e87963fc --- /dev/null +++ b/client/src/composables/useNotificationSSE.test.ts @@ -0,0 +1,100 @@ +/** + * Unit tests for the viewer-subscription helpers in useNotificationSSE. + * + * The shared EventSource isn't exercised here — these tests focus on the + * client-side bookkeeping: refcounting, dedup, and HTTP shape. Reconnect + * replay is covered by an MSW request log assertion plus an explicit call + * into the (test-only) onopen replay path. + */ + +import flushPromises from "flush-promises"; +import { http, HttpResponse } from "msw"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; + +import { useServerMock } from "@/api/client/__mocks__"; + +import { + _resetHistoryViewerSubscriptionsForTest, + addHistoryViewerSubscription, + removeHistoryViewerSubscription, +} from "./useNotificationSSE"; + +interface SubscriptionRequest { + method: "POST" | "DELETE"; + history_ids: string[]; +} + +const { server } = useServerMock(); + +describe("useNotificationSSE viewer subscriptions", () => { + let requests: SubscriptionRequest[]; + + beforeEach(() => { + _resetHistoryViewerSubscriptionsForTest(); + requests = []; + const recordHandler = async ({ request }: { request: Request }) => { + const body = (await request.json()) as { history_ids: string[] }; + requests.push({ + method: request.method as "POST" | "DELETE", + history_ids: body.history_ids, + }); + return new HttpResponse(null, { status: 204 }); + }; + server.use( + http.post("/api/events/history-subscriptions", recordHandler), + http.delete("/api/events/history-subscriptions", recordHandler), + ); + }); + + afterEach(() => { + _resetHistoryViewerSubscriptionsForTest(); + }); + + it("POSTs once per first subscriber for a given history id", async () => { + addHistoryViewerSubscription("hist-A"); + await flushPromises(); + expect(requests).toHaveLength(1); + expect(requests[0]?.method).toBe("POST"); + expect(requests[0]?.history_ids).toEqual(["hist-A"]); + }); + + it("refcounts duplicate subscriptions — second add is a no-op on the wire", async () => { + addHistoryViewerSubscription("hist-A"); + addHistoryViewerSubscription("hist-A"); + await flushPromises(); + expect(requests.filter((r) => r.method === "POST")).toHaveLength(1); + }); + + it("only DELETEs when the last subscriber for an id releases", async () => { + addHistoryViewerSubscription("hist-A"); + addHistoryViewerSubscription("hist-A"); + await flushPromises(); + const postCount = requests.filter((r) => r.method === "POST").length; + + removeHistoryViewerSubscription("hist-A"); + await flushPromises(); + // First remove still has one outstanding refcount — must not DELETE yet. + expect(requests.filter((r) => r.method === "DELETE")).toHaveLength(0); + expect(requests.filter((r) => r.method === "POST")).toHaveLength(postCount); + + removeHistoryViewerSubscription("hist-A"); + await flushPromises(); + const deletes = requests.filter((r) => r.method === "DELETE"); + expect(deletes).toHaveLength(1); + expect(deletes[0]?.history_ids).toEqual(["hist-A"]); + }); + + it("ignores unsubscribes for ids that were never subscribed", async () => { + removeHistoryViewerSubscription("hist-never"); + await flushPromises(); + expect(requests).toHaveLength(0); + }); + + it("tracks distinct history ids independently", async () => { + addHistoryViewerSubscription("hist-A"); + addHistoryViewerSubscription("hist-B"); + await flushPromises(); + const ids = requests.filter((r) => r.method === "POST").map((r) => r.history_ids[0]); + expect(new Set(ids)).toEqual(new Set(["hist-A", "hist-B"])); + }); +}); diff --git a/client/src/composables/useNotificationSSE.ts b/client/src/composables/useNotificationSSE.ts index 8bdae964f2e7..2d5a561a0549 100644 --- a/client/src/composables/useNotificationSSE.ts +++ b/client/src/composables/useNotificationSSE.ts @@ -1,4 +1,4 @@ -import { onScopeDispose, ref } from "vue"; +import { onScopeDispose, readonly, ref } from "vue"; import { withPrefix } from "@/utils/redirect"; @@ -39,6 +39,10 @@ type Handler = (event: MessageEvent) => void; let sharedSource: EventSource | null = null; const sharedConnected = ref(false); +// True once the SSE connection has succeeded at least once in this session. +// Used by UI to distinguish "still connecting" from "was connected, dropped" +// — only the latter should surface a connection-lost warning. +const sseEverConnected = ref(false); const subscribers: Map> = new Map(); // Track the per-type dispatchers we registered so ``closeSource`` removes the // exact same listeners (``addEventListener`` matches by reference). @@ -70,9 +74,15 @@ function openSourceIfNeeded() { sharedSource.onopen = () => { sharedConnected.value = true; + sseEverConnected.value = true; // Global readiness flag so Selenium tests can distinguish a working // SSE pipeline from the polling fallback. sseGlobals().__galaxy_sse_connected = true; + // Re-assert any viewer subscriptions the user accumulated. The server + // doesn't carry app-level subscription state across reconnects (it + // only knows the user from the cookie), so the client owns the source + // of truth and replays it on every successful open. + replayViewerSubscriptionsOnOpen(); }; sharedSource.onerror = () => { @@ -189,3 +199,95 @@ export function useSSE(onEvent: Handler, eventTypes: readonly SSEEventType[] = S * @deprecated Use `useSSE` instead. This alias exists for backward compatibility. */ export const useNotificationSSE = useSSE; + +/** + * Read-only handle on the shared SSE connection state. ``connected`` flips + * with the EventSource lifecycle; ``hasEverConnected`` latches true on the + * first successful open so callers can ignore the initial-connect window + * when surfacing a "connection lost" warning. + */ +export function useSSEConnectionStatus() { + return { + connected: readonly(sharedConnected), + hasEverConnected: readonly(sseEverConnected), + }; +} + +// --------------------------------------------------------------------------- +// Viewer subscriptions for non-owned histories +// +// Owner routing already covers history_update events for histories the +// current user owns. Watching a non-owned history (e.g. a shared history +// pinned in the multi-history view) requires the client to opt in by POSTing +// the history id to ``/api/events/history-subscriptions``. The server keeps a +// per-worker map and pushes the same history_update events to every viewer +// in that map. The desired set is held module-level so reconnects can +// replay it and so multiple components subscribed to the same id only emit +// one HTTP call. +// --------------------------------------------------------------------------- + +const viewerSubscriptions = new Map(); + +function postViewerSubscription(method: "POST" | "DELETE", historyIds: string[]): Promise { + if (historyIds.length === 0) { + return Promise.resolve(); + } + return fetch(withPrefix("/api/events/history-subscriptions"), { + method, + headers: { "Content-Type": "application/json", Accept: "application/json" }, + credentials: "same-origin", + body: JSON.stringify({ history_ids: historyIds }), + }).then((response) => { + if (!response.ok) { + throw new Error(`history-subscriptions ${method} failed: ${response.status}`); + } + }); +} + +function replayViewerSubscriptionsOnOpen(): void { + const ids = [...viewerSubscriptions.keys()]; + if (ids.length === 0) { + return; + } + postViewerSubscription("POST", ids).catch((err) => + console.error("Failed to replay history viewer subscriptions on reconnect:", err), + ); +} + +/** + * Add a viewer subscription for a history this user does not own. Refcounted + * so two components watching the same history share one server-side + * subscription and the first to mount opens it, last to unmount closes it. + * + * Idempotent per call: re-subscribing an already-tracked id does not POST a + * duplicate. + */ +export function addHistoryViewerSubscription(historyId: string): void { + const previous = viewerSubscriptions.get(historyId) ?? 0; + viewerSubscriptions.set(historyId, previous + 1); + if (previous === 0) { + postViewerSubscription("POST", [historyId]).catch((err) => + console.error(`Failed to subscribe to history ${historyId}:`, err), + ); + } +} + +export function removeHistoryViewerSubscription(historyId: string): void { + const previous = viewerSubscriptions.get(historyId); + if (!previous) { + return; + } + if (previous > 1) { + viewerSubscriptions.set(historyId, previous - 1); + return; + } + viewerSubscriptions.delete(historyId); + postViewerSubscription("DELETE", [historyId]).catch((err) => + console.error(`Failed to unsubscribe from history ${historyId}:`, err), + ); +} + +/** Test-only: drain the desired set so per-test state doesn't leak. */ +export function _resetHistoryViewerSubscriptionsForTest(): void { + viewerSubscriptions.clear(); +} diff --git a/client/src/stores/_testing/sseStoreSupport.ts b/client/src/stores/_testing/sseStoreSupport.ts index b3e5d97fcb89..3313b5a5d154 100644 --- a/client/src/stores/_testing/sseStoreSupport.ts +++ b/client/src/stores/_testing/sseStoreSupport.ts @@ -21,20 +21,28 @@ export interface SSEMockState { connect: ReturnType; disconnect: ReturnType; connected?: Ref; + hasEverConnected?: Ref; } /** Build the factory used with ``vi.mock("@/composables/useNotificationSSE", ...)``. */ export function sseMockFactory(state: SSEMockState) { - // Lazily initialize ``connected`` so existing callers that don't pass it - // still get a working ref. + // Lazily initialize the connection refs so existing callers that don't + // pre-populate them still get working refs. if (!state.connected) { state.connected = ref(false); } + if (!state.hasEverConnected) { + state.hasEverConnected = ref(false); + } return { useSSE: vi.fn((onEvent: (event: MessageEvent) => void) => { state.onEvent = onEvent; return { connect: state.connect, disconnect: state.disconnect, connected: state.connected }; }), + useSSEConnectionStatus: vi.fn(() => ({ + connected: state.connected, + hasEverConnected: state.hasEverConnected, + })), }; } @@ -46,6 +54,25 @@ export function emitSse(state: SSEMockState, type: string, payload: unknown): vo state.onEvent(new MessageEvent(type, { data: JSON.stringify(payload) })); } +/** Set the mocked ``connected`` flag, lazy-initializing the ref if a test reads + * it before any component has mounted. */ +export function setSseConnected(state: SSEMockState, value: boolean): void { + if (!state.connected) { + state.connected = ref(value); + } else { + state.connected.value = value; + } +} + +/** Set the mocked ``hasEverConnected`` latch, lazy-initializing the ref. */ +export function setSseHasEverConnected(state: SSEMockState, value: boolean): void { + if (!state.hasEverConnected) { + state.hasEverConnected = ref(value); + } else { + state.hasEverConnected.value = value; + } +} + /** * Save the current ``document.visibilityState`` descriptor and return a restorer. * Call the restorer in ``afterEach`` to prevent patching from leaking into later tests. diff --git a/client/src/stores/historyStore.ts b/client/src/stores/historyStore.ts index f0f6b73fc4b6..f0c8275242a5 100644 --- a/client/src/stores/historyStore.ts +++ b/client/src/stores/historyStore.ts @@ -18,6 +18,7 @@ import { useResourceWatcher } from "@/composables/resourceWatcher"; import { useSSE } from "@/composables/useNotificationSSE"; import { useUserLocalStorage } from "@/composables/userLocalStorage"; import { useConfigStore } from "@/stores/configurationStore"; +import { useHistoryItemsStore } from "@/stores/historyItemsStore"; import { createAndSelectNewHistory, getCurrentHistoryFromServer, @@ -408,14 +409,37 @@ export const useHistoryStore = defineStore("historyStore", () => { try { const data = JSON.parse(event.data); const changedHistoryIds: string[] = data.history_ids ?? []; - if (currentHistoryId.value && changedHistoryIds.includes(currentHistoryId.value)) { + if (!changedHistoryIds.length) { + return; + } + const currentId = currentHistoryId.value; + if (currentId && changedHistoryIds.includes(currentId)) { // SSE is itself the signal that the history changed — force the // refresh so the update_time short-circuit in watchHistoryOnce // can't suppress the contents fetch. const app = getGalaxyInstance(); refreshHistoryFromPushSuppliedApp(app).catch((err) => - console.error("Error refreshing history from SSE push:", err), + console.error("Error refreshing current history from SSE push:", err), + ); + } + // Also refresh other tracked histories (e.g. visible in the + // multi-history view). Skip ids not in storedHistories — there's + // no rendered panel for them, so refetching wastes a request. + const itemsStore = useHistoryItemsStore(); + for (const id of changedHistoryIds) { + if (id === currentId) { + continue; + } + if (!storedHistories.value[id]) { + continue; + } + const filterText = storedFilterTexts.value[id] ?? ""; + updateContentStats(id).catch((err) => + console.error(`Error updating content stats for history ${id}:`, err), ); + itemsStore + .fetchHistoryItems(id, filterText, 0) + .catch((err) => console.error(`Error fetching items for history ${id}:`, err)); } } catch (e) { console.error("Error handling history SSE event:", e); diff --git a/lib/galaxy/celery/tasks.py b/lib/galaxy/celery/tasks.py index c5f5f3e064ff..8a4f2c1b0aa1 100644 --- a/lib/galaxy/celery/tasks.py +++ b/lib/galaxy/celery/tasks.py @@ -82,10 +82,7 @@ Vault, ) from galaxy.short_term_storage import ShortTermStorageMonitor -from galaxy.structured_app import ( - MinimalManagerApp, - StructuredApp, -) +from galaxy.structured_app import MinimalManagerApp from galaxy.tools import create_tool_from_representation from galaxy.tools.data_fetch import do_fetch from galaxy.util import galaxy_directory @@ -670,7 +667,7 @@ def dispatch_pending_notifications(notification_manager: NotificationManager): @galaxy_task(action="emit queue and SSE observability metrics") -def emit_queue_metrics_task(app: StructuredApp): +def emit_queue_metrics_task(app: MinimalManagerApp): """Sample control-queue depth, SSE connection count, and worker rows → statsd. Resolves the narrow collaborators ``emit_queue_metrics`` needs from the app diff --git a/lib/galaxy/managers/sse.py b/lib/galaxy/managers/sse.py index 0284b1786b6b..70d83cbf653a 100644 --- a/lib/galaxy/managers/sse.py +++ b/lib/galaxy/managers/sse.py @@ -85,6 +85,12 @@ def __init__(self, statsd_client: Optional[VanillaGalaxyStatsdClient] = None) -> self._broadcast_connections: set[asyncio.Queue] = set() self._loop: Optional[asyncio.AbstractEventLoop] = None self._statsd_client = statsd_client + # Viewer subscriptions for non-owned histories. Each worker keeps its + # own copy; the producer fans out subscribe/unsubscribe via Kombu so + # every webapp process learns about it. Maps encoded history_id (str) + # to the set of user_ids / session_ids that have asked for events. + self._history_viewers_user: dict[str, set[int]] = defaultdict(set) + self._history_viewers_session: dict[str, set[int]] = defaultdict(set) def _ensure_loop(self) -> None: """Capture the running asyncio event loop. Must be called from async context.""" @@ -134,10 +140,16 @@ def disconnect( self._connections[user_id].discard(queue) if not self._connections[user_id]: del self._connections[user_id] + # Last connection for this user on this worker — drop their + # viewer subscriptions so the maps don't grow unbounded over + # long-lived processes. The client re-asserts subscriptions + # on every ``onopen`` so the next reconnect restores them. + self._purge_user_viewer_subscriptions(user_id) if galaxy_session_id is not None: self._session_connections[galaxy_session_id].discard(queue) if not self._session_connections[galaxy_session_id]: del self._session_connections[galaxy_session_id] + self._purge_session_viewer_subscriptions(galaxy_session_id) self._broadcast_connections.discard(queue) log.debug( "SSE connection closed for user_id=%s session_id=%s (total=%d)", @@ -167,6 +179,60 @@ def push_broadcast(self, event: SSEEvent) -> None: for queue in list(self._broadcast_connections): self._safe_put(queue, event) + def push_to_history_viewers(self, history_id: str, event: SSEEvent) -> None: + """Thread-safe. Push an event to every user/session that has subscribed + to ``history_id`` as a viewer (i.e. is watching a history they don't + own). Owner routing remains a separate path so the common "user changes + their own history" case never depends on a subscribe round trip. + """ + for user_id in list(self._history_viewers_user.get(history_id, ())): + self.push_to_user(user_id, event) + for session_id in list(self._history_viewers_session.get(history_id, ())): + self.push_to_session(session_id, event) + + # -- Viewer subscription bookkeeping -- + # These mutate per-worker state and are called from the Kombu task thread + # via the ``subscribe_history_viewer`` / ``unsubscribe_history_viewer`` + # control tasks. They are not thread-safe with each other (defaultdict + # mutations are not atomic), but Kombu drains the control queue serially + # so concurrent calls into a single manager don't happen in practice. + + def subscribe_user_viewer(self, user_id: int, history_id: str) -> None: + self._history_viewers_user[history_id].add(user_id) + + def unsubscribe_user_viewer(self, user_id: int, history_id: str) -> None: + viewers = self._history_viewers_user.get(history_id) + if viewers is None: + return + viewers.discard(user_id) + if not viewers: + del self._history_viewers_user[history_id] + + def subscribe_session_viewer(self, session_id: int, history_id: str) -> None: + self._history_viewers_session[history_id].add(session_id) + + def unsubscribe_session_viewer(self, session_id: int, history_id: str) -> None: + viewers = self._history_viewers_session.get(history_id) + if viewers is None: + return + viewers.discard(session_id) + if not viewers: + del self._history_viewers_session[history_id] + + def _purge_user_viewer_subscriptions(self, user_id: int) -> None: + """Drop every viewer subscription this user holds on this worker. + + Called after their last connection on this worker closes. The map of + ``history_id → user_ids`` is small enough that a full scan is fine — + the inverse index isn't worth maintaining at this scale. + """ + for history_id in list(self._history_viewers_user.keys()): + self.unsubscribe_user_viewer(user_id, history_id) + + def _purge_session_viewer_subscriptions(self, session_id: int) -> None: + for history_id in list(self._history_viewers_session.keys()): + self.unsubscribe_session_viewer(session_id, history_id) + def _safe_put(self, queue: asyncio.Queue, event: SSEEvent) -> None: """Cross the thread boundary safely using ``call_soon_threadsafe``.""" if self._loop is None or self._loop.is_closed(): diff --git a/lib/galaxy/managers/sse_dispatch.py b/lib/galaxy/managers/sse_dispatch.py index 9fbfb8a16164..efa2372f438f 100644 --- a/lib/galaxy/managers/sse_dispatch.py +++ b/lib/galaxy/managers/sse_dispatch.py @@ -150,6 +150,39 @@ def history_update( kwargs["session_updates"] = session_updates self._send("history_update", kwargs) + def subscribe_history_viewer( + self, + history_id: str, + user_id: Optional[int] = None, + session_id: Optional[int] = None, + ) -> None: + """Broadcast a viewer-subscription record so every webapp worker can + push history_update events to a user/session watching a history they + don't own. The dispatch is fire-and-forget — workers apply the + record locally on receipt. Multi-worker deployments rely on this to + keep subscriptions consistent regardless of which worker fields the + REST call. + """ + kwargs: dict[str, Any] = {"history_id": history_id} + if user_id is not None: + kwargs["user_id"] = user_id + if session_id is not None: + kwargs["session_id"] = session_id + self._send("subscribe_history_viewer", kwargs) + + def unsubscribe_history_viewer( + self, + history_id: str, + user_id: Optional[int] = None, + session_id: Optional[int] = None, + ) -> None: + kwargs: dict[str, Any] = {"history_id": history_id} + if user_id is not None: + kwargs["user_id"] = user_id + if session_id is not None: + kwargs["session_id"] = session_id + self._send("unsubscribe_history_viewer", kwargs) + def entry_point_update(self, user_id: int, event_id: Optional[str] = None) -> None: """Fan out a wake-up ``entry_point_update`` event for one user. diff --git a/lib/galaxy/queue_worker/__init__.py b/lib/galaxy/queue_worker/__init__.py index 2bbff9e74e85..425dec9dce31 100644 --- a/lib/galaxy/queue_worker/__init__.py +++ b/lib/galaxy/queue_worker/__init__.py @@ -88,6 +88,20 @@ class EntryPointUpdatePayload(TypedDict, total=False): event_id: Optional[str] +class HistoryViewerSubscriptionPayload(TypedDict, total=False): + """Wire contract for the (un)subscribe_history_viewer control-task kwargs. + + Either ``user_id`` or ``session_id`` is set, never both. ``history_id`` is + the encoded string the client used in its REST call — the dispatch path + never needs to decode it because viewer routing only fans out to live + queues, not to anything that touches the database. + """ + + history_id: str + user_id: int + session_id: int + + def send_local_control_task( app: "StructuredApp", task: str, @@ -431,23 +445,79 @@ def history_update(app: "MinimalManagerApp", **kwargs) -> None: stays free of presentation/security concerns. Handles both user-keyed routing (registered users) and galaxy_session-keyed routing (anonymous histories, which have ``user_id IS NULL``). + + After owner routing, also fans out to viewer subscriptions so users + watching histories they don't own (multi-history view of a shared history, + published-history page, etc.) receive the same push. Viewer dispatch is + additive: the owner gets the event from ``push_to_user`` regardless of + whether they happen to be subscribed as a viewer too — duplicate refresh + is idempotent on the client and cheaper than maintaining a dedup index. """ payload = cast(HistoryUpdatePayload, kwargs) sse_manager = app[SSEConnectionManager] event_id = payload.get("event_id") encode = app.security.encode_id + # Build (history_id, encoded_event) once per changed history so the viewer + # fan-out below doesn't redo the JSON serialization. + events_by_encoded_id: dict[str, SSEEvent] = {} + + def event_for(history_id: int) -> SSEEvent: + encoded = encode(history_id) + cached = events_by_encoded_id.get(encoded) + if cached is None: + data = json.dumps({"history_ids": [encoded]}) + cached = SSEEvent(event="history_update", data=data, id=event_id) + events_by_encoded_id[encoded] = cached + return cached + for user_id_str, history_ids in payload.get("user_updates", {}).items(): user_id = int(user_id_str) encoded_ids = [encode(hid) for hid in history_ids] data = json.dumps({"history_ids": encoded_ids}) event = SSEEvent(event="history_update", data=data, id=event_id) sse_manager.push_to_user(user_id, event) + for hid in history_ids: + # Pre-populate the per-history cache so the viewer fan-out reuses + # the encoded id without re-running encode_id. + event_for(hid) for session_id_str, history_ids in payload.get("session_updates", {}).items(): session_id = int(session_id_str) encoded_ids = [encode(hid) for hid in history_ids] data = json.dumps({"history_ids": encoded_ids}) event = SSEEvent(event="history_update", data=data, id=event_id) sse_manager.push_to_session(session_id, event) + for hid in history_ids: + event_for(hid) + # Viewer dispatch — push a single-history event to anyone (user or session) + # subscribed as a viewer of that history on this worker. + for encoded_id, viewer_event in events_by_encoded_id.items(): + sse_manager.push_to_history_viewers(encoded_id, viewer_event) + + +def subscribe_history_viewer(app: "MinimalManagerApp", **kwargs) -> None: + """Apply a viewer subscription on this worker. + + Fired by the producer's broadcast so each webapp process can route future + history_update events to viewers regardless of which worker fielded the + REST call. + """ + payload = cast(HistoryViewerSubscriptionPayload, kwargs) + sse_manager = app[SSEConnectionManager] + history_id = payload["history_id"] + if "user_id" in payload: + sse_manager.subscribe_user_viewer(int(payload["user_id"]), history_id) + if "session_id" in payload: + sse_manager.subscribe_session_viewer(int(payload["session_id"]), history_id) + + +def unsubscribe_history_viewer(app: "MinimalManagerApp", **kwargs) -> None: + payload = cast(HistoryViewerSubscriptionPayload, kwargs) + sse_manager = app[SSEConnectionManager] + history_id = payload["history_id"] + if "user_id" in payload: + sse_manager.unsubscribe_user_viewer(int(payload["user_id"]), history_id) + if "session_id" in payload: + sse_manager.unsubscribe_session_viewer(int(payload["session_id"]), history_id) def entry_point_update(app: "MinimalManagerApp", **kwargs) -> None: @@ -483,6 +553,8 @@ def entry_point_update(app: "MinimalManagerApp", **kwargs) -> None: "notify_broadcast": notify_broadcast, "history_update": history_update, "entry_point_update": entry_point_update, + "subscribe_history_viewer": subscribe_history_viewer, + "unsubscribe_history_viewer": unsubscribe_history_viewer, } diff --git a/lib/galaxy/structured_app/__init__.py b/lib/galaxy/structured_app/__init__.py index 2c8b7c84e526..e30f42bb2525 100644 --- a/lib/galaxy/structured_app/__init__.py +++ b/lib/galaxy/structured_app/__init__.py @@ -122,6 +122,8 @@ class MinimalApp(BasicSharedApp): class MinimalManagerApp(MinimalApp): # Minimal App that is sufficient to run Celery tasks + amqp_internal_connection_obj: Optional[Connection] + execution_timer_factory: "ExecutionTimerFactory" carbon_intensity: float file_sources: ConfiguredFileSources genome_builds: GenomeBuilds @@ -166,7 +168,6 @@ class StructuredApp(MinimalManagerApp): Any. """ - amqp_internal_connection_obj: Optional[Connection] data_managers: "DataManagersInterface" dependency_resolvers_view: DependencyResolversView installed_repository_manager: "InstalledRepositoryManager" @@ -177,7 +178,6 @@ class StructuredApp(MinimalManagerApp): vault: Vault webhooks_registry: WebhooksRegistry queue_worker: Any # 'galaxy.queue_worker.GalaxyQueueWorker' - execution_timer_factory: "ExecutionTimerFactory" data_provider_registry: Any # 'galaxy.visualization.data_providers.registry.DataProviderRegistry' tool_cache: "ToolCache" tool_shed_repository_cache: Optional[ToolShedRepositoryCache] diff --git a/lib/galaxy/webapps/galaxy/api/events.py b/lib/galaxy/webapps/galaxy/api/events.py index 67120591227b..c44032d5fa2b 100644 --- a/lib/galaxy/webapps/galaxy/api/events.py +++ b/lib/galaxy/webapps/galaxy/api/events.py @@ -9,9 +9,12 @@ from typing import Optional from fastapi import ( + Body, Header, Request, + status, ) +from pydantic import BaseModel from starlette.responses import StreamingResponse from galaxy.managers.context import ProvidesUserContext @@ -22,6 +25,13 @@ Router, ) + +class HistoryViewerSubscriptionPayload(BaseModel): + """REST payload for ``/api/events/history-subscriptions`` endpoints.""" + + history_ids: list[str] + + log = logging.getLogger(__name__) router = Router(tags=["events"]) @@ -60,3 +70,33 @@ async def stream_events( "X-Accel-Buffering": "no", }, ) + + @router.post( + "/api/events/history-subscriptions", + summary="Subscribe to history_update SSE events for histories you don't own.", + status_code=status.HTTP_204_NO_CONTENT, + ) + def subscribe_history_viewer( + self, + payload: HistoryViewerSubscriptionPayload = Body(...), + trans: ProvidesUserContext = DependsOnTrans, + ) -> None: + """Asks every webapp worker to start routing ``history_update`` events + for these histories to the requesting user/session, in addition to the + default owner-routing. Idempotent: re-subscribing to the same id is a + no-op. Clients re-send the full set after each ``EventSource.onopen`` + so reconnects don't drop subscriptions. + """ + self.service.subscribe_history_viewer(trans, payload.history_ids) + + @router.delete( + "/api/events/history-subscriptions", + summary="Cancel viewer subscriptions for these histories.", + status_code=status.HTTP_204_NO_CONTENT, + ) + def unsubscribe_history_viewer( + self, + payload: HistoryViewerSubscriptionPayload = Body(...), + trans: ProvidesUserContext = DependsOnTrans, + ) -> None: + self.service.unsubscribe_history_viewer(trans, payload.history_ids) diff --git a/lib/galaxy/webapps/galaxy/services/events.py b/lib/galaxy/webapps/galaxy/services/events.py index 170c4c035efc..690c93a60fa4 100644 --- a/lib/galaxy/webapps/galaxy/services/events.py +++ b/lib/galaxy/webapps/galaxy/services/events.py @@ -17,14 +17,21 @@ IsDisconnected, SSEConnectionManager, ) +from galaxy.managers.sse_dispatch import SSEEventDispatcher from galaxy.webapps.galaxy.services.base import ServiceBase from galaxy.webapps.galaxy.services.notifications import NotificationService class EventsService(ServiceBase): - def __init__(self, sse_manager: SSEConnectionManager, notifications: NotificationService) -> None: + def __init__( + self, + sse_manager: SSEConnectionManager, + notifications: NotificationService, + sse_dispatcher: SSEEventDispatcher, + ) -> None: self.sse_manager = sse_manager self.notifications = notifications + self.sse_dispatcher = sse_dispatcher def open_stream( self, @@ -42,3 +49,32 @@ def open_stream( session_id = user_context.galaxy_session.id if user_context.galaxy_session else None catch_up = self.notifications.build_status_catchup(user_context, last_event_id) return self.sse_manager.stream(is_disconnected, user_id, catch_up=catch_up, galaxy_session_id=session_id) + + def subscribe_history_viewer(self, user_context: ProvidesUserContext, history_ids: list[str]) -> None: + """Register the requesting user/session as a viewer for each history. + + No authorization check on the dispatch path: SSE events only carry + history IDs, and the client follows up with a REST GET that runs the + normal access-controlled fetch — leaking a "history changed at T" ping + is acceptable. Broadcasts via Kombu so every webapp worker tracks the + subscription regardless of which one fielded this request. + """ + for history_id in history_ids: + self._dispatch_subscription(user_context, history_id, subscribe=True) + + def unsubscribe_history_viewer(self, user_context: ProvidesUserContext, history_ids: list[str]) -> None: + for history_id in history_ids: + self._dispatch_subscription(user_context, history_id, subscribe=False) + + def _dispatch_subscription(self, user_context: ProvidesUserContext, history_id: str, subscribe: bool) -> None: + user_id = user_context.user.id if not user_context.anonymous else None + session_id = user_context.galaxy_session.id if user_context.galaxy_session else None + if user_id is None and session_id is None: + # No way to route events back; silently skip. + return + if subscribe: + self.sse_dispatcher.subscribe_history_viewer(history_id=history_id, user_id=user_id, session_id=session_id) + else: + self.sse_dispatcher.unsubscribe_history_viewer( + history_id=history_id, user_id=user_id, session_id=session_id + ) diff --git a/test/integration/test_history_sse.py b/test/integration/test_history_sse.py index f60bfef1b15a..b412859de4a4 100644 --- a/test/integration/test_history_sse.py +++ b/test/integration/test_history_sse.py @@ -4,8 +4,6 @@ from urllib.parse import urljoin from uuid import uuid4 -import requests - from galaxy_test.base.populators import DatasetPopulator from galaxy_test.base.sse import SSELineListener from galaxy_test.driver.integration_util import IntegrationTestCase @@ -28,15 +26,15 @@ def setUp(self): def _events_stream_url(self) -> str: return urljoin(self.url, "api/events/stream") - def _create_history(self, name=None) -> str: - name = name or f"test_history_{uuid4()}" - response = self._post("histories", data={"name": name}, json=True) - self._assert_status_code_is_ok(response) - return response.json()["id"] + def _populator_for_user(self, api_key: str) -> DatasetPopulator: + """Build a populator bound to a separate user so two users can act + concurrently without context-switching the default interactor. + """ + return DatasetPopulator(self._get_interactor(api_key=api_key)) def test_history_update_contains_current_history_id(self): """The history_update event should contain the history's encoded ID.""" - history_id = self._create_history() + history_id = self.dataset_populator.new_history(name=f"test_history_{uuid4()}") api_key = self.galaxy_interactor.api_key assert api_key is not None @@ -53,6 +51,73 @@ def test_history_update_contains_current_history_id(self): finally: listener.stop() + def test_history_update_reaches_viewer_after_subscription(self): + """After User A subscribes as a viewer of User B's history, A's SSE + stream receives ``history_update`` events for that history. Without + the subscription, A would only see events for histories they own. + """ + user_b = self._setup_user(f"{uuid4()}@galaxy.test") + _, user_b_api_key = self._setup_user_get_key(user_b["email"]) + user_b_populator = self._populator_for_user(user_b_api_key) + + # User B creates a history that A will watch as a viewer. + user_b_history_id = user_b_populator.new_history(name="User B History") + + api_key = self.galaxy_interactor.api_key + assert api_key is not None + listener = SSELineListener(self._events_stream_url(), api_key) + listener.start() + try: + # User A subscribes as a viewer for User B's history. + sub_resp = self._post( + "events/history-subscriptions", + data={"history_ids": [user_b_history_id]}, + json=True, + ) + self._assert_status_code_is(sub_resp, 204) + + # User B mutates their own history. A must see the SSE event for it. + user_b_populator.new_dataset(user_b_history_id, content="viewer test", wait=False) + history_events = listener.wait_for_event_where( + "history_update", + lambda e: user_b_history_id in json.loads(e["data"]).get("history_ids", []), + ) + assert any( + user_b_history_id in json.loads(e["data"]).get("history_ids", []) for e in history_events + ), f"Viewer subscription did not deliver history_update: {history_events}" + + # Unsubscribe and confirm subsequent mutations no longer arrive. + unsub_resp = self._delete( + "events/history-subscriptions", + data={"history_ids": [user_b_history_id]}, + json=True, + ) + self._assert_status_code_is(unsub_resp, 204) + + # Drain previously-collected events so a *new* mutation can be + # distinguished. Then mutate B's history again, and prove no + # events for it land within the wait window — driven by waiting + # for an event for A's *own* history (positive assertion to avoid + # sleep-based flakes), then asserting B's id is absent. + user_a_history_id = self.dataset_populator.new_history(name=f"test_history_{uuid4()}") + baseline_count = len(listener.get_events("history_update")) + user_b_populator.new_dataset(user_b_history_id, content="post-unsubscribe", wait=False) + self.dataset_populator.new_dataset(user_a_history_id, wait=False) + listener.wait_for_event_where( + "history_update", + lambda e: user_a_history_id in json.loads(e["data"]).get("history_ids", []), + ) + after_events = listener.get_events("history_update")[baseline_count:] + seen_b_after_unsub = any( + user_b_history_id in json.loads(e["data"]).get("history_ids", []) for e in after_events + ) + assert not seen_b_after_unsub, ( + "User A still received history_update events for User B's history " + f"after unsubscribing: {after_events}" + ) + finally: + listener.stop() + def test_history_update_is_scoped_to_owning_user(self): """User A must only see history_update events for their own histories. @@ -63,8 +128,9 @@ def test_history_update_is_scoped_to_owning_user(self): """ user_b = self._setup_user(f"{uuid4()}@galaxy.test") _, user_b_api_key = self._setup_user_get_key(user_b["email"]) + user_b_populator = self._populator_for_user(user_b_api_key) - user_a_history_id = self._create_history() + user_a_history_id = self.dataset_populator.new_history(name=f"test_history_{uuid4()}") api_key = self.galaxy_interactor.api_key assert api_key is not None @@ -72,19 +138,8 @@ def test_history_update_is_scoped_to_owning_user(self): listener.start() try: # User B creates a history and uploads to it. User A must NOT see this. - create_resp = requests.post( - urljoin(self.url, "api/histories"), - params={"key": user_b_api_key}, - json={"name": "User B History"}, - ) - assert create_resp.status_code == 200 - user_b_history_id = create_resp.json()["id"] - - requests.post( - urljoin(self.url, f"api/histories/{user_b_history_id}/contents"), - params={"key": user_b_api_key}, - json={"from_hda_id": None, "source": "pasted", "content": "user b content"}, - ) + user_b_history_id = user_b_populator.new_history(name="User B History") + user_b_populator.new_dataset(user_b_history_id, content="user b content", wait=False) # User A uploads to their own history — this is what A's stream must observe. self.dataset_populator.new_dataset(user_a_history_id, wait=False) diff --git a/test/integration_selenium/test_entry_point_sse.py b/test/integration_selenium/test_entry_point_sse.py index 0011a7d12dfc..8e00204a5365 100644 --- a/test/integration_selenium/test_entry_point_sse.py +++ b/test/integration_selenium/test_entry_point_sse.py @@ -1,13 +1,22 @@ -"""Playwright E2E test for the interactive-tool entry-point SSE pipeline. - -Verifies that when an interactive-tool entry point transitions to ``configured`` -server-side (the runner's port-routing hook calls ``configure_entry_points``), -a logged-in user's browser receives the ``entry_point_update`` SSE event and -the entry-point store refetches, without the 10 s polling interval. - -This test stubs the server-side runner callback: it creates the Job and entry -point directly and calls ``InteractiveToolManager.configure_entry_points`` on -the live app. That invocation is the exact dispatch site in production. +"""Playwright E2E tests for the SSE pipeline. + +Covers two flows: + +* Interactive-tool entry-point updates: when an entry point transitions to + ``configured`` server-side (the runner's port-routing hook calls + ``configure_entry_points``), the logged-in user's browser receives the + ``entry_point_update`` event and the entry-point store refetches, without + the 10 s polling interval. The server side is stubbed by creating the Job + and entry point directly and calling + ``InteractiveToolManager.configure_entry_points`` on the live app — the + exact dispatch site in production. + +* History updates in the multi-history view: when a history that is *not* + the user's current history is mutated (dataset upload via the API), the + multi-history view panel for that history refreshes via SSE without + switching the current history. Polling is disabled when + ``enable_sse_updates`` is true, so an item appearing in the non-current + panel implies the SSE-driven refresh path fired. """ from uuid import uuid4 @@ -111,3 +120,44 @@ def test_entry_point_update_pushed_via_sse(self): # event-timestamp hook only advances when useSSE's listener fires. self._wait_for_sse_event_after(baseline_ts) self.screenshot("entry_point_sse_after") + + @selenium_test + def test_history_update_in_multi_view_for_non_current_history(self): + """SSE history_update events should refresh non-current panels in the multi-history view. + + Polling is disabled when ``enable_sse_updates`` is true, so the only path + that can surface a new dataset in a non-current history's panel is the + SSE handler in ``historyStore.handleHistorySSEEvent``. + """ + self.home() + first_history_id = self.current_history_id() + # New history via the API populator. The browser's current history is + # tracked separately and must remain the first one — this test + # exercises the *non-current* update path. + second_history_id = self.dataset_populator.new_history(name="multi-view-sse-target") + self.home() + assert ( + self.current_history_id() == first_history_id + ), "Browser current history flipped; cannot assert non-current SSE update path." + + self.open_history_multi_view() + self._wait_for_sse_connected() + baseline_ts = self._last_sse_event_ts() + self.screenshot("history_multi_view_sse_before") + + # Upload to the non-current history. wait=False — terminal dataset + # state doesn't matter here; the SSE event fires on any history + # mutation, and the assertion below only requires the new item to + # appear in the multi-view panel. + self.dataset_populator.new_dataset(second_history_id, content="sse content", wait=False) + + # Prove SSE delivered the event (timestamp only advances on a real push). + self._wait_for_sse_event_after(baseline_ts) + + # And that the multi-history view rendered the new item without a + # current-history switch. With polling disabled, the only way for the + # second history's panel to surface hid=1 is via handleHistorySSEEvent. + item_selector = self.content_item_by_attributes(hid=1, multi_history_panel=True) + self.wait_for_visible(item_selector, wait_type=self.wait_types.JOB_COMPLETION) + assert self.current_history_id() == first_history_id + self.screenshot("history_multi_view_sse_after")