Skip to content

Commit 5652a0b

Browse files
authored
Merge pull request galaxyproject#22189 from mvdbeek/improve-celery-rate-limit-and-concurrnecy-handling
Improve celery rate limit and concurrency handling
2 parents cacf25f + a4f7129 commit 5652a0b

11 files changed

Lines changed: 811 additions & 19 deletions

File tree

doc/source/admin/galaxy_options.rst

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5451,6 +5451,21 @@
54515451
:Type: float
54525452

54535453

5454+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5455+
``celery_user_concurrency_limit``
5456+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5457+
5458+
:Description:
5459+
Maximum number of Celery tasks that can execute concurrently for a
5460+
single user. If set to 0 (default), no concurrency limit is
5461+
enforced. When a user exceeds this limit, new tasks are deferred
5462+
and retried until a slot becomes available. A periodic cleanup
5463+
task reclaims slots from crashed workers by inspecting active
5464+
tasks on all workers.
5465+
:Default: ``0``
5466+
:Type: int
5467+
5468+
54545469
~~~~~~~~~~~~~~
54555470
``use_pbkdf2``
54565471
~~~~~~~~~~~~~~

doc/source/admin/production.md

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,3 +262,166 @@ This configuration ensures that:
262262
2. Exported files remain available for 7 days after generation
263263

