Skip to content

Commit 1ace006

Browse files
committed
feat(dask): return service logs and fix service status handling (reanahub#648)
- Mark the Dask service as deleted instead of removing it when the Dask cluster terminates. - Return the service logs in the response
1 parent 12a9742 commit 1ace006

3 files changed

Lines changed: 25 additions & 20 deletions

File tree

reana_workflow_controller/consumer.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
calculate_hash_of_dir,
3030
calculate_job_input_hash,
3131
build_unique_component_name,
32-
get_dask_component_name,
3332
)
3433
from reana_db.database import Session
3534
from reana_db.models import Job, JobCache, Workflow, RunStatus, Service

reana_workflow_controller/dask.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,9 @@
1212
from flask import current_app
1313

1414
from kubernetes import client
15-
from kubernetes.client.exceptions import ApiException
1615

1716
from reana_db.database import Session
18-
from reana_db.models import Service
17+
from reana_db.models import Service, ServiceStatus
1918
from reana_db.utils import _get_workflow_with_uuid_or_name
2019
from reana_commons.config import (
2120
K8S_CERN_EOS_AVAILABLE,
@@ -677,16 +676,15 @@ def delete_dask_cluster(workflow_id, user_id) -> None:
677676
)
678677
if dask_service:
679678
workflow = _get_workflow_with_uuid_or_name(str(workflow_id), user_id)
680-
workflow.services.remove(dask_service)
681-
Session.delete(dask_service)
679+
dask_service.status = ServiceStatus.deleted
682680
Session.object_session(workflow).commit()
683681
logging.info(
684-
f"Dask service model for workflow {workflow_id} deleted successfully from database."
682+
f"Dask service model for workflow {workflow_id} status updated to 'deleted' in database."
685683
)
686684

687685
except Exception as e:
688686
errors.append(
689-
f"Error deleting Dask Service model from database of the workflow: {workflow_id}: {e}"
687+
f"Error updating Dask Service model status in database for workflow {workflow_id}: {e}"
690688
)
691689

692690
# Raise collected errors if any

reana_workflow_controller/rest/workflows_status.py

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# -*- coding: utf-8 -*-
22
#
33
# This file is part of REANA.
4-
# Copyright (C) 2020, 2021, 2022, 2024 CERN.
4+
# Copyright (C) 2020, 2021, 2022, 2024, 2025 CERN.
55
#
66
# REANA is free software; you can redistribute it and/or modify it
77
# under the terms of the MIT License; see LICENSE file for more details.
@@ -27,6 +27,7 @@
2727
)
2828
from reana_workflow_controller.rest.utils import (
2929
build_workflow_logs,
30+
build_service_logs,
3031
delete_workflow,
3132
get_workflow_name,
3233
get_workflow_progress,
@@ -172,20 +173,27 @@ def get_workflow_logs(workflow_id_or_name, paginate=None, **kwargs): # noqa
172173
workflow_logs = {
173174
"workflow_logs": logs or workflow.logs,
174175
"job_logs": build_workflow_logs(workflow, paginate=paginate),
176+
"service_logs": {},
175177
"engine_specific": workflow.engine_specific,
176178
}
177-
return (
178-
jsonify(
179-
{
180-
"workflow_id": workflow.id_,
181-
"workflow_name": get_workflow_name(workflow),
182-
"logs": json.dumps(workflow_logs),
183-
"user": user_uuid,
184-
"live_logs_enabled": REANA_OPENSEARCH_ENABLED,
185-
}
186-
),
187-
200,
188-
)
179+
180+
# Get all services logs
181+
workflow_logs["service_logs"] = {
182+
s.name: [log.log for log in s.logs] for s in workflow.services
183+
}
184+
185+
return (
186+
jsonify(
187+
{
188+
"workflow_id": workflow.id_,
189+
"workflow_name": get_workflow_name(workflow),
190+
"logs": json.dumps(workflow_logs),
191+
"user": user_uuid,
192+
"live_logs_enabled": REANA_OPENSEARCH_ENABLED,
193+
}
194+
),
195+
200,
196+
)
189197

190198
except ValueError:
191199
return (

0 commit comments

Comments
 (0)