Skip to content

Commit 850065f

Browse files
models: replace workflow.run_number with major and minor run numbers
Change the workflow table to split the run_number into two integers: one is the major run number (the number before the dot), and the other one is the minor run number (after the dot), which increases when restarting a workflow, thus removing the limit of 9 restarts. Closes #186.
1 parent 6271ecd commit 850065f

8 files changed

Lines changed: 221 additions & 53 deletions

File tree

AUTHORS.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ The list of contributors in alphabetical order:
77
- `Camila Diaz <https://orcid.org/0000-0001-5543-797X>`_
88
- `Diego Rodriguez <https://orcid.org/0000-0003-0649-2002>`_
99
- `Dinos Kousidis <https://orcid.org/0000-0002-4914-4289>`_
10+
- `Giuseppe Steduto <https://orcid.org/0009-0002-1258-8553>`_
1011
- `Jan Okraska <https://orcid.org/0000-0002-1416-3244>`_
1112
- `Leticia Wanderley <https://orcid.org/0000-0003-4649-6630>`_
1213
- `Marco Donadoni <https://orcid.org/0000-0003-2922-5505>`_

CHANGES.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
Changes
22
=======
33

4+
Version 0.9.3 (UNRELEASED)
5+
--------------------------
6+
- Changes the ``Workflow`` table to replace the ``run_number`` column with two new columns ``run_number_major`` and ``run_number_minor``, in order to allow for more than 9 restarts.
7+
48
Version 0.9.2 (2023-09-26)
59
--------------------------
610

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
"""Separate run number into major and minor run numbers.
2+
3+
Revision ID: b85c3e601de4
4+
Revises: 377cfbfccf75
5+
Create Date: 2023-10-02 12:08:18.292490
6+
7+
"""
8+
from alembic import op
9+
import sqlalchemy as sa
10+
11+
12+
# revision identifiers, used by Alembic.
13+
revision = "b85c3e601de4"
14+
down_revision = "377cfbfccf75"
15+
branch_labels = None
16+
depends_on = None
17+
18+
19+
def upgrade():
20+
"""Upgrade to b85c3e601de4 revision."""
21+
# Add new columns (run_number_major, run_number_minor)
22+
op.add_column(
23+
"workflow", sa.Column("run_number_major", sa.Integer()), schema="__reana"
24+
)
25+
op.add_column(
26+
"workflow",
27+
sa.Column("run_number_minor", sa.Integer(), default=0),
28+
schema="__reana",
29+
)
30+
31+
# Data migration (split run_number into run_number_major and run_number_minor)
32+
op.get_bind().execute(
33+
sa.text(
34+
"UPDATE __reana.workflow"
35+
" SET run_number_major = FLOOR(run_number), "
36+
" run_number_minor = (run_number - FLOOR(run_number)) * 10"
37+
),
38+
)
39+
40+
# Delete old constraint
41+
op.drop_constraint("_user_workflow_run_uc", "workflow", schema="__reana")
42+
43+
# Drop old run_number column
44+
op.drop_column("workflow", "run_number", schema="__reana")
45+
46+
# Add new constraint (the primary key is not run_number anymore, but with major and minor run number
47+
op.create_unique_constraint(
48+
"_user_workflow_run_uc",
49+
"workflow",
50+
["name", "owner_id", "run_number_major", "run_number_minor"],
51+
schema="__reana",
52+
)
53+
54+
# Update run_number_minor for workflows that have been restarted more than 10 times
55+
# (thus erroneously having the following run_number_major), in case some of them
56+
# were created before the limit on 9 restarts was introduced.
57+
op.get_bind().execute(
58+
sa.text(
59+
"""
60+
UPDATE __reana.workflow AS w
61+
SET
62+
run_number_major = to_be_updated.new_major_run_number,
63+
run_number_minor = (w.run_number_minor + (w.run_number_major - to_be_updated.new_major_run_number) * 10)
64+
FROM (
65+
SELECT MIN(w1.run_number_major) - 1 AS new_major_run_number, w1.workspace_path
66+
FROM __reana.workflow w1
67+
WHERE w1.restart AND w1.run_number_minor = 0
68+
GROUP BY w1.workspace_path
69+
) AS to_be_updated
70+
WHERE w.workspace_path = to_be_updated.workspace_path
71+
"""
72+
),
73+
)
74+
75+
76+
def downgrade():
77+
"""Downgrade to 377cfbfccf75 revision."""
78+
# Revert constraint
79+
op.drop_constraint("_user_workflow_run_uc", "workflow", schema="__reana")
80+
81+
# Add old run_number column back
82+
op.add_column("workflow", sa.Column("run_number", sa.Float()), schema="__reana")
83+
84+
# Check that there are no workflows discarded more than 10 times
85+
# This is because of the way the info about restarts is stored in
86+
# the run_number column (see https://github.com/reanahub/reana-db/issues/186)
87+
restarted_ten_times = (
88+
op.get_bind()
89+
.execute("SELECT COUNT(*) FROM __reana.workflow WHERE run_number_minor >= 10")
90+
.fetchone()[0]
91+
)
92+
if restarted_ten_times != 0:
93+
raise ValueError(
94+
"Cannot migrate database because some workflows have been restarted 10 or more times,"
95+
" and the previous database revision only supports up to 9 restarts."
96+
" If you want to downgrade, you should manually delete them."
97+
)
98+
99+
# Data migration (combine run_number_major and restart_number back to run_number)
100+
op.get_bind().execute(
101+
"UPDATE __reana.workflow SET run_number=run_number_major+(run_number_minor * 1.0 /10)"
102+
)
103+
104+
# Drop new columns
105+
op.drop_column("workflow", "run_number_major", schema="__reana")
106+
op.drop_column("workflow", "run_number_minor", schema="__reana")
107+
108+
# Restore old constraint
109+
op.create_unique_constraint(
110+
"_user_workflow_run_uc",
111+
"workflow",
112+
["name", "owner_id", "run_number"],
113+
schema="__reana",
114+
)

