diff --git a/Makefile b/Makefile index 019fb9c11..76abd5add 100644 --- a/Makefile +++ b/Makefile @@ -73,6 +73,9 @@ lint: ## check style using tox and flake8 for Python 2 and Python 3 test: ## run tests with the default Python (faster than tox) $(IN_VENV) pytest $(TESTS) +format: ## format Python code with black + $(IN_VENV) black planemo tests + quick-test: ## run quickest tests with the default Python $(IN_VENV) PLANEMO_SKIP_SLOW_TESTS=1 PLANEMO_SKIP_GALAXY_TESTS=1 pytest $(TESTS) diff --git a/planemo/commands/cmd_run.py b/planemo/commands/cmd_run.py index edd7c9ad2..2b3cf3630 100644 --- a/planemo/commands/cmd_run.py +++ b/planemo/commands/cmd_run.py @@ -30,7 +30,6 @@ @options.run_download_outputs_option() @options.engine_options() @options.test_options() -@options.no_early_termination_option() @command_function def cli(ctx, runnable_identifier, job_path, **kwds): """Planemo command for running tools and jobs. diff --git a/planemo/commands/cmd_workflow_test_on_invocation.py b/planemo/commands/cmd_workflow_test_on_invocation.py index 8083e2a6a..fea77cd6f 100644 --- a/planemo/commands/cmd_workflow_test_on_invocation.py +++ b/planemo/commands/cmd_workflow_test_on_invocation.py @@ -15,9 +15,7 @@ @click.command("workflow_test_on_invocation") @options.optional_tools_arg(multiple=False, allow_uris=False, metavar="TEST.YML") -@options.required_invocation_id_arg() -@options.galaxy_url_option(required=True) -@options.galaxy_user_key_option(required=True) +@options.invocation_target_options() @options.test_index_option() @options.test_output_options() @command_function @@ -34,7 +32,10 @@ def cli(ctx, path, invocation_id, test_index, **kwds): len(test_cases) >= test_index ), f"Selected test case {test_index}, but only found {len(test_cases)} test case(s)." test_case = test_cases[test_index - 1] - run_response = invocation_to_run_response(ctx, user_gi=config.user_gi, runnable=runnable, invocation=invocation) + # Hardcode fail_fast, no need to expose the option to the user IMO. + run_response = invocation_to_run_response( + ctx, user_gi=config.user_gi, runnable=runnable, invocation=invocation, fail_fast=True + ) structured_data = test_case.structured_test_data(run_response) test_data = { "version": "0.1", diff --git a/planemo/commands/cmd_workflow_track.py b/planemo/commands/cmd_workflow_track.py new file mode 100644 index 000000000..20c2c09e9 --- /dev/null +++ b/planemo/commands/cmd_workflow_track.py @@ -0,0 +1,28 @@ +"""Module describing the planemo ``workflow_track`` command.""" + +import click + +from planemo import options +from planemo.cli import command_function +from planemo.engine.factory import engine_context +from planemo.galaxy.activity import wait_for_invocation_and_jobs + + +@click.command("workflow_track") +@options.invocation_target_options() +@options.fail_fast_option() +@command_function +def cli(ctx, invocation_id, **kwds): + """Follow the progress of a workflow invocation.""" + with engine_context(ctx, engine="external_galaxy", **kwds) as engine, engine.ensure_runnables_served([]) as config: + user_gi = config.user_gi + wait_for_invocation_and_jobs( + ctx, + invocation_id, + history_id=None, + user_gi=user_gi, + polling_backoff=5, + fail_fast=kwds.get("fail_fast", False), + ) + + ctx.exit(0) diff --git a/planemo/galaxy/activity.py b/planemo/galaxy/activity.py index c75ee107a..ac41b083b 100644 --- a/planemo/galaxy/activity.py +++ b/planemo/galaxy/activity.py @@ -3,7 +3,6 @@ import os import sys import tempfile -import time import traceback from datetime import datetime from typing import ( @@ -36,12 +35,16 @@ unicodify, ) from pathvalidate import sanitize_filename -from requests.exceptions import ( - HTTPError, - RequestException, -) +from requests.exceptions import HTTPError -from planemo.galaxy.api import summarize_history +from planemo.galaxy.api import ( + retry_on_timeouts, + summarize_history, +) +from planemo.galaxy.invocations.api import BioblendInvocationApi +from planemo.galaxy.invocations.polling import PollingTrackerImpl +from planemo.galaxy.invocations.polling import wait_for_invocation_and_jobs as polling_wait_for_invocation_and_jobs +from planemo.galaxy.invocations.progress import WorkflowProgressDisplay from planemo.io import wait_on from planemo.runnable import ( ErrorRunResponse, @@ -66,12 +69,12 @@ def execute( - ctx: "PlanemoCliContext", config: "BaseGalaxyConfig", runnable: Runnable, job_path: str, **kwds + ctx: "PlanemoCliContext", config: "BaseGalaxyConfig", runnable: Runnable, job_path: str, fail_fast=False, **kwds ) -> RunResponse: """Execute a Galaxy activity.""" try: start_datetime = datetime.now() - return _execute(ctx, config, runnable, job_path, **kwds) + return _execute(ctx, config, runnable, job_path, fail_fast=fail_fast, **kwds) except Exception as e: end_datetime = datetime.now() ctx.log("Failed to execute Galaxy activity, throwing ErrorRunResponse") @@ -148,7 +151,7 @@ def _log(self, message): def _execute( # noqa C901 - ctx: "PlanemoCliContext", config: "BaseGalaxyConfig", runnable: Runnable, job_path: str, **kwds + ctx: "PlanemoCliContext", config: "BaseGalaxyConfig", runnable: Runnable, job_path: str, fail_fast=False, **kwds ) -> "GalaxyBaseRunResponse": user_gi = config.user_gi admin_gi = config.gi @@ -215,7 +218,7 @@ def _execute( # noqa C901 no_wait=kwds.get("no_wait", False), start_datetime=start_datetime, log=log_contents_str(config), - early_termination=not kwds.get("no_early_termination", False), + fail_fast=fail_fast, ) else: @@ -249,7 +252,7 @@ def invocation_to_run_response( no_wait=False, start_datetime=None, log=None, - early_termination=True, + fail_fast=False, ): start_datetime = start_datetime or datetime.now() invocation_id = invocation["id"] @@ -258,19 +261,23 @@ def invocation_to_run_response( ctx.vlog("Waiting for invocation [%s]" % invocation_id) - final_invocation_state, job_state, error_message = wait_for_invocation_and_jobs( - ctx, - invocation_id=invocation_id, - history_id=history_id, - user_gi=user_gi, - no_wait=no_wait, - polling_backoff=polling_backoff, - early_termination=early_termination, - ) - if final_invocation_state not in ("ok", "skipped", "scheduled"): - msg = f"Failed to run workflow [{workflow_id}], at least one job is in [{final_invocation_state}] state." - ctx.vlog(msg) - summarize_history(ctx, user_gi, history_id) + if not no_wait: + final_invocation_state, job_state, error_message = wait_for_invocation_and_jobs( + ctx, + invocation_id=invocation_id, + history_id=history_id, + user_gi=user_gi, + polling_backoff=polling_backoff, + fail_fast=fail_fast, + ) + if final_invocation_state not in ("ok", "skipped", "scheduled"): + msg = f"Failed to run workflow [{workflow_id}], at least one job is in [{final_invocation_state}] state." + ctx.vlog(msg) + summarize_history(ctx, user_gi, history_id) + else: + final_invocation_state = invocation["state"] + job_state = None + error_message = None return GalaxyWorkflowRunResponse( ctx, @@ -779,86 +786,28 @@ def _history_id(gi, **kwds) -> str: def wait_for_invocation_and_jobs( ctx, invocation_id: str, - history_id: str, + history_id: Optional[str], user_gi: GalaxyInstance, - no_wait: bool, polling_backoff: int, - early_termination: bool, + fail_fast: bool = False, ): - ctx.vlog("Waiting for invocation [%s]" % invocation_id) - final_invocation_state = "new" - - # TODO: hook in invocation["messages"] - error_message = "" - job_state = "ok" - try: - final_invocation_state = _wait_for_invocation(ctx, user_gi, invocation_id, polling_backoff) - assert final_invocation_state == "scheduled" - except Exception as e: - ctx.vlog(f"Problem waiting on invocation: {str(e)}") - summarize_history(ctx, user_gi, history_id) - error_message = f"Final state of invocation {invocation_id} is [{final_invocation_state}]" - - ctx.vlog(f"Final state of invocation {invocation_id} is [{final_invocation_state}]") - - if not no_wait: - job_state = _wait_for_invocation_jobs(ctx, user_gi, invocation_id, polling_backoff, early_termination) - if job_state not in ("ok", "skipped"): - msg = f"Failed to run workflow, at least one job is in [{job_state}] state." - error_message = msg if not error_message else f"{error_message}. {msg}" - else: - # wait for possible subworkflow invocations - invocation = user_gi.invocations.show_invocation(invocation_id) - for step in invocation["steps"]: - if step.get("subworkflow_invocation_id") is not None: - final_invocation_state, job_state, error_message = wait_for_invocation_and_jobs( - ctx, - invocation_id=step["subworkflow_invocation_id"], - history_id=history_id, - user_gi=user_gi, - no_wait=no_wait, - polling_backoff=polling_backoff, - early_termination=early_termination, - ) - if final_invocation_state != "scheduled" or job_state not in ("ok", "skipped"): - return final_invocation_state, job_state, error_message - - ctx.vlog(f"The final state of all jobs and subworkflow invocations for invocation [{invocation_id}] is 'ok'") - return final_invocation_state, job_state, error_message - - -def _wait_for_invocation(ctx, gi, invocation_id, polling_backoff=0): - def state_func(): - return _retry_on_timeouts(ctx, gi, lambda gi: gi.invocations.show_invocation(invocation_id)) - - return _wait_on_state(state_func, polling_backoff) - - -def _retry_on_timeouts(ctx, gi, f): - gi.timeout = 60 - try_count = 5 - try: - for try_num in range(try_count): - start_time = time.time() - try: - return f(gi) - except RequestException: - end_time = time.time() - if end_time - start_time > 45 and (try_num + 1) < try_count: - ctx.vlog("Galaxy seems to have timed out, retrying to fetch status.") - continue - else: - raise - finally: - gi.timeout = None - - -def has_jobs_in_states(ctx, gi, history_id, states): - params = {"history_id": history_id} - jobs_url = gi.url + "/jobs" - jobs = gi.jobs._get(url=jobs_url, params=params) - target_jobs = [j for j in jobs if j["state"] in states] - return len(target_jobs) > 0 + polling_tracker = PollingTrackerImpl(polling_backoff) + invocation_api = BioblendInvocationApi(ctx, user_gi) + with WorkflowProgressDisplay(invocation_id, galaxy_url=user_gi.base_url) as workflow_progress_display: + final_invocation_state, job_state, error_message = polling_wait_for_invocation_and_jobs( + ctx, + invocation_id, + invocation_api, + polling_tracker, + workflow_progress_display, + fail_fast=fail_fast, + ) + if error_message: + if not history_id: + invocation = invocation_api.get_invocation(invocation_id) + history_id = invocation["history_id"] + summarize_history(ctx, user_gi, history_id) + return final_invocation_state, job_state, error_message def _wait_for_history(ctx, gi, history_id, polling_backoff=0): @@ -867,24 +816,11 @@ def _wait_for_history(ctx, gi, history_id, polling_backoff=0): # no need to wait for active jobs anymore I think. def state_func(): - return _retry_on_timeouts(ctx, gi, lambda gi: gi.histories.show_history(history_id)) + return retry_on_timeouts(ctx, gi, lambda gi: gi.histories.show_history(history_id)) return _wait_on_state(state_func, polling_backoff) -def _wait_for_invocation_jobs(ctx, gi, invocation_id, polling_backoff=0, early_termination=True): - # Wait for invocation jobs to finish. Less brittle than waiting for a history to finish, - # as you could have more than one invocation in a history, or an invocation without - # steps that produce history items. - - ctx.log(f"waiting for invocation {invocation_id}") - - def state_func(): - return _retry_on_timeouts(ctx, gi, lambda gi: gi.jobs.get_jobs(invocation_id=invocation_id)) - - return _wait_on_state(state_func, polling_backoff, early_termination=early_termination) - - def _wait_for_job(gi, job_id, timeout=None): def state_func(): return gi.jobs.show_job(job_id, full_details=True) @@ -892,7 +828,7 @@ def state_func(): return _wait_on_state(state_func, timeout=timeout) -def _wait_on_state(state_func, polling_backoff=0, timeout=None, early_termination=True): +def _wait_on_state(state_func, polling_backoff=0, timeout=None): def get_state(): response = state_func() if not isinstance(response, list): @@ -914,8 +850,6 @@ def get_state(): "cancelled", "failed", ] - if not early_termination and current_non_terminal_states: - return None for terminal_state in hierarchical_fail_states: if terminal_state in current_states: # If we got here something has failed and we can return (early) diff --git a/planemo/galaxy/api.py b/planemo/galaxy/api.py index 6f4906f33..cbcea69a8 100644 --- a/planemo/galaxy/api.py +++ b/planemo/galaxy/api.py @@ -1,9 +1,11 @@ """A high-level interface to local Galaxy instances using bioblend.""" +import time from io import StringIO from typing import Optional from bioblend.galaxy import GalaxyInstance +from requests.exceptions import RequestException DEFAULT_ADMIN_API_KEY = "test_key" @@ -136,6 +138,25 @@ def _dataset_provenance(gi, history_id, id): return provenance +def retry_on_timeouts(ctx, gi, f): + gi.timeout = 60 + try_count = 5 + try: + for try_num in range(try_count): + start_time = time.time() + try: + return f(gi) + except RequestException: + end_time = time.time() + if end_time - start_time > 45 and (try_num + 1) < try_count: + ctx.vlog("Galaxy seems to have timed out, retrying to fetch status.") + continue + else: + raise + finally: + gi.timeout = None + + __all__ = ( "DEFAULT_ADMIN_API_KEY", "gi", diff --git a/planemo/galaxy/invocations/__init__.py b/planemo/galaxy/invocations/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/planemo/galaxy/invocations/api.py b/planemo/galaxy/invocations/api.py new file mode 100644 index 000000000..1518c125e --- /dev/null +++ b/planemo/galaxy/invocations/api.py @@ -0,0 +1,62 @@ +"""API interaction for Galaxy's workflow invocation API. + +Gives a mockable surface for testing, type contract consumed by Planemo, +and builtin utilities around bioblend for working around transient request +issues that have been observed in practice. +""" + +from typing import ( + Dict, + List, + Optional, + Protocol, +) + +from typing_extensions import TypedDict + +from planemo.galaxy.api import retry_on_timeouts + + +class InvocationStep(TypedDict, total=False): + state: Optional[str] + subworkflow_invocation_id: Optional[str] + + +class Invocation(TypedDict, total=False): + id: str + state: str + steps: List[InvocationStep] + history_id: Optional[str] + + +class InvocationJobsSummary(TypedDict, total=False): + states: Dict[str, int] + + +class InvocationApi(Protocol): + + def get_invocation(self, invocation_id: str) -> Invocation: ... + + def get_invocation_summary(self, invocation_id: str) -> InvocationJobsSummary: ... + + +class BioblendInvocationApi(InvocationApi): + + def __init__(self, ctx, user_gi): + self._ctx = ctx + self._user_gi = user_gi + + def get_invocation(self, invocation_id: str) -> Invocation: + return retry_on_timeouts(self._ctx, self._user_gi, lambda gi: gi.invocations.show_invocation(invocation_id)) + + def get_invocation_summary(self, invocation_id: str) -> InvocationJobsSummary: + return retry_on_timeouts( + self._ctx, self._user_gi, lambda gi: gi.invocations.get_invocation_summary(invocation_id) + ) + + +def invocation_state_terminal(state: str): + return state in ["scheduled", "cancelled", "failed"] + + +JOB_ERROR_STATES = ["error", "deleted", "failed", "stopped", "stop", "deleting"] diff --git a/planemo/galaxy/invocations/polling.py b/planemo/galaxy/invocations/polling.py new file mode 100644 index 000000000..bc6e1669d --- /dev/null +++ b/planemo/galaxy/invocations/polling.py @@ -0,0 +1,204 @@ +import time +from typing import ( + List, + Optional, + Protocol, +) + +from .api import ( + invocation_state_terminal, + InvocationApi, + InvocationJobsSummary, + JOB_ERROR_STATES, +) +from .progress import WorkflowProgressDisplay + + +class PollingTracker(Protocol): + + def sleep(self) -> None: ... + + +class PollingTrackerImpl(PollingTracker): + + def __init__(self, polling_backoff: int, timeout=None): + self.polling_backoff = polling_backoff + self.timeout = timeout + self.delta = 0.25 + self.total_wait_time = 0 + + def sleep(self): + if self.timeout is not None and self.total_wait_time > self.timeout: + message = "Timed out while polling Galaxy." + raise Exception(message) + self.total_wait_time += self.delta + time.sleep(self.delta) + self.delta += self.polling_backoff + + +def _summarize_invocation(invocation_api: InvocationApi, invocation_id: str): + invocation = invocation_api.get_invocation(invocation_id) + assert invocation + invocation_jobs = invocation_api.get_invocation_summary(invocation_id) + return invocation, invocation_jobs + + +def _poll_main_workflow( + ctx, + invocation_id: str, + invocation_api: InvocationApi, + workflow_progress_display: WorkflowProgressDisplay, + fail_fast: bool, +): + if workflow_progress_display.workflow_progress.terminal: + return None, None, None + + try: + invocation, invocation_jobs = _summarize_invocation(invocation_api, invocation_id) + workflow_progress_display.handle_invocation(invocation, invocation_jobs) + return invocation, invocation_jobs, None + except Exception as e: + print(e) + return None, None, e + + +def _poll_subworkflow( + ctx, + invocation_id: str, + invocation_api: InvocationApi, + workflow_progress_display: WorkflowProgressDisplay, + fail_fast: bool, +): + if workflow_progress_display.all_subworkflows_complete(): + return None, None, None + + try: + subworkflow_id = workflow_progress_display.an_incomplete_subworkflow_id() + invocation, invocation_jobs = _summarize_invocation(invocation_api, subworkflow_id) + workflow_progress_display.handle_subworkflow_invocation(invocation, invocation_jobs) + return invocation, invocation_jobs, None + except Exception as e: + return None, None, e + + +def _check_for_errors( + ctx, + invocation_id: str, + exception: Optional[Exception], + invocation, + invocation_jobs, + fail_fast: bool, +): + error_message = workflow_in_error_message( + ctx, invocation_id, exception, invocation, invocation_jobs, fail_fast=fail_fast + ) + if error_message: + final_state = "new" if not invocation else invocation["state"] + job_state = summary_job_state(invocation_jobs) + return final_state, job_state, error_message + return None + + +def _is_polling_complete(workflow_progress_display: WorkflowProgressDisplay) -> bool: + return ( + workflow_progress_display.workflow_progress.terminal and workflow_progress_display.all_subworkflows_complete() + ) + + +def wait_for_invocation_and_jobs( + ctx, + invocation_id: str, + invocation_api: InvocationApi, + polling_tracker: PollingTracker, + workflow_progress_display: WorkflowProgressDisplay, + fail_fast: bool = False, +): + ctx.vlog("Waiting for invocation [%s]" % invocation_id) + + last_invocation = None + last_invocation_jobs = None + error_message: Optional[str] = None + + while not _is_polling_complete(workflow_progress_display): + # Poll main workflow + main_invocation, main_jobs, main_exception = _poll_main_workflow( + ctx, invocation_id, invocation_api, workflow_progress_display, fail_fast + ) + + if main_invocation: + last_invocation = main_invocation + last_invocation_jobs = main_jobs + + error_result = _check_for_errors(ctx, invocation_id, main_exception, main_invocation, main_jobs, fail_fast) + if error_result: + return error_result + + # Poll subworkflow + sub_invocation, sub_jobs, sub_exception = _poll_subworkflow( + ctx, invocation_id, invocation_api, workflow_progress_display, fail_fast + ) + + error_result = _check_for_errors(ctx, invocation_id, sub_exception, sub_invocation, sub_jobs, fail_fast) + if error_result: + return error_result + + if not _is_polling_complete(workflow_progress_display): + polling_tracker.sleep() + + ctx.vlog(f"The final state of all jobs and subworkflow invocations for invocation [{invocation_id}] is 'ok'") + job_state = summary_job_state(last_invocation_jobs) + assert last_invocation + return last_invocation["state"], job_state, error_message + + +def workflow_in_error_message( + ctx, + invocation_id, + last_exception, + last_invocation, + last_invocation_jobs, + fail_fast=False, +) -> Optional[str]: + """Return an error message if workflow is in an error state.""" + + invocation_state = "new" if not last_invocation else last_invocation["state"] + job_state = summary_job_state(last_invocation_jobs) + + error_message = None + if last_exception: + ctx.vlog(f"Problem waiting on invocation: {str(last_exception)}") + error_message = f"Final state of invocation {invocation_id} is [{invocation_state}]" + + if invocation_state_terminal(invocation_state) and invocation_state != "scheduled": + msg = f"Failed to run workflow, invocation ended in [{invocation_state}] state." + ctx.vlog(msg) + error_message = msg if not error_message else f"{error_message}. {msg}" + + if fail_fast and job_state in JOB_ERROR_STATES: + msg = f"Failed to run workflow, at least one job is in [{job_state}] state." + ctx.vlog(msg) + error_message = msg if not error_message else f"{error_message}. {msg}" + + return error_message + + +# we're still mocking out the old history state by just picking out a random +# job state of interest. Seems like we should drop this. +def summary_job_state(job_states_summary: Optional[InvocationJobsSummary]): + states = (job_states_summary or {"states": {}}).get("states", {}).copy() + states.pop("ok", None) + states.pop("skipped", None) + if states: + return next(iter(states.keys())) + else: + return "ok" + + +def subworkflow_invocation_ids(invocation_api: InvocationApi, invocation_id: str) -> List[str]: + invocation = invocation_api.get_invocation(invocation_id) + subworkflow_invocation_ids = [] + for step in invocation["steps"]: + subworkflow_invocation_id = step.get("subworkflow_invocation_id") + if subworkflow_invocation_id: + subworkflow_invocation_ids.append(subworkflow_invocation_id) + return subworkflow_invocation_ids diff --git a/planemo/galaxy/invocations/progress.py b/planemo/galaxy/invocations/progress.py new file mode 100644 index 000000000..0769db4a4 --- /dev/null +++ b/planemo/galaxy/invocations/progress.py @@ -0,0 +1,356 @@ +import random +from io import StringIO +from typing import ( + Dict, + List, + Optional, + Set, +) + +from rich.console import Group +from rich.live import Live +from rich.panel import Panel +from rich.progress import ( + BarColumn, + Progress, + TaskID, + TaskProgressColumn, + TextColumn, +) +from typing_extensions import TypedDict + +from .api import ( + invocation_state_terminal, + JOB_ERROR_STATES, +) +from .progress_display import DisplayConfiguration + + +# Types for various invocation responses +class InvocationStep(TypedDict, total=False): + state: Optional[str] + subworkflow_invocation_id: Optional[str] + + +class Invocation(TypedDict, total=False): + id: str + state: str + steps: List[InvocationStep] + + +class InvocationJobsSummary(TypedDict, total=False): + states: Dict[str, int] + + +class WorkflowProgress(Progress): + + _jobs_task: TaskID + _steps_task: TaskID + _subworkflows_task: Optional[TaskID] = None + + def __init__(self, display: DisplayConfiguration): + self.invocation_state: str = "new" + self.step_count: Optional[int] = None + self.job_count: Optional[int] = 0 + self.jobs_completed: Optional[int] = None + self.step_states: Dict[str, int] = {} + self.num_ok: int = 0 + self.num_new: int = 0 + self.num_queued: int = 0 + self.num_running: int = 0 + self.num_errors: int = 0 + self.num_paused: int = 0 + + self.num_subworkflows: int = 0 + self.num_subworkflows_complete: int = 0 + self.display = display + bar_column = BarColumn( + style=self.display.style_bar_back, + finished_style=self.display.style_bar_finished, + complete_style=self.display.style_bar_complete, + ) + self.jobs_color: str = self.display.style_initializing + self.steps_color: str = self.display.style_initializing + self.subworkflows_color: str = self.display.style_initializing + super().__init__( + TextColumn("[progress.description]{task.description}"), + TextColumn(display.divider), + bar_column, + TextColumn(display.divider), + TaskProgressColumn(f"[{self.display.style_percent}]{{task.percentage:>3.0f}}%"), + TextColumn(display.divider), + TextColumn(text_format="{task.fields[status]}"), + ) + self.add_bars() + + @property + def invocation_scheduling_terminal(self): + return invocation_state_terminal(self.invocation_state) + + @property + def jobs_terminal(self): + return self.job_count is not None and self.job_count == self.jobs_terminal_count + + @property + def terminal(self): + return self.invocation_scheduling_terminal and self.jobs_terminal + + def handle_subworkflow_counts(self, num: int, num_complete: int): + previous_count = self.num_subworkflows + self.num_subworkflows = num + self.num_subworkflows_complete = num_complete + if previous_count < 2 and num >= 2: + self._subworkflows_task = self.add_task( + f"[{self.subworkflows_color}]{self.display.label_progress_subworkflows}", status="" + ) + + if num >= 2: + self.subworkflows_color = self.display.style_ok + subworkflows_status = f"{self.num_subworkflows_complete}/{self.num_subworkflows} terminal" + self.update( + self._subworkflows_task, + total=self.num_subworkflows, + completed=self.num_subworkflows_complete, + description=f"[{self.subworkflows_color}]{self.display.label_progress_subworkflows}", + status=subworkflows_status, + ) + + def handle_invocation(self, invocation: Invocation, job_state_summary: InvocationJobsSummary): + self.invocation_state = invocation.get("state") or "new" + self.step_count = len(invocation.get("steps") or []) or None + self.step_states = step_states(invocation) + + steps_completed = None + + steps_status = "" + if self.step_count is None: + steps_status = "Loading steps." + self.steps_color = self.display.style_initializing + elif self.invocation_state == "cancelled": + steps_status = "Invocation cancelled" + self.steps_color = self.display.style_error + elif self.invocation_state == "failed": + steps_status = "Invocation failed" + self.steps_color = self.display.style_error + else: + num_scheduled = self.step_states.get("scheduled") or 0 + if num_scheduled > 0: + self.steps_color = self.display.style_ok + else: + self.steps_color = self.display.style_initializing + steps_completed = num_scheduled + steps_status = f"{num_scheduled}/{self.step_count} scheduled" + + jobs_status = "" + self.job_count = job_count(job_state_summary) + self.num_new = count_states(job_state_summary, ["new"]) + self.num_queued = count_states(job_state_summary, ["queued", "waiting"]) + self.num_running = count_states(job_state_summary, ["running"]) + self.num_errors = error_count(job_state_summary) + self.num_ok = ok_count(job_state_summary) + self.jobs_completed = self.num_ok + self.num_errors + self.num_paused = count_states(job_state_summary, ["paused"]) + self.jobs_terminal_count = self.jobs_completed + self.num_paused + jobs_total: Optional[int] = self.job_count + if self.num_errors > 0: + self.jobs_color = self.display.style_error + elif self.job_count > 0: + self.jobs_color = self.display.style_ok + else: + self.jobs_color = self.display.style_initializing + self.jobs_completed = None + jobs_total = None + if self.job_count > 0: + jobs_status = f"{self.jobs_completed}/{self.job_count} terminal" + self.update( + self._steps_task, + total=self.step_count, + completed=steps_completed, + description=f"[{self.steps_color}]{self.display.label_progress_steps}", + status=steps_status, + ) + self.update( + self._jobs_task, + total=jobs_total, + completed=self.jobs_completed, + description=f"[{self.jobs_color}]{self.display.label_progress_jobs}", + status=jobs_status, + ) + + def _job_states_console_line(self): + output = StringIO() + if self.num_ok > 0: + output.write(f"{self.display.icon_state_ok} {self.num_ok} {self.display.divider} ") + if self.num_errors > 0: + output.write(f"{self.display.icon_state_errors} {self.num_errors} {self.display.divider} ") + if self.num_new > 0: + output.write(f"{self.display.icon_state_new} {self.num_new} {self.display.divider} ") + if self.num_queued > 0: + output.write(f"{self.display.icon_state_queued} {self.num_queued} {self.display.divider} ") + if self.num_running > 0: + output.write(f"{self.display.icon_state_running} {self.num_running} {self.display.divider} ") + if self.num_paused > 0: + output.write(f"{self.display.icon_state_paused} {self.num_paused} {self.display.divider} ") + + result = output.getvalue().rstrip(" {self.display.divider} ") + output.close() + # Is there an actual way to reset this? The undefined style seems to work but is a hack. + return f"[{self.jobs_color}]{self.display.label_job_states_prefix} [reset]{self.display.divider} {result}" + + def add_bars(self): + self._steps_task = self.add_task(f"[{self.steps_color}]{self.display.label_progress_steps}", status="") + self._jobs_task = self.add_task(f"[{self.jobs_color}]{self.display.label_progress_jobs}", status="") + + +# converted from Galaxy TypeScript (see util.ts next to WorkflowInvocationState.vue) +def count_states(job_summary: Optional[InvocationJobsSummary], query_states: list[str]) -> int: + count = 0 + states = job_summary.get("states") if job_summary else None + if states: + for state in query_states: + count += states.get(state, 0) + return count + + +def job_count(job_summary: Optional[InvocationJobsSummary]) -> int: + states = job_summary.get("states") if job_summary else None + count = 0 + if states: + for state_count in states.values(): + if state_count: + count += state_count + return count + + +def step_states(invocation: Invocation): + step_states = {} + steps = invocation.get("steps") or [] + for step in steps: + if not step: + continue + step_state = step.get("state") or "unknown" + if step_state not in step_states: + step_states[step_state] = 0 + step_states[step_state] += 1 + + return step_states + + +def ok_count(job_summary: InvocationJobsSummary) -> int: + return count_states(job_summary, ["ok", "skipped"]) + + +def error_count(job_summary: InvocationJobsSummary) -> int: + return count_states(job_summary, JOB_ERROR_STATES) + + +def running_count(job_summary: InvocationJobsSummary) -> int: + return count_states(job_summary, ["running"]) + + +class WorkflowProgressDisplay(Live): + + def __init__( + self, + invocation_id: str, + display_configuration: Optional[DisplayConfiguration] = None, + galaxy_url: Optional[str] = None, + ): + self.subworkflow_invocation_ids_seen: Set[str] = set() + self.subworkflow_invocation_ids_completed: Set[str] = set() + self.subworkflow_invocation_id: Optional[str] = None + self.invocation_id = invocation_id + display = display_configuration or DisplayConfiguration() + self.galaxy_url = galaxy_url + self.display = display + self.workflow_progress = WorkflowProgress(display) + self.subworkflow_progress = WorkflowProgress(display) + super().__init__(self._panel()) + + def _register_subworkflow_invocation_ids_from(self, invocation: Invocation): + subworkflow_invocation_ids: List[str] = [] + steps = invocation.get("steps") or [] + for step in steps: + subworkflow_invocation_id = step.get("subworkflow_invocation_id") + if subworkflow_invocation_id: + subworkflow_invocation_ids.append(subworkflow_invocation_id) + self._register_subworkflow_invocation_ids(subworkflow_invocation_ids) + + def _register_subworkflow_invocation_ids(self, ids: List[str]): + for invocation_id in ids: + self.subworkflow_invocation_ids_seen.add(invocation_id) + + def _complete_subworkflow(self, id: str): + self.subworkflow_invocation_ids_completed.add(id) + + def an_incomplete_subworkflow_id(self): + return random.choice(tuple(self.subworkflow_invocation_ids_seen - self.subworkflow_invocation_ids_completed)) + + def all_subworkflows_complete(self): + return len(self.subworkflow_invocation_ids_seen) == len(self.subworkflow_invocation_ids_completed) + + def get_invocation_ui_link(self): + if self.galaxy_url: + return f"{self.galaxy_url}/workflows/invocations/{self.invocation_id}" + else: + return None + + def _panel(self): + def job_states(workflow_progress): + if self.display.include_job_state_breakdown: + return workflow_progress._job_states_console_line() + else: + return None + + title = f"[{self.display.style_header}]{self.display.label_header_prefix}<[link={self.get_invocation_ui_link()}]{self.invocation_id}[/link]>" + subworkflow_title = None + if self.subworkflow_invocation_id: + subworkflow_title = f"[{self.display.style_subworkflow_header}]{self.display.label_subworkflow_header_prefix}<{self.subworkflow_invocation_id}>" + + if not self.subworkflow_invocation_id or not self.display.include_nested_subworkflows: + renderable = as_group( + self.workflow_progress, + job_states(self.workflow_progress), + ) + elif not self.display.subworkflows_as_panel: + renderable = as_group( + self.workflow_progress, + job_states(self.workflow_progress), + subworkflow_title, + self.subworkflow_progress, + job_states(self.subworkflow_progress), + ) + else: + renderable = as_group( + self.workflow_progress, + job_states(self.workflow_progress), + Panel( + as_group(self.subworkflow_progress, job_states(self.subworkflow_progress)), + title=subworkflow_title, + ), + ) + return Panel(renderable, title=title, expand=True) + + def _update_panel(self): + self.update(self._panel()) + + def handle_invocation(self, invocation: Invocation, job_state_summary: InvocationJobsSummary): + self.workflow_progress.handle_invocation(invocation, job_state_summary) + self._register_subworkflow_invocation_ids_from(invocation) + self._update_panel() + + def handle_subworkflow_invocation(self, invocation: Invocation, job_state_summary: InvocationJobsSummary): + self.subworkflow_invocation_id = invocation["id"] + self.subworkflow_progress.handle_invocation(invocation, job_state_summary) + self._register_subworkflow_invocation_ids_from(invocation) + if self.subworkflow_progress.terminal: + self._complete_subworkflow(invocation["id"]) + self.workflow_progress.handle_subworkflow_counts( + len(self.subworkflow_invocation_ids_seen), + len(self.subworkflow_invocation_ids_completed), + ) + self._update_panel() + + +def as_group(*renderables): + return Group(*(r for r in renderables if r)) diff --git a/planemo/galaxy/invocations/progress_display.py b/planemo/galaxy/invocations/progress_display.py new file mode 100644 index 000000000..8eefffbe6 --- /dev/null +++ b/planemo/galaxy/invocations/progress_display.py @@ -0,0 +1,73 @@ +from pydantic import BaseModel + +# from rich.style import StyleType - doesn't work with Pydantic, keeping to string styles for now +StyleType = str + +# uses a bit more space but has better visual separation between subworkflows and workflows +DISPLAY_INCLUDE_NESTED_SUBWORKFLOWS = True +DISPLAY_SUBWORKFLOWS_AS_PANEL = True +DISPLAY_INCLUDE_JOB_STATE_BREAKDOWN = True +DISPLAY_DIVIDER = "β—†" +# bar.* are Rich defaults for these values. +DISPLAY_STYLE_BAR_BACK: StyleType = "bar.back" +DISPLAY_STYLE_BAR_FINISHED: StyleType = "bar.finished" +DISPLAY_STYLE_BAR_COMPLETE: StyleType = "bar.complete" + +DISPLAY_STYLE_INITIALIZING: StyleType = "cyan" +DISPLAY_STYLE_OK: StyleType = "green" +DISPLAY_STYLE_RUNNING: StyleType = "green" +DISPLAY_STYLE_ERROR: StyleType = "red" + +# Rich default style - a magenta +DISPLAY_STYLE_PERCENT: StyleType = "progress.percentage" + +DISPLAY_STYLE_HEADER: StyleType = "bold" +DISPLAY_STYLE_SUBWORKFLOW_HEADER: StyleType = "bold" + +DISPLAY_LABEL_HEADER_PREFIX = "Invocation " +DISPLAY_LABEL_SUBWORKFLOW_HEADER_PREFIX = "Subworkflow Invocation " +DISPLAY_LABEL_PROGRESS_STEPS = "Steps" +DISPLAY_LABEL_PROGRESS_JOBS = "Jobs" +DISPLAY_LABEL_PROGRESS_SUBWORKFLOWS = "SubWFs" +DISPLAY_LABEL_JOB_STATES_PREFIX = "Job States" + +DISPLAY_ICON_STATE_OK = "🟒" +DISPLAY_ICON_STATE_ERRORS = "πŸ”΄" +DISPLAY_ICON_STATE_NEW = "πŸ†•" +DISPLAY_ICON_STATE_QUEUED = "⏳" +DISPLAY_ICON_STATE_RUNNING = "πŸ‘Ÿ" +DISPLAY_ICON_STATE_PAUSED = "⏸️" + + +class DisplayConfiguration(BaseModel): + include_nested_subworkflows: bool = DISPLAY_INCLUDE_NESTED_SUBWORKFLOWS + include_job_state_breakdown: bool = DISPLAY_INCLUDE_JOB_STATE_BREAKDOWN + subworkflows_as_panel: bool = DISPLAY_SUBWORKFLOWS_AS_PANEL + divider: str = DISPLAY_DIVIDER + style_bar_back: StyleType = DISPLAY_STYLE_BAR_BACK + style_bar_complete: StyleType = DISPLAY_STYLE_BAR_COMPLETE + style_bar_finished: StyleType = DISPLAY_STYLE_BAR_FINISHED + + style_percent: StyleType = DISPLAY_STYLE_PERCENT + + style_initializing: StyleType = DISPLAY_STYLE_INITIALIZING + style_ok: StyleType = DISPLAY_STYLE_OK + style_running: StyleType = DISPLAY_STYLE_RUNNING + style_error: StyleType = DISPLAY_STYLE_ERROR + + style_header: StyleType = DISPLAY_STYLE_HEADER + style_subworkflow_header: StyleType = DISPLAY_STYLE_SUBWORKFLOW_HEADER + + label_header_prefix: str = DISPLAY_LABEL_HEADER_PREFIX + label_subworkflow_header_prefix: str = DISPLAY_LABEL_SUBWORKFLOW_HEADER_PREFIX + label_progress_steps: str = DISPLAY_LABEL_PROGRESS_STEPS + label_progress_jobs: str = DISPLAY_LABEL_PROGRESS_JOBS + label_progress_subworkflows: str = DISPLAY_LABEL_PROGRESS_SUBWORKFLOWS + label_job_states_prefix: str = DISPLAY_LABEL_JOB_STATES_PREFIX + + icon_state_ok: str = DISPLAY_ICON_STATE_OK + icon_state_errors: str = DISPLAY_ICON_STATE_ERRORS + icon_state_new: str = DISPLAY_ICON_STATE_NEW + icon_state_queued: str = DISPLAY_ICON_STATE_QUEUED + icon_state_running: str = DISPLAY_ICON_STATE_RUNNING + icon_state_paused: str = DISPLAY_ICON_STATE_PAUSED diff --git a/planemo/galaxy/invocations/simulations.py b/planemo/galaxy/invocations/simulations.py new file mode 100644 index 000000000..f220b49e5 --- /dev/null +++ b/planemo/galaxy/invocations/simulations.py @@ -0,0 +1,227 @@ +"""Simulate Galaxy workflows running on a server for testing purposes.""" + +from collections import deque +from typing import ( + Dict, + List, + Optional, +) +from uuid import uuid4 + +import yaml + +from .api import Invocation as InvocationResponse +from .api import InvocationJobsSummary +from .api import InvocationStep as InvocationStepResponse + + +class Ticks: + after: int + + @property + def active(self): + return self.after <= 0 + + def tick(self) -> None: + if self.active: + self.tick_when_active() + else: + self.after -= 1 + + def tick_when_active(self) -> None: ... + + +class StateWithDuration(Ticks): + + def __init__(self, state: str, duration: int): + self.after = 0 + self.state = state + self.duration = duration + + def tick_when_active(self) -> None: + self.duration -= 1 + + +class HasState(Ticks): + final: Optional[str] + + def __init__(self, after: int, states: List[StateWithDuration]): + self.after = after or 0 + self.states = deque(states) + self.final_state: Optional[str] = None + + def tick_when_active(self) -> None: + if self.final_state is not None: + return + + next_state = self.states.popleft() + next_state.tick() + if next_state.duration == 0 and not self.states: + self.final_state = next_state.state + elif next_state.duration != 0: + self.states.appendleft(next_state) + # else: next state will be state during next tick + + @property + def state(self): + if self.final_state is not None: + return self.final_state + else: + return self.states[0].state + + +Job = HasState + + +class InvocationStep(HasState): + invocation: Optional["Invocation"] + jobs: Optional[List[Job]] + + def __init__( + self, jobs: List[Job], invocation: Optional["Invocation"], after: int, states: List[StateWithDuration] + ): + super().__init__(after, states) + self.jobs = jobs + self.invocation = invocation + + def tick_when_active(self) -> None: + super().tick_when_active() + if self.jobs: + for job in self.jobs: + job.tick() + if self.invocation: + self.invocation.tick() + + @property + def active_jobs(self) -> List[Job]: + return [j for j in (self.jobs or []) if j.active] + + +class Invocation(HasState): + + def __init__(self, steps: List[InvocationStep], after: int, states: List[StateWithDuration]): + self.id = str(uuid4())[:8] + self.steps = steps + super().__init__(after, states) + + def tick_when_active(self) -> None: + super().tick_when_active() + for step in self.steps: + step.tick() + + @property + def active_steps(self) -> List[InvocationStep]: + return [s for s in self.steps if s.active] + + def get_invocation_by_id(self, invocation_id: str) -> Optional["Invocation"]: + if self.id == invocation_id: + return self + for step in self.steps: + step_invocation = step.invocation + if step_invocation: + step_subworkflow_invocation_with_id = step_invocation.get_invocation_by_id(invocation_id) + if step_subworkflow_invocation_with_id is not None: + return step_subworkflow_invocation_with_id + return None + + def get_subworkflow_invocation(self, subworkflow_invocation_id: str) -> "Invocation": + for step in self.steps: + if step.invocation and step.invocation.id == subworkflow_invocation_id: + return step.invocation + raise Exception(f"Unknown subworkflow invocation id ({subworkflow_invocation_id})") + + def get_subworkflow_invocation_by_step_index(self, index: int) -> Optional["Invocation"]: + return self.steps[index].invocation + + def get_api_invocation(self) -> InvocationResponse: + steps: List[InvocationStepResponse] = [] + for step in self.active_steps: + api_step: InvocationStepResponse = { + "state": step.state, + } + if step.invocation: + api_step["subworkflow_invocation_id"] = step.invocation.id + + steps.append(api_step) + return { + "id": self.id, + "state": self.state, + "steps": steps, + } + + def get_api_jobs_summary(self) -> InvocationJobsSummary: + job_states = [] + for step in self.active_steps: + for job in step.active_jobs: + api_job = { + "state": job.state, + } + job_states.append(api_job) + by_state: Dict[str, int] = {} + for job_state in job_states: + state = job_state["state"] + if state not in by_state: + by_state[state] = 0 + by_state[state] += 1 + return {"states": by_state} + + +def parse_workflow_simulation_from_string(workflow_simulation: str) -> Invocation: + return parse_workflow_simulation(yaml.safe_load(workflow_simulation)) + + +def parse_workflow_simulation(workflow_simulation: dict) -> Invocation: + return parse_workflow_simulation_invocation(workflow_simulation) + + +def parse_workflow_simulation_job(workflow_simulation_job: dict) -> Job: + states = parse_states_from(workflow_simulation_job) + after = parse_after_from(workflow_simulation_job) + return Job(after, states) + + +def parse_workflow_simulation_invocation_step(workflow_simulation_invocation_step: dict) -> InvocationStep: + states = parse_states_from(workflow_simulation_invocation_step) + after = parse_after_from(workflow_simulation_invocation_step) + if "invocation" in workflow_simulation_invocation_step: + invocation = parse_workflow_simulation_invocation(workflow_simulation_invocation_step["invocation"]) + else: + invocation = None + jobs = None + if "jobs" in workflow_simulation_invocation_step: + jobs = [] + for job in workflow_simulation_invocation_step.get("jobs") or []: + jobs.append(parse_workflow_simulation_job(job)) + return InvocationStep(jobs, invocation, after, states) + + +def parse_workflow_simulation_invocation(workflow_simulation_invocation: dict) -> Invocation: + states = parse_states_from(workflow_simulation_invocation) + after = parse_after_from(workflow_simulation_invocation) + steps = [] + for step in workflow_simulation_invocation.get("steps") or []: + steps.append(parse_workflow_simulation_invocation_step(step)) + + return Invocation(steps, after, states) + + +def parse_after_from(simulation_object: dict) -> int: + return simulation_object.get("after", 0) + + +def parse_states_from(simulation_object: dict) -> List[StateWithDuration]: + if "states" in simulation_object: + states = simulation_object["states"] + states_with_duration = [] + for state in states: + if ":" in state: + state, duration_str = state.split(":", 1) + duration = int(duration_str) + state_with_duration = StateWithDuration(state, duration) + else: + state_with_duration = StateWithDuration(state, 1) + states_with_duration.append(state_with_duration) + return states_with_duration + else: + state = simulation_object["state"] + return [StateWithDuration(state, 1)] diff --git a/planemo/options.py b/planemo/options.py index b61d1760e..251a905f2 100644 --- a/planemo/options.py +++ b/planemo/options.py @@ -1784,6 +1784,10 @@ def test_index_option(): return planemo_option("--test_index", default=1, type=int, help="Select which test to check. Counting starts at 1") +def fail_fast_option(): + return planemo_option("--fail_fast", is_flag=True, help="Stop on first job failure.") + + def test_output_options(): return _compose( planemo_option( @@ -1826,10 +1830,7 @@ def test_output_options(): def test_options(): - return _compose( - paste_test_data_paths_option(), - test_output_options(), - ) + return _compose(paste_test_data_paths_option(), test_output_options(), fail_fast_option()) def _compose(*functions): @@ -2174,16 +2175,6 @@ def tool_init_example_command_option(help=EXAMPLE_COMMAND_HELP): ) -def no_early_termination_option(): - return planemo_option( - "--no_early_termination", - is_flag=True, - default=False, - prompt=False, - help="Wait until all jobs terminate, even if some jobs have failed", - ) - - def mulled_conda_option(): return planemo_option( "--mulled_conda_version", @@ -2220,6 +2211,14 @@ def mulled_action_option(): ) +def invocation_target_options(): + return _compose( + required_invocation_id_arg(), + galaxy_url_option(required=True), + galaxy_user_key_option(required=True), + ) + + def mulled_options(): return _compose( mulled_conda_option(), diff --git a/planemo/reports/build_report.py b/planemo/reports/build_report.py index 8aaf0b77d..308dc9957 100644 --- a/planemo/reports/build_report.py +++ b/planemo/reports/build_report.py @@ -1,11 +1,11 @@ import base64 from galaxy.util import strip_control_characters +from galaxy.util.resources import resource_string from jinja2 import ( Environment, PackageLoader, ) -from pkg_resources import resource_string TITLE = "Results (powered by Planemo)" @@ -130,14 +130,10 @@ def __inject_summary(environment): def __style(filename): - resource = __load_resource(filename) + resource = resource_string("planemo.reports", filename) return "" % resource def __script(short_name): - resource = __load_resource("%s.js" % short_name) + resource = resource_string("planemo.reports", "%s.js" % short_name) return "" % resource - - -def __load_resource(name): - return resource_string(__name__, name).decode("UTF-8") diff --git a/setup.cfg b/setup.cfg index 8ec77f3d4..e6c8fb2af 100644 --- a/setup.cfg +++ b/setup.cfg @@ -2,6 +2,7 @@ # E203 is whitespace before ':'; we follow black's formatting here. See https://black.readthedocs.io/en/stable/faq.html#why-are-flake8-s-e203-and-w503-violated # E501 is line length, managed by black # W503 is line breaks before binary operators, which has been reversed in PEP 8. -ignore = E203,E501,W503 +# E701,E704 are multiple statements on one line; we follow black's formatting here. See https://black.readthedocs.io/en/stable/guides/using_black_with_other_tools.html#configuration +ignore = E203,E501,W503,E701,E704 max-complexity = 14 exclude=.eggs,.git,.tox,.venv,build,docs/conf.py,docs/standards,project_templates/cwl_draft3_spec/ diff --git a/tests/test_external_galaxy_commands.py b/tests/test_external_galaxy_commands.py index ff940f4eb..1f9130aa5 100644 --- a/tests/test_external_galaxy_commands.py +++ b/tests/test_external_galaxy_commands.py @@ -1,6 +1,7 @@ """Tests for planemo commands relating to external Galaxy instances""" import os +from unittest import skip import yaml @@ -15,6 +16,7 @@ ) +@skip("Configuring quay.io/bgruening/galaxy:latest is currently broken") class ExternalGalaxyCommandsTestCase(CliTestCase): @skip_if_environ("PLANEMO_SKIP_GALAXY_TESTS") diff --git a/tests/test_invocation_polling.py b/tests/test_invocation_polling.py new file mode 100644 index 000000000..4327bfac9 --- /dev/null +++ b/tests/test_invocation_polling.py @@ -0,0 +1,178 @@ +from time import sleep +from typing import Optional + +from planemo.galaxy.invocations.api import Invocation as InvocationResponse +from planemo.galaxy.invocations.api import ( + InvocationApi, + InvocationJobsSummary, +) +from planemo.galaxy.invocations.polling import ( + PollingTracker, + wait_for_invocation_and_jobs, +) +from planemo.galaxy.invocations.progress import WorkflowProgressDisplay +from planemo.galaxy.invocations.progress_display import DisplayConfiguration +from planemo.galaxy.invocations.simulations import ( + Invocation, + parse_workflow_simulation_from_string, +) +from .test_utils import create_test_context +from .test_workflow_simulation import ( + SCENARIO_1, + SCENARIO_MULTIPLE_OK_SUBWORKFLOWS, + SCENARIO_NESTED_SUBWORKFLOWS, +) + +SLEEP = 0 + + +class MockPollingTracker(PollingTracker): + + def __init__(self, simulation: Invocation): + self._simulation = simulation + + def sleep(self) -> None: + self._simulation.tick() + if SLEEP > 0: + sleep(SLEEP) + + +def test_polling_scenario_1(): + final_invocation_state, job_state, error_message = run_workflow_simulation(SCENARIO_1, fail_fast=True) + assert final_invocation_state == "scheduled" + assert job_state == "failed" + assert error_message + assert "failed" in error_message + + +def test_polling_scenario_three_ok_subworkflows(): + final_invocation_state, job_state, error_message = run_workflow_simulation(SCENARIO_MULTIPLE_OK_SUBWORKFLOWS) + assert final_invocation_state == "scheduled" + assert job_state == "ok" + assert not error_message + + +def test_polling_scenario_nested_subworkflows(): + final_invocation_state, job_state, error_message = run_workflow_simulation(SCENARIO_NESTED_SUBWORKFLOWS) + assert final_invocation_state == "scheduled" + assert job_state == "ok" + assert not error_message + + +def test_polling_without_display(): + simulation = parse_workflow_simulation_from_string(SCENARIO_1) + invocation_id = simulation.id + invocation_api = SimulatedApi(simulation) + polling_tracker = MockPollingTracker(simulation) + ctx = create_test_context() + # using this without setting up the display context seems to use all the tracking + # without printing to the console. Hides a lot of bad design mixing presentation and + # tracking logic being too mixed together. + display = WorkflowProgressDisplay(invocation_id) + final_invocation_state, job_state, error_message = wait_for_invocation_and_jobs( + ctx, + invocation_id, + invocation_api, + polling_tracker, + display, + fail_fast=True, + ) + assert final_invocation_state == "scheduled" + assert job_state == "failed" + assert error_message + assert "failed" in error_message + + +def test_polling_with_compact_display(): + display_configuration = DisplayConfiguration( + include_nested_subworkflows=False, + include_job_state_breakdown=False, + ) + final_invocation_state, job_state, error_message = run_workflow_simulation( + SCENARIO_NESTED_SUBWORKFLOWS, display_configuration + ) + assert final_invocation_state == "scheduled" + assert job_state == "ok" + assert not error_message + + +def test_polling_without_invocation_as_full_subpanel(): + display_configuration = DisplayConfiguration( + include_nested_subworkflows=True, + include_job_state_breakdown=True, + subworkflows_as_panel=False, + ) + final_invocation_state, job_state, error_message = run_workflow_simulation( + SCENARIO_NESTED_SUBWORKFLOWS, display_configuration + ) + assert final_invocation_state == "scheduled" + assert job_state == "ok" + assert not error_message + + +def test_fail_fast_enabled_with_job_failure(): + """Test that fail_fast=True returns error when a job fails.""" + final_invocation_state, job_state, error_message = run_workflow_simulation(SCENARIO_1, fail_fast=True) + # Invocation should still be scheduled (workflow scheduling succeeded) + assert final_invocation_state == "scheduled" + assert job_state == "failed" + # fail_fast should detect the failed job and return error message + assert error_message + assert "Failed to run workflow, at least one job is in [failed] state." in error_message + + +def test_fail_fast_disabled_with_job_failure(): + """Test that fail_fast=False does not report job failures as errors.""" + final_invocation_state, job_state, error_message = run_workflow_simulation(SCENARIO_1, fail_fast=False) + # Invocation should be scheduled (workflow scheduling succeeded) + assert final_invocation_state == "scheduled" + assert job_state == "failed" + # Without fail_fast, job failures shouldn't cause error messages + # (unless invocation itself fails, which it doesn't in this case) + assert error_message is None + + +def test_fail_fast_enabled_with_successful_workflow(): + """Test that fail_fast=True works normally when no jobs fail.""" + final_invocation_state, job_state, error_message = run_workflow_simulation( + SCENARIO_MULTIPLE_OK_SUBWORKFLOWS, fail_fast=True + ) + assert final_invocation_state == "scheduled" + assert job_state == "ok" + assert not error_message + + +def run_workflow_simulation( + yaml_str: str, display_configuration: Optional[DisplayConfiguration] = None, fail_fast: bool = False +): + simulation = parse_workflow_simulation_from_string(yaml_str) + invocation_id = simulation.id + invocation_api = SimulatedApi(simulation) + polling_tracker = MockPollingTracker(simulation) + ctx = create_test_context() + with WorkflowProgressDisplay(invocation_id, display_configuration=display_configuration) as display: + return wait_for_invocation_and_jobs( + ctx, + invocation_id, + invocation_api, + polling_tracker, + display, + fail_fast=fail_fast, + ) + + +class SimulatedApi(InvocationApi): + _simulation: Invocation + + def __init__(self, invocation: Invocation): + self._simulation = invocation + + def get_invocation(self, invocation_id: str) -> InvocationResponse: + invocation = self._simulation.get_invocation_by_id(invocation_id) + assert invocation, f"Simulation has no invocation_id {invocation_id}" + return invocation.get_api_invocation() + + def get_invocation_summary(self, invocation_id: str) -> InvocationJobsSummary: + invocation = self._simulation.get_invocation_by_id(invocation_id) + assert invocation, f"Simulation has no invocation_id {invocation_id}" + return invocation.get_api_jobs_summary() diff --git a/tests/test_test_engines.py b/tests/test_test_engines.py index 59d11fa94..d83555e5b 100644 --- a/tests/test_test_engines.py +++ b/tests/test_test_engines.py @@ -158,6 +158,7 @@ def test_galaxy_workflow_step_failed(): "extra_tools": ["$GALAXY_FUNCTIONAL_TEST_TOOLS"], "test_output_json": json_out.name, "galaxy_branch": target_galaxy_branch(), + "fail_fast": True, } exit_code = t_runnables(ctx, runnables, **kwds) assert exit_code == 1 diff --git a/tests/test_workflow_progress.py b/tests/test_workflow_progress.py new file mode 100644 index 000000000..1eabd0105 --- /dev/null +++ b/tests/test_workflow_progress.py @@ -0,0 +1,142 @@ +import time + +from planemo.galaxy.invocations.progress import ( + WorkflowProgress, + WorkflowProgressDisplay, +) +from planemo.galaxy.invocations.progress_display import DisplayConfiguration + +STEP_NEW = {"state": "new"} +STEP_SCHEDULED = {"state": "scheduled"} +SLEEP = 0.8 + + +def test_workflow_progress_typical(): + with WorkflowProgressDisplay("myid12345abcde") as live: + new_steps = [STEP_NEW, STEP_NEW, STEP_NEW, STEP_NEW] + one_scheduled_steps = [STEP_SCHEDULED, STEP_NEW, STEP_NEW, STEP_NEW] + two_scheduled_steps = [STEP_SCHEDULED, STEP_SCHEDULED, STEP_NEW, STEP_NEW] + three_scheduled_steps = [STEP_SCHEDULED, STEP_SCHEDULED, STEP_SCHEDULED, STEP_NEW] + all_scheduled_steps = [STEP_SCHEDULED, STEP_SCHEDULED, STEP_SCHEDULED, STEP_SCHEDULED] + + state_pairs = [ + ({"state": "new"}, {}, None), + ({"state": "ready", "steps": new_steps}, {}, None), + ({"state": "ready", "steps": one_scheduled_steps}, {"states": {"new": 1}}, None), + ({"state": "ready", "steps": two_scheduled_steps}, {"states": {"new": 2}}, None), + ({"state": "ready", "steps": two_scheduled_steps}, {"states": {"new": 1, "running": 1}}, None), + ({"state": "ready", "steps": two_scheduled_steps}, {"states": {"new": 1, "ok": 1}}, None), + ({"state": "ready", "steps": two_scheduled_steps}, {"states": {"ok": 2}}, None), + ({"state": "scheduled", "steps": three_scheduled_steps}, {"states": {"ok": 2, "new": 3}}, None), + ( + {"state": "scheduled", "steps": three_scheduled_steps}, + {"states": {"ok": 2, "running": 1, "new": 2}}, + None, + ), + ( + {"state": "scheduled", "steps": three_scheduled_steps}, + {"states": {"ok": 3, "running": 1, "new": 1}}, + None, + ), + ({"state": "scheduled", "steps": three_scheduled_steps}, {"states": {"ok": 4, "running": 1}}, None), + ({"state": "scheduled", "steps": three_scheduled_steps}, {"states": {"ok": 5}}, None), + ({"state": "scheduled", "steps": all_scheduled_steps}, {"states": {"ok": 5}}, None), + ({"state": "scheduled", "steps": all_scheduled_steps}, {"states": {"ok": 5}}, None), + ({"state": "ready", "steps": new_steps}, {}, "abcde123456"), + ({"state": "ready", "steps": one_scheduled_steps}, {"states": {"new": 1}}, "abcde123456"), + ({"state": "ready", "steps": two_scheduled_steps}, {"states": {"new": 2}}, "abcde123456"), + ({"state": "ready", "steps": two_scheduled_steps}, {"states": {"new": 1, "running": 1}}, "abcde123456"), + ({"state": "ready", "steps": two_scheduled_steps}, {"states": {"new": 1, "ok": 1}}, "abcde123456"), + ({"state": "ready", "steps": two_scheduled_steps}, {"states": {"ok": 2}}, "abcde123456"), + ({"state": "scheduled", "steps": three_scheduled_steps}, {"states": {"ok": 2, "new": 3}}, "abcde123456"), + ( + {"state": "scheduled", "steps": three_scheduled_steps}, + {"states": {"ok": 2, "running": 1, "new": 2}}, + "abcde123456", + ), + ( + {"state": "scheduled", "steps": three_scheduled_steps}, + {"states": {"ok": 3, "running": 1, "new": 1}}, + "abcde123456", + ), + ( + {"state": "scheduled", "steps": three_scheduled_steps}, + {"states": {"ok": 4, "running": 1}}, + "abcde123456", + ), + ({"state": "scheduled", "steps": three_scheduled_steps}, {"states": {"ok": 5}}, "abcde123456"), + ({"state": "scheduled", "steps": all_scheduled_steps}, {"states": {"ok": 5}}, "abcde123456"), + ({"state": "scheduled", "steps": all_scheduled_steps}, {"states": {"ok": 5}}, "abcde123456"), + ] + i = 0 + for invocation, job_states_summary, subworkflow_invocation_id in state_pairs: + i = i + 1 + if subworkflow_invocation_id is None: + live.handle_invocation(invocation, job_states_summary) + else: + invocation = invocation.copy() + invocation["id"] = subworkflow_invocation_id + live.handle_subworkflow_invocation(invocation, job_states_summary) + time.sleep(SLEEP) + + +def test_workflow_progress_scheduling_state_handling(): + with WorkflowProgress(DisplayConfiguration()) as workflow_progress: + workflow_progress.handle_invocation({"state": "new", "steps": [STEP_NEW]}, {"states": {"new": 1}}) + assert not workflow_progress.invocation_scheduling_terminal + + workflow_progress.handle_invocation({"state": "ready", "steps": [STEP_NEW]}, {"states": {"new": 1}}) + assert not workflow_progress.invocation_scheduling_terminal + + workflow_progress.handle_invocation({"state": "cancelling", "steps": [STEP_NEW]}, {"states": {"new": 1}}) + assert not workflow_progress.invocation_scheduling_terminal + + workflow_progress.handle_invocation( + {"state": "requires_materialization", "steps": [STEP_NEW]}, {"states": {"new": 1}} + ) + assert not workflow_progress.invocation_scheduling_terminal + + workflow_progress.handle_invocation({"state": "scheduled", "steps": [STEP_NEW]}, {"states": {"new": 1}}) + assert workflow_progress.invocation_scheduling_terminal + + workflow_progress.handle_invocation({"state": "cancelled", "steps": [STEP_NEW]}, {"states": {"new": 1}}) + assert workflow_progress.invocation_scheduling_terminal + + workflow_progress.handle_invocation({"state": "failed", "steps": [STEP_NEW]}, {"states": {"new": 1}}) + assert workflow_progress.invocation_scheduling_terminal + + +# From Galaxy: +# NEW = "new" +# RESUBMITTED = "resubmitted" +# UPLOAD = "upload" +# WAITING = "waiting" +# QUEUED = "queued" +# RUNNING = "running" +# OK = "ok" +# ERROR = "error" +# FAILED = "failed" +# PAUSED = "paused" +# DELETING = "deleting" +# DELETED = "deleted" +# STOPPING = "stop" +# STOPPED = "stopped" +# SKIPPED = "skipped" +def test_workflow_progress_job_state_handling(): + scheduled_invocation = {"state": "scheduled", "steps": [STEP_SCHEDULED]} + + with WorkflowProgress(DisplayConfiguration()) as workflow_progress: + workflow_progress.handle_invocation(scheduled_invocation, {"states": {"new": 1}}) + assert not workflow_progress.jobs_terminal + + workflow_progress.handle_invocation(scheduled_invocation, {"states": {"new": 1, "ok": 2}}) + assert not workflow_progress.jobs_terminal + + workflow_progress.handle_invocation(scheduled_invocation, {"states": {"ok": 2}}) + assert workflow_progress.jobs_terminal + + workflow_progress.handle_invocation(scheduled_invocation, {"states": {"ok": 2, "paused": 1}}) + assert workflow_progress.jobs_terminal + + workflow_progress.handle_invocation(scheduled_invocation, {"states": {"ok": 2, "paused": 1, "new": 1}}) + assert not workflow_progress.jobs_terminal diff --git a/tests/test_workflow_simulation.py b/tests/test_workflow_simulation.py new file mode 100644 index 000000000..d31dacb2d --- /dev/null +++ b/tests/test_workflow_simulation.py @@ -0,0 +1,260 @@ +from planemo.galaxy.invocations.simulations import parse_workflow_simulation_from_string + +SCENARIO_1 = """ +states: [new, ready:4, scheduled] +steps: +- state: scheduled + jobs: + - states: [new, queued:2, running:2, ok] +- after: 2 + state: scheduled + jobs: + - states: [new, queued, failed] + - states: [new, queued, ok] +- after: 3 + state: scheduled + invocation: + states: [new, ready, scheduled] + steps: + - state: scheduled + jobs: + - states: [new, queued, ok] + - states: [new, queued:2, ok] +""" + +SCENARIO_MULTIPLE_OK_SUBWORKFLOWS = """ +states: [new, ready:4, scheduled] +steps: +- state: scheduled + jobs: + - states: [new, queued:2, running:2, ok] +- after: 2 + state: scheduled + jobs: + - states: [new, queued, running:2, ok] + - states: [new, queued, running:4, ok] +- after: 3 + state: scheduled + invocation: + states: [new, ready, scheduled] + steps: + - state: scheduled + jobs: + - states: [new, queued, ok] + - states: [new, queued:2, running, ok] +- after: 4 + state: scheduled + invocation: + states: [new, ready, scheduled] + steps: + - state: scheduled + jobs: + - states: [new, queued, ok] + - states: [new, queued:2, ok] + - after: 3 + state: scheduled + jobs: + - states: [new, queued, running, ok] + - states: [new, queued:3, running:2, ok] + - after: 4 + state: scheduled + jobs: + - states: [new, queued, ok] + - states: [new, queued:2, ok] +- after: 5 + state: scheduled + invocation: + states: [new, ready:2, scheduled] + steps: + - state: scheduled + jobs: + - states: [new, queued:3, ok] + - states: [new, queued:1, ok] + - after: 3 + state: scheduled + jobs: + - states: [new, queued:2, running:4, ok] + - states: [new, queued:3, running:2, ok] + - after: 4 + state: scheduled + jobs: + - states: [new, queued, ok] + - states: [new, running:5, ok] +""" + +SCENARIO_NESTED_SUBWORKFLOWS = """ +states: [new, ready:4, scheduled] +steps: +- state: scheduled + jobs: + - states: [new, queued:2, running:2, ok] +- after: 2 + state: scheduled + jobs: + - states: [new, queued, running:2, ok] + - states: [new, queued, running:4, ok] +- after: 3 + state: scheduled + invocation: + states: [new, ready, scheduled] + steps: + - state: scheduled + jobs: + - states: [new, queued, ok] + - states: [new, queued:2, running, ok] + - after: 3 + states: [new, ready, scheduled] + invocation: + states: [new, ready:4, scheduled] + steps: + - state: scheduled + jobs: + - states: [new, queued, ok] + - states: [new, queued:2, running, ok] + - state: scheduled + jobs: + - states: [new, queued, ok] + - states: [new, queued:2, running, ok] + - states: [new, queued:3, running, ok] +- after: 4 + state: scheduled + invocation: + states: [new, ready, scheduled] + steps: + - state: scheduled + jobs: + - states: [new, queued, ok] + - states: [new, queued:2, ok] + - after: 3 + state: scheduled + jobs: + - states: [new, queued, running, ok] + - states: [new, queued:3, running:2, ok] + - after: 4 + state: scheduled + jobs: + - states: [new, queued, ok] + - states: [new, queued:2, ok] +- after: 5 + state: scheduled + invocation: + states: [new, ready:2, scheduled] + steps: + - state: scheduled + jobs: + - states: [new, queued:3, ok] + - states: [new, queued:1, ok] + - after: 3 + state: scheduled + jobs: + - states: [new, queued:2, running:4, ok] + - states: [new, queued:3, running:2, ok] + - after: 4 + state: scheduled + jobs: + - states: [new, queued, ok] + - states: [new, running:5, ok] +""" + + +def test_parse_scenario_1_invocation_state_evolution(): + invocation = parse_workflow_simulation_from_string(SCENARIO_1) + invocation_dict = invocation.get_api_invocation() + assert invocation_dict["state"] == "new" + invocation.tick() + invocation_dict = invocation.get_api_invocation() + assert invocation_dict["state"] == "ready" + invocation.tick() + invocation.tick() + invocation.tick() + invocation.tick() + invocation_dict = invocation.get_api_invocation() + assert invocation_dict["state"] == "scheduled" + + +def test_parse_scenario_1_invocation_step_states(): + invocation = parse_workflow_simulation_from_string(SCENARIO_1) + invocation_dict = invocation.get_api_invocation() + steps = invocation_dict["steps"] + assert len(steps) == 1 + + invocation.tick() + invocation.tick() + + invocation_dict = invocation.get_api_invocation() + steps = invocation_dict["steps"] + assert len(steps) == 2 + assert steps[0]["state"] == "scheduled" + assert steps[1]["state"] == "scheduled" + + invocation.tick() + + invocation_dict = invocation.get_api_invocation() + steps = invocation_dict["steps"] + assert len(steps) == 3 + assert steps[2]["state"] == "scheduled" + + +def test_parse_scenario_1_invocation_job_states(): + invocation = parse_workflow_simulation_from_string(SCENARIO_1) + states = invocation.get_api_jobs_summary()["states"] + assert len(states) == 1 + assert states["new"] == 1 + + invocation.tick() + + states = invocation.get_api_jobs_summary()["states"] + assert len(states) == 1 + assert states["queued"] == 1 + + invocation.tick() + + states = invocation.get_api_jobs_summary()["states"] + assert len(states) == 2 + assert states["queued"] == 1 + assert states["new"] == 2 + + invocation.tick() + + states = invocation.get_api_jobs_summary()["states"] + assert len(states) == 2 + assert states["queued"] == 2 + assert states["running"] == 1 + + invocation.tick() + + states = invocation.get_api_jobs_summary()["states"] + assert len(states) == 3 + assert states["ok"] == 1 + assert states["running"] == 1 + assert states["failed"] == 1 + + +def test_parse_scenario_1_subworkflow_invocation_state(): + invocation = parse_workflow_simulation_from_string(SCENARIO_1) + + invocation.tick() + invocation.tick() + invocation.tick() + + subworkflow_invocation = invocation.get_subworkflow_invocation_by_step_index(2) + assert subworkflow_invocation.get_api_invocation()["state"] == "new" + + invocation.tick() + + assert subworkflow_invocation.get_api_invocation()["state"] == "ready" + + invocation.tick() + + assert subworkflow_invocation.get_api_invocation()["state"] == "scheduled" + + states = subworkflow_invocation.get_api_jobs_summary()["states"] + assert len(states) == 2 + assert states["ok"] == 1 + assert states["queued"] == 1 + + invocation.tick() + + states = subworkflow_invocation.get_api_jobs_summary()["states"] + assert len(states) == 1 + assert states["ok"] == 2