diff --git a/Dockerfile b/Dockerfile index 00341501..17dbf4c1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -22,24 +22,24 @@ COPY requirements.txt /code/ # Install all system and Python dependencies in one go # hadolint ignore=DL3008,DL3013 RUN apt-get update -y && \ - apt-get install --no-install-recommends -y \ - gcc \ - git \ - libpcre3 \ - libpcre3-dev \ - libpython3.12 \ - python3-pip \ - python3.12 \ - python3.12-dev \ - vim-tiny && \ - pip install --no-cache-dir --upgrade 'setuptools<81' && \ - pip install --no-cache-dir -r /code/requirements.txt && \ - apt-get remove -y \ - gcc \ - python3.12-dev && \ - apt-get autoremove -y && \ - apt-get clean && \ - rm -rf /var/lib/apt/lists/* + apt-get install --no-install-recommends -y \ + gcc \ + git \ + libpcre3 \ + libpcre3-dev \ + libpython3.12 \ + python3-pip \ + python3.12 \ + python3.12-dev \ + vim-tiny && \ + pip install --no-cache-dir --upgrade 'setuptools<81' && \ + pip install --no-cache-dir -r /code/requirements.txt && \ + apt-get remove -y \ + gcc \ + python3.12-dev && \ + apt-get autoremove -y && \ + apt-get clean && \ + rm -rf /var/lib/apt/lists/* # Copy cluster component source code WORKDIR /code @@ -52,19 +52,19 @@ RUN if [ "${DEBUG}" -gt 0 ]; then pip install --no-cache-dir -e ".[debug]"; else # Are we building with locally-checked-out shared modules? # hadolint ignore=DL3013 RUN if test -e modules/reana-commons; then \ - if [ "${DEBUG}" -gt 0 ]; then \ - pip install --no-cache-dir -e "modules/reana-commons[kubernetes]" --upgrade; \ - else \ - pip install --no-cache-dir "modules/reana-commons[kubernetes]" --upgrade; \ - fi \ - fi; \ - if test -e modules/reana-db; then \ - if [ "${DEBUG}" -gt 0 ]; then \ - pip install --no-cache-dir -e "modules/reana-db" --upgrade; \ - else \ - pip install --no-cache-dir "modules/reana-db" --upgrade; \ - fi \ - fi + if [ "${DEBUG}" -gt 0 ]; then \ + pip install --no-cache-dir -e "modules/reana-commons[kubernetes]" --upgrade; \ + else \ + pip install --no-cache-dir "modules/reana-commons[kubernetes]" --upgrade; \ + fi \ + fi; \ + if test -e modules/reana-db; then \ + if [ "${DEBUG}" -gt 0 ]; then \ + pip install --no-cache-dir -e "modules/reana-db" --upgrade; \ + else \ + pip install --no-cache-dir "modules/reana-db" --upgrade; \ + fi \ + fi # Check for any broken Python dependencies RUN pip check @@ -75,12 +75,12 @@ ARG UWSGI_MAX_FD=1048576 ARG UWSGI_PROCESSES=2 ARG UWSGI_THREADS=2 ENV FLASK_APP=reana_workflow_controller/app.py \ - PYTHONPATH=/workdir \ - TERM=xterm \ - UWSGI_BUFFER_SIZE=${UWSGI_BUFFER_SIZE:-8192} \ - UWSGI_MAX_FD=${UWSGI_MAX_FD:-1048576} \ - UWSGI_PROCESSES=${UWSGI_PROCESSES:-2} \ - UWSGI_THREADS=${UWSGI_THREADS:-2} + PYTHONPATH=/workdir \ + TERM=xterm \ + UWSGI_BUFFER_SIZE=${UWSGI_BUFFER_SIZE:-8192} \ + UWSGI_MAX_FD=${UWSGI_MAX_FD:-1048576} \ + UWSGI_PROCESSES=${UWSGI_PROCESSES:-2} \ + UWSGI_THREADS=${UWSGI_THREADS:-2} # Expose ports to clients EXPOSE 5000 @@ -90,22 +90,22 @@ EXPOSE 5000 # while also allowing shell expansion # hadolint ignore=DL3025 CMD exec uwsgi \ - --buffer-size ${UWSGI_BUFFER_SIZE} \ - --die-on-term \ - --hook-master-start "unix_signal:2 gracefully_kill_them_all" \ - --hook-master-start "unix_signal:15 gracefully_kill_them_all" \ - --enable-threads \ - --http-socket 0.0.0.0:5000 \ - --master \ - --max-fd ${UWSGI_MAX_FD} \ - --module reana_workflow_controller.app:app \ - --need-app \ - --processes ${UWSGI_PROCESSES} \ - --single-interpreter \ - --stats /tmp/stats.socket \ - --threads ${UWSGI_THREADS} \ - --vacuum \ - --wsgi-disable-file-wrapper + --buffer-size ${UWSGI_BUFFER_SIZE} \ + --die-on-term \ + --hook-master-start "unix_signal:2 gracefully_kill_them_all" \ + --hook-master-start "unix_signal:15 gracefully_kill_them_all" \ + --enable-threads \ + --http-socket 0.0.0.0:5000 \ + --master \ + --max-fd ${UWSGI_MAX_FD} \ + --module reana_workflow_controller.app:app \ + --need-app \ + --processes ${UWSGI_PROCESSES} \ + --single-interpreter \ + --stats /tmp/stats.socket \ + --threads ${UWSGI_THREADS} \ + --vacuum \ + --wsgi-disable-file-wrapper # Set image labels LABEL org.opencontainers.image.authors="team@reanahub.io" diff --git a/MANIFEST.in b/MANIFEST.in index 783b49ee..426f8508 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -15,6 +15,7 @@ include Dockerfile include LICENSE include pytest.ini include docs/openapi.json +include sidecars/datastore/Dockerfile exclude .editorconfig exclude .prettierrc exclude .prettierignore @@ -33,3 +34,7 @@ recursive-include tests *.py recursive-include tests *.finished recursive-include tests *.running recursive-include tests *.waiting +recursive-include sidecars *.py +recursive-include sidecars *.sh +recursive-include sidecars *.txt + diff --git a/reana_workflow_controller/config.py b/reana_workflow_controller/config.py index 7f8c4222..b31eb50f 100644 --- a/reana_workflow_controller/config.py +++ b/reana_workflow_controller/config.py @@ -321,6 +321,8 @@ def _parse_interactive_sessions_environments(env_var): ) """Default image for REANA Job Controller sidecar.""" +REANA_JOB_CONTROLLER_SECRET = os.getenv("REANA_JOB_CONTROLLER_SECRET") +"""DOptional secret for REANA Job Controller sidecar.""" JOB_CONTROLLER_ENV_VARS = _env_vars_dict_to_k8s_list( json.loads(os.getenv("REANA_JOB_CONTROLLER_ENV_VARS", "{}")) @@ -465,3 +467,21 @@ def _parse_interactive_sessions_environments(env_var): MAX_WORKFLOW_SHARING_MESSAGE_LENGTH = 5000 """Maximum length of the user-provided message when sharing a workflow.""" + + +REANA_RUNTIME_JOBS_KUBERNETES_TOLERATIONS = os.getenv( + "REANA_RUNTIME_JOBS_KUBERNETES_TOLERATIONS" +) +"""Tolerations for jobs""" +REANA_DATASTORE_ENABLED = os.getenv("REANA_DATASTORE_ENABLED") == "true" +"""Set datastore (s3) sidecar for interactive sessions enabled or disabled""" + +if REANA_DATASTORE_ENABLED: + REANA_DATASTORE_IMAGE = os.getenv("REANA_DATASTORE_IMAGE") + """Optional Image for datastore (s3) sidecar for interactive sessions""" + + REANA_DATASTORE_SECRET = os.getenv("REANA_DATASTORE_SECRET") + """Optional secret for datastore (s3) sidecar for interactive sessions""" +else: + REANA_DATASTORE_IMAGE = "" + REANA_DATASTORE_SECRET = "" diff --git a/reana_workflow_controller/k8s.py b/reana_workflow_controller/k8s.py index 1c755b85..18c902a1 100644 --- a/reana_workflow_controller/k8s.py +++ b/reana_workflow_controller/k8s.py @@ -29,6 +29,9 @@ REANA_INGRESS_ANNOTATIONS, REANA_INGRESS_CLASS_NAME, REANA_INGRESS_HOST, + REANA_DATASTORE_SECRET, + REANA_DATASTORE_IMAGE, + REANA_DATASTORE_ENABLED, ) @@ -74,16 +77,42 @@ def __init__( self.image = image self.port = port self.path = path - self.cvmfs_repos = cvmfs_repos or [] + self.cvmfs_repos = (cvmfs_repos or [],) + self.image_pull_secrets = [] + self.datastore_enabled = False metadata = client.V1ObjectMeta( name=deployment_name, labels={"reana_workflow_mode": "session"}, ) self._session_container = client.V1Container( - name=self.deployment_name, image=self.image, env=[], volume_mounts=[] + name=self.deployment_name, + image=self.image, + env=[], + volume_mounts=[], + ports=[client.V1ContainerPort(container_port=self.port)], ) + containers = [self._session_container] + if REANA_DATASTORE_ENABLED: + user_secrets = UserSecretsStore.fetch(self.owner_id) + all_env = user_secrets.get_env_secrets_as_k8s_spec() + s3_env = [ + s for s in all_env if s.get("name", "").startswith("S3_TO_LOCAL_") + ] + if s3_env: + self.datastore_enabled = True + if self.datastore_enabled: + self._s3_container = client.V1Container( + name="datastore", + image=REANA_DATASTORE_IMAGE, + env=[], + volume_mounts=[], + ports=[], + image_pull_policy="Always", + ) + containers.append(self._s3_container) + self._pod_spec = client.V1PodSpec( - containers=[self._session_container], + containers=containers, volumes=[], node_selector=REANA_RUNTIME_SESSIONS_KUBERNETES_NODE_LABEL, # Disable service discovery with env variables, so that the environment is @@ -146,9 +175,15 @@ def _build_service(self, metadata): type="ClusterIP", ports=[ client.V1ServicePort( + name="interactive-session", port=InteractiveDeploymentK8sBuilder.internal_service_port, target_port=self.port, - ) + ), + client.V1ServicePort( + name="datastore", + port=5000, + target_port=5000, + ), ], selector={"app": self.deployment_name}, ) @@ -204,6 +239,61 @@ def add_reana_shared_storage(self): self._session_container.volume_mounts.append(volume_mount) self._pod_spec.volumes.append(volume) + def add_image_pull_secrets(self): + """Attach the configured image pull secrets to scheduler and worker containers.""" + if REANA_DATASTORE_SECRET: + self._pod_spec.image_pull_secrets = [ + client.V1LocalObjectReference(name=REANA_DATASTORE_SECRET) + ] + + def setup_s3_storage(self): + """Configure shared empty_dir volume for S3 sidecar and session container.""" + volume_name = "s3-mounts" + + volume = client.V1Volume( + name=volume_name, empty_dir=client.V1EmptyDirVolumeSource() + ) + + volume_mount = client.V1VolumeMount( + name=volume_name, + mount_path="/data/s3/", + mount_propagation="HostToContainer", + ) + + self._session_container.volume_mounts.append(volume_mount) + + volume_mount = client.V1VolumeMount( + name=volume_name, mount_path="/s3-data", mount_propagation="Bidirectional" + ) + + self._s3_container.volume_mounts.append(volume_mount) + + self._pod_spec.volumes.append(volume) + + def setup_s3_sidecar(self): + """Add the sidecar for s3 mounts.""" + # Define the volume mount for /dev/fuse + fuse_volume_mount = client.V1VolumeMount( + name="fuse-device", mount_path="/dev/fuse" + ) + + # Define the volume for /dev/fuse + fuse_volume = client.V1HostPathVolumeSource(path="/dev/fuse") + + # Append the volume mount and volume to the session container and pod spec + self._s3_container.volume_mounts.append(fuse_volume_mount) + self._pod_spec.volumes.append( + client.V1Volume(name="fuse-device", host_path=fuse_volume) + ) + + security_context = client.V1SecurityContext( + run_as_user=0, + allow_privilege_escalation=True, + capabilities=client.V1Capabilities(add=["SYS_ADMIN"]), + privileged=True, + ) + self._s3_container.security_context = security_context + def add_cvmfs_repo_mounts(self, cvmfs_repos): """Add mounts for the provided CVMFS repositories to the deployment. @@ -224,9 +314,7 @@ def add_environment_variable(self, name, value): def add_run_with_root_permissions(self): """Run interactive session with root.""" - security_context = client.V1SecurityContext( - run_as_user=0, allow_privilege_escalation=False - ) + security_context = client.V1SecurityContext(run_as_user=0, privileged=True) self._session_container.security_context = security_context def add_user_secrets(self): @@ -239,8 +327,26 @@ def add_user_secrets(self): self._pod_spec.volumes.append(secrets_volume) self._session_container.volume_mounts.append(secrets_volume_mount) - # set environment secrets - self._session_container.env += user_secrets.get_env_secrets_as_k8s_spec() + # build env arrays for different containers + # sorting s3 variables to only be mounted in sidecar + all_env = user_secrets.get_env_secrets_as_k8s_spec() + s3_env = [] + session_env = [] + + if self.datastore_enabled: + for secret in all_env: + secret_name = secret.get("name", "") + if secret_name.startswith("S3_TO_LOCAL_"): + s3_env.append(secret) + else: + session_env.append(secret) + # set environment secrets without s3 secrets + self._session_container.env = session_env + # mount s3 secretes + if REANA_DATASTORE_ENABLED: + self._s3_container.env = s3_env + else: + self._session_container.env = all_env def get_deployment_objects(self): """Return the alrady built Kubernetes objects.""" @@ -298,6 +404,10 @@ def build_interactive_jupyter_deployment_k8s_objects( ) deployment_builder.add_command_arguments(command_args) deployment_builder.add_reana_shared_storage() + deployment_builder.add_image_pull_secrets() + if REANA_DATASTORE_ENABLED and deployment_builder.datastore_enabled: + deployment_builder.setup_s3_sidecar() + deployment_builder.setup_s3_storage() if cvmfs_repos: deployment_builder.add_cvmfs_repo_mounts(cvmfs_repos) if expose_secrets: diff --git a/reana_workflow_controller/workflow_run_manager.py b/reana_workflow_controller/workflow_run_manager.py index 8a0326e1..e3ee9065 100644 --- a/reana_workflow_controller/workflow_run_manager.py +++ b/reana_workflow_controller/workflow_run_manager.py @@ -12,6 +12,7 @@ import logging import os from typing import List, Optional +import requests from flask import current_app from kubernetes import client @@ -101,6 +102,11 @@ REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_THREADS, KUEUE_ENABLED, KUEUE_LOCAL_QUEUE_NAME, + REANA_RUNTIME_JOBS_KUBERNETES_TOLERATIONS, + REANA_JOB_CONTROLLER_SECRET, + REANA_DATASTORE_IMAGE, + REANA_DATASTORE_ENABLED, + REANA_DATASTORE_SECRET, ) @@ -543,7 +549,16 @@ def stop_interactive_session(self, interactive_session_id): self.workflow.name ) ) + action_completed = True + try: + requests.post( + f"http://{int_session.name}.{REANA_RUNTIME_KUBERNETES_NAMESPACE}:5000/shutdown", + timeout=5, + ) + except Exception as e: + print(f"Sidecar already down or unreachable: {e}") + try: delete_k8s_ingress_object( ingress_name=int_session.name, @@ -814,6 +829,18 @@ def _create_job_spec( "value": REANA_KUBERNETES_JOBS_MAX_USER_TIMEOUT_LIMIT, }, {"name": "WORKSPACE_PATHS", "value": json.dumps(WORKSPACE_PATHS)}, + { + "name": "REANA_DATASTORE_IMAGE", + "value": json.dumps(REANA_DATASTORE_IMAGE).replace('"', ""), + }, + { + "name": "REANA_DATASTORE_ENABLED", + "value": json.dumps(REANA_DATASTORE_ENABLED), + }, + { + "name": "REANA_DATASTORE_SECRET", + "value": json.dumps(REANA_DATASTORE_SECRET).replace('"', ""), + }, ] ) # env vars coming from Helm values are added after the ones from r-w-controller @@ -827,6 +854,13 @@ def _create_job_spec( "value": os.getenv("REANA_RUNTIME_JOBS_KUBERNETES_NODE_LABEL"), }, ) + if REANA_RUNTIME_JOBS_KUBERNETES_TOLERATIONS: + job_controller_container.env.append( + { + "name": "REANA_RUNTIME_JOBS_KUBERNETES_TOLERATIONS", + "value": os.getenv("REANA_RUNTIME_JOBS_KUBERNETES_TOLERATIONS"), + }, + ) if requires_dask(self.workflow): job_controller_container.env.append( { @@ -845,13 +879,16 @@ def _create_job_spec( job_controller_container.ports = [ {"containerPort": current_app.config["JOB_CONTROLLER_CONTAINER_PORT"]} ] + containers = [workflow_engine_container, job_controller_container] spec.template.spec = client.V1PodSpec( containers=containers, node_selector=REANA_RUNTIME_BATCH_KUBERNETES_NODE_LABEL, init_containers=[], termination_grace_period_seconds=REANA_RUNTIME_BATCH_TERMINATION_GRACE_PERIOD, + image_pull_secrets=[], ) + spec.template.spec.service_account_name = ( REANA_RUNTIME_KUBERNETES_SERVICEACCOUNT_NAME ) @@ -864,6 +901,11 @@ def _create_job_spec( volumes += kerberos.volumes spec.template.spec.init_containers.append(kerberos.init_container) + if REANA_JOB_CONTROLLER_SECRET: + spec.template.spec.image_pull_secrets.append( + client.V1LocalObjectReference(name=REANA_JOB_CONTROLLER_SECRET) + ) + # filter out volumes with the same name spec.template.spec.volumes = list({v["name"]: v for v in volumes}.values()) diff --git a/requirements.txt b/requirements.txt index 4dd59021..b0448550 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ # -# This file is autogenerated by pip-compile with Python 3.12 +# This file is autogenerated by pip-compile with Python 3.11 # by the following command: # # pip-compile --annotation-style=line --output-file=requirements.txt setup.py diff --git a/sidecars/datastore/Dockerfile b/sidecars/datastore/Dockerfile new file mode 100644 index 00000000..5e28c3ed --- /dev/null +++ b/sidecars/datastore/Dockerfile @@ -0,0 +1,28 @@ +FROM alpine:3.20 + +# Install dependencies +RUN apk update && apk add --no-cache python3 py3-pip s3fs-fuse + +# Create the base directory for S3 data +RUN mkdir -p /s3-data + +# Copy the scripts +COPY entrypoint.sh /entrypoint.sh +COPY app.py /app/app.py +COPY requirements.txt . +COPY mount.py /app/mount.py + +RUN python -m venv /opt/venv +ENV PATH="/opt/venv/bin:$PATH" + +# Install dependencys +RUN pip install -r requirements.txt + +# Make entrypoint executable +RUN chmod +x /entrypoint.sh + +# Set working directory +WORKDIR /app + +# Set the entrypoint +ENTRYPOINT ["/entrypoint.sh"] diff --git a/sidecars/datastore/app.py b/sidecars/datastore/app.py new file mode 100644 index 00000000..33d101d0 --- /dev/null +++ b/sidecars/datastore/app.py @@ -0,0 +1,64 @@ +import os +import signal +import sys +from flask import Flask, jsonify, cli +import mount +import time +import threading +import logging + +app = Flask(__name__) +MOUNTING_COMPLETE = False + + +@app.route("/health", methods=["GET"]) +def health(): + """Returns 200 if all S3 buckets are ready, 503 otherwise.""" + if MOUNTING_COMPLETE: + return jsonify({"status": "ready"}), 200 + return jsonify({"status": "mounting"}), 503 + + +@app.route("/shutdown", methods=["POST"]) +def shutdown(): + """Endpoint for the main container to signal it is finished.""" + print("Shutdown requested via API endpoint.") + + def kill_delay(): + time.sleep(1) + os.kill(os.getpid(), signal.SIGINT) + + threading.Thread(target=kill_delay).start() + return "Cleanup initiated", 200 + + +def cleanup_and_exit(signum, frame): + """Signal handler to ensure S3fs is unmounted before the container dies.""" + print(f"\nSignal {signum} received. Cleaning up mounts...") + mount.umount(aliases) + print("Exiting.") + sys.exit(0) + + +def main_logic(): + """Runs the initial mounting sequence.""" + global MOUNTING_COMPLETE + print("Initializing S3 mounts...") + global aliases + aliases = mount.mount() + MOUNTING_COMPLETE = True + print("S3 system ready.") + + +if __name__ == "__main__": + # disable logging and banner view of flask itself + cli.show_server_banner = lambda *args: None + logging.getLogger("werkzeug").disabled = True + + signal.signal(signal.SIGINT, cleanup_and_exit) + signal.signal(signal.SIGTERM, cleanup_and_exit) + + main_logic() + + print("Starting Datastore API on port 5000...") + app.run(host="0.0.0.0", port=5000) diff --git a/sidecars/datastore/entrypoint.sh b/sidecars/datastore/entrypoint.sh new file mode 100644 index 00000000..9ebca4e2 --- /dev/null +++ b/sidecars/datastore/entrypoint.sh @@ -0,0 +1,5 @@ +#!/bin/sh +set -e + +echo "Starting Python S3 Data Manager..." +python3 /app/app.py diff --git a/sidecars/datastore/mount.py b/sidecars/datastore/mount.py new file mode 100644 index 00000000..dcfd3611 --- /dev/null +++ b/sidecars/datastore/mount.py @@ -0,0 +1,109 @@ +import os +import re +import sys + + +def createFolders(aliases, base_dir): + try: + for i in range(0, len(aliases)): + target_path = os.path.join(base_dir, aliases[i][0]) + target_path = os.path.join(target_path, aliases[i][1]) + if not os.path.exists(target_path): + os.makedirs(target_path) + print(f"Created folder: {target_path}") + else: + print(f"Folder exists: {target_path}") + return True + except Exception as e: + print(f"A error accrued during the creation of the s3-buckets: {e}") + return False + + +def getCredentials(aliases): + aliases_credentials = [] + try: + for alias in aliases: + temp_list = [alias] + temp_list.append(os.getenv(f"S3_TO_LOCAL_{alias}_BUCKET")) + temp_list.append(os.getenv(f"S3_TO_LOCAL_{alias}_HOST")) + temp_list.append(os.getenv(f"S3_TO_LOCAL_{alias}_REGION")) + temp_list.append(os.getenv(f"S3_TO_LOCAL_{alias}_ACCESS_KEY")) + temp_list.append(os.getenv(f"S3_TO_LOCAL_{alias}_SECRET_KEY")) + aliases_credentials.append(temp_list) + return aliases_credentials + except Exception as e: + print(f"A error accrued during load of S3 credentials: {e}") + return False + + +def createS3Mounts(aliases, base_dir): + for i in range(len(aliases)): + try: + with open(".passwd-s3fs", mode="w", encoding="utf-8") as f: + f.write(f"{aliases[i][4]}:{aliases[i][5]}") + os.system("chmod 600 .passwd-s3fs") + target_path = os.path.join(base_dir, aliases[i][0]) + target_path = os.path.join(target_path, aliases[i][1]) + cmd = f"s3fs {aliases[i][1]} {target_path} -o passwd_file=.passwd-s3fs -o url={aliases[i][2]} -o endpoint={aliases[i][3]} -o use_path_request_style -o allow_other" + rc = os.system(cmd) + if rc != 0: + print( + f"s3fuse returned a non‑zero status ({rc >> 8}) for alias '{aliases[i][0]}'", + file=sys.stderr, + ) + else: + print(f"Successfully mounted '{aliases[i][0]}'") + with open("/etc/active_mounts.txt", "a") as f: + f.write(f"{target_path}\n") + os.system("rm .passwd-s3fs") + except Exception as e: + print(f"A error accrued during the mounting of alias {aliases[i][0]}: {e}") + return False + + +def mount(): + base_dir = "/s3-data" + + if not os.path.exists(base_dir): + os.makedirs(base_dir) + print(f"Base directory created: {base_dir}") + + pattern = re.compile(r"^S3_TO_LOCAL_(.*)_ALIAS$") + + print("Scanning environment variables for S3 aliases...") + + aliases = [] + for key, value in os.environ.items(): + match = pattern.match(key) + if match: + aliases.append(value) + + aliases = getCredentials(aliases) + createFolders(aliases, base_dir) + createS3Mounts(aliases, base_dir) + + if len(aliases) == 0: + print("No environment variables matching 'S3_TO_LOCAL_*_ALIAS' found.") + else: + print(f"Processed {len(aliases)} S3 alias(es).") + + return aliases + + +def umount(aliases): + """Unmounts all paths recorded in /etc/active_mounts.txt.""" + + print("Starting unmount sequence...") + try: + for i in range(len(aliases)): + target_path = os.path.join("/s3-data", aliases[i][0]) + target_path = os.path.join(target_path, aliases[i][1]) + print(f"Unmounting: {target_path}") + rc = os.system(f"fusermount -u {target_path}") + if rc != 0: + print(f"Warning: Failed to unmount {target_path}. It might be busy.") + else: + print(f"Successfully unmounted {target_path}") + + except Exception as e: + print(f"An error occurred during unmounting: {e}") diff --git a/sidecars/datastore/requirements.txt b/sidecars/datastore/requirements.txt new file mode 100644 index 00000000..7e106024 --- /dev/null +++ b/sidecars/datastore/requirements.txt @@ -0,0 +1 @@ +flask diff --git a/tests/test_k8s.py b/tests/test_k8s.py index af94fd2f..298a2af6 100644 --- a/tests/test_k8s.py +++ b/tests/test_k8s.py @@ -43,3 +43,45 @@ def test_interactive_deployment_k8s_builder_user_secrets(monkeypatch): assert any(v["name"] == "k8s-secret" for v in pod.volumes) assert any(vm["name"] == "k8s-secret" for vm in pod.containers[0].volume_mounts) assert any(e["name"] == "third_env" for e in pod.containers[0].env) + + +def test_s3_integration(monkeypatch): + """Checks datastore sidecar creation and env variables allocation between pods.""" + user_id = uuid4() + user_secrets = UserSecrets( + user_id=str(user_id), + k8s_secret_name="k8s-secret", + secrets=[ + Secret(name="main_env", type_="env", value="3"), + Secret(name="S3_TO_LOCAL_TEST_ALIAS", type_="env", value="TEST"), + Secret(name="S3_TO_LOCAL_TEST_ACCESS_KEY", type_="env", value="-"), + Secret(name="S3_TO_LOCAL_TEST_BUCKET", type_="env", value="-"), + Secret(name="S3_TO_LOCAL_TEST_HOST", type_="env", value="-"), + Secret(name="S3_TO_LOCAL_TEST_SECRET_KEY", type_="env", value="-"), + Secret(name="S3_TO_LOCAL_TEST_REGION", type_="env", value="-"), + ], + ) + monkeypatch.setattr( + UserSecretsStore, + "fetch", + lambda _: user_secrets, + ) + + monkeypatch.setattr("reana_workflow_controller.k8s.REANA_DATASTORE_ENABLED", True) + + builder = InteractiveDeploymentK8sBuilder( + "name", "workflow_id", "owner_id", "workspace", "docker_image", "port", "path" + ) + + builder.add_user_secrets() + builder.add_run_with_root_permissions() + builder.setup_s3_sidecar() + builder.setup_s3_storage() + objs = builder.get_deployment_objects() + + deployment = objs["deployment"] + pod = deployment.spec.template.spec + assert len(pod.containers) == 2 + assert any(e["name"] == "main_env" for e in pod.containers[0].env) + assert any(e["name"] == "S3_TO_LOCAL_TEST_ALIAS" for e in pod.containers[1].env) + assert len(pod.containers[1].env) == 6