forked from reanahub/reana-workflow-controller
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathk8s.py
More file actions
402 lines (360 loc) · 15.3 KB
/
k8s.py
File metadata and controls
402 lines (360 loc) · 15.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
# This file is part of REANA.
# Copyright (C) 2019, 2020, 2021, 2022, 2024 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
"""REANA Workflow Controller Kubernetes utils."""
from kubernetes import client
from kubernetes.client.rest import ApiException
from reana_commons.config import (
REANA_WORKFLOW_UMASK,
REANA_RUNTIME_SESSIONS_KUBERNETES_NODE_LABEL,
)
from reana_commons.k8s.api_client import (
current_k8s_appsv1_api_client,
current_k8s_corev1_api_client,
current_k8s_networking_api_client,
)
from reana_commons.k8s.secrets import REANAUserSecretsStore
from reana_commons.k8s.volumes import (
get_k8s_cvmfs_volumes,
get_workspace_volume,
)
from reana_workflow_controller.config import ( # isort:skip
JUPYTER_INTERACTIVE_SESSION_DEFAULT_IMAGE,
JUPYTER_INTERACTIVE_SESSION_DEFAULT_PORT,
REANA_INGRESS_ANNOTATIONS,
REANA_INGRESS_CLASS_NAME,
REANA_INGRESS_HOST,
)
class InteractiveDeploymentK8sBuilder(object):
"""Build Kubernetes deployment objects for interactive sessions."""
internal_service_port = 8081
"""Port exposed by the Service placed in front of the Deployment and
referenced by the Ingress. This port has nothing to do with the
port exposed by deployment itself, the one which can change
from one interactive session application to other."""
def __init__(
self,
deployment_name,
workflow_id,
owner_id,
workspace,
image,
port,
path,
cvmfs_repos=None,
):
"""Initialise basic interactive deployment builder for Kubernetes.
:param deployment_name: Name which identifies all deployment objects
and maps to the workflow it belongs.
:param workflow_id: UUID of the workflow to which the interactive
session belongs to.
:param owner_id: Owner of the interactive session.
:param workspace: Path to the interactive session workspace, which
matches with the workflow workspace the interactive session
belongs to.
:param image: Docker image which the deployment will use as base.
:param port: Port exposed by the Docker image.
:param path: Path where the interactive session will be accessible
from outside the cluster.
"""
self.deployment_name = deployment_name
self.workflow_id = workflow_id
self.owner_id = owner_id
self.workspace = workspace
self.image = image
self.port = port
self.path = path
self.cvmfs_repos = cvmfs_repos or []
metadata = client.V1ObjectMeta(
name=deployment_name,
labels={"reana_workflow_mode": "session"},
)
self._session_container = client.V1Container(
name=self.deployment_name, image=self.image, env=[], volume_mounts=[]
)
self._pod_spec = client.V1PodSpec(
containers=[self._session_container],
volumes=[],
node_selector=REANA_RUNTIME_SESSIONS_KUBERNETES_NODE_LABEL,
# Disable service discovery with env variables, so that the environment is
# not polluted with variables like `REANA_SERVER_SERVICE_HOST`
enable_service_links=False,
automount_service_account_token=False,
)
self.kubernetes_objects = {
"ingress": self._build_ingress(),
"service": self._build_service(metadata),
"deployment": self._build_deployment(metadata),
}
def _build_ingress(self):
"""Build ingress Kubernetes object.
:param metadata: Common Kubernetes metadata for the interactive
deployment.
"""
ingress_service_backend = client.V1IngressServiceBackend(
name=self.deployment_name,
port=client.V1ServiceBackendPort(
number=InteractiveDeploymentK8sBuilder.internal_service_port
),
)
ingress_backend = client.V1IngressBackend(service=ingress_service_backend)
ingress_rule_value = client.V1HTTPIngressRuleValue(
[
client.V1HTTPIngressPath(
path=self.path, backend=ingress_backend, path_type="Prefix"
)
]
)
spec = client.V1IngressSpec(
rules=[
client.V1IngressRule(http=ingress_rule_value, host=REANA_INGRESS_HOST)
]
)
if REANA_INGRESS_CLASS_NAME:
spec.ingress_class_name = REANA_INGRESS_CLASS_NAME
ingress = client.V1Ingress(
api_version="networking.k8s.io/v1",
kind="Ingress",
spec=spec,
metadata=client.V1ObjectMeta(
name=self.deployment_name,
annotations=REANA_INGRESS_ANNOTATIONS,
),
)
return ingress
def _build_service(self, metadata):
"""Build service Kubernetes object.
:param metadata: Common Kubernetes metadata for the interactive
deployment.
"""
spec = client.V1ServiceSpec(
type="ClusterIP",
ports=[
client.V1ServicePort(
port=InteractiveDeploymentK8sBuilder.internal_service_port,
target_port=self.port,
)
],
selector={"app": self.deployment_name},
)
service = client.V1APIService(
api_version="v1",
kind="Service",
spec=spec,
metadata=metadata,
)
return service
def _build_deployment(self, metadata):
"""Build deployment Kubernetes object.
:param metadata: Common Kubernetes metadata for the interactive
deployment.
"""
labels = {
"app": self.deployment_name,
"reana_workflow_mode": "session",
"reana-run-session-workflow-uuid": str(self.workflow_id),
"reana-run-session-owner-uuid": str(self.owner_id),
}
template = client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(labels=labels),
spec=self._pod_spec,
)
spec = client.V1DeploymentSpec(
selector=client.V1LabelSelector(match_labels=labels),
replicas=1,
template=template,
)
deployment = client.V1Deployment(
api_version="apps/v1",
kind="Deployment",
metadata=metadata,
spec=spec,
)
return deployment
def add_command(self, command):
"""Add a command to the deployment."""
self._session_container.command = command
def add_command_arguments(self, args):
"""Add command line arguments in addition to the command."""
self._session_container.args = args
def add_reana_shared_storage(self):
"""Add the REANA shared file system volume mount to the deployment."""
volume_mount, volume = get_workspace_volume(self.workspace)
self._session_container.volume_mounts.append(volume_mount)
self._pod_spec.volumes.append(volume)
def add_cvmfs_repo_mounts(self, cvmfs_repos):
"""Add mounts for the provided CVMFS repositories to the deployment.
:param cvmfs_mounts: List of CVMFS repos to make available.
"""
cvmfs_volume_mounts, cvmfs_volumes = get_k8s_cvmfs_volumes(cvmfs_repos)
self._pod_spec.volumes.extend(cvmfs_volumes)
self._session_container.volume_mounts.extend(cvmfs_volume_mounts)
def add_environment_variable(self, name, value):
"""Add an environment variable.
:param name: Environment variable name.
:param value: Environment variable value.
"""
env_var = client.V1EnvVar(name, str(value))
self._session_container.env.append(env_var)
def add_run_with_root_permissions(self):
"""Run interactive session with root."""
security_context = client.V1SecurityContext(run_as_user=0)
self._session_container.security_context = security_context
def add_user_secrets(self):
"""Mount the "file" secrets and set the "env" secrets in the container."""
secrets_store = REANAUserSecretsStore(self.owner_id)
# mount file secrets
secrets_volume = secrets_store.get_file_secrets_volume_as_k8s_specs()
secrets_volume_mount = secrets_store.get_secrets_volume_mount_as_k8s_spec()
self._pod_spec.volumes.append(secrets_volume)
self._session_container.volume_mounts.append(secrets_volume_mount)
# set environment secrets
self._session_container.env += secrets_store.get_env_secrets_as_k8s_spec()
def get_deployment_objects(self):
"""Return the alrady built Kubernetes objects."""
return self.kubernetes_objects
def build_interactive_jupyter_deployment_k8s_objects(
deployment_name,
workspace,
access_path,
access_token=None,
cvmfs_repos=None,
owner_id=None,
workflow_id=None,
image=None,
expose_secrets=True,
):
"""Build the Kubernetes specification for a Jupyter NB interactive session.
:param deployment_name: Name used to tag all Kubernetes objects spawned
for the given interactive session.
:param workspace: Path to the interactive session workspace, which
matches with the workflow workspace the interactive session
belongs to.
:param access_path: URL path where the interactive session will be
accessible. Note that this path should be set as base path of
the interactive session service whenever redirections are needed,
i.e. if we expose ``/1234`` and then application does a redirect to
/me Traefik won't send the request to the interactive session
(``/1234/me``) but to the root path (``/me``) giving most probably
a ``404``.
:param cvmfs_mounts: List of CVMFS repos to make available.
:param owner_id: Owner of the interactive session.
:param workflow_id: UUID of the workflow to which the interactive
session belongs to.
:param image: Jupyter Notebook image to use, i.e.
``jupyter/tensorflow-notebook`` to enable ``tensorflow``.
:param expose_secrets: If true, mount the "file" secrets and set the
"env" secrets in jupyter's pod.
"""
image = image or JUPYTER_INTERACTIVE_SESSION_DEFAULT_IMAGE
cvmfs_repos = cvmfs_repos or []
port = JUPYTER_INTERACTIVE_SESSION_DEFAULT_PORT
deployment_builder = InteractiveDeploymentK8sBuilder(
deployment_name, workflow_id, owner_id, workspace, image, port, access_path
)
command_args = [
"start-notebook.sh",
"--NotebookApp.base_url='{base_url}'".format(base_url=access_path),
"--notebook-dir='{workflow_workspace}'".format(workflow_workspace=workspace),
f'--NotebookApp.terminado_settings={{"shell_command": ["/usr/bin/bash", "-c", "cd \'{workspace}\' && bash"]}}',
]
if access_token:
command_args.append(
"--NotebookApp.token='{access_token}'".format(access_token=access_token)
)
deployment_builder.add_command_arguments(command_args)
deployment_builder.add_reana_shared_storage()
if cvmfs_repos:
deployment_builder.add_cvmfs_repo_mounts(cvmfs_repos)
if expose_secrets:
deployment_builder.add_user_secrets()
deployment_builder.add_environment_variable("NB_GID", 0)
# Changes umask so all files generated by the Jupyter Notebook can be
# modified by the root group users.
deployment_builder.add_environment_variable("NB_UMASK", REANA_WORKFLOW_UMASK)
deployment_builder.add_environment_variable("REANA_WORKSPACE", workspace)
deployment_builder.add_run_with_root_permissions()
return deployment_builder.get_deployment_objects()
build_interactive_k8s_objects = {
"jupyter": build_interactive_jupyter_deployment_k8s_objects
}
"""Build interactive k8s deployment objects."""
def instantiate_chained_k8s_objects(kubernetes_objects, namespace):
"""Instantiate chained Kubernetes objects.
:param kubernetes_objects: Dictionary composed by the object kind as
key and the object itself as value.
:param namespace: Kubernetes namespace where the objects will be deployed.
"""
instantiate_k8s_object = {
"deployment": current_k8s_appsv1_api_client.create_namespaced_deployment,
"service": current_k8s_corev1_api_client.create_namespaced_service,
"ingress": current_k8s_networking_api_client.create_namespaced_ingress,
}
try:
parent_k8s_object_references = None
for index, obj in enumerate(kubernetes_objects.items()):
kind = obj[0]
k8s_object = obj[1]
if index == 0:
result = instantiate_k8s_object[kind](namespace, k8s_object)
parent_k8s_object_references = [
{
"uid": result._metadata.uid,
"kind": result._kind,
"name": result._metadata.name,
"apiVersion": result._api_version,
}
]
else:
k8s_object.metadata.owner_references = parent_k8s_object_references
result = instantiate_k8s_object[kind](namespace, k8s_object)
except KeyError:
raise Exception("Unsupported Kubernetes object kind {}.".format(kind))
except ApiException as e:
raise ApiException(
"Exception when calling ExtensionsV1beta1Api->"
f"create_namespaced_deployment_rollback: {e}\n"
)
def delete_k8s_objects_if_exist(kubernetes_objects, namespace):
"""Delete Kubernetes objects if they exist.
:param kubernetes_objects: Dictionary composed by the object kind as
key and the object itself as value.
:param namespace: Kubernetes namespace where the objects will be deleted
from.
"""
delete_k8s_object = {
"deployment": current_k8s_appsv1_api_client.delete_namespaced_deployment,
"service": current_k8s_corev1_api_client.delete_namespaced_service,
"ingress": current_k8s_networking_api_client.delete_namespaced_ingress,
}
try:
for obj in kubernetes_objects.items():
try:
kind = obj[0]
k8s_object = obj[1]
delete_k8s_object[kind](k8s_object.metadata.name, namespace)
except ApiException as k8s_api_exception:
if k8s_api_exception.reason == "Not Found":
continue
else:
raise
except KeyError:
raise Exception("Unsupported Kubernetes object kind {}.".format(kind))
def delete_k8s_ingress_object(ingress_name, namespace):
"""Delete Kubernetes ingress object.
:param ingress_name: name of ingress object to delete.
:param namespace: k8s namespace of ingress object.
"""
try:
current_k8s_networking_api_client.delete_namespaced_ingress(
name=ingress_name, namespace=namespace, body=client.V1DeleteOptions()
)
except ApiException as k8s_api_exception:
if k8s_api_exception.reason == "Not Found":
raise Exception("K8s object was not found {}.".format(ingress_name))
raise Exception(
"Exception when calling ExtensionsV1beta1->"
"Api->delete_namespaced_ingress: {}\n".format(k8s_api_exception)
)