|
1 | 1 | import inspect |
2 | | -import json |
3 | 2 | import os |
4 | 3 | import re |
5 | | -from collections import OrderedDict |
6 | 4 | from typing import ( |
7 | 5 | Any, |
8 | 6 | Dict, |
|
13 | 11 | Tuple, |
14 | 12 | TYPE_CHECKING, |
15 | 13 | ) |
16 | | -from urllib.parse import urlparse |
17 | 14 |
|
18 | 15 | import requests |
19 | 16 | import yaml |
|
24 | 21 | from galaxy.tool_util.parser.yaml import __to_test_assert_list |
25 | 22 | from galaxy.tool_util.verify import asserts |
26 | 23 | from gxformat2.lint import ( |
27 | | - lint_format2, |
28 | | - lint_ga, |
| 24 | + lint_best_practices_format2, |
| 25 | + lint_best_practices_ga, |
| 26 | + lint_format2_path, |
| 27 | + lint_ga_path, |
| 28 | + lint_pydantic_validation, |
29 | 29 | ) |
30 | 30 | from gxformat2.yaml import ordered_load |
31 | 31 |
|
@@ -59,6 +59,17 @@ class WorkflowLintContext(LintContext): |
59 | 59 | # from click arguments. |
60 | 60 | training_topic = None |
61 | 61 |
|
| 62 | + def warn(self, message, linter=None, *args, **kwargs): |
| 63 | + # gxformat2 lint rules pass Linter subclasses; galaxy LintMessage expects a name string. |
| 64 | + if isinstance(linter, type): |
| 65 | + linter = linter.__name__ |
| 66 | + super().warn(message, linter, *args, **kwargs) |
| 67 | + |
| 68 | + def error(self, message, linter=None, *args, **kwargs): |
| 69 | + if isinstance(linter, type): |
| 70 | + linter = linter.__name__ |
| 71 | + super().error(message, linter, *args, **kwargs) |
| 72 | + |
62 | 73 |
|
63 | 74 | def build_wf_lint_args(ctx: "PlanemoCliContext", **kwds) -> Dict[str, Any]: |
64 | 75 | lint_args = build_lint_args(ctx, **kwds) |
@@ -158,15 +169,8 @@ def _lint_workflow_artifacts_on_path(lint_context: WorkflowLintContext, path: st |
158 | 169 | ) |
159 | 170 |
|
160 | 171 | elif looks_like_a_workflow(potential_workflow_artifact_path): |
161 | | - |
162 | | - def structure(path, lint_context): |
163 | | - with open(path) as f: |
164 | | - workflow_dict = ordered_load(f) |
165 | | - workflow_class = workflow_dict.get("class") |
166 | | - lint_func = lint_format2 if workflow_class == "GalaxyWorkflow" else lint_ga |
167 | | - lint_func(lint_context, workflow_dict, path=path) |
168 | | - |
169 | | - lint_context.lint("lint_structure", structure, potential_workflow_artifact_path) |
| 172 | + lint_context.lint("lint_structure", _lint_structure, potential_workflow_artifact_path) |
| 173 | + lint_context.lint("lint_schema_validation", _lint_schema_validation, potential_workflow_artifact_path) |
170 | 174 | if lint_args["iwc_grade"]: |
171 | 175 | lint_context.lint("lint_release", _lint_release, potential_workflow_artifact_path) |
172 | 176 | lint_context.lint("lint_best_practices", _lint_best_practices, potential_workflow_artifact_path) |
@@ -200,91 +204,31 @@ def _lint_tsts(path: str, lint_context: WorkflowLintContext) -> None: |
200 | 204 | lint_context.valid(f"Tests appear structurally correct for {runnable.path}") |
201 | 205 |
|
202 | 206 |
|
203 | | -def _lint_best_practices(path: str, lint_context: WorkflowLintContext) -> None: # noqa: C901 |
204 | | - """ |
205 | | - This function duplicates the checks made by Galaxy's best practices panel: |
206 | | - https://github.com/galaxyproject/galaxy/blob/5396bb15fe8cfcf2e89d46c1d061c49b60e2f0b1/client/src/components/Workflow/Editor/Lint.vue |
207 | | - """ |
208 | | - |
209 | | - def check_json_for_untyped_params(j): |
210 | | - values = j.values() if isinstance(j, dict) else j |
211 | | - for value in values: |
212 | | - if type(value) in [list, dict, OrderedDict]: |
213 | | - if check_json_for_untyped_params(value): |
214 | | - return True |
215 | | - elif isinstance(value, str): |
216 | | - if re.match(r"\$\{.+?\}", value): |
217 | | - return True |
218 | | - return False |
| 207 | +def _lint_structure(path: str, lint_context: WorkflowLintContext) -> None: |
| 208 | + workflow_dict = _load_workflow_dict(path) |
| 209 | + if workflow_dict.get("class") == "GalaxyWorkflow": |
| 210 | + lint_format2_path(lint_context, path) |
| 211 | + else: |
| 212 | + lint_ga_path(lint_context, path) |
219 | 213 |
|
220 | | - with open(path) as f: |
221 | | - workflow_dict = ordered_load(f) |
222 | 214 |
|
223 | | - steps = workflow_dict.get("steps", {}) |
| 215 | +def _lint_schema_validation(path: str, lint_context: WorkflowLintContext) -> None: |
| 216 | + workflow_dict = _load_workflow_dict(path) |
| 217 | + is_format2 = workflow_dict.get("class") == "GalaxyWorkflow" |
| 218 | + lint_pydantic_validation(lint_context, workflow_dict, format2=is_format2) |
224 | 219 |
|
225 | | - # annotation |
226 | | - if not workflow_dict.get("annotation"): |
227 | | - lint_context.warn("Workflow is not annotated.") |
228 | 220 |
|
229 | | - # creator |
230 | | - creators = workflow_dict.get("creator", []) |
231 | | - if not len(creators) > 0: |
232 | | - lint_context.warn("Workflow does not specify a creator.") |
| 221 | +def _lint_best_practices(path: str, lint_context: WorkflowLintContext) -> None: |
| 222 | + workflow_dict = _load_workflow_dict(path) |
| 223 | + if workflow_dict.get("class") == "GalaxyWorkflow": |
| 224 | + lint_best_practices_format2(lint_context, workflow_dict) |
233 | 225 | else: |
234 | | - if not isinstance(creators, list): |
235 | | - # Don't know if this can happen, if we implement schema validation on the Galaxy side |
236 | | - # this won't be needed. |
237 | | - creators = [creators] |
238 | | - for creator in creators: |
239 | | - if creator.get("class", "").lower() == "person" and "identifier" in creator: |
240 | | - identifier = creator["identifier"] |
241 | | - parsed_url = urlparse(identifier) |
242 | | - if not parsed_url.scheme: |
243 | | - lint_context.warn( |
244 | | - f'Creator identifier "{identifier}" should be a fully qualified URI, for example "https://orcid.org/0000-0002-1825-0097".' |
245 | | - ) |
246 | | - |
247 | | - # license |
248 | | - if not workflow_dict.get("license"): |
249 | | - lint_context.warn("Workflow does not specify a license.") |
250 | | - |
251 | | - # checks on individual steps |
252 | | - for step in steps.values(): |
253 | | - # disconnected inputs |
254 | | - if step.get("type") not in ["data_collection_input", "parameter_input"]: |
255 | | - for input in step.get("inputs", []): |
256 | | - if input.get("name") not in step.get("input_connections"): # TODO: check optional |
257 | | - lint_context.warn( |
258 | | - f"Input {input.get('name')} of workflow step {step.get('annotation') or step.get('id')} is disconnected." |
259 | | - ) |
260 | | - |
261 | | - # missing metadata |
262 | | - if not step.get("annotation"): |
263 | | - lint_context.warn(f"Workflow step with ID {step.get('id')} has no annotation.") |
264 | | - if not step.get("label"): |
265 | | - lint_context.warn(f"Workflow step with ID {step.get('id')} has no label.") |
266 | | - |
267 | | - # untyped parameters |
268 | | - if workflow_dict.get("class") == "GalaxyWorkflow": |
269 | | - tool_state = step.get("tool_state", {}) |
270 | | - pjas = step.get("out", {}) |
271 | | - else: |
272 | | - raw_tool_state = step.get("tool_state", {}) |
273 | | - if isinstance(raw_tool_state, str): |
274 | | - tool_state = json.loads(raw_tool_state) |
275 | | - else: |
276 | | - tool_state = raw_tool_state |
277 | | - pjas = step.get("post_job_actions", {}) |
| 226 | + lint_best_practices_ga(lint_context, workflow_dict) |
278 | 227 |
|
279 | | - if check_json_for_untyped_params(tool_state): |
280 | | - lint_context.warn(f"Workflow step with ID {step.get('id')} specifies an untyped parameter as an input.") |
281 | 228 |
|
282 | | - if check_json_for_untyped_params(pjas): |
283 | | - lint_context.warn( |
284 | | - f"Workflow step with ID {step.get('id')} specifies an untyped parameter in the post-job actions." |
285 | | - ) |
286 | | - |
287 | | - # unlabeled outputs are checked by gxformat2, no need to check here |
| 229 | +def _load_workflow_dict(path: str) -> Dict[str, Any]: |
| 230 | + with open(path) as f: |
| 231 | + return ordered_load(f) |
288 | 232 |
|
289 | 233 |
|
290 | 234 | def _lint_case(path: str, test_case: TestCase, lint_context: WorkflowLintContext) -> bool: |
|
0 commit comments