Skip to content

Commit 039c5fb

Browse files
committed
Add SSE entry-point channel, dispatch observability, declare-queue cache
Three related additions to the SSE notification pipeline, bundled here because they share ``SSEEventDispatcher._send`` as their modification point: 1. Interactive-tool entry-point SSE channel - ``entry_point_update`` dispatcher method + queue-worker handler. - ``InteractiveToolManager.configure_entry_points`` dispatches a wake-up event after the DB commit; the client refetches ``/api/entry_points`` on receipt (no payload). - Frontend: new SSE event type, store subscription with XOR polling fallback via ``enable_sse_entry_point_updates`` config flag. - Integration + selenium tests. 2. Queue and SSE observability metrics - Counters, timers, and periodic gauges for SSE dispatch, control-queue task execution, control-queue depth (via kombu passive declare), active SSE connections, dropped events, and active WorkerProcess rows. Flow through the existing ``galaxy_statsd_client`` — no new infra. Gauges are scheduled via Celery beat at ``queue_metrics_interval`` seconds (default 15). All instrumentation no-ops when statsd isn't configured. - Sub-emitter failures log once at WARNING and bump a ``galaxy.queue_metrics.error`` counter tagged by emitter so broken emitters are visible in metrics without log spam. 3. Active-worker control-queue cache - 30 s TTL cache on ``all_control_queues_for_declare`` with RLock stampede protection. At 1000+ events/s this eliminates ~30 DB round-trips/s per webapp for data that only changes on 60 s heartbeat cadence. Empty results are not cached — would otherwise silently drop every SSE event during the startup window.
1 parent 53b8f12 commit 039c5fb

35 files changed

Lines changed: 1290 additions & 44 deletions

client/src/composables/useNotificationSSE.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ export const SSE_EVENT_TYPES = [
1010
"broadcast_update",
1111
"notification_status",
1212
"history_update",
13+
"entry_point_update",
1314
] as const;
1415

1516
export type SSEEventType = (typeof SSE_EVENT_TYPES)[number];

client/src/stores/_testing/sseStoreSupport.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,20 +13,27 @@
1313
* files in this directory for the pattern.
1414
*/
1515

16+
import { ref, type Ref } from "vue";
1617
import { vi } from "vitest";
1718

1819
export interface SSEMockState {
1920
onEvent: ((event: MessageEvent) => void) | null;
2021
connect: ReturnType<typeof vi.fn>;
2122
disconnect: ReturnType<typeof vi.fn>;
23+
connected: Ref<boolean>;
2224
}
2325

2426
/** Build the factory used with ``vi.mock("@/composables/useNotificationSSE", ...)``. */
2527
export function sseMockFactory(state: SSEMockState) {
28+
// Lazily initialize ``connected`` so existing callers that don't pass it
29+
// still get a working ref.
30+
if (!state.connected) {
31+
state.connected = ref(false);
32+
}
2633
return {
2734
useSSE: vi.fn((onEvent: (event: MessageEvent) => void) => {
2835
state.onEvent = onEvent;
29-
return { connect: state.connect, disconnect: state.disconnect };
36+
return { connect: state.connect, disconnect: state.disconnect, connected: state.connected };
3037
}),
3138
};
3239
}

client/src/stores/entryPointStore.test.js

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,25 @@
11
import flushPromises from "flush-promises";
22
import { createPinia, setActivePinia } from "pinia";
3-
import { beforeEach, describe, expect, it } from "vitest";
3+
import { beforeEach, describe, expect, it, vi } from "vitest";
44

55
import { HttpResponse, useServerMock } from "@/api/client/__mocks__";
66

77
import testInteractiveToolsResponse from "../components/InteractiveTools/testData/testInteractiveToolsResponse";
8+
import { sseMockFactory } from "./_testing/sseStoreSupport";
89
import { useEntryPointStore } from "./entryPointStore";
910

11+
// ``vi.mock`` is hoisted above module-level declarations, so the capture-state
12+
// has to be built via ``vi.hoisted`` to be visible to the factory. Prevents
13+
// these tests from opening a real EventSource against ``/api/events/stream``
14+
// when ``useEntryPointStore()`` is invoked.
15+
const sseState = vi.hoisted(() => ({
16+
onEvent: null,
17+
connect: vi.fn(),
18+
disconnect: vi.fn(),
19+
connected: null,
20+
}));
21+
vi.mock("@/composables/useNotificationSSE", () => sseMockFactory(sseState));
22+
1023
const { server, http } = useServerMock();
1124

