|
20 | 20 | try: |
21 | 21 | from pykube.config import KubeConfig |
22 | 22 | from pykube.http import HTTPClient |
23 | | - from pykube.exceptions import KubernetesError |
24 | 23 | from pykube.objects import ( |
25 | 24 | Job, |
26 | 25 | Pod |
@@ -60,7 +59,7 @@ def __init__(self, app, nworkers, **kwargs): |
60 | 59 | k8s_default_requests_memory=dict(map=str, default=None), |
61 | 60 | k8s_default_limits_cpu=dict(map=str, default=None), |
62 | 61 | k8s_default_limits_memory=dict(map=str, default=None), |
63 | | - k8s_cleanup_job=dict(map=str, default="always"), |
| 62 | + k8s_cleanup_job=dict(map=str, valid=lambda s: s in set(["onsuccess", "always", "never"]), default="always"), |
64 | 63 | k8s_pod_retries=dict(map=int, valid=lambda x: int >= 0, default=3), |
65 | 64 | k8s_pod_retrials=dict(map=int, valid=lambda x: int >= 0, default=3)) |
66 | 65 |
|
@@ -475,12 +474,17 @@ def __cleanup_k8s_job(self, job): |
475 | 474 | k8s_cleanup_job = self.runner_params['k8s_cleanup_job'] |
476 | 475 | job_failed = (job.obj['status']['failed'] > 0 |
477 | 476 | if 'failed' in job.obj['status'] else False) |
478 | | - if k8s_cleanup_job == "never": |
479 | | - job.scale(replicas=0) |
480 | | - elif k8s_cleanup_job == "onsuccess" and job_failed: |
481 | | - job.scale(replicas=0) |
482 | | - else: |
483 | | - job.delete() |
| 477 | + # Scale down the job just in case even if cleanup is never |
| 478 | + job.scale(replicas=0) |
| 479 | + if (k8s_cleanup_job == "always" or |
| 480 | + (k8s_cleanup_job == "onsuccess" and not job_failed)): |
| 481 | + delete_options = { |
| 482 | + "apiVersion": "v1", |
| 483 | + "kind": "DeleteOptions", |
| 484 | + "propagationPolicy": "Background" |
| 485 | + } |
| 486 | + r = job.api.delete(json=delete_options, **job.api_kwargs()) |
| 487 | + job.api.raise_for_status(r) |
484 | 488 |
|
485 | 489 | def __job_failed_due_to_low_memory(self, job_state): |
486 | 490 | """ |
@@ -546,7 +550,4 @@ def finish_job(self, job_state): |
546 | 550 | # If more than one job matches selector, leave all jobs intact as it's a configuration error |
547 | 551 | if len(jobs.response['items']) == 1: |
548 | 552 | job = Job(self._pykube_api, jobs.response['items'][0]) |
549 | | - try: |
550 | | - self.__cleanup_k8s_job(job) |
551 | | - except KubernetesError: |
552 | | - log.exception("Error while cleaning up k8s job") |
| 553 | + self.__cleanup_k8s_job(job) |
0 commit comments