From ce3085a0c35b38bb2b76a91fe4dedff78cc7ed7e Mon Sep 17 00:00:00 2001 From: etlstrauss Date: Tue, 3 Mar 2026 12:46:56 +0100 Subject: [PATCH 01/17] feat(k8s): create shared directory sidecar - main container (#673) --- Dockerfile | 2 +- reana_workflow_controller/k8s.py | 50 +++++++++++++++++++++++++++++--- 2 files changed, 47 insertions(+), 5 deletions(-) diff --git a/Dockerfile b/Dockerfile index c0e3b9d0..1c1c1852 100644 --- a/Dockerfile +++ b/Dockerfile @@ -32,7 +32,7 @@ RUN apt-get update -y && \ python3.12 \ python3.12-dev \ vim-tiny && \ - pip install --no-cache-dir --upgrade setuptools && \ + pip install --no-cache-dir --upgrade "setuptools<81.0.0" && \ pip install --no-cache-dir -r /code/requirements.txt && \ apt-get remove -y \ gcc \ diff --git a/reana_workflow_controller/k8s.py b/reana_workflow_controller/k8s.py index 1c755b85..ef184ae3 100644 --- a/reana_workflow_controller/k8s.py +++ b/reana_workflow_controller/k8s.py @@ -51,6 +51,7 @@ def __init__( port, path, cvmfs_repos=None, + image_pull_secret="rancher-gitlab", ): """Initialise basic interactive deployment builder for Kubernetes. @@ -74,7 +75,8 @@ def __init__( self.image = image self.port = port self.path = path - self.cvmfs_repos = cvmfs_repos or [] + self.cvmfs_repos = cvmfs_repos or [], + self.image_pull_secret = image_pull_secret metadata = client.V1ObjectMeta( name=deployment_name, labels={"reana_workflow_mode": "session"}, @@ -82,9 +84,15 @@ def __init__( self._session_container = client.V1Container( name=self.deployment_name, image=self.image, env=[], volume_mounts=[] ) + self._s3_container = client.V1Container( + name=(self.deployment_name + "s3-sidecar"), image=self.image, env=[], volume_mounts=[] + ) self._pod_spec = client.V1PodSpec( - containers=[self._session_container], - volumes=[], + containers=[self._session_container, self._s3_container], + volumes=[client.V1Volume( + name="s3-mounts", + empty_dir=client.V1EmptyDirVolumeSource() + )], node_selector=REANA_RUNTIME_SESSIONS_KUBERNETES_NODE_LABEL, # Disable service discovery with env variables, so that the environment is # not polluted with variables like `REANA_SERVER_SERVICE_HOST` @@ -161,6 +169,11 @@ def _build_service(self, metadata): return service def _build_deployment(self, metadata): + if self.image_pull_secret: + self._pod_spec.image_pull_secrets = [ + client.V1LocalObjectReference(name=self.image_pull_secret) + ] + """Build deployment Kubernetes object. :param metadata: Common Kubernetes metadata for the interactive @@ -204,6 +217,34 @@ def add_reana_shared_storage(self): self._session_container.volume_mounts.append(volume_mount) self._pod_spec.volumes.append(volume) + def add_s3_storage(self): + """Add the volume for s3 mounts from sidecar""" + volume_mount, volume = get_workspace_volume(self.workspace) + self._session_container.volume_mounts.append(volume_mount) + self._pod_spec.volumes.append(volume) + + def setup_s3_sidecar(self): + """Add the sidecar for s3 mounts""" + # Define the volume mount for /dev/fuse + fuse_volume_mount = client.V1VolumeMount( + name="fuse-device", + mount_path="/dev/fuse" + ) + + # Define the volume for /dev/fuse + fuse_volume = client.V1HostPathVolumeSource( + path="/dev/fuse" + ) + + # Append the volume mount and volume to the session container and pod spec + self._s3_container.volume_mounts.append(fuse_volume_mount) + self._pod_spec.volumes.append(client.V1Volume(name="fuse-device", host_path=fuse_volume)) + + security_context = client.V1SecurityContext( + run_as_user=0, allow_privilege_escalation=True, capabilities=client.V1Capabilities(add=["SYS_ADMIN"]), privileged=True + ) + self._s3_container.security_context = security_context + def add_cvmfs_repo_mounts(self, cvmfs_repos): """Add mounts for the provided CVMFS repositories to the deployment. @@ -225,7 +266,7 @@ def add_environment_variable(self, name, value): def add_run_with_root_permissions(self): """Run interactive session with root.""" security_context = client.V1SecurityContext( - run_as_user=0, allow_privilege_escalation=False + run_as_user=0, privileged=True ) self._session_container.security_context = security_context @@ -298,6 +339,7 @@ def build_interactive_jupyter_deployment_k8s_objects( ) deployment_builder.add_command_arguments(command_args) deployment_builder.add_reana_shared_storage() + deployment_builder.setup_s3_sidecar() if cvmfs_repos: deployment_builder.add_cvmfs_repo_mounts(cvmfs_repos) if expose_secrets: From ba212da6f29547f5769e6657d06b82dd6cbb7614 Mon Sep 17 00:00:00 2001 From: etlstrauss Date: Wed, 18 Mar 2026 12:31:47 +0100 Subject: [PATCH 02/17] feat(k8s): prepare variables for coustmization of job-controller (#673) --- reana_workflow_controller/config.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/reana_workflow_controller/config.py b/reana_workflow_controller/config.py index 7f8c4222..e8f73c79 100644 --- a/reana_workflow_controller/config.py +++ b/reana_workflow_controller/config.py @@ -321,6 +321,10 @@ def _parse_interactive_sessions_environments(env_var): ) """Default image for REANA Job Controller sidecar.""" +REANA_JOB_CONTROLLER_SECRET = os.getenv( + "REANA_JOB_CONTROLLER_SECRET" +) +"""DOptional secret for REANA Job Controller sidecar.""" JOB_CONTROLLER_ENV_VARS = _env_vars_dict_to_k8s_list( json.loads(os.getenv("REANA_JOB_CONTROLLER_ENV_VARS", "{}")) @@ -465,3 +469,19 @@ def _parse_interactive_sessions_environments(env_var): MAX_WORKFLOW_SHARING_MESSAGE_LENGTH = 5000 """Maximum length of the user-provided message when sharing a workflow.""" + + +REANA_RUNTIME_JOBS_KUBERNETES_TOLERATIONS = os.getenv( + "REANA_RUNTIME_JOBS_KUBERNETES_TOLERATIONS" +) +"""Tolerations for jobs""" + +REANA_DATASTORE_IMAGE = os.getenv( + "REANA_DATASTORE_IMAGE" +) +"""Image for datastore (s3) sidecar for interactive sessions""" + +REANA_DATASTORE_SECRET = os.getenv( + "REANA_DATASTORE_SECRET" +) +"""Optional secret for datastore (s3) sidecar for interactive sessions""" \ No newline at end of file From 095eb05b32b8cfdd0af1711f45dd9386f0c2834c Mon Sep 17 00:00:00 2001 From: etlstrauss Date: Wed, 18 Mar 2026 12:32:40 +0100 Subject: [PATCH 03/17] feat(k8s): configure datastore sidecar for interactive session (#673) --- reana_workflow_controller/k8s.py | 76 +++++++++++++++++++++++++------- 1 file changed, 61 insertions(+), 15 deletions(-) diff --git a/reana_workflow_controller/k8s.py b/reana_workflow_controller/k8s.py index ef184ae3..4a0f8300 100644 --- a/reana_workflow_controller/k8s.py +++ b/reana_workflow_controller/k8s.py @@ -6,6 +6,8 @@ """REANA Workflow Controller Kubernetes utils.""" +import json, os + from kubernetes import client from kubernetes.client.rest import ApiException from reana_commons.config import ( @@ -29,6 +31,8 @@ REANA_INGRESS_ANNOTATIONS, REANA_INGRESS_CLASS_NAME, REANA_INGRESS_HOST, + REANA_DATASTORE_SECRET, + REANA_DATASTORE_IMAGE, ) @@ -51,7 +55,6 @@ def __init__( port, path, cvmfs_repos=None, - image_pull_secret="rancher-gitlab", ): """Initialise basic interactive deployment builder for Kubernetes. @@ -76,23 +79,20 @@ def __init__( self.port = port self.path = path self.cvmfs_repos = cvmfs_repos or [], - self.image_pull_secret = image_pull_secret + self.image_pull_secrets = [] metadata = client.V1ObjectMeta( name=deployment_name, labels={"reana_workflow_mode": "session"}, ) self._session_container = client.V1Container( - name=self.deployment_name, image=self.image, env=[], volume_mounts=[] + name=self.deployment_name, image=self.image, env=[], volume_mounts=[], ports=[client.V1ContainerPort(container_port=self.port)] ) self._s3_container = client.V1Container( - name=(self.deployment_name + "s3-sidecar"), image=self.image, env=[], volume_mounts=[] + name="datastore", image=REANA_DATASTORE_IMAGE, env=[], volume_mounts=[], ports=[], image_pull_policy="Always" ) self._pod_spec = client.V1PodSpec( containers=[self._session_container, self._s3_container], - volumes=[client.V1Volume( - name="s3-mounts", - empty_dir=client.V1EmptyDirVolumeSource() - )], + volumes=[], node_selector=REANA_RUNTIME_SESSIONS_KUBERNETES_NODE_LABEL, # Disable service discovery with env variables, so that the environment is # not polluted with variables like `REANA_SERVER_SERVICE_HOST` @@ -169,9 +169,9 @@ def _build_service(self, metadata): return service def _build_deployment(self, metadata): - if self.image_pull_secret: + if self.image_pull_secrets: self._pod_spec.image_pull_secrets = [ - client.V1LocalObjectReference(name=self.image_pull_secret) + client.V1LocalObjectReference(name=self.image_pull_secrets) ] """Build deployment Kubernetes object. @@ -217,10 +217,37 @@ def add_reana_shared_storage(self): self._session_container.volume_mounts.append(volume_mount) self._pod_spec.volumes.append(volume) - def add_s3_storage(self): - """Add the volume for s3 mounts from sidecar""" - volume_mount, volume = get_workspace_volume(self.workspace) + def add_image_pull_secrets(self): + """Attach the configured image pull secrets to scheduler and worker containers.""" + for secret_name in REANA_DATASTORE_SECRET: + if secret_name: + self.image_pull_secrets.append({"name": secret_name}) + + def setup_s3_storage(self): + """Configure shared empty_dir volume for S3 sidecar and session container.""" + volume_name = "s3-mounts" + + volume = client.V1Volume( + name=volume_name, + empty_dir=client.V1EmptyDirVolumeSource() + ) + + volume_mount = client.V1VolumeMount( + name=volume_name, + mount_path="/data/s3/", + mount_propagation="HostToContainer" + ) + self._session_container.volume_mounts.append(volume_mount) + + volume_mount = client.V1VolumeMount( + name=volume_name, + mount_path="/s3-data", + mount_propagation="Bidirectional" + ) + + self._s3_container.volume_mounts.append(volume_mount) + self._pod_spec.volumes.append(volume) def setup_s3_sidecar(self): @@ -280,8 +307,25 @@ def add_user_secrets(self): self._pod_spec.volumes.append(secrets_volume) self._session_container.volume_mounts.append(secrets_volume_mount) - # set environment secrets - self._session_container.env += user_secrets.get_env_secrets_as_k8s_spec() + # build env arrays for different containers + # sorting s3 variables to only be mounted in sidecar + all_env = user_secrets.get_env_secrets_as_k8s_spec() + s3_env = [] + session_env = [] + + # Single for loop to split secrets + for secret in all_env: + secret_name = secret.get("name", "") + if secret_name.startswith("S3_TO_LOCAL_"): + s3_env.append(secret) + else: + session_env.append(secret) + + # set environment secrets without s3 secrets + self._session_container.env = session_env + + # mount s3 secretes + self._s3_container.env = s3_env def get_deployment_objects(self): """Return the alrady built Kubernetes objects.""" @@ -339,7 +383,9 @@ def build_interactive_jupyter_deployment_k8s_objects( ) deployment_builder.add_command_arguments(command_args) deployment_builder.add_reana_shared_storage() + deployment_builder.add_image_pull_secrets() deployment_builder.setup_s3_sidecar() + deployment_builder.setup_s3_storage() if cvmfs_repos: deployment_builder.add_cvmfs_repo_mounts(cvmfs_repos) if expose_secrets: From 6eefab542a4c329dbcea8895c60069e14b15f404 Mon Sep 17 00:00:00 2001 From: etlstrauss Date: Wed, 18 Mar 2026 12:34:39 +0100 Subject: [PATCH 04/17] feat(k8s): adding parsing of tolerations for taints (#673) --- .../workflow_run_manager.py | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/reana_workflow_controller/workflow_run_manager.py b/reana_workflow_controller/workflow_run_manager.py index c35b0814..a7069d2b 100644 --- a/reana_workflow_controller/workflow_run_manager.py +++ b/reana_workflow_controller/workflow_run_manager.py @@ -100,6 +100,9 @@ REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_THREADS, KUEUE_ENABLED, KUEUE_LOCAL_QUEUE_NAME, + REANA_RUNTIME_JOBS_KUBERNETES_TOLERATIONS, + IMAGE_PULL_SECRETS, + REANA_JOB_CONTROLLER_SECRET, ) @@ -826,6 +829,13 @@ def _create_job_spec( "value": os.getenv("REANA_RUNTIME_JOBS_KUBERNETES_NODE_LABEL"), }, ) + if REANA_RUNTIME_JOBS_KUBERNETES_TOLERATIONS: + job_controller_container.env.append( + { + "name": "REANA_RUNTIME_JOBS_KUBERNETES_TOLERATIONS", + "value": os.getenv("REANA_RUNTIME_JOBS_KUBERNETES_TOLERATIONS"), + }, + ) if requires_dask(self.workflow): job_controller_container.env.append( { @@ -850,6 +860,7 @@ def _create_job_spec( node_selector=REANA_RUNTIME_BATCH_KUBERNETES_NODE_LABEL, init_containers=[], termination_grace_period_seconds=REANA_RUNTIME_BATCH_TERMINATION_GRACE_PERIOD, + image_pull_secrets=[], ) spec.template.spec.service_account_name = ( REANA_RUNTIME_KUBERNETES_SERVICEACCOUNT_NAME @@ -863,6 +874,16 @@ def _create_job_spec( volumes += kerberos.volumes spec.template.spec.init_containers.append(kerberos.init_container) + image_pull_secrets = [] + for secret_name in REANA_JOB_CONTROLLER_SECRET: + if secret_name: + self.image_pull_secrets.append({"name": secret_name}) + + if image_pull_secrets: + spec.template.spec.image_pull_secrets = [ + client.V1LocalObjectReference(name=self.image_pull_secrets) + ] + # filter out volumes with the same name spec.template.spec.volumes = list({v["name"]: v for v in volumes}.values()) From 6c9784fd60328f5a3deaecdb3453a8904c8a5ad1 Mon Sep 17 00:00:00 2001 From: etlstrauss Date: Wed, 18 Mar 2026 12:36:15 +0100 Subject: [PATCH 05/17] feat(k8s): adding datastore sidecar for interactive session (#673) --- sidecars/datastore/Dockerfile | 20 +++++++ sidecars/datastore/app.py | 89 ++++++++++++++++++++++++++++++++ sidecars/datastore/entrypoint.sh | 8 +++ 3 files changed, 117 insertions(+) create mode 100644 sidecars/datastore/Dockerfile create mode 100644 sidecars/datastore/app.py create mode 100644 sidecars/datastore/entrypoint.sh diff --git a/sidecars/datastore/Dockerfile b/sidecars/datastore/Dockerfile new file mode 100644 index 00000000..a4935c29 --- /dev/null +++ b/sidecars/datastore/Dockerfile @@ -0,0 +1,20 @@ +FROM alpine:3.20 + +# Install dependencies +RUN apk update && apk add --no-cache python3 py3-pip s3fs-fuse + +# Create the base directory for S3 data +RUN mkdir -p /s3-data + +# Copy the scripts +COPY entrypoint.sh /entrypoint.sh +COPY app.py /app/app.py + +# Make entrypoint executable +RUN chmod +x /entrypoint.sh + +# Set working directory +WORKDIR /app + +# Set the entrypoint +ENTRYPOINT ["/entrypoint.sh"] diff --git a/sidecars/datastore/app.py b/sidecars/datastore/app.py new file mode 100644 index 00000000..331511dd --- /dev/null +++ b/sidecars/datastore/app.py @@ -0,0 +1,89 @@ +import os +import re +import sys + + +def createFolders(aliases, base_dir): + try: + for i in range(0, len(aliases)): + target_path = os.path.join(base_dir, aliases[i][0]) + target_path = os.path.join(target_path, aliases[i][1]) + if not os.path.exists(target_path): + os.makedirs(target_path) + print(f"Created folder: {target_path}") + else: + print(f"Folder exists: {target_path}") + return True + except Exception as e: + print(f"A error accrued during the creation of the s3-buckets: {e}") + return False + +def getCredentials(aliases): + aliases_credentials = [] + try: + for alias in aliases: + temp_list =[alias] + temp_list.append(os.getenv(f"S3_TO_LOCAL_{alias}_BUCKET")) + temp_list.append(os.getenv(f"S3_TO_LOCAL_{alias}_HOST")) + temp_list.append(os.getenv(f"S3_TO_LOCAL_{alias}_REGION")) + temp_list.append(os.getenv(f"S3_TO_LOCAL_{alias}_ACCESS_KEY")) + temp_list.append(os.getenv(f"S3_TO_LOCAL_{alias}_SECRET_KEY")) + aliases_credentials.append(temp_list) + return aliases_credentials + except Exception as e: + print(f"A error accrued during load of S3 credentials: {e}") + return False + +def createS3Mounts(aliases, base_dir): + for i in (0, len(aliases)-2): + try: + with open(".passwd-s3fs", mode="w", encoding="utf-8") as f: + f.write(f"{aliases[i][4]}:{aliases[i][5]}") + os.system("chmod 600 .passwd-s3fs") + target_path = os.path.join(base_dir, aliases[i][0]) + target_path = os.path.join(target_path, aliases[i][1]) + cmd=f"s3fs {aliases[i][1]} {target_path} -o passwd_file=.passwd-s3fs -o url={aliases[i][2]} -o endpoint={aliases[i][3]} -o use_path_request_style -o allow_other" + rc = os.system(cmd) + if rc != 0: + print( + f"s3fuse returned a non‑zero status ({rc >> 8}) for alias '{aliases[i][0]}'", + file=sys.stderr, + ) + else: + print(f"Successfully mounted '{aliases[i][0]}'") + os.system("rm .passwd-s3fs") + except Exception as e: + print(f"A error accrued during the mounting of alias {aliases[i][0]}: {e}") + return False + +def main(): + base_dir = "/s3-data" + + # Ensure the base directory exists + if not os.path.exists(base_dir): + os.makedirs(base_dir) + print(f"Base directory created: {base_dir}") + + # Regex pattern: S3_TO_LOCAL__ALIAS + # The (.*) captures the part + pattern = re.compile(r'^S3_TO_LOCAL_(.*)_ALIAS$') + + print("Scanning environment variables for S3 aliases...") + + aliases = [] + for key, value in os.environ.items(): + match = pattern.match(key) + if match: + aliases.append(value) + + aliases = getCredentials(aliases) + createFolders(aliases, base_dir) + createS3Mounts(aliases, base_dir) + + if len(aliases) == 0: + print("No environment variables matching 'S3_TO_LOCAL_*_ALIAS' found.") + else: + print(f"Processed {len(aliases)} S3 alias(es).") + +if __name__ == "__main__": + main() diff --git a/sidecars/datastore/entrypoint.sh b/sidecars/datastore/entrypoint.sh new file mode 100644 index 00000000..bc846cb0 --- /dev/null +++ b/sidecars/datastore/entrypoint.sh @@ -0,0 +1,8 @@ +#!/bin/sh +set -e + +echo "Starting Python S3 Data Manager..." +python3 /app/app.py +echo "Initialization complete." + +tail -f /dev/null From c1d652fd3bf713eaed37b9443d1318590354103d Mon Sep 17 00:00:00 2001 From: etlstrauss Date: Mon, 23 Mar 2026 11:47:08 +0100 Subject: [PATCH 06/17] feat(k8s): fix imagePullSecret definition for datastore container (#673) --- reana_workflow_controller/k8s.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/reana_workflow_controller/k8s.py b/reana_workflow_controller/k8s.py index 4a0f8300..986be4d3 100644 --- a/reana_workflow_controller/k8s.py +++ b/reana_workflow_controller/k8s.py @@ -169,11 +169,6 @@ def _build_service(self, metadata): return service def _build_deployment(self, metadata): - if self.image_pull_secrets: - self._pod_spec.image_pull_secrets = [ - client.V1LocalObjectReference(name=self.image_pull_secrets) - ] - """Build deployment Kubernetes object. :param metadata: Common Kubernetes metadata for the interactive @@ -219,9 +214,10 @@ def add_reana_shared_storage(self): def add_image_pull_secrets(self): """Attach the configured image pull secrets to scheduler and worker containers.""" - for secret_name in REANA_DATASTORE_SECRET: - if secret_name: - self.image_pull_secrets.append({"name": secret_name}) + if REANA_DATASTORE_SECRET: + self._pod_spec.image_pull_secrets = [ + client.V1LocalObjectReference(name=REANA_DATASTORE_SECRET) + ] def setup_s3_storage(self): """Configure shared empty_dir volume for S3 sidecar and session container.""" From 9617a02f2a2b7cdec0b14ae829bf5955d9ba0fd6 Mon Sep 17 00:00:00 2001 From: etlstrauss Date: Mon, 23 Mar 2026 13:12:57 +0100 Subject: [PATCH 07/17] feat(k8s): adding lifecycle for datastore sidecar (#673) --- reana_workflow_controller/k8s.py | 13 ++++++++++++- sidecars/datastore/app.py | 2 ++ sidecars/datastore/entrypoint.sh | 2 +- 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/reana_workflow_controller/k8s.py b/reana_workflow_controller/k8s.py index 986be4d3..d639ea9f 100644 --- a/reana_workflow_controller/k8s.py +++ b/reana_workflow_controller/k8s.py @@ -88,7 +88,7 @@ def __init__( name=self.deployment_name, image=self.image, env=[], volume_mounts=[], ports=[client.V1ContainerPort(container_port=self.port)] ) self._s3_container = client.V1Container( - name="datastore", image=REANA_DATASTORE_IMAGE, env=[], volume_mounts=[], ports=[], image_pull_policy="Always" + name="datastore", image=REANA_DATASTORE_IMAGE, env=[], volume_mounts=[], ports=[], image_pull_policy="Always", lifecycle=[] ) self._pod_spec = client.V1PodSpec( containers=[self._session_container, self._s3_container], @@ -268,6 +268,17 @@ def setup_s3_sidecar(self): ) self._s3_container.security_context = security_context + # adding umount for correct termination + lifecycle_dict = { + "preStop": { + "exec": { + "command": ["/bin/sh", "-c", "xargs umount -l < /etc/active_mounts.txt || true"] + } + } + } + + self._s3_container.lifecycle = lifecycle_dict + def add_cvmfs_repo_mounts(self, cvmfs_repos): """Add mounts for the provided CVMFS repositories to the deployment. diff --git a/sidecars/datastore/app.py b/sidecars/datastore/app.py index 331511dd..a2384671 100644 --- a/sidecars/datastore/app.py +++ b/sidecars/datastore/app.py @@ -51,6 +51,8 @@ def createS3Mounts(aliases, base_dir): ) else: print(f"Successfully mounted '{aliases[i][0]}'") + with open("/etc/active_mounts.txt", "a") as f: + f.write(f"{target_path}\n") os.system("rm .passwd-s3fs") except Exception as e: print(f"A error accrued during the mounting of alias {aliases[i][0]}: {e}") diff --git a/sidecars/datastore/entrypoint.sh b/sidecars/datastore/entrypoint.sh index bc846cb0..84289b6d 100644 --- a/sidecars/datastore/entrypoint.sh +++ b/sidecars/datastore/entrypoint.sh @@ -5,4 +5,4 @@ echo "Starting Python S3 Data Manager..." python3 /app/app.py echo "Initialization complete." -tail -f /dev/null +exec tail -f /dev/null From 805b3dabf7ba9d7ced1ad755ab3f8296e5470812 Mon Sep 17 00:00:00 2001 From: etlstrauss Date: Mon, 23 Mar 2026 14:04:29 +0100 Subject: [PATCH 08/17] feat(k8s): enabling of datastore interactive session side car (#673) --- reana_workflow_controller/config.py | 28 ++++++++++++++++++---------- reana_workflow_controller/k8s.py | 23 +++++++++++++++-------- 2 files changed, 33 insertions(+), 18 deletions(-) diff --git a/reana_workflow_controller/config.py b/reana_workflow_controller/config.py index e8f73c79..7dccbf8a 100644 --- a/reana_workflow_controller/config.py +++ b/reana_workflow_controller/config.py @@ -475,13 +475,21 @@ def _parse_interactive_sessions_environments(env_var): "REANA_RUNTIME_JOBS_KUBERNETES_TOLERATIONS" ) """Tolerations for jobs""" - -REANA_DATASTORE_IMAGE = os.getenv( - "REANA_DATASTORE_IMAGE" -) -"""Image for datastore (s3) sidecar for interactive sessions""" - -REANA_DATASTORE_SECRET = os.getenv( - "REANA_DATASTORE_SECRET" -) -"""Optional secret for datastore (s3) sidecar for interactive sessions""" \ No newline at end of file +REANA_DATASTORE_ENABLED = os.getenv( + "REANA_DATASTORE_ENABLED" +) == "true" +"""Set datastore (s3) sidecar for interactive sessions enabled or disabled""" + +if(REANA_DATASTORE_ENABLED): + REANA_DATASTORE_IMAGE = os.getenv( + "REANA_DATASTORE_IMAGE" + ) + """Optional Image for datastore (s3) sidecar for interactive sessions""" + + REANA_DATASTORE_SECRET = os.getenv( + "REANA_DATASTORE_SECRET" + ) + """Optional secret for datastore (s3) sidecar for interactive sessions""" +else: + REANA_DATASTORE_IMAGE = "" + REANA_DATASTORE_SECRET = "" \ No newline at end of file diff --git a/reana_workflow_controller/k8s.py b/reana_workflow_controller/k8s.py index d639ea9f..59a2fe5e 100644 --- a/reana_workflow_controller/k8s.py +++ b/reana_workflow_controller/k8s.py @@ -33,6 +33,7 @@ REANA_INGRESS_HOST, REANA_DATASTORE_SECRET, REANA_DATASTORE_IMAGE, + REANA_DATASTORE_ENABLED, ) @@ -87,11 +88,15 @@ def __init__( self._session_container = client.V1Container( name=self.deployment_name, image=self.image, env=[], volume_mounts=[], ports=[client.V1ContainerPort(container_port=self.port)] ) - self._s3_container = client.V1Container( - name="datastore", image=REANA_DATASTORE_IMAGE, env=[], volume_mounts=[], ports=[], image_pull_policy="Always", lifecycle=[] - ) + containers = [self._session_container] + if(REANA_DATASTORE_ENABLED): + self._s3_container = client.V1Container( + name="datastore", image=REANA_DATASTORE_IMAGE, env=[], volume_mounts=[], ports=[], image_pull_policy="Always", lifecycle=[] + ) + containers.append(self._s3_container) + self._pod_spec = client.V1PodSpec( - containers=[self._session_container, self._s3_container], + containers=containers, volumes=[], node_selector=REANA_RUNTIME_SESSIONS_KUBERNETES_NODE_LABEL, # Disable service discovery with env variables, so that the environment is @@ -319,7 +324,7 @@ def add_user_secrets(self): all_env = user_secrets.get_env_secrets_as_k8s_spec() s3_env = [] session_env = [] - + # Single for loop to split secrets for secret in all_env: secret_name = secret.get("name", "") @@ -332,7 +337,8 @@ def add_user_secrets(self): self._session_container.env = session_env # mount s3 secretes - self._s3_container.env = s3_env + if(REANA_DATASTORE_ENABLED): + self._s3_container.env = s3_env def get_deployment_objects(self): """Return the alrady built Kubernetes objects.""" @@ -391,8 +397,9 @@ def build_interactive_jupyter_deployment_k8s_objects( deployment_builder.add_command_arguments(command_args) deployment_builder.add_reana_shared_storage() deployment_builder.add_image_pull_secrets() - deployment_builder.setup_s3_sidecar() - deployment_builder.setup_s3_storage() + if(REANA_DATASTORE_ENABLED): + deployment_builder.setup_s3_sidecar() + deployment_builder.setup_s3_storage() if cvmfs_repos: deployment_builder.add_cvmfs_repo_mounts(cvmfs_repos) if expose_secrets: From 7664272eff2bb32b5103d4bfb96d0a1ea7757102 Mon Sep 17 00:00:00 2001 From: etlstrauss Date: Mon, 23 Mar 2026 17:05:55 +0100 Subject: [PATCH 09/17] feat(k8s): change image pull secrets for jobcontroller (#673) --- reana_workflow_controller/workflow_run_manager.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/reana_workflow_controller/workflow_run_manager.py b/reana_workflow_controller/workflow_run_manager.py index a7069d2b..d521cc18 100644 --- a/reana_workflow_controller/workflow_run_manager.py +++ b/reana_workflow_controller/workflow_run_manager.py @@ -874,14 +874,9 @@ def _create_job_spec( volumes += kerberos.volumes spec.template.spec.init_containers.append(kerberos.init_container) - image_pull_secrets = [] - for secret_name in REANA_JOB_CONTROLLER_SECRET: - if secret_name: - self.image_pull_secrets.append({"name": secret_name}) - - if image_pull_secrets: + if REANA_JOB_CONTROLLER_SECRET: spec.template.spec.image_pull_secrets = [ - client.V1LocalObjectReference(name=self.image_pull_secrets) + client.V1LocalObjectReference(name= REANA_JOB_CONTROLLER_SECRET) ] # filter out volumes with the same name From e85e628c71c80f726a1ba9c97d8e92a2efe02f9e Mon Sep 17 00:00:00 2001 From: etlstrauss Date: Tue, 24 Mar 2026 13:18:08 +0100 Subject: [PATCH 10/17] chore(master): using black to reformat python files (#673) --- reana_workflow_controller/config.py | 20 ++----- reana_workflow_controller/dask.py | 1 + reana_workflow_controller/k8s.py | 59 +++++++++++-------- .../rest/workflows_session.py | 2 - .../workflow_run_manager.py | 3 +- sidecars/datastore/app.py | 18 +++--- tests/test_utils.py | 2 +- 7 files changed, 56 insertions(+), 49 deletions(-) diff --git a/reana_workflow_controller/config.py b/reana_workflow_controller/config.py index 7dccbf8a..b31eb50f 100644 --- a/reana_workflow_controller/config.py +++ b/reana_workflow_controller/config.py @@ -321,9 +321,7 @@ def _parse_interactive_sessions_environments(env_var): ) """Default image for REANA Job Controller sidecar.""" -REANA_JOB_CONTROLLER_SECRET = os.getenv( - "REANA_JOB_CONTROLLER_SECRET" -) +REANA_JOB_CONTROLLER_SECRET = os.getenv("REANA_JOB_CONTROLLER_SECRET") """DOptional secret for REANA Job Controller sidecar.""" JOB_CONTROLLER_ENV_VARS = _env_vars_dict_to_k8s_list( @@ -475,21 +473,15 @@ def _parse_interactive_sessions_environments(env_var): "REANA_RUNTIME_JOBS_KUBERNETES_TOLERATIONS" ) """Tolerations for jobs""" -REANA_DATASTORE_ENABLED = os.getenv( - "REANA_DATASTORE_ENABLED" -) == "true" +REANA_DATASTORE_ENABLED = os.getenv("REANA_DATASTORE_ENABLED") == "true" """Set datastore (s3) sidecar for interactive sessions enabled or disabled""" -if(REANA_DATASTORE_ENABLED): - REANA_DATASTORE_IMAGE = os.getenv( - "REANA_DATASTORE_IMAGE" - ) +if REANA_DATASTORE_ENABLED: + REANA_DATASTORE_IMAGE = os.getenv("REANA_DATASTORE_IMAGE") """Optional Image for datastore (s3) sidecar for interactive sessions""" - REANA_DATASTORE_SECRET = os.getenv( - "REANA_DATASTORE_SECRET" - ) + REANA_DATASTORE_SECRET = os.getenv("REANA_DATASTORE_SECRET") """Optional secret for datastore (s3) sidecar for interactive sessions""" else: REANA_DATASTORE_IMAGE = "" - REANA_DATASTORE_SECRET = "" \ No newline at end of file + REANA_DATASTORE_SECRET = "" diff --git a/reana_workflow_controller/dask.py b/reana_workflow_controller/dask.py index d9abf27b..92d4c47d 100644 --- a/reana_workflow_controller/dask.py +++ b/reana_workflow_controller/dask.py @@ -5,6 +5,7 @@ # under the terms of the MIT License; see LICENSE file for more details. """Dask resource manager.""" + import logging import os import yaml diff --git a/reana_workflow_controller/k8s.py b/reana_workflow_controller/k8s.py index 59a2fe5e..b715dfcd 100644 --- a/reana_workflow_controller/k8s.py +++ b/reana_workflow_controller/k8s.py @@ -79,19 +79,29 @@ def __init__( self.image = image self.port = port self.path = path - self.cvmfs_repos = cvmfs_repos or [], + self.cvmfs_repos = (cvmfs_repos or [],) self.image_pull_secrets = [] metadata = client.V1ObjectMeta( name=deployment_name, labels={"reana_workflow_mode": "session"}, ) self._session_container = client.V1Container( - name=self.deployment_name, image=self.image, env=[], volume_mounts=[], ports=[client.V1ContainerPort(container_port=self.port)] + name=self.deployment_name, + image=self.image, + env=[], + volume_mounts=[], + ports=[client.V1ContainerPort(container_port=self.port)], ) containers = [self._session_container] - if(REANA_DATASTORE_ENABLED): + if REANA_DATASTORE_ENABLED: self._s3_container = client.V1Container( - name="datastore", image=REANA_DATASTORE_IMAGE, env=[], volume_mounts=[], ports=[], image_pull_policy="Always", lifecycle=[] + name="datastore", + image=REANA_DATASTORE_IMAGE, + env=[], + volume_mounts=[], + ports=[], + image_pull_policy="Always", + lifecycle=[], ) containers.append(self._s3_container) @@ -229,22 +239,19 @@ def setup_s3_storage(self): volume_name = "s3-mounts" volume = client.V1Volume( - name=volume_name, - empty_dir=client.V1EmptyDirVolumeSource() + name=volume_name, empty_dir=client.V1EmptyDirVolumeSource() ) volume_mount = client.V1VolumeMount( name=volume_name, mount_path="/data/s3/", - mount_propagation="HostToContainer" + mount_propagation="HostToContainer", ) self._session_container.volume_mounts.append(volume_mount) volume_mount = client.V1VolumeMount( - name=volume_name, - mount_path="/s3-data", - mount_propagation="Bidirectional" + name=volume_name, mount_path="/s3-data", mount_propagation="Bidirectional" ) self._s3_container.volume_mounts.append(volume_mount) @@ -252,24 +259,26 @@ def setup_s3_storage(self): self._pod_spec.volumes.append(volume) def setup_s3_sidecar(self): - """Add the sidecar for s3 mounts""" + """Add the sidecar for s3 mounts.""" # Define the volume mount for /dev/fuse fuse_volume_mount = client.V1VolumeMount( - name="fuse-device", - mount_path="/dev/fuse" + name="fuse-device", mount_path="/dev/fuse" ) # Define the volume for /dev/fuse - fuse_volume = client.V1HostPathVolumeSource( - path="/dev/fuse" - ) + fuse_volume = client.V1HostPathVolumeSource(path="/dev/fuse") # Append the volume mount and volume to the session container and pod spec self._s3_container.volume_mounts.append(fuse_volume_mount) - self._pod_spec.volumes.append(client.V1Volume(name="fuse-device", host_path=fuse_volume)) + self._pod_spec.volumes.append( + client.V1Volume(name="fuse-device", host_path=fuse_volume) + ) security_context = client.V1SecurityContext( - run_as_user=0, allow_privilege_escalation=True, capabilities=client.V1Capabilities(add=["SYS_ADMIN"]), privileged=True + run_as_user=0, + allow_privilege_escalation=True, + capabilities=client.V1Capabilities(add=["SYS_ADMIN"]), + privileged=True, ) self._s3_container.security_context = security_context @@ -277,7 +286,11 @@ def setup_s3_sidecar(self): lifecycle_dict = { "preStop": { "exec": { - "command": ["/bin/sh", "-c", "xargs umount -l < /etc/active_mounts.txt || true"] + "command": [ + "/bin/sh", + "-c", + "xargs umount -l < /etc/active_mounts.txt || true", + ] } } } @@ -304,9 +317,7 @@ def add_environment_variable(self, name, value): def add_run_with_root_permissions(self): """Run interactive session with root.""" - security_context = client.V1SecurityContext( - run_as_user=0, privileged=True - ) + security_context = client.V1SecurityContext(run_as_user=0, privileged=True) self._session_container.security_context = security_context def add_user_secrets(self): @@ -337,7 +348,7 @@ def add_user_secrets(self): self._session_container.env = session_env # mount s3 secretes - if(REANA_DATASTORE_ENABLED): + if REANA_DATASTORE_ENABLED: self._s3_container.env = s3_env def get_deployment_objects(self): @@ -397,7 +408,7 @@ def build_interactive_jupyter_deployment_k8s_objects( deployment_builder.add_command_arguments(command_args) deployment_builder.add_reana_shared_storage() deployment_builder.add_image_pull_secrets() - if(REANA_DATASTORE_ENABLED): + if REANA_DATASTORE_ENABLED: deployment_builder.setup_s3_sidecar() deployment_builder.setup_s3_storage() if cvmfs_repos: diff --git a/reana_workflow_controller/rest/workflows_session.py b/reana_workflow_controller/rest/workflows_session.py index bf4ba8dc..9210933c 100644 --- a/reana_workflow_controller/rest/workflows_session.py +++ b/reana_workflow_controller/rest/workflows_session.py @@ -8,7 +8,6 @@ """REANA Workflow Controller interactive sessions REST API.""" - from flask import Blueprint, jsonify, request from webargs import fields from webargs.flaskparser import use_kwargs @@ -18,7 +17,6 @@ from reana_workflow_controller.workflow_run_manager import KubernetesWorkflowRunManager - blueprint = Blueprint("workflows_session", __name__) diff --git a/reana_workflow_controller/workflow_run_manager.py b/reana_workflow_controller/workflow_run_manager.py index d521cc18..f6bcf2dc 100644 --- a/reana_workflow_controller/workflow_run_manager.py +++ b/reana_workflow_controller/workflow_run_manager.py @@ -5,6 +5,7 @@ # under the terms of the MIT License; see LICENSE file for more details. """Workflow run manager interface.""" + import base64 import copy import json @@ -876,7 +877,7 @@ def _create_job_spec( if REANA_JOB_CONTROLLER_SECRET: spec.template.spec.image_pull_secrets = [ - client.V1LocalObjectReference(name= REANA_JOB_CONTROLLER_SECRET) + client.V1LocalObjectReference(name=REANA_JOB_CONTROLLER_SECRET) ] # filter out volumes with the same name diff --git a/sidecars/datastore/app.py b/sidecars/datastore/app.py index a2384671..9ed722da 100644 --- a/sidecars/datastore/app.py +++ b/sidecars/datastore/app.py @@ -18,11 +18,12 @@ def createFolders(aliases, base_dir): print(f"A error accrued during the creation of the s3-buckets: {e}") return False + def getCredentials(aliases): aliases_credentials = [] try: for alias in aliases: - temp_list =[alias] + temp_list = [alias] temp_list.append(os.getenv(f"S3_TO_LOCAL_{alias}_BUCKET")) temp_list.append(os.getenv(f"S3_TO_LOCAL_{alias}_HOST")) temp_list.append(os.getenv(f"S3_TO_LOCAL_{alias}_REGION")) @@ -33,16 +34,17 @@ def getCredentials(aliases): except Exception as e: print(f"A error accrued during load of S3 credentials: {e}") return False - + + def createS3Mounts(aliases, base_dir): - for i in (0, len(aliases)-2): + for i in (0, len(aliases) - 2): try: with open(".passwd-s3fs", mode="w", encoding="utf-8") as f: f.write(f"{aliases[i][4]}:{aliases[i][5]}") os.system("chmod 600 .passwd-s3fs") target_path = os.path.join(base_dir, aliases[i][0]) target_path = os.path.join(target_path, aliases[i][1]) - cmd=f"s3fs {aliases[i][1]} {target_path} -o passwd_file=.passwd-s3fs -o url={aliases[i][2]} -o endpoint={aliases[i][3]} -o use_path_request_style -o allow_other" + cmd = f"s3fs {aliases[i][1]} {target_path} -o passwd_file=.passwd-s3fs -o url={aliases[i][2]} -o endpoint={aliases[i][3]} -o use_path_request_style -o allow_other" rc = os.system(cmd) if rc != 0: print( @@ -58,9 +60,10 @@ def createS3Mounts(aliases, base_dir): print(f"A error accrued during the mounting of alias {aliases[i][0]}: {e}") return False + def main(): base_dir = "/s3-data" - + # Ensure the base directory exists if not os.path.exists(base_dir): os.makedirs(base_dir) @@ -68,13 +71,13 @@ def main(): # Regex pattern: S3_TO_LOCAL__ALIAS # The (.*) captures the part - pattern = re.compile(r'^S3_TO_LOCAL_(.*)_ALIAS$') + pattern = re.compile(r"^S3_TO_LOCAL_(.*)_ALIAS$") print("Scanning environment variables for S3 aliases...") aliases = [] for key, value in os.environ.items(): - match = pattern.match(key) + match = pattern.match(key) if match: aliases.append(value) @@ -87,5 +90,6 @@ def main(): else: print(f"Processed {len(aliases)} S3 alias(es).") + if __name__ == "__main__": main() diff --git a/tests/test_utils.py b/tests/test_utils.py index c281c93a..ef7a98b1 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -119,7 +119,7 @@ def test_delete_workflow_with_interactive_session( "reana_workflow_controller.k8s", current_k8s_networking_api_client=mock.DEFAULT, ): - (response, http_response) = delete_workflow(workflow) + response, http_response = delete_workflow(workflow) data = json.loads(response.get_data()) assert "Workflow successfully deleted" in data["message"] assert http_response == 200 From e49f013e4eb6517ed39b1165156669bbe3de4053 Mon Sep 17 00:00:00 2001 From: etlstrauss Date: Tue, 24 Mar 2026 14:33:22 +0100 Subject: [PATCH 11/17] chore(k8s): adding configurations based on run-tests.sh (#673) --- MANIFEST.in | 4 ++++ reana_workflow_controller/k8s.py | 2 -- reana_workflow_controller/workflow_run_manager.py | 1 - 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/MANIFEST.in b/MANIFEST.in index 783b49ee..53adb2f0 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -15,6 +15,7 @@ include Dockerfile include LICENSE include pytest.ini include docs/openapi.json +include sidecars/datastore/Dockerfile exclude .editorconfig exclude .prettierrc exclude .prettierignore @@ -33,3 +34,6 @@ recursive-include tests *.py recursive-include tests *.finished recursive-include tests *.running recursive-include tests *.waiting +recursive-include sidecars *.py +recursive-include sidecars *.sh + diff --git a/reana_workflow_controller/k8s.py b/reana_workflow_controller/k8s.py index b715dfcd..a2288ad5 100644 --- a/reana_workflow_controller/k8s.py +++ b/reana_workflow_controller/k8s.py @@ -6,8 +6,6 @@ """REANA Workflow Controller Kubernetes utils.""" -import json, os - from kubernetes import client from kubernetes.client.rest import ApiException from reana_commons.config import ( diff --git a/reana_workflow_controller/workflow_run_manager.py b/reana_workflow_controller/workflow_run_manager.py index f6bcf2dc..9197b1c9 100644 --- a/reana_workflow_controller/workflow_run_manager.py +++ b/reana_workflow_controller/workflow_run_manager.py @@ -102,7 +102,6 @@ KUEUE_ENABLED, KUEUE_LOCAL_QUEUE_NAME, REANA_RUNTIME_JOBS_KUBERNETES_TOLERATIONS, - IMAGE_PULL_SECRETS, REANA_JOB_CONTROLLER_SECRET, ) From 5da53ad9ba36a6edeb897445b5a612961cc8e4f1 Mon Sep 17 00:00:00 2001 From: etlstrauss Date: Wed, 1 Apr 2026 12:18:23 +0200 Subject: [PATCH 12/17] feat(k8s): create test for datastore sidecar (#673) --- tests/test_k8s.py | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/tests/test_k8s.py b/tests/test_k8s.py index af94fd2f..298a2af6 100644 --- a/tests/test_k8s.py +++ b/tests/test_k8s.py @@ -43,3 +43,45 @@ def test_interactive_deployment_k8s_builder_user_secrets(monkeypatch): assert any(v["name"] == "k8s-secret" for v in pod.volumes) assert any(vm["name"] == "k8s-secret" for vm in pod.containers[0].volume_mounts) assert any(e["name"] == "third_env" for e in pod.containers[0].env) + + +def test_s3_integration(monkeypatch): + """Checks datastore sidecar creation and env variables allocation between pods.""" + user_id = uuid4() + user_secrets = UserSecrets( + user_id=str(user_id), + k8s_secret_name="k8s-secret", + secrets=[ + Secret(name="main_env", type_="env", value="3"), + Secret(name="S3_TO_LOCAL_TEST_ALIAS", type_="env", value="TEST"), + Secret(name="S3_TO_LOCAL_TEST_ACCESS_KEY", type_="env", value="-"), + Secret(name="S3_TO_LOCAL_TEST_BUCKET", type_="env", value="-"), + Secret(name="S3_TO_LOCAL_TEST_HOST", type_="env", value="-"), + Secret(name="S3_TO_LOCAL_TEST_SECRET_KEY", type_="env", value="-"), + Secret(name="S3_TO_LOCAL_TEST_REGION", type_="env", value="-"), + ], + ) + monkeypatch.setattr( + UserSecretsStore, + "fetch", + lambda _: user_secrets, + ) + + monkeypatch.setattr("reana_workflow_controller.k8s.REANA_DATASTORE_ENABLED", True) + + builder = InteractiveDeploymentK8sBuilder( + "name", "workflow_id", "owner_id", "workspace", "docker_image", "port", "path" + ) + + builder.add_user_secrets() + builder.add_run_with_root_permissions() + builder.setup_s3_sidecar() + builder.setup_s3_storage() + objs = builder.get_deployment_objects() + + deployment = objs["deployment"] + pod = deployment.spec.template.spec + assert len(pod.containers) == 2 + assert any(e["name"] == "main_env" for e in pod.containers[0].env) + assert any(e["name"] == "S3_TO_LOCAL_TEST_ALIAS" for e in pod.containers[1].env) + assert len(pod.containers[1].env) == 6 From 69901d57b3ae3a493c5b42526c4f7f1fe31c9c35 Mon Sep 17 00:00:00 2001 From: etlstrauss Date: Mon, 13 Apr 2026 11:09:40 +0200 Subject: [PATCH 13/17] feat(k8s): only start datastore when needed (#673) --- reana_workflow_controller/k8s.py | 57 ++++++++++++++++++-------------- 1 file changed, 32 insertions(+), 25 deletions(-) diff --git a/reana_workflow_controller/k8s.py b/reana_workflow_controller/k8s.py index a2288ad5..bb578e2e 100644 --- a/reana_workflow_controller/k8s.py +++ b/reana_workflow_controller/k8s.py @@ -79,6 +79,7 @@ def __init__( self.path = path self.cvmfs_repos = (cvmfs_repos or [],) self.image_pull_secrets = [] + self.datastore_enabled = False metadata = client.V1ObjectMeta( name=deployment_name, labels={"reana_workflow_mode": "session"}, @@ -92,16 +93,22 @@ def __init__( ) containers = [self._session_container] if REANA_DATASTORE_ENABLED: - self._s3_container = client.V1Container( - name="datastore", - image=REANA_DATASTORE_IMAGE, - env=[], - volume_mounts=[], - ports=[], - image_pull_policy="Always", - lifecycle=[], - ) - containers.append(self._s3_container) + user_secrets = UserSecretsStore.fetch(self.owner_id) + all_env = user_secrets.get_env_secrets_as_k8s_spec() + s3_env = [s for s in all_env if s.get("name", "").startswith("S3_TO_LOCAL_")] + if s3_env: + self.datastore_enabled = True + if self.datastore_enabled: + self._s3_container = client.V1Container( + name="datastore", + image=REANA_DATASTORE_IMAGE, + env=[], + volume_mounts=[], + ports=[], + image_pull_policy="Always", + lifecycle=[], + ) + containers.append(self._s3_container) self._pod_spec = client.V1PodSpec( containers=containers, @@ -334,20 +341,20 @@ def add_user_secrets(self): s3_env = [] session_env = [] - # Single for loop to split secrets - for secret in all_env: - secret_name = secret.get("name", "") - if secret_name.startswith("S3_TO_LOCAL_"): - s3_env.append(secret) - else: - session_env.append(secret) - - # set environment secrets without s3 secrets - self._session_container.env = session_env - - # mount s3 secretes - if REANA_DATASTORE_ENABLED: - self._s3_container.env = s3_env + if self.datastore_enabled: + for secret in all_env: + secret_name = secret.get("name", "") + if secret_name.startswith("S3_TO_LOCAL_"): + s3_env.append(secret) + else: + session_env.append(secret) + # set environment secrets without s3 secrets + self._session_container.env = session_env + # mount s3 secretes + if REANA_DATASTORE_ENABLED: + self._s3_container.env = s3_env + else: + session_env = all_env def get_deployment_objects(self): """Return the alrady built Kubernetes objects.""" @@ -406,7 +413,7 @@ def build_interactive_jupyter_deployment_k8s_objects( deployment_builder.add_command_arguments(command_args) deployment_builder.add_reana_shared_storage() deployment_builder.add_image_pull_secrets() - if REANA_DATASTORE_ENABLED: + if REANA_DATASTORE_ENABLED and deployment_builder.datastore_enabled: deployment_builder.setup_s3_sidecar() deployment_builder.setup_s3_storage() if cvmfs_repos: From 9ac953ee2cb71ea21ea7c39c06be50483dfffcaa Mon Sep 17 00:00:00 2001 From: etlstrauss Date: Mon, 13 Apr 2026 11:20:51 +0200 Subject: [PATCH 14/17] feat(k8s): small fixes + reformatting (#673) --- reana_workflow_controller/k8s.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/reana_workflow_controller/k8s.py b/reana_workflow_controller/k8s.py index bb578e2e..410c29cd 100644 --- a/reana_workflow_controller/k8s.py +++ b/reana_workflow_controller/k8s.py @@ -95,7 +95,9 @@ def __init__( if REANA_DATASTORE_ENABLED: user_secrets = UserSecretsStore.fetch(self.owner_id) all_env = user_secrets.get_env_secrets_as_k8s_spec() - s3_env = [s for s in all_env if s.get("name", "").startswith("S3_TO_LOCAL_")] + s3_env = [ + s for s in all_env if s.get("name", "").startswith("S3_TO_LOCAL_") + ] if s3_env: self.datastore_enabled = True if self.datastore_enabled: @@ -354,7 +356,7 @@ def add_user_secrets(self): if REANA_DATASTORE_ENABLED: self._s3_container.env = s3_env else: - session_env = all_env + self._session_container.env = all_env def get_deployment_objects(self): """Return the alrady built Kubernetes objects.""" From e1a46b30634d63d81e35465080339b89a631d28a Mon Sep 17 00:00:00 2001 From: etlstrauss Date: Mon, 13 Apr 2026 23:32:19 +0200 Subject: [PATCH 15/17] feat(controller): adding s3 config (#673) --- .../workflow_run_manager.py | 85 +++++++++++++++++-- 1 file changed, 80 insertions(+), 5 deletions(-) diff --git a/reana_workflow_controller/workflow_run_manager.py b/reana_workflow_controller/workflow_run_manager.py index 960656e4..d19912dd 100644 --- a/reana_workflow_controller/workflow_run_manager.py +++ b/reana_workflow_controller/workflow_run_manager.py @@ -103,6 +103,9 @@ KUEUE_LOCAL_QUEUE_NAME, REANA_RUNTIME_JOBS_KUBERNETES_TOLERATIONS, REANA_JOB_CONTROLLER_SECRET, + REANA_DATASTORE_IMAGE, + REANA_DATASTORE_ENABLED, + REANA_DATASTORE_SECRET, ) @@ -712,6 +715,43 @@ def _create_job_spec( workflow_engine_container.volume_mounts += kerberos.volume_mounts workflow_engine_container.env += kerberos.env + datastore_container = [] + if REANA_DATASTORE_ENABLED: + security_context = client.V1SecurityContext( + run_as_user=0, + allow_privilege_escalation=True, + capabilities=client.V1Capabilities(add=["SYS_ADMIN"]), + privileged=True, + ) + + user_secrets = UserSecretsStore.fetch(owner_id) + all_env = user_secrets.get_env_secrets_as_k8s_spec() + s3_env = [s for s in all_env if s.get("name", "").startswith("S3_TO_LOCAL_")] + if s3_env: + datastore_container = client.V1Container( + name="datastore", + image=REANA_DATASTORE_IMAGE, + image_pull_policy="Always", + security_context=security_context, + volume_mounts=[], + ) + + s3_env = [] + for secret in all_env: + secret_name = secret.get("name", "") + if secret_name.startswith("S3_TO_LOCAL_"): + s3_env.append(secret) + + print("test1 \n\n\n\n") + fuse_volume_mount = client.V1VolumeMount( + name="fuse-device", mount_path="/dev/fuse" + ) + print("test2 \n\n\n\n") + # Append the volume mount and volume to the session container and pod spec + datastore_container.volume_mounts.append(fuse_volume_mount) + print("test3 \n\n\n\n") + datastore_container.env = s3_env + job_controller_env_secrets = user_secrets.get_env_secrets_as_k8s_spec() user_secret = user_secrets.get_secret("CERN_USER") @@ -816,6 +856,9 @@ def _create_job_spec( "value": REANA_KUBERNETES_JOBS_MAX_USER_TIMEOUT_LIMIT, }, {"name": "WORKSPACE_PATHS", "value": json.dumps(WORKSPACE_PATHS)}, + {"name": "REANA_DATASTORE_IMAGE", "value": json.dumps(REANA_DATASTORE_IMAGE)}, + {"name": "REANA_DATASTORE_ENABLED", "value": json.dumps(REANA_DATASTORE_ENABLED)}, + ] ) # env vars coming from Helm values are added after the ones from r-w-controller @@ -854,14 +897,34 @@ def _create_job_spec( job_controller_container.ports = [ {"containerPort": current_app.config["JOB_CONTROLLER_CONTAINER_PORT"]} ] - containers = [workflow_engine_container, job_controller_container] + + # create datastore container and configure other ones + if REANA_DATASTORE_ENABLED: + volume_name = "s3-mounts" + + volume_mount = client.V1VolumeMount( + name=volume_name, + mount_path="/data/s3/", + mount_propagation="HostToContainer", + ) + workflow_engine_container.volume_mounts.append(volume_mount) + + job_controller_container.volume_mounts.append(volume_mount) + + volume_mount = client.V1VolumeMount( + name=volume_name, mount_path="/s3-data", mount_propagation="Bidirectional" + ) + datastore_container.volume_mounts.append(volume_mount) + + containers = [workflow_engine_container, job_controller_container, datastore_container] spec.template.spec = client.V1PodSpec( containers=containers, node_selector=REANA_RUNTIME_BATCH_KUBERNETES_NODE_LABEL, init_containers=[], termination_grace_period_seconds=REANA_RUNTIME_BATCH_TERMINATION_GRACE_PERIOD, - image_pull_secrets=[], + image_pull_secrets=[{"name": REANA_DATASTORE_SECRET}], ) + spec.template.spec.service_account_name = ( REANA_RUNTIME_KUBERNETES_SERVICEACCOUNT_NAME ) @@ -874,11 +937,23 @@ def _create_job_spec( volumes += kerberos.volumes spec.template.spec.init_containers.append(kerberos.init_container) + if REANA_DATASTORE_ENABLED: + volumes.append({ + "name": "s3-mounts", + "empty_dir": {} + }) + print("test4 \n\n\n\n") + volumes.append({ + "name": "fuse-device", + "hostpath": "/dev/fuse", + }) + print("test5 \n\n\n\n") + if REANA_JOB_CONTROLLER_SECRET: - spec.template.spec.image_pull_secrets = [ - client.V1LocalObjectReference(name=REANA_JOB_CONTROLLER_SECRET) - ] + spec.template.spec.image_pull_secrets.append(client.V1LocalObjectReference(name=REANA_JOB_CONTROLLER_SECRET)) + print(volumes) + print("test5 \n\n\n\n") # filter out volumes with the same name spec.template.spec.volumes = list({v["name"]: v for v in volumes}.values()) From c8b28c04d41a5361c76244a71f37ecd68cec0cfc Mon Sep 17 00:00:00 2001 From: etlstrauss Date: Mon, 20 Apr 2026 14:28:59 +0200 Subject: [PATCH 16/17] feat(controller): reconfigure s3 configuration (#673) Update s3 configuration to esnure correct termination of pods, ensure matching dependencies, adding flask server to datastore for shutdown sequnce. --- MANIFEST.in | 1 + .../workflow_run_manager.py | 102 ++++-------- requirements.txt | 2 +- sidecars/datastore/Dockerfile | 8 + sidecars/datastore/app.py | 145 +++++++----------- sidecars/datastore/entrypoint.sh | 3 - sidecars/datastore/mount.py | 109 +++++++++++++ sidecars/datastore/requirements.txt | 1 + 8 files changed, 204 insertions(+), 167 deletions(-) create mode 100644 sidecars/datastore/mount.py create mode 100644 sidecars/datastore/requirements.txt diff --git a/MANIFEST.in b/MANIFEST.in index 53adb2f0..426f8508 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -36,4 +36,5 @@ recursive-include tests *.running recursive-include tests *.waiting recursive-include sidecars *.py recursive-include sidecars *.sh +recursive-include sidecars *.txt diff --git a/reana_workflow_controller/workflow_run_manager.py b/reana_workflow_controller/workflow_run_manager.py index d19912dd..e3ee9065 100644 --- a/reana_workflow_controller/workflow_run_manager.py +++ b/reana_workflow_controller/workflow_run_manager.py @@ -12,6 +12,7 @@ import logging import os from typing import List, Optional +import requests from flask import current_app from kubernetes import client @@ -548,7 +549,16 @@ def stop_interactive_session(self, interactive_session_id): self.workflow.name ) ) + action_completed = True + try: + requests.post( + f"http://{int_session.name}.{REANA_RUNTIME_KUBERNETES_NAMESPACE}:5000/shutdown", + timeout=5, + ) + except Exception as e: + print(f"Sidecar already down or unreachable: {e}") + try: delete_k8s_ingress_object( ingress_name=int_session.name, @@ -715,43 +725,6 @@ def _create_job_spec( workflow_engine_container.volume_mounts += kerberos.volume_mounts workflow_engine_container.env += kerberos.env - datastore_container = [] - if REANA_DATASTORE_ENABLED: - security_context = client.V1SecurityContext( - run_as_user=0, - allow_privilege_escalation=True, - capabilities=client.V1Capabilities(add=["SYS_ADMIN"]), - privileged=True, - ) - - user_secrets = UserSecretsStore.fetch(owner_id) - all_env = user_secrets.get_env_secrets_as_k8s_spec() - s3_env = [s for s in all_env if s.get("name", "").startswith("S3_TO_LOCAL_")] - if s3_env: - datastore_container = client.V1Container( - name="datastore", - image=REANA_DATASTORE_IMAGE, - image_pull_policy="Always", - security_context=security_context, - volume_mounts=[], - ) - - s3_env = [] - for secret in all_env: - secret_name = secret.get("name", "") - if secret_name.startswith("S3_TO_LOCAL_"): - s3_env.append(secret) - - print("test1 \n\n\n\n") - fuse_volume_mount = client.V1VolumeMount( - name="fuse-device", mount_path="/dev/fuse" - ) - print("test2 \n\n\n\n") - # Append the volume mount and volume to the session container and pod spec - datastore_container.volume_mounts.append(fuse_volume_mount) - print("test3 \n\n\n\n") - datastore_container.env = s3_env - job_controller_env_secrets = user_secrets.get_env_secrets_as_k8s_spec() user_secret = user_secrets.get_secret("CERN_USER") @@ -856,9 +829,18 @@ def _create_job_spec( "value": REANA_KUBERNETES_JOBS_MAX_USER_TIMEOUT_LIMIT, }, {"name": "WORKSPACE_PATHS", "value": json.dumps(WORKSPACE_PATHS)}, - {"name": "REANA_DATASTORE_IMAGE", "value": json.dumps(REANA_DATASTORE_IMAGE)}, - {"name": "REANA_DATASTORE_ENABLED", "value": json.dumps(REANA_DATASTORE_ENABLED)}, - + { + "name": "REANA_DATASTORE_IMAGE", + "value": json.dumps(REANA_DATASTORE_IMAGE).replace('"', ""), + }, + { + "name": "REANA_DATASTORE_ENABLED", + "value": json.dumps(REANA_DATASTORE_ENABLED), + }, + { + "name": "REANA_DATASTORE_SECRET", + "value": json.dumps(REANA_DATASTORE_SECRET).replace('"', ""), + }, ] ) # env vars coming from Helm values are added after the ones from r-w-controller @@ -898,31 +880,13 @@ def _create_job_spec( {"containerPort": current_app.config["JOB_CONTROLLER_CONTAINER_PORT"]} ] - # create datastore container and configure other ones - if REANA_DATASTORE_ENABLED: - volume_name = "s3-mounts" - - volume_mount = client.V1VolumeMount( - name=volume_name, - mount_path="/data/s3/", - mount_propagation="HostToContainer", - ) - workflow_engine_container.volume_mounts.append(volume_mount) - - job_controller_container.volume_mounts.append(volume_mount) - - volume_mount = client.V1VolumeMount( - name=volume_name, mount_path="/s3-data", mount_propagation="Bidirectional" - ) - datastore_container.volume_mounts.append(volume_mount) - - containers = [workflow_engine_container, job_controller_container, datastore_container] + containers = [workflow_engine_container, job_controller_container] spec.template.spec = client.V1PodSpec( containers=containers, node_selector=REANA_RUNTIME_BATCH_KUBERNETES_NODE_LABEL, init_containers=[], termination_grace_period_seconds=REANA_RUNTIME_BATCH_TERMINATION_GRACE_PERIOD, - image_pull_secrets=[{"name": REANA_DATASTORE_SECRET}], + image_pull_secrets=[], ) spec.template.spec.service_account_name = ( @@ -937,23 +901,11 @@ def _create_job_spec( volumes += kerberos.volumes spec.template.spec.init_containers.append(kerberos.init_container) - if REANA_DATASTORE_ENABLED: - volumes.append({ - "name": "s3-mounts", - "empty_dir": {} - }) - print("test4 \n\n\n\n") - volumes.append({ - "name": "fuse-device", - "hostpath": "/dev/fuse", - }) - print("test5 \n\n\n\n") - if REANA_JOB_CONTROLLER_SECRET: - spec.template.spec.image_pull_secrets.append(client.V1LocalObjectReference(name=REANA_JOB_CONTROLLER_SECRET)) + spec.template.spec.image_pull_secrets.append( + client.V1LocalObjectReference(name=REANA_JOB_CONTROLLER_SECRET) + ) - print(volumes) - print("test5 \n\n\n\n") # filter out volumes with the same name spec.template.spec.volumes = list({v["name"]: v for v in volumes}.values()) diff --git a/requirements.txt b/requirements.txt index 4dd59021..b0448550 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ # -# This file is autogenerated by pip-compile with Python 3.12 +# This file is autogenerated by pip-compile with Python 3.11 # by the following command: # # pip-compile --annotation-style=line --output-file=requirements.txt setup.py diff --git a/sidecars/datastore/Dockerfile b/sidecars/datastore/Dockerfile index a4935c29..5e28c3ed 100644 --- a/sidecars/datastore/Dockerfile +++ b/sidecars/datastore/Dockerfile @@ -9,6 +9,14 @@ RUN mkdir -p /s3-data # Copy the scripts COPY entrypoint.sh /entrypoint.sh COPY app.py /app/app.py +COPY requirements.txt . +COPY mount.py /app/mount.py + +RUN python -m venv /opt/venv +ENV PATH="/opt/venv/bin:$PATH" + +# Install dependencys +RUN pip install -r requirements.txt # Make entrypoint executable RUN chmod +x /entrypoint.sh diff --git a/sidecars/datastore/app.py b/sidecars/datastore/app.py index 9ed722da..33d101d0 100644 --- a/sidecars/datastore/app.py +++ b/sidecars/datastore/app.py @@ -1,95 +1,64 @@ import os -import re +import signal import sys +from flask import Flask, jsonify, cli +import mount +import time +import threading +import logging +app = Flask(__name__) +MOUNTING_COMPLETE = False -def createFolders(aliases, base_dir): - try: - for i in range(0, len(aliases)): - target_path = os.path.join(base_dir, aliases[i][0]) - target_path = os.path.join(target_path, aliases[i][1]) - if not os.path.exists(target_path): - os.makedirs(target_path) - print(f"Created folder: {target_path}") - else: - print(f"Folder exists: {target_path}") - return True - except Exception as e: - print(f"A error accrued during the creation of the s3-buckets: {e}") - return False - - -def getCredentials(aliases): - aliases_credentials = [] - try: - for alias in aliases: - temp_list = [alias] - temp_list.append(os.getenv(f"S3_TO_LOCAL_{alias}_BUCKET")) - temp_list.append(os.getenv(f"S3_TO_LOCAL_{alias}_HOST")) - temp_list.append(os.getenv(f"S3_TO_LOCAL_{alias}_REGION")) - temp_list.append(os.getenv(f"S3_TO_LOCAL_{alias}_ACCESS_KEY")) - temp_list.append(os.getenv(f"S3_TO_LOCAL_{alias}_SECRET_KEY")) - aliases_credentials.append(temp_list) - return aliases_credentials - except Exception as e: - print(f"A error accrued during load of S3 credentials: {e}") - return False - - -def createS3Mounts(aliases, base_dir): - for i in (0, len(aliases) - 2): - try: - with open(".passwd-s3fs", mode="w", encoding="utf-8") as f: - f.write(f"{aliases[i][4]}:{aliases[i][5]}") - os.system("chmod 600 .passwd-s3fs") - target_path = os.path.join(base_dir, aliases[i][0]) - target_path = os.path.join(target_path, aliases[i][1]) - cmd = f"s3fs {aliases[i][1]} {target_path} -o passwd_file=.passwd-s3fs -o url={aliases[i][2]} -o endpoint={aliases[i][3]} -o use_path_request_style -o allow_other" - rc = os.system(cmd) - if rc != 0: - print( - f"s3fuse returned a non‑zero status ({rc >> 8}) for alias '{aliases[i][0]}'", - file=sys.stderr, - ) - else: - print(f"Successfully mounted '{aliases[i][0]}'") - with open("/etc/active_mounts.txt", "a") as f: - f.write(f"{target_path}\n") - os.system("rm .passwd-s3fs") - except Exception as e: - print(f"A error accrued during the mounting of alias {aliases[i][0]}: {e}") - return False - - -def main(): - base_dir = "/s3-data" - - # Ensure the base directory exists - if not os.path.exists(base_dir): - os.makedirs(base_dir) - print(f"Base directory created: {base_dir}") - - # Regex pattern: S3_TO_LOCAL__ALIAS - # The (.*) captures the part - pattern = re.compile(r"^S3_TO_LOCAL_(.*)_ALIAS$") - - print("Scanning environment variables for S3 aliases...") - - aliases = [] - for key, value in os.environ.items(): - match = pattern.match(key) - if match: - aliases.append(value) - - aliases = getCredentials(aliases) - createFolders(aliases, base_dir) - createS3Mounts(aliases, base_dir) - - if len(aliases) == 0: - print("No environment variables matching 'S3_TO_LOCAL_*_ALIAS' found.") - else: - print(f"Processed {len(aliases)} S3 alias(es).") + +@app.route("/health", methods=["GET"]) +def health(): + """Returns 200 if all S3 buckets are ready, 503 otherwise.""" + if MOUNTING_COMPLETE: + return jsonify({"status": "ready"}), 200 + return jsonify({"status": "mounting"}), 503 + + +@app.route("/shutdown", methods=["POST"]) +def shutdown(): + """Endpoint for the main container to signal it is finished.""" + print("Shutdown requested via API endpoint.") + + def kill_delay(): + time.sleep(1) + os.kill(os.getpid(), signal.SIGINT) + + threading.Thread(target=kill_delay).start() + return "Cleanup initiated", 200 + + +def cleanup_and_exit(signum, frame): + """Signal handler to ensure S3fs is unmounted before the container dies.""" + print(f"\nSignal {signum} received. Cleaning up mounts...") + mount.umount(aliases) + print("Exiting.") + sys.exit(0) + + +def main_logic(): + """Runs the initial mounting sequence.""" + global MOUNTING_COMPLETE + print("Initializing S3 mounts...") + global aliases + aliases = mount.mount() + MOUNTING_COMPLETE = True + print("S3 system ready.") if __name__ == "__main__": - main() + # disable logging and banner view of flask itself + cli.show_server_banner = lambda *args: None + logging.getLogger("werkzeug").disabled = True + + signal.signal(signal.SIGINT, cleanup_and_exit) + signal.signal(signal.SIGTERM, cleanup_and_exit) + + main_logic() + + print("Starting Datastore API on port 5000...") + app.run(host="0.0.0.0", port=5000) diff --git a/sidecars/datastore/entrypoint.sh b/sidecars/datastore/entrypoint.sh index 84289b6d..9ebca4e2 100644 --- a/sidecars/datastore/entrypoint.sh +++ b/sidecars/datastore/entrypoint.sh @@ -3,6 +3,3 @@ set -e echo "Starting Python S3 Data Manager..." python3 /app/app.py -echo "Initialization complete." - -exec tail -f /dev/null diff --git a/sidecars/datastore/mount.py b/sidecars/datastore/mount.py new file mode 100644 index 00000000..dcfd3611 --- /dev/null +++ b/sidecars/datastore/mount.py @@ -0,0 +1,109 @@ +import os +import re +import sys + + +def createFolders(aliases, base_dir): + try: + for i in range(0, len(aliases)): + target_path = os.path.join(base_dir, aliases[i][0]) + target_path = os.path.join(target_path, aliases[i][1]) + if not os.path.exists(target_path): + os.makedirs(target_path) + print(f"Created folder: {target_path}") + else: + print(f"Folder exists: {target_path}") + return True + except Exception as e: + print(f"A error accrued during the creation of the s3-buckets: {e}") + return False + + +def getCredentials(aliases): + aliases_credentials = [] + try: + for alias in aliases: + temp_list = [alias] + temp_list.append(os.getenv(f"S3_TO_LOCAL_{alias}_BUCKET")) + temp_list.append(os.getenv(f"S3_TO_LOCAL_{alias}_HOST")) + temp_list.append(os.getenv(f"S3_TO_LOCAL_{alias}_REGION")) + temp_list.append(os.getenv(f"S3_TO_LOCAL_{alias}_ACCESS_KEY")) + temp_list.append(os.getenv(f"S3_TO_LOCAL_{alias}_SECRET_KEY")) + aliases_credentials.append(temp_list) + return aliases_credentials + except Exception as e: + print(f"A error accrued during load of S3 credentials: {e}") + return False + + +def createS3Mounts(aliases, base_dir): + for i in range(len(aliases)): + try: + with open(".passwd-s3fs", mode="w", encoding="utf-8") as f: + f.write(f"{aliases[i][4]}:{aliases[i][5]}") + os.system("chmod 600 .passwd-s3fs") + target_path = os.path.join(base_dir, aliases[i][0]) + target_path = os.path.join(target_path, aliases[i][1]) + cmd = f"s3fs {aliases[i][1]} {target_path} -o passwd_file=.passwd-s3fs -o url={aliases[i][2]} -o endpoint={aliases[i][3]} -o use_path_request_style -o allow_other" + rc = os.system(cmd) + if rc != 0: + print( + f"s3fuse returned a non‑zero status ({rc >> 8}) for alias '{aliases[i][0]}'", + file=sys.stderr, + ) + else: + print(f"Successfully mounted '{aliases[i][0]}'") + with open("/etc/active_mounts.txt", "a") as f: + f.write(f"{target_path}\n") + os.system("rm .passwd-s3fs") + except Exception as e: + print(f"A error accrued during the mounting of alias {aliases[i][0]}: {e}") + return False + + +def mount(): + base_dir = "/s3-data" + + if not os.path.exists(base_dir): + os.makedirs(base_dir) + print(f"Base directory created: {base_dir}") + + pattern = re.compile(r"^S3_TO_LOCAL_(.*)_ALIAS$") + + print("Scanning environment variables for S3 aliases...") + + aliases = [] + for key, value in os.environ.items(): + match = pattern.match(key) + if match: + aliases.append(value) + + aliases = getCredentials(aliases) + createFolders(aliases, base_dir) + createS3Mounts(aliases, base_dir) + + if len(aliases) == 0: + print("No environment variables matching 'S3_TO_LOCAL_*_ALIAS' found.") + else: + print(f"Processed {len(aliases)} S3 alias(es).") + + return aliases + + +def umount(aliases): + """Unmounts all paths recorded in /etc/active_mounts.txt.""" + + print("Starting unmount sequence...") + try: + for i in range(len(aliases)): + target_path = os.path.join("/s3-data", aliases[i][0]) + target_path = os.path.join(target_path, aliases[i][1]) + print(f"Unmounting: {target_path}") + rc = os.system(f"fusermount -u {target_path}") + if rc != 0: + print(f"Warning: Failed to unmount {target_path}. It might be busy.") + else: + print(f"Successfully unmounted {target_path}") + + except Exception as e: + print(f"An error occurred during unmounting: {e}") diff --git a/sidecars/datastore/requirements.txt b/sidecars/datastore/requirements.txt new file mode 100644 index 00000000..7e106024 --- /dev/null +++ b/sidecars/datastore/requirements.txt @@ -0,0 +1 @@ +flask From a7f6a66ddb872173c6967d611b866251524c53de Mon Sep 17 00:00:00 2001 From: etlstrauss Date: Mon, 20 Apr 2026 14:30:18 +0200 Subject: [PATCH 17/17] feat(k8s): configure k8s s3 (#673) Reconfigure interactive session deployment to correctly terminate datastore. --- reana_workflow_controller/k8s.py | 24 +++++++----------------- 1 file changed, 7 insertions(+), 17 deletions(-) diff --git a/reana_workflow_controller/k8s.py b/reana_workflow_controller/k8s.py index 410c29cd..18c902a1 100644 --- a/reana_workflow_controller/k8s.py +++ b/reana_workflow_controller/k8s.py @@ -108,7 +108,6 @@ def __init__( volume_mounts=[], ports=[], image_pull_policy="Always", - lifecycle=[], ) containers.append(self._s3_container) @@ -176,9 +175,15 @@ def _build_service(self, metadata): type="ClusterIP", ports=[ client.V1ServicePort( + name="interactive-session", port=InteractiveDeploymentK8sBuilder.internal_service_port, target_port=self.port, - ) + ), + client.V1ServicePort( + name="datastore", + port=5000, + target_port=5000, + ), ], selector={"app": self.deployment_name}, ) @@ -289,21 +294,6 @@ def setup_s3_sidecar(self): ) self._s3_container.security_context = security_context - # adding umount for correct termination - lifecycle_dict = { - "preStop": { - "exec": { - "command": [ - "/bin/sh", - "-c", - "xargs umount -l < /etc/active_mounts.txt || true", - ] - } - } - } - - self._s3_container.lifecycle = lifecycle_dict - def add_cvmfs_repo_mounts(self, cvmfs_repos): """Add mounts for the provided CVMFS repositories to the deployment.