Skip to content

Commit ecc4b47

Browse files
authored
Merge pull request galaxyproject#19824 from mvdbeek/fix_limit_bypass
[24.2] Fix various job concurrency limit issues
2 parents b8c902a + c088f9c commit ecc4b47

5 files changed

Lines changed: 295 additions & 30 deletions

File tree

lib/galaxy/jobs/__init__.py

Lines changed: 156 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@
1414
import sys
1515
import time
1616
import traceback
17+
from dataclasses import (
18+
dataclass,
19+
field,
20+
)
1721
from json import loads
1822
from typing import (
1923
Any,
@@ -27,7 +31,12 @@
2731
import yaml
2832
from packaging.version import Version
2933
from pulsar.client.staging import COMMAND_VERSION_FILENAME
30-
from sqlalchemy import select
34+
from sqlalchemy import (
35+
and_,
36+
func,
37+
select,
38+
update,
39+
)
3140

3241
from galaxy import (
3342
model,
@@ -296,6 +305,18 @@ def job_config_xml_to_dict(config, root):
296305
return config_dict
297306

298307

308+
@dataclass
309+
class JobConfigurationLimits:
310+
registered_user_concurrent_jobs: Optional[int] = None
311+
anonymous_user_concurrent_jobs: Optional[int] = None
312+
walltime: Optional[str] = None
313+
walltime_delta: Optional[datetime.timedelta] = None
314+
total_walltime: Dict[str, Any] = field(default_factory=dict)
315+
output_size: Optional[int] = None
316+
destination_user_concurrent_jobs: Dict[str, int] = field(default_factory=dict)
317+
destination_total_concurrent_jobs: Dict[str, int] = field(default_factory=dict)
318+
319+
299320
class JobConfiguration(ConfiguresHandlers):
300321
"""A parser and interface to advanced job management features.
301322
@@ -344,16 +365,7 @@ def __init__(self, app: MinimalManagerApp):
344365
self.resource_groups = {}
345366
self.default_resource_group = None
346367
self.resource_parameters = {}
347-
self.limits = Bunch(
348-
registered_user_concurrent_jobs=None,
349-
anonymous_user_concurrent_jobs=None,
350-
walltime=None,
351-
walltime_delta=None,
352-
total_walltime={},
353-
output_size=None,
354-
destination_user_concurrent_jobs={},
355-
destination_total_concurrent_jobs={},
356-
)
368+
self.limits = JobConfigurationLimits()
357369

358370
default_resubmits = []
359371
default_resubmit_condition = self.app.config.default_job_resubmission_condition
@@ -1610,12 +1622,142 @@ def get_destination_configuration(self, key, default=None):
16101622
dest_params = self.job_destination.params
16111623
return self.get_job().get_destination_configuration(dest_params, self.app.config, key, default)
16121624

1625+
def queue_with_limit(self, job: Job, job_destination: JobDestination):
1626+
anonymous_user_concurrent_jobs = self.app.job_config.limits.anonymous_user_concurrent_jobs
1627+
registered_user_concurrent_jobs = self.app.job_config.limits.registered_user_concurrent_jobs
1628+
destination_total_concurrent_jobs = self.app.job_config.limits.destination_total_concurrent_jobs
1629+
destination_total_limit = self.app.job_config.limits.destination_total_concurrent_jobs.get(job_destination.id)
1630+
destination_user_limit = self.app.job_config.limits.destination_user_concurrent_jobs.get(job_destination.id)
1631+
destination_tag_limits = {}
1632+
if job_destination.tags:
1633+
for tag in job_destination.tags:
1634+
if tag_limit := destination_total_concurrent_jobs.get(tag):
1635+
destination_tag_limits[tag] = tag_limit
1636+
1637+
conditions = [model.Job.table.c.id == job.id]
1638+
1639+
if job.user_id:
1640+
user_job_count = (
1641+
select(func.count(model.Job.table.c.id))
1642+
.where(
1643+
and_(
1644+
model.Job.table.c.state.in_(
1645+
[
1646+
model.Job.states.QUEUED,
1647+
model.Job.states.RUNNING,
1648+
model.Job.states.RESUBMITTED,
1649+
]
1650+
),
1651+
model.Job.table.c.user_id == job.user_id,
1652+
)
1653+
)
1654+
.scalar_subquery()
1655+
)
1656+
1657+
if registered_user_concurrent_jobs is not None:
1658+
conditions.append(user_job_count < registered_user_concurrent_jobs)
1659+
if destination_user_limit is not None:
1660+
destination_job_count = (
1661+
select(func.count(model.Job.table.c.id))
1662+
.where(
1663+
and_(
1664+
model.Job.table.c.state.in_(
1665+
[
1666+
model.Job.states.QUEUED,
1667+
model.Job.states.RUNNING,
1668+
model.Job.states.RESUBMITTED,
1669+
]
1670+
),
1671+
model.Job.table.c.destination_id == job_destination.id,
1672+
model.Job.table.c.user_id == job.user_id,
1673+
)
1674+
)
1675+
.scalar_subquery()
1676+
)
1677+
conditions.append(destination_job_count < destination_user_limit)
1678+
1679+
elif anonymous_user_concurrent_jobs and job.galaxy_session and job.galaxy_session.id:
1680+
anon_job_count = (
1681+
select(func.count(model.Job.table.c.id))
1682+
.where(
1683+
and_(
1684+
model.Job.table.c.state.in_(
1685+
[
1686+
model.Job.states.QUEUED,
1687+
model.Job.states.RUNNING,
1688+
model.Job.states.RESUBMITTED,
1689+
]
1690+
),
1691+
model.Job.table.c.session_id == job.galaxy_session.id,
1692+
)
1693+
)
1694+
.scalar_subquery()
1695+
)
1696+
conditions.append(anon_job_count < anonymous_user_concurrent_jobs)
1697+
1698+
if destination_total_limit is not None:
1699+
destination_total_count = (
1700+
select(func.count(model.Job.table.c.id))
1701+
.where(
1702+
and_(
1703+
model.Job.table.c.state.in_(
1704+
[
1705+
model.Job.states.QUEUED,
1706+
model.Job.states.RUNNING,
1707+
model.Job.states.RESUBMITTED,
1708+
]
1709+
),
1710+
model.Job.table.c.destination_id == job_destination.id,
1711+
)
1712+
)
1713+
.scalar_subquery()
1714+
)
1715+
conditions.append(destination_total_count < destination_total_limit)
1716+
1717+
if destination_tag_limits:
1718+
for tag, limit in destination_tag_limits.items():
1719+
destination_ids = {destination.id for destination in self.app.job_config.get_destinations(tag)}
1720+
tag_count = (
1721+
select(func.count(model.Job.table.c.id))
1722+
.where(
1723+
and_(
1724+
model.Job.table.c.state.in_(
1725+
[
1726+
model.Job.states.QUEUED,
1727+
model.Job.states.RUNNING,
1728+
model.Job.states.RESUBMITTED,
1729+
]
1730+
),
1731+
model.Job.table.c.destination_id.in_(destination_ids),
1732+
)
1733+
)
1734+
.scalar_subquery()
1735+
)
1736+
conditions.append(tag_count < limit)
1737+
1738+
update_stmt = (
1739+
update(model.Job)
1740+
.where(*conditions)
1741+
.values(
1742+
state=model.Job.states.QUEUED,
1743+
destination_id=job_destination.id,
1744+
destination_params=job_destination.params,
1745+
job_runner_name=job_destination.runner,
1746+
)
1747+
)
1748+
1749+
result = self.sa_session.execute(update_stmt)
1750+
self.sa_session.commit()
1751+
1752+
return result.rowcount > 0
1753+
16131754
def enqueue(self):
16141755
job = self.get_job()
16151756
# Change to queued state before handing to worker thread so the runner won't pick it up again
1616-
self.change_state(model.Job.states.QUEUED, flush=False, job=job)
1617-
# Persist the destination so that the job will be included in counts if using concurrency limits
1618-
self.set_job_destination(self.job_destination, None, flush=False, job=job)
1757+
if self.is_task:
1758+
self.change_state(model.Job.states.QUEUED, flush=False, job=job)
1759+
elif not self.queue_with_limit(job, self.job_destination):
1760+
return False
16191761
# Set object store after job destination so can leverage parameters...
16201762
self._set_object_store_ids(job)
16211763
# Now that we have the object store id, check if we are over the limit

lib/galaxy/jobs/handler.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -518,6 +518,9 @@ def __handle_waiting_jobs(self):
518518
pass
519519
# Ensure that we get new job counts on each iteration
520520
self.__clear_job_count()
521+
self.__cache_total_job_count_per_destination()
522+
self.__cache_user_job_count_per_destination()
523+
self.__cache_user_job_count()
521524
# Check resubmit jobs first so that limits of new jobs will still be enforced
522525
for job in resubmit_jobs:
523526
log.debug("(%s) Job was resubmitted and is being dispatched immediately", job.id)
@@ -824,7 +827,6 @@ def __clear_job_count(self):
824827
self.total_job_count_per_destination = None
825828

826829
def get_user_job_count(self, user_id):
827-
self.__cache_user_job_count()
828830
# This could have been incremented by a previous job dispatched on this iteration, even if we're not caching
829831
rval = self.user_job_count.get(user_id, 0)
830832
if not self.app.config.cache_user_job_count:
@@ -865,7 +867,6 @@ def __cache_user_job_count(self):
865867
self.user_job_count = {}
866868

867869
def get_user_job_count_per_destination(self, user_id):
868-
self.__cache_user_job_count_per_destination()
869870
cached = self.user_job_count_per_destination.get(user_id, {})
870871
if self.app.config.cache_user_job_count:
871872
rval = cached
@@ -1006,7 +1007,6 @@ def __cache_total_job_count_per_destination(self):
10061007
self.total_job_count_per_destination[row["destination_id"]] = row["job_count"]
10071008

10081009
def get_total_job_count_per_destination(self):
1009-
self.__cache_total_job_count_per_destination()
10101010
# Always use caching (at worst a job will have to wait one iteration,
10111011
# and this would be more fair anyway as it ensures FIFO scheduling,
10121012
# insofar as FIFO would be fair...)

lib/galaxy/jobs/runners/pulsar.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -451,9 +451,7 @@ def queue_job(self, job_wrapper):
451451
job = job_wrapper.get_job()
452452
# Set the job destination here (unlike other runners) because there are likely additional job destination
453453
# params from the Pulsar client.
454-
# Flush with change_state.
455-
job_wrapper.set_job_destination(job_destination, external_id=external_job_id, flush=False, job=job)
456-
job_wrapper.change_state(model.Job.states.QUEUED, job=job)
454+
job_wrapper.set_job_destination(job_destination, external_id=external_job_id, flush=True, job=job)
457455
except Exception:
458456
job_wrapper.fail("failure running job", exception=True)
459457
log.exception("failure running job %d", job_wrapper.job_id)

lib/galaxy/jobs/runners/slurm.py

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -138,16 +138,8 @@ def _get_slurm_state():
138138
ajs.job_wrapper.get_id_tag(),
139139
ajs.job_id,
140140
)
141-
ajs.job_wrapper.change_state(
142-
model.Job.states.QUEUED, info="Job was resubmitted due to node failure"
143-
)
144-
try:
145-
self.queue_job(ajs.job_wrapper)
146-
return
147-
except Exception:
148-
ajs.fail_message = (
149-
"This job failed due to a cluster node failure, and an attempt to resubmit the job failed."
150-
)
141+
self.mark_as_resubmitted(ajs, info="Job was resubmitted due to node failure")
142+
return
151143
elif slurm_state == "OUT_OF_MEMORY":
152144
log.info(
153145
"(%s/%s) Job hit memory limit (SLURM state: OUT_OF_MEMORY)",

0 commit comments

Comments
 (0)