Skip to content

Commit 60722e7

Browse files
authored
Merge pull request #1 from mapk-amazon/desirable-grasshopper
Change JobState Retry-Strategy
2 parents 85254bc + 5288774 commit 60722e7

3 files changed

Lines changed: 54 additions & 26 deletions

File tree

lib/galaxy/jobs/runners/kubernetes.py

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
produce_k8s_job_prefix,
4444
pull_policy,
4545
pykube_client_from_dict,
46+
reload_job,
4647
Service,
4748
service_object_dict,
4849
)
@@ -53,13 +54,16 @@
5354

5455
__all__ = ("KubernetesJobRunner",)
5556

56-
5757
@dataclass
58-
class RetryableDeleteJob:
59-
k8s_job: Job
60-
retries: int = 5 # Max number of retries
61-
attempts: int = 0 # Current number of attempts
62-
58+
class RetryableDeleteJobState(JobState):
59+
def __init__ (self, job_state, k8s_job, max_retries=5, attempts=0):
60+
self.__dict__ = job_state.__dict__.copy()
61+
self.init_retryable_job(max_retries, attempts)
62+
self.k8s_job = k8s_job
63+
64+
def init_retryable_job(self, max_retries, attempts):
65+
self.max_retries: int = max_retries
66+
self.attempts: int = attempts
6367