1225
describe("stores/EntryPointStore", () => {

client/src/stores/entryPointStore.ts

Lines changed: 70 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
import axios from "axios";
22
import isEqual from "lodash.isequal";
33
import { defineStore } from "pinia";
4-
import { computed, ref } from "vue";
4+
import { computed, ref, watch } from "vue";
55

66
import { useResourceWatcher } from "@/composables/resourceWatcher";
7+
import { useSSE } from "@/composables/useNotificationSSE";
78
import { getAppRoot } from "@/onload/loadConfig";
9+
import { useConfigStore } from "@/stores/configurationStore";
810
import { rethrowSimple } from "@/utils/simple-error";
911

1012
const ACTIVE_POLLING_INTERVAL = 10000;
@@ -23,22 +25,8 @@ interface EntryPoint {
2325
}
2426

2527
export const useEntryPointStore = defineStore("entryPointStore", () => {
26-
const { startWatchingResource: startWatchingEntryPoints } = useResourceWatcher(fetchEntryPoints, {
27-
shortPollingInterval: ACTIVE_POLLING_INTERVAL,
28-
enableBackgroundPolling: false, // No need to poll in the background
29-
});
30-
3128
const entryPoints = ref<EntryPoint[]>([]);
3229

33-
const entryPointsForJob = computed(() => {
34-
return (jobId: string) => entryPoints.value.filter((entryPoint) => entryPoint["job_id"] === jobId);
35-
});
36-
37-
const entryPointsForHda = computed(() => {
38-
return (hdaId: string) =>
39-
entryPoints.value.filter((entryPoint) => entryPoint["output_datasets_ids"].includes(hdaId));
40-
});
41-
4230
async function fetchEntryPoints() {
4331
const url = `${getAppRoot()}api/entry_points`;
4432
const params = { running: true };
@@ -50,6 +38,73 @@ export const useEntryPointStore = defineStore("entryPointStore", () => {
5038
}
5139
}
5240

41+
// SSE-driven path: on each entry_point_update signal, refetch the canonical
42+
// list from REST. The event carries no data — it's a pure wake-up.
43+
function handleEntryPointSSEEvent(_event: MessageEvent) {
44+
fetchEntryPoints().catch((err) => console.error("Error refreshing entry points from SSE push:", err));
45+
}
46+
const { connect: sseConnect, connected: sseConnected } = useSSE(handleEntryPointSSEEvent, ["entry_point_update"]);
47+
48+
let watchingInitialized = false;
49+
50+
// Callers opt in via ``startWatchingEntryPoints()`` (App.vue gates this on
51+
// ``interactivetools_enable``). We then pick SSE or polling based on the
52+
// server flag — mutually exclusive, mirroring historyStore / notificationsStore.
53+
// ``useConfigStore`` is resolved lazily here so tests that only exercise
54+
// the data methods don't need a ``/api/configuration`` handler registered.
55+
function startWatchingEntryPoints() {
56+
if (watchingInitialized) {
57+
return;
58+
}
59+
watchingInitialized = true;
60+
const configStore = useConfigStore();
61+
62+
const decide = () => {
63+
if (configStore.config?.enable_sse_entry_point_updates) {
64+
// Baseline fetch + SSE. Reconnect-refetch closes the "user
65+
// navigated away and missed events" window.
66+
fetchEntryPoints().catch((err) => console.warn("Initial entry-point load failed", err));
67+
sseConnect();
68+
watch(sseConnected, (isConnected, wasConnected) => {
69+
if (isConnected && !wasConnected) {
70+
fetchEntryPoints().catch((err) =>
71+
console.error("Error refreshing entry points on SSE reconnect:", err),
72+
);
73+
}
74+
});
75+
} else {
76+
const { startWatchingResource } = useResourceWatcher(fetchEntryPoints, {
77+
shortPollingInterval: ACTIVE_POLLING_INTERVAL,
78+
enableBackgroundPolling: false,
79+
});
80+
startWatchingResource();
81+
}
82+
};
83+
84+
if (configStore.isLoaded) {
85+
decide();
86+
} else {
87+
const stop = watch(
88+
() => configStore.isLoaded,
89+
(loaded) => {
90+
if (loaded) {
91+
stop();
92+
decide();
93+
}
94+
},
95+
);
96+
}
97+
}
98+
99+
const entryPointsForJob = computed(() => {
100+
return (jobId: string) => entryPoints.value.filter((entryPoint) => entryPoint["job_id"] === jobId);
101+
});
102+
103+
const entryPointsForHda = computed(() => {
104+
return (hdaId: string) =>
105+
entryPoints.value.filter((entryPoint) => entryPoint["output_datasets_ids"].includes(hdaId));
106+
});
107+
53108
function updateEntryPoints(data: EntryPoint[]) {
54109
let hasChanged = entryPoints.value.length !== data.length ? true : false;
55110
if (entryPoints.value.length === 0) {

doc/source/admin/galaxy_options.rst

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3394,6 +3394,18 @@
33943394
:Type: bool
33953395

33963396

3397+
~~~~~~~~~~~~~~~~~~~~~~~~~~
3398+
``queue_metrics_interval``
3399+
~~~~~~~~~~~~~~~~~~~~~~~~~~
3400+
3401+
:Description:
3402+
How often (in seconds) the Celery beat task emits queue-depth,
3403+
SSE-connection, and WorkerProcess gauges. Only active when
3404+
statsd_host is set. Set to 0 to disable.
3405+
:Default: ``15``
3406+
:Type: int
3407+
3408+
33973409
~~~~~~~~~~~~~~~~~~~~~~
33983410
``library_import_dir``
33993411
~~~~~~~~~~~~~~~~~~~~~~
@@ -5804,6 +5816,20 @@
58045816
:Type: bool
58055817

58065818

5819+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5820+
``enable_sse_entry_point_updates``
5821+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5822+
5823+
:Description:
5824+
Enables real-time interactive-tool entry-point update
5825+
notifications via Server-Sent Events. When enabled, the client
5826+
subscribes to entry_point_update SSE events and refetches the
5827+
entry-point list on each event, replacing the 10-second polling
5828+
loop. When disabled, polling remains the source of updates.
5829+
:Default: ``false``
5830+
:Type: bool
5831+
5832+
58075833
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
58085834
``history_audit_monitor_poll_interval``
58095835
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

lib/galaxy/app/__init__.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -693,6 +693,7 @@ def __init__(
693693
SSEEventDispatcher(
694694
queue_worker=getattr(self, "queue_worker", None),
695695
application_stack=self.application_stack,
696+
statsd_client=self.execution_timer_factory.galaxy_statsd_client,
696697
),
697698
)
698699
self.notification_manager = self._register_singleton(NotificationManager)
@@ -856,7 +857,12 @@ def __init__(self, **kwargs) -> None:
856857
# SSE connection manager for real-time notification push.
857858
# Consumed via ``depends(SSEConnectionManager)`` / ``app[SSEConnectionManager]``,
858859
# so no module-level attribute is needed — keep the container wiring only.
859-
self._register_singleton(SSEConnectionManager)
860+
self._register_singleton(
861+
SSEConnectionManager,
862+
SSEConnectionManager(
863+
statsd_client=self.execution_timer_factory.galaxy_statsd_client,
864+
),
865+
)
860866

861867
# AI agent registry and service
862868
agent_registry = build_agent_registry(self.config)

lib/galaxy/app_unittest_utils/galaxy_mock.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ def __init__(self, config=None, **kwargs) -> None:
159159
self.application_stack = ApplicationStack()
160160
self.auth_manager = AuthManager(self.config)
161161
self.user_manager = UserManager(cast(BasicSharedApp, self))
162-
self.execution_timer_factory = Bunch(get_timer=StructuredExecutionTimer)
162+
self.execution_timer_factory = Bunch(get_timer=StructuredExecutionTimer, galaxy_statsd_client=None)
163163
self.interactivetool_manager = Bunch(create_interactivetool=lambda *args, **kwargs: None)
164164
self.is_job_handler = False
165165
self.biotools_metadata_source = None

lib/galaxy/authnz/psa_authnz.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -798,6 +798,7 @@ def _send_oidc_profile_update_notification(trans, user, updates: list[str]) -> N
798798
NotificationVariant,
799799
PersonalNotificationCategory,
800800
)
801+
801802
labels: dict[str, str] = {
802803
"email": "email address",
803804
"username": "public name",

lib/galaxy/celery/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,9 @@ def schedule_task(task, interval):
252252
schedule_task("prune_history_audit_table", config.history_audit_table_prune_interval)
253253
schedule_task("cleanup_short_term_storage", config.short_term_storage_cleanup_interval)
254254

255+
if config.statsd_host:
256+
schedule_task("emit_queue_metrics_task", config.queue_metrics_interval)
257+
255258
if config.enable_notification_system:
256259
schedule_task("cleanup_expired_notifications", config.expired_notifications_cleanup_interval)
257260
schedule_task("dispatch_pending_notifications", config.dispatch_notifications_interval)

lib/galaxy/celery/tasks.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -622,6 +622,14 @@ def dispatch_pending_notifications(notification_manager: NotificationManager):
622622
log.info(f"Successfully dispatched {count} notifications.")
623623

624624

625+
@galaxy_task(action="emit queue and SSE observability metrics")
626+
def emit_queue_metrics_task(app: MinimalManagerApp):
627+
"""Sample control-queue depth, SSE connection count, and worker rows → statsd."""
628+
from galaxy.webapps.galaxy.metrics.queue_metrics import emit_queue_metrics
629+
630+
emit_queue_metrics(app)
631+
632+
625633
@galaxy_task(action="clean up job working directories")
626634
def cleanup_jwds(sa_session: galaxy_scoped_session, object_store: BaseObjectStore, config: GalaxyAppConfiguration):
627635
"""Cleanup job working directories for failed jobs that are older than X days"""

0 commit comments

Comments
 (0)