264264
You can monitor Celery task status using [Flower](https://flower.readthedocs.io/en/latest/), a real-time web-based monitoring tool for Celery.
265+
266+
#### Per-user task rate limiting
267+
268+
Galaxy supports limiting the rate at which Celery tasks are executed per user. This prevents a single user from monopolizing worker capacity and ensures fair scheduling across all users.
269+
270+
##### Configuration
271+
272+
Set `celery_user_rate_limit` in the Galaxy configuration to a non-zero float representing the maximum number of tasks per user per second. For example:
273+
274+
```yaml
275+
celery_user_rate_limit: 0.1
276+
```
277+
278+
This allows each user at most one task execution every 10 seconds. The default value of `0.0` disables rate limiting entirely.
279+
280+
##### How it works
281+
282+
Rate limiting is implemented in Celery's `before_start` hook via the `GalaxyTaskBeforeStart` class hierarchy. The mechanism works in two phases:
283+
284+
1. **Slot reservation (first attempt):** When a task is about to execute for the first time, Galaxy atomically reserves the next available timeslot for that user in the `celery_user_rate_limit` database table. The reserved time is calculated as `max(last_scheduled_time + interval, now)`. If the reserved timeslot is in the future, the task is deferred using `task.retry(countdown=...)` and the reserved time is stored in a Celery message header.
285+
286+
2. **Execution gating (retries):** On each subsequent retry, the task reads its reserved timeslot from the message header and checks whether the current time has reached it. If not, it retries again with the remaining countdown. Once the timeslot is reached, the task proceeds to execute.
287+
288+
This two-phase design ensures that:
289+
- Each task reserves a slot in the DB exactly once, avoiding cascading delays from re-reservation.
290+
- Tasks from different users are scheduled independently — user A's tasks do not delay user B's tasks.
291+
- Tasks are retried with `max_retries=None` for rate-limit retries, so they are never lost due to Celery's default retry limit.
292+
293+
The flow for a single task looks like this:
294+
295+
```
296+
Task arrives (retries=0, no header)
297+
→ Atomically reserve timeslot in DB
298+
→ Timeslot is now? Execute immediately.
299+
→ Timeslot is in the future?
300+
→ Store timeslot in message header
301+
→ task.retry(countdown=timeslot - now)
302+
303+
Task wakes up (retry, header present)
304+
→ Read reserved timeslot from header
305+
→ now >= timeslot? Execute.
306+
→ now < timeslot? retry(countdown=remaining)
307+
```
308+
309+
##### Database backends
310+
311+
Galaxy provides two implementations of the timeslot reservation logic:
312+
313+
- **PostgreSQL** (`GalaxyTaskBeforeStartUserRateLimitPostgres`): Uses `UPDATE ... RETURNING` with `greatest()` for an atomic, single-statement slot reservation. Falls back to `INSERT ... ON CONFLICT DO UPDATE` (upsert) for the first task by a given user. This is the most efficient implementation.
314+
315+
- **Standard SQL** (`GalaxyTaskBeforeStartUserRateLimitStandard`): Uses `SELECT ... FOR UPDATE` followed by a separate `UPDATE` (or `INSERT` for new users). This works with SQLite and other databases but requires two statements and explicit locking.
316+
317+
The correct implementation is selected automatically based on the configured `database_connection`.
318+
319+
##### Limitations
320+
321+
- **Rate limiting, not concurrency limiting.** The mechanism controls the *rate* at which tasks are scheduled (tasks per second per user), not how many tasks run concurrently. If a user submits 100 tasks, they will all eventually execute — just spaced apart by the configured interval.
322+
- **Tasks without `task_user_id` are not rate-limited.** Only tasks that receive a `task_user_id` keyword argument participate in rate limiting. System tasks and tasks without a user context bypass the check entirely.
323+
- **Timeslots are not released on failure.** If a task fails after its timeslot was reserved, that slot is consumed. The next task for the same user will be scheduled after it. This means task failures still "use up" rate-limit capacity.
324+
- **Clock precision.** The mechanism relies on `datetime.datetime.now()` on the worker. Clock skew between workers could cause minor scheduling inaccuracies, though this is unlikely to matter in practice.
325+
- **No priority or reordering.** Tasks are scheduled in the order they reserve slots (first-come, first-served within a user). There is no mechanism to prioritize certain task types over others for the same user.
326+
- **Worker restarts.** If a worker is terminated while holding deferred tasks, Celery's broker (e.g., Redis, RabbitMQ) will redeliver them. The tasks will re-enter the `before_start` hook, read their reserved timeslot from the message header, and continue waiting or execute as appropriate — no slots are lost or duplicated.
327+
- **Database overhead.** Each task execution requires one or two queries to the `celery_user_rate_limit` table to reserve a timeslot (one for PostgreSQL's atomic upsert, two for the standard `SELECT FOR UPDATE` + `UPDATE` path). On PostgreSQL at 100 tasks/second this adds ~100 small writes/second; the standard backend doubles that. For most Galaxy deployments (typically fewer than 10 tasks/second) this is negligible. Additionally, tasks that are deferred via `task.retry` re-enter the broker and are redelivered to a worker, adding a small amount of broker traffic proportional to the deferral rate.
328+
329+
#### Per-user task concurrency limiting
330+
331+
In addition to rate limiting, Galaxy supports limiting the number of tasks that can execute **concurrently** for a single user. This prevents one user from consuming all available worker capacity.
332+
333+
##### Configuration
334+
335+
Set `celery_user_concurrency_limit` in the Galaxy configuration to the maximum number of tasks that can run simultaneously per user:
336+
337+
```yaml
338+
celery_user_concurrency_limit: 5
339+
```
340+
341+
The default value of `0` disables concurrency limiting. This setting can be used independently of or in combination with `celery_user_rate_limit`.
342+
343+
##### How it works
344+
345+
Concurrency limiting uses a tracking table (`celery_user_active_task`) that records which tasks are currently executing for each user. The mechanism has three components:
346+
347+
1. **Admission control (`before_start`):** Before a task executes, the system counts the user's currently active tasks in the tracking table. If the count is at or above the limit, the task is deferred via `task.retry(countdown=5)` with unlimited retries. Otherwise, a tracking row is inserted for this task.
348+
349+
2. **Cleanup on completion (`after_return`):** When a task finishes (success or failure), its tracking row is deleted from the table. This runs via Celery's `after_return` hook, which fires regardless of whether the task succeeded or failed. Retries do not trigger cleanup — only final completion does.
350+
351+
3. **Stale row recovery (periodic beat task):** A periodic task (`cleanup_stale_concurrency_slots`) runs every 5 minutes to handle the case where a worker crashes without calling `after_return`. It queries all workers via `celery_app.control.inspect().active()` to get the set of actually-running task IDs, then removes any tracking rows older than 30 minutes whose task ID is not found on any worker.
352+
353+
The flow for a single task:
354+
355+
```
356+
Task arrives
357+
→ Count active tasks for this user in DB
358+
→ Count >= limit? → task.retry(countdown=5)
359+
→ Count < limit?
360+
→ INSERT tracking row (task_id, user_id, started_at)
361+
→ Execute task
362+
→ Task finishes (success or failure)
363+
→ DELETE tracking row
364+
365+
Periodic cleanup (every 5 min)
366+
→ SELECT stale rows (started_at > 30 min ago)
367+
→ inspect().active() → get all running task IDs from workers
368+
→ DELETE rows where task_id NOT in active set
369+
```
370+
371+
##### Combining rate limiting and concurrency limiting
372+
373+
When both `celery_user_rate_limit` and `celery_user_concurrency_limit` are set, rate limiting runs first (to schedule the timeslot) and concurrency limiting runs second (to gate execution based on active task count). This means a task must both reach its scheduled timeslot *and* have a concurrency slot available before it can execute.
374+
375+
##### Limitations
376+
377+
- **Tasks without `task_user_id` are not limited.** Only tasks that receive a `task_user_id` keyword argument participate in concurrency limiting.
378+
- **Worker crash recovery is not instant.** If a worker is killed (SIGKILL, OOM), its tracking rows remain until the periodic cleanup task runs (every 5 minutes by default). During this window, those slots are "leaked" and reduce the user's effective concurrency limit.
379+
- **Retry polling interval is fixed.** Deferred tasks retry every 5 seconds. Under heavy load with many deferred tasks, this creates periodic bursts of retry attempts.
380+
- **No queue ordering guarantees.** When multiple tasks are waiting for a concurrency slot, the order in which they acquire slots depends on Celery's delivery order and retry timing — not submission order.
381+
- **Database overhead.** Each task execution requires an INSERT (on start) and DELETE (on completion) in the tracking table, plus a COUNT query for admission. At 100 tasks/second this adds ~300 small queries/second to the database. For most Galaxy deployments (which typically sustain fewer than 10 tasks/second) this is negligible. Deployments processing hundreds of tasks per second should monitor database connection pool utilization and query latency on the `celery_user_active_task` table.
382+
383+
##### Administrative operations
384+
385+
Admins can directly manage the concurrency tracking table and the Celery queue to recover from stuck states or clear backlogs.
386+
387+
**Clearing leaked concurrency slots manually:**
388+
389+
If a worker crashes and the periodic cleanup hasn't run yet (or Celery beat is not running), admins can free slots directly:
390+
391+
```sql
392+
-- View all currently tracked active tasks
393+
SELECT * FROM celery_user_active_task ORDER BY started_at;
394+
395+
-- Remove all slots for a specific user (e.g., user_id 42)
396+
DELETE FROM celery_user_active_task WHERE user_id = 42;
397+
398+
-- Remove all stale slots older than 1 hour
399+
DELETE FROM celery_user_active_task
400+
WHERE started_at < NOW() - INTERVAL '1 hour';
401+
402+
-- Nuclear option: clear ALL tracking rows (resets all concurrency counters)
403+
DELETE FROM celery_user_active_task;
404+
```
405+
406+
After clearing rows, deferred tasks waiting for slots will acquire them on their next retry (within 5 seconds).
407+
408+
**Purging tasks from the Celery queue:**
409+
410+
To remove pending (not yet started) tasks from the broker queue:
411+
412+
```bash
413+
# Purge all pending tasks from the default Galaxy queue
414+
celery -A galaxy.celery purge -Q galaxy.internal
415+
416+
# Purge all pending tasks from all queues
417+
celery -A galaxy.celery purge
418+
419+
# Revoke a specific task by ID (prevents it from executing even if already delivered)
420+
celery -A galaxy.celery call celery.backend_cleanup # or use the control interface:
421+
celery -A galaxy.celery control revoke <task-id>
422+
423+
# Revoke all pending tasks for inspection first
424+
celery -A galaxy.celery inspect reserved
425+
```
426+
427+
Note: `purge` only removes tasks that have not yet been delivered to a worker. Tasks already being executed or waiting in a worker's prefetch buffer require `revoke`. Revoking a task that is mid-execution requires the `--terminate` flag, which sends SIGTERM to the worker process — use with caution.

lib/galaxy/app.py

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,12 @@
2929
from galaxy.agents.registry import AgentRegistry
3030
from galaxy.carbon_emissions import get_carbon_intensity_entry
3131
from galaxy.celery.base_task import (
32+
GalaxyTaskAfterReturn,
33+
GalaxyTaskAfterReturnConcurrencyLimit,
3234
GalaxyTaskBeforeStart,
35+
GalaxyTaskBeforeStartCombined,
36+
GalaxyTaskBeforeStartConcurrencyLimitPostgres,
37+
GalaxyTaskBeforeStartConcurrencyLimitStandard,
3338
GalaxyTaskBeforeStartUserRateLimitPostgres,
3439
GalaxyTaskBeforeStartUserRateLimitStandard,
3540
)
@@ -709,24 +714,55 @@ def __init__(self, configure_logging=True, use_converters=True, use_display_appl
709714

710715
def _register_celery_galaxy_task_components(self):
711716
"""
712-
Register subtype class instance to support implementation of a user rate limit for execution of celery tasks.
713-
The default supertype class does not enforce a user rate limit. This is the case if the celery_user_rate_limit
714-
config param is the default value.
717+
Register subtype class instances for user rate limiting and concurrency
718+
limiting of celery task executions. Both can be enabled independently
719+
or together (combined via GalaxyTaskBeforeStartCombined).
715720
"""
716-
task_before_start: GalaxyTaskBeforeStart
721+
hooks: list[GalaxyTaskBeforeStart] = []
722+
723+
# Rate limiting hook
717724
if self.config.celery_user_rate_limit:
718725
if is_postgres(self.config.database_connection): # type: ignore[arg-type]
719-
task_before_start = GalaxyTaskBeforeStartUserRateLimitPostgres(
720-
self.config.celery_user_rate_limit, self.model.session
726+
hooks.append(
727+
GalaxyTaskBeforeStartUserRateLimitPostgres(self.config.celery_user_rate_limit, self.model.session)
721728
)
722729
else:
723-
task_before_start = GalaxyTaskBeforeStartUserRateLimitStandard(
724-
self.config.celery_user_rate_limit, self.model.session
730+
hooks.append(
731+
GalaxyTaskBeforeStartUserRateLimitStandard(self.config.celery_user_rate_limit, self.model.session)
725732
)
733+
734+
# Concurrency limiting hook
735+
if self.config.celery_user_concurrency_limit:
736+
if is_postgres(self.config.database_connection): # type: ignore[arg-type]
737+
hooks.append(
738+
GalaxyTaskBeforeStartConcurrencyLimitPostgres(
739+
self.config.celery_user_concurrency_limit, self.model.session
740+
)
741+
)
742+
else:
743+
hooks.append(
744+
GalaxyTaskBeforeStartConcurrencyLimitStandard(
745+
self.config.celery_user_concurrency_limit, self.model.session
746+
)
747+
)
748+
749+
# Register the appropriate before_start hook
750+
if len(hooks) > 1:
751+
task_before_start: GalaxyTaskBeforeStart = GalaxyTaskBeforeStartCombined(*hooks)
752+
elif len(hooks) == 1:
753+
task_before_start = hooks[0]
726754
else:
727755
task_before_start = GalaxyTaskBeforeStart()
728756
self._register_singleton(GalaxyTaskBeforeStart, task_before_start)
729757

758+
# Register after_return hook for concurrency tracking cleanup
759+
task_after_return: GalaxyTaskAfterReturn
760+
if self.config.celery_user_concurrency_limit:
761+
task_after_return = GalaxyTaskAfterReturnConcurrencyLimit(self.model.session)
762+
else:
763+
task_after_return = GalaxyTaskAfterReturn()
764+
self._register_singleton(GalaxyTaskAfterReturn, task_after_return)
765+
730766
def _configure_tool_shed_registry(self) -> None:
731767
# Set up the tool sheds registry
732768
if os.path.isfile(self.config.tool_sheds_config_file):

lib/galaxy/celery/__init__.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@
2323
)
2424
from kombu import serialization
2525

26-
from galaxy.celery.base_task import GalaxyTaskBeforeStart
26+
from galaxy.celery.base_task import (
27+
GalaxyTaskAfterReturn,
28+
GalaxyTaskBeforeStart,
29+
)
2730
from galaxy.config import Configuration
2831
from galaxy.main_config import find_config
2932
from galaxy.util import ExecutionTimer
@@ -71,8 +74,8 @@ def trim_module_name(self, module):
7174

7275
class GalaxyTask(Task):
7376
"""
74-
Custom celery task used to limit number of tasks executions per user
75-
per second.
77+
Custom celery task used to enforce per-user rate limits and
78+
concurrency limits on task executions.
7679
"""
7780

7881
def before_start(self, task_id, args, kwargs):
@@ -83,6 +86,17 @@ def before_start(self, task_id, args, kwargs):
8386
assert app
8487
app[GalaxyTaskBeforeStart](self, task_id, args, kwargs)
8588

89+
def after_return(self, status, retval, task_id, args, kwargs, einfo):
90+
"""
91+
Called after task returns (success, failure, revoked, or retry).
92+
Used to clean up concurrency tracking rows.
93+
"""
94+
if status == "RETRY":
95+
return # Don't clean up on retry — the task will run again
96+
app = get_galaxy_app()
97+
if app:
98+
app[GalaxyTaskAfterReturn](self, task_id, args, kwargs)
99+
86100

87101
def set_thread_app(app):
88102
APP_LOCAL.app = app
@@ -251,6 +265,10 @@ def schedule_task(task, interval):
251265
if config.vault_token_renewal_interval:
252266
schedule_task("renew_vault_token", config.vault_token_renewal_interval)
253267

268+
if config.celery_user_concurrency_limit:
269+
# Run cleanup every 5 minutes (300 seconds)
270+
schedule_task("cleanup_stale_concurrency_slots", 300)
271+
254272
if beat_schedule:
255273
celery_app.conf.beat_schedule = beat_schedule
256274

0 commit comments

Comments
 (0)