reana_db/config.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,3 @@
8585
os.getenv("REANA_PERIODIC_RESOURCE_QUOTA_UPDATE_POLICY", "false")
8686
)
8787
"""Whether to run the periodic (cronjob) resource quota updater."""
88-
89-
LIMIT_RESTARTS = 9
90-
"""Maximum number of times a workflow can be restarted."""

reana_db/models.py

Lines changed: 54 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import uuid
1717
from datetime import datetime
1818
from functools import reduce
19-
from typing import Dict, List
19+
from typing import Dict, List, Tuple
2020

2121
from reana_commons.config import (
2222
MQ_MAX_PRIORITY,
@@ -54,12 +54,12 @@
5454
DB_SECRET_KEY,
5555
DEFAULT_QUOTA_LIMITS,
5656
DEFAULT_QUOTA_RESOURCES,
57-
LIMIT_RESTARTS,
5857
WORKFLOW_TERMINATION_QUOTA_UPDATE_POLICY,
5958
)
6059
from reana_db.utils import (
6160
build_workspace_path,
6261
store_workflow_disk_quota,
62+
split_run_number,
6363
update_users_cpu_quota,
6464
update_users_disk_quota,
6565
update_workflow_cpu_quota,
@@ -459,7 +459,8 @@ class Workflow(Base, Timestamp, QuotaBase):
459459
run_started_at = Column(DateTime)
460460
run_finished_at = Column(DateTime)
461461
run_stopped_at = Column(DateTime)
462-
_run_number = Column("run_number", Float)
462+
run_number_major = Column(Integer)
463+
run_number_minor = Column(Integer, default=0)
463464
job_progress = Column(JSONType, default=dict)
464465
workspace_path = Column(String)
465466
restart = Column(Boolean, default=False)
@@ -487,7 +488,11 @@ class Workflow(Base, Timestamp, QuotaBase):
487488

488489
__table_args__ = (
489490
UniqueConstraint(
490-
"name", "owner_id", "run_number", name="_user_workflow_run_uc"
491+
"name",
492+
"owner_id",
493+
"run_number_major",
494+
"run_number_minor",
495+
name="_user_workflow_run_uc",
491496
),
492497
{"schema": "__reana"},
493498
)
@@ -527,7 +532,9 @@ def __init__(
527532
self.git_repo = git_repo
528533
self.git_provider = git_provider
529534
self.restart = restart
530-
self._run_number = self.assign_run_number(run_number)
535+
self.run_number_major, self.run_number_minor = self.get_new_run_number(
536+
run_number
537+
)
531538
self.workspace_path = workspace_path or build_workspace_path(
532539
self.owner_id, self.id_
533540
)
@@ -537,54 +544,66 @@ def __repr__(self):
537544
"""Workflow string representation."""
538545
return "<Workflow %r>" % self.id_
539546

540-
@hybrid_property
541-
def run_number(self):
542-
"""Property of run_number."""
543-
if self._run_number.is_integer():
544-
return int(self._run_number)
545-
return self._run_number
546-
547-
@run_number.expression
548-
def run_number(cls):
549-
return func.abs(cls._run_number)
550-
551-
def assign_run_number(self, run_number):
552-
"""Assing run number."""
547+
@property
548+
def run_number(self) -> str:
549+
"""Get workflow run number."""
550+
if self.run_number_minor != 0:
551+
return f"{self.run_number_major}.{self.run_number_minor}"
552+
return str(self.run_number_major)
553+
554+
def _get_last_workflow(self, run_number):
555+
"""Fetch the last workflow restart given a certain run number."""
553556
from .database import Session
554557

555558
if run_number:
559+
run_number_major, run_number_minor = split_run_number(run_number)
556560
last_workflow = (
557561
Session.query(Workflow)
558562
.filter(
559563
Workflow.name == self.name,
560-
Workflow.run_number >= int(run_number),
561-
Workflow.run_number < int(run_number) + 1,
564+
Workflow.run_number_major == run_number_major,
562565
Workflow.owner_id == self.owner_id,
563566
)
564-
.order_by(Workflow.run_number.desc())
567+
.order_by(
568+
Workflow.run_number_major.desc(), Workflow.run_number_minor.desc()
569+
)
565570
.first()
566571
)
567572
else:
568573
last_workflow = (
569574
Session.query(Workflow)
570575
.filter_by(name=self.name, restart=False, owner_id=self.owner_id)
571-
.order_by(Workflow.run_number.desc())
576+
.order_by(
577+
Workflow.run_number_major.desc(), Workflow.run_number_minor.desc()
578+
)
572579
.first()
573580
)
574-
if last_workflow and self.restart:
575-
# FIXME: remove the limit of nine restarts when we fix the way in which
576-
# we save `run_number` in the DB
577-
num_restarts = round(last_workflow.run_number * 10) % 10
578-
if num_restarts == LIMIT_RESTARTS:
581+
return last_workflow
582+
583+
def get_new_run_number(self, run_number) -> Tuple[int, int]:
584+
"""Return the major and minor run numbers for a new workflow.
585+
586+
Return a tuple where the first element is the major run number and the
587+
second element is the minor run number.
588+
"""
589+
last_workflow = self._get_last_workflow(run_number)
590+
591+
if not last_workflow:
592+
if self.restart:
579593
raise REANAValidationError(
580-
f"Cannot restart a workflow more than {LIMIT_RESTARTS} times"
594+
"Cannot restart a workflow that has not been run before."
581595
)
582-
return round(last_workflow.run_number + 0.1, 1)
596+
return 1, 0 # First workflow run
597+
583598
else:
584-
if not last_workflow:
585-
return 1
599+
if not self.restart:
600+
run_number_major = last_workflow.run_number_major + 1
601+
run_number_minor = 0
586602
else:
587-
return last_workflow.run_number + 1
603+
run_number_major = last_workflow.run_number_major
604+
run_number_minor = last_workflow.run_number_minor + 1
605+
606+
return run_number_major, run_number_minor
588607

589608
def get_input_parameters(self):
590609
"""Return workflow parameters."""
@@ -604,7 +623,7 @@ def get_owner_access_token(self):
604623

605624
def get_full_workflow_name(self):
606625
"""Return full workflow name including run number."""
607-
return "{}.{}".format(self.name, str(self.run_number))
626+
return "{}.{}".format(self.name, self.run_number)
608627

609628
def get_workspace_disk_usage(self, summarize=False, search=None):
610629
"""Retrieve disk usage information of a workspace."""
@@ -643,15 +662,13 @@ def get_all_restarts(self):
643662
"""Get all the restarts of this workflow, including the original workflow.
644663
645664
Returns all the restarts of this workflow, that is all the workflows that have
646-
the same name and the same run number (up to the dot). This includes the
665+
the same name and the same major run number. This includes the
647666
original workflow, as well as all the following restarts.
648667
"""
649-
run_number = int(self.run_number)
650668
restarts = Workflow.query.filter(
651669
Workflow.name == self.name,
652670
Workflow.owner_id == self.owner_id,
653-
Workflow.run_number >= run_number,
654-
Workflow.run_number < run_number + 1,
671+
Workflow.run_number_major == self.run_number_major,
655672
)
656673
return restarts
657674

reana_db/utils.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,14 @@ def build_workspace_path(user_id, workflow_id=None, workspace_root_path=None):
5252
return workspace_path
5353

5454

55+
def split_run_number(run_number):
56+
"""Split run number into major and minor run numbers."""
57+
run_number = str(run_number)
58+
if "." in run_number:
59+
return tuple(map(int, run_number.split(".", maxsplit=1)))
60+
return int(run_number), 0
61+
62+
5563
def _get_workflow_with_uuid_or_name(uuid_or_name, user_uuid):
5664
"""Get Workflow from database with uuid or name.
5765
@@ -128,21 +136,20 @@ def _get_workflow_with_uuid_or_name(uuid_or_name, user_uuid):
128136
return _get_workflow_by_name(workflow_name, user_uuid)
129137

130138
# `run_number` was specified.
131-
# Check `run_number` is valid.
132139
try:
133-
run_number = float(run_number)
140+
run_number_major, run_number_minor = split_run_number(run_number)
134141
except ValueError:
135-
# `uuid_or_name` was split, so it is a dot-separated string
136-
# but it didn't contain a valid `run_number`.
137-
# Assume that this dot-separated string is the name of
142+
# The specified `run_number` is not valid.
143+
# Assume that this string is the name of
138144
# the workflow and search with it.
139145
return _get_workflow_by_name(uuid_or_name, user_uuid)
140146

141147
# `run_number` is valid.
142-
# Search by `run_number` since it is a primary key.
148+
# Search by `run_number_major` and `run_number_minor`, since it is a primary key.
143149
workflow = Workflow.query.filter(
144150
Workflow.name == workflow_name,
145-
Workflow.run_number == run_number,
151+
Workflow.run_number_major == run_number_major,
152+
Workflow.run_number_minor == run_number_minor,
146153
Workflow.owner_id == user_uuid,
147154
).one_or_none()
148155
if not workflow:
@@ -169,7 +176,7 @@ def _get_workflow_by_name(workflow_name, user_uuid):
169176
Workflow.query.filter(
170177
Workflow.name == workflow_name, Workflow.owner_id == user_uuid
171178
)
172-
.order_by(Workflow.run_number.desc())
179+
.order_by(Workflow.run_number_major.desc(), Workflow.run_number_minor.desc())
173180
.first()
174181
)
175182
if not workflow:

0 commit comments

Comments
 (0)