6468
class KubernetesJobRunner(AsynchronousJobRunner):
6569
"""
@@ -839,7 +843,7 @@ def _handle_unschedulable_job(self, k8s_job, job_state):
839843
if self.__has_guest_ports(job_state.job_wrapper):
840844
self.__cleanup_k8s_guest_ports(job_state.job_wrapper, k8s_job)
841845
# Wrap the k8s job before we put it in the work queue so it can be retried a few times
842-
self.work_queue.put((self.__cleanup_k8s_job, RetryableDeleteJob(k8s_job=k8s_job)))
846+
self.work_queue.put((self.__cleanup_k8s_job, RetryableDeleteJobState(job_state=job_state, k8s_job=k8s_job)))
843847
except Exception:
844848
log.exception("Could not clean up an unschedulable k8s batch job. Ignoring...")
845849
return None
@@ -879,33 +883,40 @@ def _handle_job_failure(self, k8s_job, job_state):
879883
if self.__has_guest_ports(job_state.job_wrapper):
880884
self.__cleanup_k8s_guest_ports(job_state.job_wrapper, k8s_job)
881885
# Wrap the k8s job before we put it in the work queue so it can be retried a few times
882-
self.work_queue.put((self.__cleanup_k8s_job, RetryableDeleteJob(k8s_job=k8s_job)))
886+
self.work_queue.put((self.__cleanup_k8s_job, RetryableDeleteJobState(job_state=job_state, k8s_job=k8s_job)))
883887
except Exception:
884888
log.exception("Could not clean up a failed k8s batch job. Ignoring...")
885889
return mark_failed
886890

887-
def __cleanup_k8s_job(self, retryable_delete_k8s_job: RetryableDeleteJob):
888-
k8s_job = retryable_delete_k8s_job.k8s_job
889-
log.debug(f"Cleaning up job with K8s id {k8s_job.name} (attempt {retryable_delete_k8s_job.attempts + 1}).")
891+
def __cleanup_k8s_job(self, retryable_delete_k8s_job_state: RetryableDeleteJobState):
892+
k8s_job = retryable_delete_k8s_job_state.k8s_job
893+
log.debug(f"Cleaning up job with K8s id {k8s_job.name} (attempt {retryable_delete_k8s_job_state.attempts + 1}).")
890894
k8s_cleanup_job = self.runner_params["k8s_cleanup_job"]
891895
try:
892896
delete_job(k8s_job, k8s_cleanup_job)
893897
except HTTPError as exc:
894-
if retryable_delete_k8s_job.retries < 1:
898+
# If job not found, then previous deletion was successful
899+
if exc.code == 404 and retryable_delete_k8s_job_state.attempts >= 1:
900+
log.warning(
901+
f"Cleanup job with K8s id {k8s_job.name} skipped as it is no longer available (404) and a previous deletion was triggered."
902+
)
903+
return
904+
if retryable_delete_k8s_job_state.max_retries <= retryable_delete_k8s_job_state.attempts:
895905
log.error(
896-
f"Failed to cleanup job with K8s id {k8s_job.name} after {retryable_delete_k8s_job.attempts} attempts; giving up."
906+
f"Failed to cleanup job with K8s id {k8s_job.name} after {retryable_delete_k8s_job_state.attempts} of {retryable_delete_k8s_job_state.max_retries} attempts; giving up."
897907
)
898908
raise exc
899909
else:
900910
# Refresh the job to resolve object & cluster conflicts
901-
k8s_job.reload()
911+
reload_job(k8s_job)
902912
# Try the cleanup again
903-
new_retryable_job = RetryableDeleteJob(
913+
new_retryable_job_state = RetryableDeleteJobState(
914+
job_state=retryable_delete_k8s_job_state,
904915
k8s_job=k8s_job,
905-
retries=retryable_delete_k8s_job.retries - 1,
906-
attempts=retryable_delete_k8s_job.attempts + 1,
916+
max_retries=retryable_delete_k8s_job_state.max_retries,
917+
attempts=retryable_delete_k8s_job_state.attempts + 1,
907918
)
908-
self.work_queue.put((self.__cleanup_k8s_job, new_retryable_job))
919+
self.work_queue.put((self.__cleanup_k8s_job, new_retryable_job_state))
909920

910921
def __cleanup_k8s_ingress(self, ingress, job_failed=False):
911922
k8s_cleanup_job = self.runner_params["k8s_cleanup_job"]
@@ -1023,7 +1034,7 @@ def stop_job(self, job_wrapper):
10231034
log.debug(f"Job {gxy_job.id} ({gxy_job.job_runner_external_id}) has guest ports, cleaning them up")
10241035
self.__cleanup_k8s_guest_ports(job_wrapper, k8s_job)
10251036
# Wrap the k8s job before we put it in the work queue so it can be retried a few times
1026-
self.work_queue.put((self.__cleanup_k8s_job, RetryableDeleteJob(k8s_job=k8s_job)))
1037+
self.work_queue.put((self.__cleanup_k8s_job, RetryableDeleteJobState(job_state=JobState(job_wrapper=job_wrapper,job_destination=job_wrapper.job_destination),k8s_job=k8s_job)))
10271038
else:
10281039
log.debug(f"Could not find job with id {gxy_job.get_job_runner_external_id()} to delete")
10291040
# TODO assert whether job parallelism == 0
@@ -1156,4 +1167,4 @@ def finish_job(self, job_state):
11561167
if self.__has_guest_ports(job_state.job_wrapper):
11571168
self.__cleanup_k8s_guest_ports(job_state.job_wrapper, k8s_job)
11581169
# Wrap the k8s job before we put it in the work queue so it can be retried a few times
1159-
self.work_queue.put((self.__cleanup_k8s_job, RetryableDeleteJob(k8s_job=k8s_job)))
1170+
self.work_queue.put((self.__cleanup_k8s_job, RetryableDeleteJobState(job_state=job_state, k8s_job=k8s_job)))

lib/galaxy/jobs/runners/util/pykube_util.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@
77

88
try:
99
from pykube.config import KubeConfig
10-
from pykube.exceptions import HTTPError
10+
from pykube.exceptions import (
11+
HTTPError,
12+
ObjectDoesNotExist,
13+
)
1114
from pykube.http import HTTPClient
1215
from pykube.objects import (
1316
Ingress,
@@ -105,7 +108,12 @@ def is_pod_unschedulable(pykube_api, pod, namespace=None):
105108
def delete_job(job, cleanup="always"):
106109
job_failed = job.obj["status"]["failed"] > 0 if "failed" in job.obj["status"] else False
107110
# Scale down the job just in case even if cleanup is never
108-
job.scale(replicas=0)
111+
try:
112+
job.scale(replicas=0)
113+
except ObjectDoesNotExist as e:
114+
# Okay, job does no longer exist
115+
log.info(e)
116+
109117
api_delete = cleanup == "always"
110118
if not api_delete and cleanup == "onsuccess" and not job_failed:
111119
api_delete = True
@@ -328,3 +336,12 @@ def galaxy_instance_id(params):
328336
"get_volume_mounts_for_job",
329337
"parse_pvc_param_line",
330338
)
339+
340+
def reload_job(job):
341+
try:
342+
job.reload()
343+
except HTTPError as e:
344+
if e.code == 404:
345+
pass
346+
else:
347+
raise e

lib/galaxy/objectstore/__init__.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1065,10 +1065,10 @@ def _size(self, obj, **kwargs) -> int:
10651065

10661066
def _delete(self, obj, entire_dir: bool = False, **kwargs) -> bool:
10671067
"""Override `ObjectStore`'s stub; delete the file or folder on disk."""
1068-
path = self._get_filename(obj, **kwargs)
1069-
extra_dir = kwargs.get("extra_dir", None)
1070-
obj_dir = kwargs.get("obj_dir", False)
10711068
try:
1069+
path = self._get_filename(obj, **kwargs)
1070+
extra_dir = kwargs.get("extra_dir", None)
1071+
obj_dir = kwargs.get("obj_dir", False)
10721072
if entire_dir and (extra_dir or obj_dir):
10731073
shutil.rmtree(path)
10741074
return True
@@ -1111,7 +1111,7 @@ def _get_filename(self, obj, sync_cache: bool = True, **kwargs) -> str:
11111111
return path
11121112
path = self._construct_path(obj, **kwargs)
11131113
if not os.path.exists(path):
1114-
raise ObjectNotFound
1114+
raise FileNotFoundError
11151115
return path
11161116

11171117
def _update_from_file(

0 commit comments

Comments
 (0)