Skip to content
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
06cdb3a
Replace notification polling with Server-Sent Events (SSE)
mvdbeek Apr 16, 2026
2ba3863
Fix SSE control-queue routing for Celery + post-fork workers
mvdbeek Apr 16, 2026
276ffab
Fix history SSE fallback switching and forced refresh
mvdbeek Apr 16, 2026
0e19f5f
Split SSEEventDispatcher into its own module to drop inline imports
mvdbeek Apr 16, 2026
c364fef
Add pg_notify to history audit triggers for existing installs
mvdbeek Apr 17, 2026
5d1a1c7
Drive history/notifications SSE from config, not runtime socket state
mvdbeek Apr 17, 2026
973dba5
Skip response buffering for non-HTML in galaxy-dev-server plugin
mvdbeek Apr 17, 2026
3cf8a51
Align history-audit trigger SQL between migration and runtime installer
mvdbeek Apr 17, 2026
8d12bff
Address review feedback for SSE notifications
mvdbeek Apr 17, 2026
2515eda
Ruff
mvdbeek Apr 17, 2026
079d4ce
Add category discriminator to notification content fixtures
mvdbeek Apr 17, 2026
5ae948c
Fix mypy errors on sse-notifications branch
mvdbeek Apr 19, 2026
6e5eada
Add SSE entry-point channel, dispatch observability, declare-queue cache
mvdbeek Apr 20, 2026
53f3624
Expose enable_sse_entry_point_updates through /api/configuration
mvdbeek Apr 20, 2026
7384f19
Fix SSE selenium tests for Playwright backend and user identity
mvdbeek Apr 21, 2026
da97ee6
Add types-cachetools to typecheck deps
mvdbeek Apr 21, 2026
f053d80
Swap Playwright text= selector for xpath in notification SSE test
mvdbeek Apr 23, 2026
f8cb87a
Apply black formatting to SSE test
mvdbeek Apr 23, 2026
cc289e9
TEMP: enable notification/SSE flags across all UI test workflows
mvdbeek Apr 23, 2026
4b90e4f
Route history_update SSE events for anonymous sessions
mvdbeek Apr 23, 2026
daba096
Close EventSource on pagehide so full-page navigation doesn't race th…
mvdbeek Apr 23, 2026
01c714a
Guarantee playwright.stop() runs even when browser.close() raises
mvdbeek Apr 23, 2026
1ae724f
Cancel background traffic before login/register so session cookie isn…
mvdbeek Apr 24, 2026
cb1ef59
Tear down selenium driver when setUp fails before login completes
mvdbeek Apr 24, 2026
35c6077
Extend auth-navigation abort to GalaxyApi (openapi-fetch) requests
mvdbeek Apr 24, 2026
6463c24
Skip SSE/polling watchers in iframed Galaxy instances
mvdbeek Apr 24, 2026
3408a8e
Apply prettier to pendingRequestsMiddleware
mvdbeek Apr 24, 2026
39ddf33
Unify SSE: single shared EventSource, drop /api/notifications/stream
mvdbeek Apr 24, 2026
b0cf4f7
Fix failing SSE integration tests
mvdbeek Apr 24, 2026
ce884dc
Elect single history-audit monitor; add standalone daemon
mvdbeek Apr 24, 2026
01c88db
Prune Kombu SQLAlchemy transport on a schedule
mvdbeek Apr 24, 2026
004dbbd
Relocate queue_metrics; narrow its deps
mvdbeek Apr 24, 2026
ecfdcd2
Inject ControlTask factory + queues provider into SSEEventDispatcher
mvdbeek Apr 24, 2026
78ab002
Address arch-review polish items
mvdbeek Apr 24, 2026
b3b75b5
Fix CI: Callable import + ControlTaskLike Protocol
mvdbeek Apr 24, 2026
aea682c
Fix CI: widen control_task_factory return to Any
mvdbeek Apr 24, 2026
1b73dc6
Reorder NotificationManager import (isort)
mvdbeek Apr 24, 2026
859fe25
Apply black formatting
mvdbeek Apr 24, 2026
502a6e2
Hoist non-justified inline imports
mvdbeek Apr 28, 2026
5def1f8
Add shared id-decode helpers to IntegrationInstance
mvdbeek Apr 28, 2026
bbb4cdc
Collapse SSE feature flags into single enable_sse_updates
mvdbeek Apr 28, 2026
5d89d81
Always pass update_time cursor on SSE-driven history refresh
mvdbeek Apr 28, 2026
db07cc1
Route share notifications through NotificationService
mvdbeek Apr 28, 2026
f16e155
Gate notifications SSE on enable_sse_updates
mvdbeek Apr 28, 2026
af2f401
Add admin docs for SSE-driven real-time updates
mvdbeek Apr 28, 2026
a999aec
Fix return type of send_internal_notification
mvdbeek Apr 28, 2026
014a4bf
Enable SSE updates in notification SSE selenium test
mvdbeek Apr 28, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/integration_selenium.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ env:
YARN_INSTALL_OPTS: --frozen-lockfile
GALAXY_CONFIG_SQLALCHEMY_WARN_20: '1'
GALAXY_DEPENDENCIES_INSTALL_WEASYPRINT: '1'
# TEMP: shake down SSE/notification system across full UI surface — revert before merge
GALAXY_CONFIG_OVERRIDE_ENABLE_NOTIFICATION_SYSTEM: '1'
GALAXY_CONFIG_OVERRIDE_ENABLE_SSE_HISTORY_UPDATES: '1'
GALAXY_CONFIG_OVERRIDE_ENABLE_SSE_ENTRY_POINT_UPDATES: '1'
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/playwright.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ env:
GALAXY_TEST_SELENIUM_HEADLESS: 1
YARN_INSTALL_OPTS: --frozen-lockfile
GALAXY_CONFIG_SQLALCHEMY_WARN_20: '1'
# TEMP: shake down SSE/notification system across full UI surface — revert before merge
GALAXY_CONFIG_OVERRIDE_ENABLE_NOTIFICATION_SYSTEM: '1'
GALAXY_CONFIG_OVERRIDE_ENABLE_SSE_HISTORY_UPDATES: '1'
GALAXY_CONFIG_OVERRIDE_ENABLE_SSE_ENTRY_POINT_UPDATES: '1'
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/selenium.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ env:
GALAXY_TEST_SKIP_FLAKEY_TESTS_ON_ERROR: 1
YARN_INSTALL_OPTS: --frozen-lockfile
GALAXY_CONFIG_SQLALCHEMY_WARN_20: '1'
# TEMP: shake down SSE/notification system across full UI surface — revert before merge
GALAXY_CONFIG_OVERRIDE_ENABLE_NOTIFICATION_SYSTEM: '1'
GALAXY_CONFIG_OVERRIDE_ENABLE_SSE_HISTORY_UPDATES: '1'
GALAXY_CONFIG_OVERRIDE_ENABLE_SSE_ENTRY_POINT_UPDATES: '1'
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
Expand Down
2 changes: 2 additions & 0 deletions client/src/api/client/pendingRequestsMiddleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ export const pendingRequestsMiddleware: Middleware = {
return new Request(request, { headers });
}
const shared = getPendingAbortSignal();
// Combine with any signal the caller may have set so we don't silently
// drop their cancellation semantics.
const signal = typeof AbortSignal.any === "function" ? AbortSignal.any([request.signal, shared]) : shared;
return new Request(request, { signal });
},
Expand Down
67 changes: 67 additions & 0 deletions client/src/api/schema/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1292,6 +1292,33 @@ export interface paths {
patch?: never;
trace?: never;
};
"/api/events/stream": {
parameters: {
query?: never;
header?: never;
path?: never;
cookie?: never;
};
/**
* Server-Sent Events stream for real-time updates.
* @description Opens a Server-Sent Events (SSE) connection that pushes real-time
* updates for notifications, history changes, and other events.
*
* On reconnect, the browser sends the ``Last-Event-ID`` header automatically.
* If the notification system is enabled, any notifications created since that
* timestamp are delivered as a catch-up ``notification_status`` event.
*
* Anonymous users receive only broadcast events.
*/
get: operations["stream_events_api_events_stream_get"];
put?: never;
post?: never;
delete?: never;
options?: never;
head?: never;
patch?: never;
trace?: never;
};
"/api/exports": {
parameters: {
query?: never;
Expand Down Expand Up @@ -33209,6 +33236,46 @@ export interface operations {
};
};
};
stream_events_api_events_stream_get: {
parameters: {
query?: never;
header?: {
"Last-Event-ID"?: string | null;
/** @description The user ID that will be used to effectively make this API call. Only admins and designated users can make API calls on behalf of other users. */
"run-as"?: string | null;
};
path?: never;
cookie?: never;
};
requestBody?: never;
responses: {
/** @description Successful Response */
200: {
headers: {
[name: string]: unknown;
};
content?: never;
};
/** @description Request Error */
"4XX": {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["MessageExceptionModel"];
};
};
/** @description Server Error */
"5XX": {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["MessageExceptionModel"];
};
};
};
};
index_api_exports_get: {
parameters: {
query?: {
Expand Down
5 changes: 3 additions & 2 deletions client/src/composables/useAuthNavigation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ import { useNotificationsStore } from "@/stores/notificationsStore";
* will use a fresh signal and is not affected.
*/
export function discardActiveConnectionsBeforeAuthNavigation() {
// Stop polling watchers first so they can't kick off new fetches, then
// abort any requests still in flight via the shared AbortController.
// Order: close SSE streams first (synchronous TCP close), then stop the
// polling watchers so they can't kick off new fetches, then abort any
// requests still in flight via the shared AbortController.
useHistoryStore().stopWatchingHistory();
useEntryPointStore().stopWatchingEntryPoints();
useNotificationsStore().stopWatchingNotifications();
Expand Down
191 changes: 191 additions & 0 deletions client/src/composables/useNotificationSSE.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
import { onScopeDispose, ref } from "vue";

import { withPrefix } from "@/utils/redirect";

/**
* All SSE event types the server may emit.
*/
export const SSE_EVENT_TYPES = [
"notification_update",
"broadcast_update",
"notification_status",
"history_update",
"entry_point_update",
] as const;

export type SSEEventType = (typeof SSE_EVENT_TYPES)[number];

interface SSEDebugGlobals {
__galaxy_sse_connected?: boolean;
__galaxy_sse_last_event_ts?: number;
}

function sseGlobals(): SSEDebugGlobals {
return window as unknown as SSEDebugGlobals;
}

// ---------------------------------------------------------------------------
// Module-level shared EventSource.
//
// Every call to ``useSSE`` registers its handler against this one socket so
// the tab opens a single ``/api/events/stream`` connection no matter how many
// stores listen. HTTP/1.1 caps simultaneous connections per origin at six;
// before this consolidation we burned three slots on SSE alone (history,
// notifications, entry points), which is what starved the scratchbook iframe
// flow — see the fix in ``client/src/entry/analysis/App.vue``.
// ---------------------------------------------------------------------------

type Handler = (event: MessageEvent) => void;

let sharedSource: EventSource | null = null;
const sharedConnected = ref(false);
const subscribers: Map<SSEEventType, Set<Handler>> = new Map();
// Track the per-type dispatchers we registered so ``closeSource`` removes the
// exact same listeners (``addEventListener`` matches by reference).
const dispatchers: Map<SSEEventType, Handler> = new Map();

function openSourceIfNeeded() {
if (sharedSource) {
return;
}
sharedSource = new EventSource(withPrefix("/api/events/stream"));

for (const eventType of SSE_EVENT_TYPES) {
const dispatcher: Handler = (event) => {
// Selenium tests watch ``__galaxy_sse_last_event_ts`` to prove that
// an observable state change came from an SSE push and not the
// polling fallback (where the global would never advance).
sseGlobals().__galaxy_sse_last_event_ts = Date.now();
const subs = subscribers.get(eventType);
if (!subs) {
return;
}
for (const handler of subs) {
handler(event);
}
};
dispatchers.set(eventType, dispatcher);
sharedSource.addEventListener(eventType, dispatcher);
}

sharedSource.onopen = () => {
sharedConnected.value = true;
// Global readiness flag so Selenium tests can distinguish a working
// SSE pipeline from the polling fallback.
sseGlobals().__galaxy_sse_connected = true;
};

sharedSource.onerror = () => {
// EventSource auto-reconnects natively; SSE-vs-polling is a
// config-level decision (see historyStore / notificationsStore), so
// we must not give up on transient errors here — doing so would leave
// the client with no updates at all.
sharedConnected.value = false;
sseGlobals().__galaxy_sse_connected = false;
};

// Browser EventSource teardown during a full-page navigation
// (``window.location.href = …``) is not guaranteed to happen before the
// browser issues requests for the new page — we've seen Chrome keep the
// stream alive long enough that a login/register POST reload races the
// close, and the new page then loads with a stale auth view. Force a
// synchronous ``close()`` during ``pagehide`` (fires for both reloads and
// tab-close, unlike ``beforeunload``) to close that window.
if (typeof window !== "undefined") {
window.addEventListener("pagehide", closeSource);
}
}

function closeSource() {
if (!sharedSource) {
return;
}
for (const [eventType, dispatcher] of dispatchers) {
sharedSource.removeEventListener(eventType, dispatcher);
}
dispatchers.clear();
sharedSource.close();
sharedSource = null;
sharedConnected.value = false;
sseGlobals().__galaxy_sse_connected = false;
if (typeof window !== "undefined") {
window.removeEventListener("pagehide", closeSource);
}
}

function addSubscriber(onEvent: Handler, eventTypes: readonly SSEEventType[]) {
for (const eventType of eventTypes) {
let subs = subscribers.get(eventType);
if (!subs) {
subs = new Set();
subscribers.set(eventType, subs);
}
subs.add(onEvent);
}
}

function removeSubscriber(onEvent: Handler, eventTypes: readonly SSEEventType[]): boolean {
let anyRemaining = false;
for (const eventType of eventTypes) {
const subs = subscribers.get(eventType);
if (subs) {
subs.delete(onEvent);
if (subs.size === 0) {
subscribers.delete(eventType);
}
}
}
for (const subs of subscribers.values()) {
if (subs.size > 0) {
anyRemaining = true;
break;
}
}
return anyRemaining;
}

/**
* Composable for subscribing to events on the shared SSE stream.
*
* The browser's EventSource handles reconnection automatically and sends the
* ``Last-Event-ID`` header so the server can catch up on missed events. Only
* one EventSource is opened per tab regardless of how many callers invoke
* this composable; the composable multiplexes dispatch per event type.
*
* @param onEvent - callback invoked for every matching SSE event
* @param eventTypes - subset of event types to listen to (defaults to all)
*/
export function useSSE(onEvent: Handler, eventTypes: readonly SSEEventType[] = SSE_EVENT_TYPES) {
let connected_: boolean = false;

function connect() {
if (connected_) {
return;
}
connected_ = true;
addSubscriber(onEvent, eventTypes);
openSourceIfNeeded();
}

function disconnect() {
if (!connected_) {
return;
}
connected_ = false;
const anyRemaining = removeSubscriber(onEvent, eventTypes);
if (!anyRemaining) {
closeSource();
}
}

onScopeDispose(() => {
disconnect();
});

return { connect, disconnect, connected: sharedConnected };
}

/**
* @deprecated Use `useSSE` instead. This alias exists for backward compatibility.
*/
export const useNotificationSSE = useSSE;
26 changes: 23 additions & 3 deletions client/src/entry/analysis/App.vue
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
</template>
<script>
import { storeToRefs } from "pinia";
import { ref, watch } from "vue";
import { computed, ref, watch } from "vue";
import { useRoute } from "vue-router/composables";

import { getGalaxyInstance } from "@/app";
Expand Down Expand Up @@ -105,9 +105,29 @@ export default {
const uploadModal = ref(null);
setGlobalUploadModal(uploadModal);

const embedded = useRouteQueryBool("embed");
// Treat any iframe context as embedded: scratchbook pops dataset
// displays into ``WinBox`` iframes that hit the same routes without
// an ``embed`` query param, and each one would otherwise open its own
// SSE + polling traffic, quickly saturating the HTTP/1.1 per-origin
// connection pool (e.g. ``test_scratchbook_window_persistence`` hangs
// indefinitely after two windows are open).
const inIframe = (() => {
if (typeof window === "undefined") {
return false;
}
try {
return window.top !== window.self;
} catch {
// Cross-origin access throws — that's definitely an iframe.
return true;
}
})();
const embeddedQuery = useRouteQueryBool("embed");
const embedded = computed(() => embeddedQuery.value || inIframe);
const historyStore = useHistoryStore();
historyStore.startWatchingHistory();
if (!embedded.value) {
historyStore.startWatchingHistory();
}

watch(
() => embedded.value,
Expand Down
Loading
Loading