Skip to content

Commit 5e3a417

Browse files
committed
Improve type annotation of workflow management methods
1 parent cfdb2ee commit 5e3a417

7 files changed

Lines changed: 115 additions & 87 deletions

File tree

lib/galaxy/managers/workflows.py

Lines changed: 95 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@
113113
from galaxy.workflow.modules import (
114114
module_factory,
115115
PickValueModule,
116+
SubWorkflowModule,
116117
ToolModule,
117118
WorkflowModule,
118119
WorkflowModuleInjector,
@@ -615,6 +616,30 @@ def _step_pja_dict(step):
615616
return pja_dict
616617

617618

619+
class WorkflowStateResolutionOptions(BaseModel):
620+
# fill in default tool state when updating, may change tool_state
621+
fill_defaults: bool = False
622+
# If True, assume all tool state coming from generated form instead of potentially simpler json stored in DB/exported
623+
from_tool_form: bool = False
624+
# If False, allow running with less exact tool versions
625+
exact_tools: bool = True
626+
627+
628+
class WorkflowUpdateOptions(WorkflowStateResolutionOptions):
629+
# Only used internally, don't set. If using the API assume updating the workflows
630+
# representation with name or annotation for instance, updates the corresponding
631+
# stored workflow
632+
update_stored_workflow_attributes: bool = True
633+
allow_missing_tools: bool = False
634+
dry_run: bool = False
635+
636+
637+
class RawWorkflowDescription:
638+
def __init__(self, as_dict, workflow_path: str | None = None):
639+
self.as_dict = as_dict
640+
self.workflow_path = workflow_path
641+
642+
618643
class WorkflowContentsManager(UsesAnnotations):
619644

620645
def __init__(self, app: MinimalManagerApp, trs_proxy: TrsProxy):
@@ -740,8 +765,12 @@ def build_workflow_from_raw_description(
740765
return CreatedWorkflow(stored_workflow=stored, workflow=workflow, missing_tools=missing_tool_tups)
741766

742767
def update_workflow_from_raw_description(
743-
self, trans, stored_workflow, raw_workflow_description, workflow_update_options
744-
):
768+
self,
769+
trans,
770+
stored_workflow: StoredWorkflow,
771+
raw_workflow_description: RawWorkflowDescription,
772+
workflow_update_options: WorkflowUpdateOptions,
773+
) -> tuple[Workflow, list[str]]:
745774
raw_workflow_description = self.ensure_raw_description(raw_workflow_description)
746775

747776
# Put parameters in workflow mode
@@ -951,13 +980,13 @@ def _workflow_from_raw_description(
951980
def workflow_to_dict(
952981
self,
953982
trans,
954-
stored,
955-
style="export",
956-
version=None,
957-
history=None,
958-
instance_id=None,
959-
preserve_external_subworkflow_links=False,
960-
):
983+
stored: StoredWorkflow,
984+
style: str = "export",
985+
version: int | None = None,
986+
history: History | None = None,
987+
instance_id: int | None = None,
988+
preserve_external_subworkflow_links: bool = False,
989+
) -> dict[str, Any]:
961990
"""Export the workflow contents to a dictionary ready for JSON-ification and to be
962991
sent out via API for instance. There are three styles of export allowed 'export', 'instance', and
963992
'editor'. The Galaxy team will do its best to preserve the backward compatibility of the
@@ -974,11 +1003,7 @@ def workflow_to_dict(
9741003
def to_format_2(wf_dict, **kwds):
9751004
return from_galaxy_native(wf_dict, None, **kwds)
9761005

977-
if version == "":
978-
version = None
979-
if version is not None:
980-
version = int(version)
981-
elif instance_id:
1006+
if version is None and instance_id:
9821007
# If the instance_id is provided, we need to extract the workflow instance via the version.
9831008
for i, workflow in enumerate(reversed(stored.workflows)):
9841009
if workflow.id == instance_id:
@@ -1000,24 +1025,24 @@ def to_format_2(wf_dict, **kwds):
10001025
elif style == "format2":
10011026
wf_dict = self._workflow_to_dict_export(
10021027
trans,
1003-
stored,
10041028
workflow=workflow,
1029+
stored=stored,
10051030
preserve_external_subworkflow_links=preserve_external_subworkflow_links,
10061031
)
10071032
wf_dict = to_format_2(wf_dict)
10081033
elif style == "format2_wrapped_yaml":
10091034
wf_dict = self._workflow_to_dict_export(
10101035
trans,
1011-
stored,
10121036
workflow=workflow,
1037+
stored=stored,
10131038
preserve_external_subworkflow_links=preserve_external_subworkflow_links,
10141039
)
10151040
wf_dict = to_format_2(wf_dict, json_wrapper=True)
10161041
elif style == "ga":
10171042
wf_dict = self._workflow_to_dict_export(
10181043
trans,
1019-
stored,
10201044
workflow=workflow,
1045+
stored=stored,
10211046
preserve_external_subworkflow_links=preserve_external_subworkflow_links,
10221047
)
10231048
else:
@@ -1031,44 +1056,57 @@ def to_format_2(wf_dict, **kwds):
10311056
wf_dict["version"] = len(stored.workflows) - 1
10321057
return wf_dict
10331058

1034-
def _sync_stored_workflow(self, trans, stored_workflow):
1059+
def _sync_stored_workflow(self, trans, stored_workflow: StoredWorkflow) -> None:
10351060
if trans.user_is_admin:
10361061
workflow_path = stored_workflow.from_path
1037-
self.store_workflow_to_path(workflow_path, stored_workflow, stored_workflow.latest_workflow, trans=trans)
1062+
assert workflow_path is not None
1063+
self.store_workflow_to_path(
1064+
workflow_path, stored_workflow=stored_workflow, workflow=stored_workflow.latest_workflow, trans=trans
1065+
)
10381066

1039-
def store_workflow_artifacts(self, directory, filename_base, workflow, **kwd):
1067+
def store_workflow_artifacts(
1068+
self, directory: str, filename_base: str, workflow: Workflow, history: History, user: User | None
1069+
) -> None:
10401070
modern_workflow_path = os.path.join(directory, f"{filename_base}.gxwf.yml")
10411071
legacy_workflow_path = os.path.join(directory, f"{filename_base}.ga")
10421072
abstract_cwl_workflow_path = os.path.join(directory, f"{filename_base}.abstract.cwl")
10431073
for path in [legacy_workflow_path, modern_workflow_path, abstract_cwl_workflow_path]:
1044-
self.app.workflow_contents_manager.store_workflow_to_path(path, workflow.stored_workflow, workflow, **kwd)
1074+
self.store_workflow_to_path(
1075+
path, stored_workflow=workflow.stored_workflow, workflow=workflow, history=history, user=user
1076+
)
10451077
try:
10461078
cytoscape_path = os.path.join(directory, f"{filename_base}.html")
10471079
to_cytoscape(modern_workflow_path, cytoscape_path)
10481080
except Exception:
10491081
# completely optional and currently broken so ignore...
10501082
pass
10511083

1052-
def store_workflow_to_path(self, workflow_path, stored_workflow, workflow, **kwd):
1053-
trans = kwd.get("trans")
1084+
def store_workflow_to_path(
1085+
self,
1086+
workflow_path: str,
1087+
stored_workflow: StoredWorkflow,
1088+
workflow: Workflow,
1089+
trans=None,
1090+
history: History | None = None,
1091+
user: User | None = None,
1092+
) -> None:
10541093
if trans is None:
1055-
trans = WorkRequestContext(app=self.app, user=kwd.get("user"), history=kwd.get("history"))
1094+
trans = WorkRequestContext(app=self.app, user=user, history=history)
10561095

1057-
workflow = stored_workflow.latest_workflow
10581096
with open(workflow_path, "w") as f:
1097+
wf_dict = self._workflow_to_dict_export(trans, workflow=workflow, stored=stored_workflow)
10591098
if workflow_path.endswith(".ga"):
1060-
wf_dict = self._workflow_to_dict_export(trans, stored_workflow, workflow=workflow)
10611099
json.dump(wf_dict, f, indent=4)
10621100
elif workflow_path.endswith(".abstract.cwl"):
1063-
wf_dict = self._workflow_to_dict_export(trans, stored_workflow, workflow=workflow)
10641101
abstract_dict = from_dict(wf_dict)
10651102
ordered_dump(abstract_dict, f)
10661103
else:
1067-
wf_dict = self._workflow_to_dict_export(trans, stored_workflow, workflow=workflow)
10681104
wf_dict = from_galaxy_native(wf_dict, None, json_wrapper=True)
10691105
f.write(wf_dict["yaml_content"])
10701106

1071-
def _workflow_to_dict_run(self, trans: ProvidesUserContext, stored, workflow, history=None):
1107+
def _workflow_to_dict_run(
1108+
self, trans: ProvidesUserContext, stored: StoredWorkflow, workflow: Workflow, history: History | None = None
1109+
) -> dict[str, Any]:
10721110
"""
10731111
Builds workflow dictionary used by run workflow form
10741112
"""
@@ -1079,7 +1117,7 @@ def _workflow_to_dict_run(self, trans: ProvidesUserContext, stored, workflow, hi
10791117
trans.workflow_building_mode = workflow_building_modes.USE_HISTORY
10801118
module_injector = WorkflowModuleInjector(trans)
10811119
has_upgrade_messages = False
1082-
step_version_changes = []
1120+
step_version_changes: list[str] = []
10831121
missing_tools = []
10841122
errors = {}
10851123
module_injector.inject_all(workflow, exact_tools=False, ignore_tool_missing_exception=True)
@@ -1094,20 +1132,20 @@ def _workflow_to_dict_run(self, trans: ProvidesUserContext, stored, workflow, hi
10941132
if step.upgrade_messages:
10951133
has_upgrade_messages = True
10961134
if step.type in ("tool", "subworkflow", None):
1135+
assert isinstance(step.module, (ToolModule, SubWorkflowModule))
10971136
if step.module.version_changes:
10981137
step_version_changes.extend(step.module.version_changes)
10991138
step_errors = step.module.get_errors()
11001139
if step_errors:
11011140
errors[step.id] = step_errors
11021141
if missing_tools:
1103-
workflow.annotation = self.get_item_annotation_str(trans.sa_session, trans.user, workflow)
11041142
raise exceptions.MessageException(f"Following tools missing: {', '.join(missing_tools)}")
1105-
workflow.annotation = self.get_item_annotation_str(trans.sa_session, trans.user, workflow)
11061143
step_order_indices = {}
11071144
for step in workflow.steps:
11081145
step_order_indices[step.id] = step.order_index
11091146
step_models = []
11101147
for step in workflow.steps:
1148+
assert step.module is not None
11111149
step_model = None
11121150
if step.type == "tool":
11131151
incoming: dict[str, Any] = {}
@@ -1118,6 +1156,7 @@ def _workflow_to_dict_run(self, trans: ProvidesUserContext, stored, workflow, hi
11181156
raise exceptions.MessageException(
11191157
f"Following tool missing or inaccessible: '{step.tool_id}/{step.tool_uuid}'"
11201158
)
1159+
assert step.state is not None
11211160
params_to_incoming(incoming, tool.inputs, step.state.inputs, trans.app)
11221161
step_model = tool.to_json(
11231162
trans, incoming, workflow_building_mode=workflow_building_modes.USE_HISTORY, history=history
@@ -1141,15 +1180,17 @@ def _workflow_to_dict_run(self, trans: ProvidesUserContext, stored, workflow, hi
11411180
step_model["step_name"] = step.module.get_name()
11421181
step_model["step_version"] = step.module.get_version()
11431182
step_model["step_index"] = step.order_index
1144-
step_model["output_connections"] = [
1145-
{
1146-
"input_step_index": step_order_indices.get(oc.input_step_id),
1147-
"output_step_index": step_order_indices.get(oc.output_step_id),
1148-
"input_name": oc.input_name,
1149-
"output_name": oc.output_name,
1150-
}
1151-
for oc in step.output_connections
1152-
]
1183+
step_model["output_connections"] = []
1184+
for oc in step.output_connections:
1185+
assert oc.output_step_id is not None
1186+
step_model["output_connections"].append(
1187+
{
1188+
"input_step_index": step_order_indices.get(oc.input_step_id),
1189+
"output_step_index": step_order_indices.get(oc.output_step_id),
1190+
"input_name": oc.input_name,
1191+
"output_name": oc.output_name,
1192+
}
1193+
)
11531194
if step.annotations:
11541195
step_model["annotation"] = step.annotations[0].annotation
11551196
if step.upgrade_messages:
@@ -1537,12 +1578,12 @@ def _resolve_collection_type(self, steps):
15371578
def _workflow_to_dict_export(
15381579
self,
15391580
trans,
1540-
stored=None,
1541-
workflow=None,
1542-
internal=False,
1543-
allow_upgrade=False,
1544-
preserve_external_subworkflow_links=False,
1545-
):
1581+
workflow: Workflow,
1582+
stored: StoredWorkflow | None = None,
1583+
internal: bool = False,
1584+
allow_upgrade: bool = False,
1585+
preserve_external_subworkflow_links: bool = False,
1586+
) -> dict[str, Any]:
15461587
"""Export the workflow contents to a dictionary ready for JSON-ification and export.
15471588
15481589
If internal, use content_ids instead subworkflow definitions.
@@ -1664,6 +1705,7 @@ def _workflow_to_dict_export(
16641705
del step_dict["tool_version"]
16651706
del step_dict["tool_state"]
16661707
subworkflow = step.subworkflow
1708+
assert subworkflow is not None
16671709
if preserve_external_subworkflow_links and subworkflow.source_metadata:
16681710
del step_dict["content_id"]
16691711
source_metadata = subworkflow.source_metadata
@@ -1677,8 +1719,8 @@ def _workflow_to_dict_export(
16771719
del step_dict["content_id"]
16781720
subworkflow_as_dict = self._workflow_to_dict_export(
16791721
trans,
1680-
stored=None,
16811722
workflow=subworkflow,
1723+
stored=None,
16821724
preserve_external_subworkflow_links=preserve_external_subworkflow_links,
16831725
)
16841726
step_dict["subworkflow"] = subworkflow_as_dict
@@ -1782,7 +1824,9 @@ def callback(input, prefixed_name, **kwargs):
17821824
steps[step.order_index] = step_dict
17831825
return data
17841826

1785-
def _workflow_to_dict_instance(self, trans, stored, workflow, legacy=True):
1827+
def _workflow_to_dict_instance(
1828+
self, trans, stored: StoredWorkflow, workflow: Workflow, legacy: bool = True
1829+
) -> dict[str, Any]:
17861830
encode = self.app.security.encode_id
17871831
sa_session = self.app.model.context
17881832
item = stored.to_dict(view="element")
@@ -1844,6 +1888,7 @@ def _workflow_to_dict_instance(self, trans, stored, workflow, legacy=True):
18441888
del step_dict["tool_id"]
18451889
del step_dict["tool_version"]
18461890
del step_dict["tool_inputs"]
1891+
assert step.subworkflow is not None
18471892
step_dict["workflow_id"] = step.subworkflow.id
18481893

18491894
for conn in step.input_connections:
@@ -2244,7 +2289,7 @@ def do_refactor(
22442289
workflow = stored_workflow.get_internal_version(refactor_request.version)
22452290

22462291
as_dict = self._workflow_to_dict_export(
2247-
trans, stored_workflow, workflow=workflow, internal=True, allow_upgrade=True
2292+
trans, workflow=workflow, stored=stored_workflow, internal=True, allow_upgrade=True
22482293
)
22492294
raw_workflow_description = self.normalize_workflow_format(trans, as_dict)
22502295
workflow_update_options = WorkflowUpdateOptions(
@@ -2406,24 +2451,6 @@ class RefactorResponse(BaseModel):
24062451
dry_run: bool
24072452

24082453

2409-
class WorkflowStateResolutionOptions(BaseModel):
2410-
# fill in default tool state when updating, may change tool_state
2411-
fill_defaults: bool = False
2412-
# If True, assume all tool state coming from generated form instead of potentially simpler json stored in DB/exported
2413-
from_tool_form: bool = False
2414-
# If False, allow running with less exact tool versions
2415-
exact_tools: bool = True
2416-
2417-
2418-
class WorkflowUpdateOptions(WorkflowStateResolutionOptions):
2419-
# Only used internally, don't set. If using the API assume updating the workflows
2420-
# representation with name or annotation for instance, updates the corresponding
2421-
# stored workflow
2422-
update_stored_workflow_attributes: bool = True
2423-
allow_missing_tools: bool = False
2424-
dry_run: bool = False
2425-
2426-
24272454
# Workflow update options but with some different defaults - we allow creating
24282455
# workflows with missing tools by default but not updating.
24292456
class WorkflowCreateOptions(WorkflowStateResolutionOptions):
@@ -2476,12 +2503,6 @@ def __init__(self, workflow, errors):
24762503
self.errors = errors
24772504

24782505

2479-
class RawWorkflowDescription:
2480-
def __init__(self, as_dict, workflow_path=None):
2481-
self.as_dict = as_dict
2482-
self.workflow_path = workflow_path
2483-
2484-
24852506
def _get_stored_workflow(session, workflow_uuid, workflow_id, by_stored_id):
24862507
stmt = select(StoredWorkflow)
24872508
if workflow_uuid is not None:

0 commit comments

Comments
 (0)