Skip to content

Commit f737cf8

Browse files
authored
Merge pull request #1510 from jmchilton/workflow_progress
Workflow progress bar.
2 parents 1796b5b + f6d0ad5 commit f737cf8

20 files changed

Lines changed: 1632 additions & 145 deletions

Makefile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,9 @@ lint: ## check style using tox and flake8 for Python 2 and Python 3
7373
test: ## run tests with the default Python (faster than tox)
7474
$(IN_VENV) pytest $(TESTS)
7575

76+
format: ## format Python code with black
77+
$(IN_VENV) black planemo tests
78+
7679
quick-test: ## run quickest tests with the default Python
7780
$(IN_VENV) PLANEMO_SKIP_SLOW_TESTS=1 PLANEMO_SKIP_GALAXY_TESTS=1 pytest $(TESTS)
7881

planemo/commands/cmd_run.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
@options.run_download_outputs_option()
3131
@options.engine_options()
3232
@options.test_options()
33-
@options.no_early_termination_option()
3433
@command_function
3534
def cli(ctx, runnable_identifier, job_path, **kwds):
3635
"""Planemo command for running tools and jobs.

planemo/commands/cmd_workflow_test_on_invocation.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,7 @@
1515

1616
@click.command("workflow_test_on_invocation")
1717
@options.optional_tools_arg(multiple=False, allow_uris=False, metavar="TEST.YML")
18-
@options.required_invocation_id_arg()
19-
@options.galaxy_url_option(required=True)
20-
@options.galaxy_user_key_option(required=True)
18+
@options.invocation_target_options()
2119
@options.test_index_option()
2220
@options.test_output_options()
2321
@command_function
@@ -34,7 +32,10 @@ def cli(ctx, path, invocation_id, test_index, **kwds):
3432
len(test_cases) >= test_index
3533
), f"Selected test case {test_index}, but only found {len(test_cases)} test case(s)."
3634
test_case = test_cases[test_index - 1]
37-
run_response = invocation_to_run_response(ctx, user_gi=config.user_gi, runnable=runnable, invocation=invocation)
35+
# Hardcode fail_fast, no need to expose the option to the user IMO.
36+
run_response = invocation_to_run_response(
37+
ctx, user_gi=config.user_gi, runnable=runnable, invocation=invocation, fail_fast=True
38+
)
3839
structured_data = test_case.structured_test_data(run_response)
3940
test_data = {
4041
"version": "0.1",
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
"""Module describing the planemo ``workflow_track`` command."""
2+
3+
import click
4+
5+
from planemo import options
6+
from planemo.cli import command_function
7+
from planemo.engine.factory import engine_context
8+
from planemo.galaxy.activity import wait_for_invocation_and_jobs
9+
10+
11+
@click.command("workflow_track")
12+
@options.invocation_target_options()
13+
@options.fail_fast_option()
14+
@command_function
15+
def cli(ctx, invocation_id, **kwds):
16+
"""Follow the progress of a workflow invocation."""
17+
with engine_context(ctx, engine="external_galaxy", **kwds) as engine, engine.ensure_runnables_served([]) as config:
18+
user_gi = config.user_gi
19+
wait_for_invocation_and_jobs(
20+
ctx,
21+
invocation_id,
22+
history_id=None,
23+
user_gi=user_gi,
24+
polling_backoff=5,
25+
fail_fast=kwds.get("fail_fast", False),
26+
)
27+
28+
ctx.exit(0)

planemo/galaxy/activity.py

Lines changed: 52 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import os
44
import sys
55
import tempfile
6-
import time
76
import traceback
87
from datetime import datetime
98
from typing import (
@@ -36,12 +35,16 @@
3635
unicodify,
3736
)
3837
from pathvalidate import sanitize_filename
39-
from requests.exceptions import (
40-
HTTPError,
41-
RequestException,
42-
)
38+
from requests.exceptions import HTTPError
4339

44-
from planemo.galaxy.api import summarize_history
40+
from planemo.galaxy.api import (
41+
retry_on_timeouts,
42+
summarize_history,
43+
)
44+
from planemo.galaxy.invocations.api import BioblendInvocationApi
45+
from planemo.galaxy.invocations.polling import PollingTrackerImpl
46+
from planemo.galaxy.invocations.polling import wait_for_invocation_and_jobs as polling_wait_for_invocation_and_jobs
47+
from planemo.galaxy.invocations.progress import WorkflowProgressDisplay
4548
from planemo.io import wait_on
4649
from planemo.runnable import (
4750
ErrorRunResponse,
@@ -66,12 +69,12 @@
6669

6770

6871
def execute(
69-
ctx: "PlanemoCliContext", config: "BaseGalaxyConfig", runnable: Runnable, job_path: str, **kwds
72+
ctx: "PlanemoCliContext", config: "BaseGalaxyConfig", runnable: Runnable, job_path: str, fail_fast=False, **kwds
7073
) -> RunResponse:
7174
"""Execute a Galaxy activity."""
7275
try:
7376
start_datetime = datetime.now()
74-
return _execute(ctx, config, runnable, job_path, **kwds)
77+
return _execute(ctx, config, runnable, job_path, fail_fast=fail_fast, **kwds)
7578
except Exception as e:
7679
end_datetime = datetime.now()
7780
ctx.log("Failed to execute Galaxy activity, throwing ErrorRunResponse")
@@ -148,7 +151,7 @@ def _log(self, message):
148151

149152

150153
def _execute( # noqa C901
151-
ctx: "PlanemoCliContext", config: "BaseGalaxyConfig", runnable: Runnable, job_path: str, **kwds
154+
ctx: "PlanemoCliContext", config: "BaseGalaxyConfig", runnable: Runnable, job_path: str, fail_fast=False, **kwds
152155
) -> "GalaxyBaseRunResponse":
153156
user_gi = config.user_gi
154157
admin_gi = config.gi
@@ -215,7 +218,7 @@ def _execute( # noqa C901
215218
no_wait=kwds.get("no_wait", False),
216219
start_datetime=start_datetime,
217220
log=log_contents_str(config),
218-
early_termination=not kwds.get("no_early_termination", False),
221+
fail_fast=fail_fast,
219222
)
220223

221224
else:
@@ -249,7 +252,7 @@ def invocation_to_run_response(
249252
no_wait=False,
250253
start_datetime=None,
251254
log=None,
252-
early_termination=True,
255+
fail_fast=False,
253256
):
254257
start_datetime = start_datetime or datetime.now()
255258
invocation_id = invocation["id"]
@@ -258,19 +261,23 @@ def invocation_to_run_response(
258261

259262
ctx.vlog("Waiting for invocation [%s]" % invocation_id)
260263

261-
final_invocation_state, job_state, error_message = wait_for_invocation_and_jobs(
262-
ctx,
263-
invocation_id=invocation_id,
264-
history_id=history_id,
265-
user_gi=user_gi,
266-
no_wait=no_wait,
267-
polling_backoff=polling_backoff,
268-
early_termination=early_termination,
269-
)
270-
if final_invocation_state not in ("ok", "skipped", "scheduled"):
271-
msg = f"Failed to run workflow [{workflow_id}], at least one job is in [{final_invocation_state}] state."
272-
ctx.vlog(msg)
273-
summarize_history(ctx, user_gi, history_id)
264+
if not no_wait:
265+
final_invocation_state, job_state, error_message = wait_for_invocation_and_jobs(
266+
ctx,
267+
invocation_id=invocation_id,
268+
history_id=history_id,
269+
user_gi=user_gi,
270+
polling_backoff=polling_backoff,
271+
fail_fast=fail_fast,
272+
)
273+
if final_invocation_state not in ("ok", "skipped", "scheduled"):
274+
msg = f"Failed to run workflow [{workflow_id}], at least one job is in [{final_invocation_state}] state."
275+
ctx.vlog(msg)
276+
summarize_history(ctx, user_gi, history_id)
277+
else:
278+
final_invocation_state = invocation["state"]
279+
job_state = None
280+
error_message = None
274281

275282
return GalaxyWorkflowRunResponse(
276283
ctx,
@@ -779,86 +786,28 @@ def _history_id(gi, **kwds) -> str:
779786
def wait_for_invocation_and_jobs(
780787
ctx,
781788
invocation_id: str,
782-
history_id: str,
789+
history_id: Optional[str],
783790
user_gi: GalaxyInstance,
784-
no_wait: bool,
785791
polling_backoff: int,
786-
early_termination: bool,
792+
fail_fast: bool = False,
787793
):
788-
ctx.vlog("Waiting for invocation [%s]" % invocation_id)
789-
final_invocation_state = "new"
790-
791-
# TODO: hook in invocation["messages"]
792-
error_message = ""
793-
job_state = "ok"
794-
try:
795-
final_invocation_state = _wait_for_invocation(ctx, user_gi, invocation_id, polling_backoff)
796-
assert final_invocation_state == "scheduled"
797-
except Exception as e:
798-
ctx.vlog(f"Problem waiting on invocation: {str(e)}")
799-
summarize_history(ctx, user_gi, history_id)
800-
error_message = f"Final state of invocation {invocation_id} is [{final_invocation_state}]"
801-
802-
ctx.vlog(f"Final state of invocation {invocation_id} is [{final_invocation_state}]")
803-
804-
if not no_wait:
805-
job_state = _wait_for_invocation_jobs(ctx, user_gi, invocation_id, polling_backoff, early_termination)
806-
if job_state not in ("ok", "skipped"):
807-
msg = f"Failed to run workflow, at least one job is in [{job_state}] state."
808-
error_message = msg if not error_message else f"{error_message}. {msg}"
809-
else:
810-
# wait for possible subworkflow invocations
811-
invocation = user_gi.invocations.show_invocation(invocation_id)
812-
for step in invocation["steps"]:
813-
if step.get("subworkflow_invocation_id") is not None:
814-
final_invocation_state, job_state, error_message = wait_for_invocation_and_jobs(
815-
ctx,
816-
invocation_id=step["subworkflow_invocation_id"],
817-
history_id=history_id,
818-
user_gi=user_gi,
819-
no_wait=no_wait,
820-
polling_backoff=polling_backoff,
821-
early_termination=early_termination,
822-
)
823-
if final_invocation_state != "scheduled" or job_state not in ("ok", "skipped"):
824-
return final_invocation_state, job_state, error_message
825-
826-
ctx.vlog(f"The final state of all jobs and subworkflow invocations for invocation [{invocation_id}] is 'ok'")
827-
return final_invocation_state, job_state, error_message
828-
829-
830-
def _wait_for_invocation(ctx, gi, invocation_id, polling_backoff=0):
831-
def state_func():
832-
return _retry_on_timeouts(ctx, gi, lambda gi: gi.invocations.show_invocation(invocation_id))
833-
834-
return _wait_on_state(state_func, polling_backoff)
835-
836-
837-
def _retry_on_timeouts(ctx, gi, f):
838-
gi.timeout = 60
839-
try_count = 5
840-
try:
841-
for try_num in range(try_count):
842-
start_time = time.time()
843-
try:
844-
return f(gi)
845-
except RequestException:
846-
end_time = time.time()
847-
if end_time - start_time > 45 and (try_num + 1) < try_count:
848-
ctx.vlog("Galaxy seems to have timed out, retrying to fetch status.")
849-
continue
850-
else:
851-
raise
852-
finally:
853-
gi.timeout = None
854-
855-
856-
def has_jobs_in_states(ctx, gi, history_id, states):
857-
params = {"history_id": history_id}
858-
jobs_url = gi.url + "/jobs"
859-
jobs = gi.jobs._get(url=jobs_url, params=params)
860-
target_jobs = [j for j in jobs if j["state"] in states]
861-
return len(target_jobs) > 0
794+
polling_tracker = PollingTrackerImpl(polling_backoff)
795+
invocation_api = BioblendInvocationApi(ctx, user_gi)
796+
with WorkflowProgressDisplay(invocation_id, galaxy_url=user_gi.base_url) as workflow_progress_display:
797+
final_invocation_state, job_state, error_message = polling_wait_for_invocation_and_jobs(
798+
ctx,
799+
invocation_id,
800+
invocation_api,
801+
polling_tracker,
802+
workflow_progress_display,
803+
fail_fast=fail_fast,
804+
)
805+
if error_message:
806+
if not history_id:
807+
invocation = invocation_api.get_invocation(invocation_id)
808+
history_id = invocation["history_id"]
809+
summarize_history(ctx, user_gi, history_id)
810+
return final_invocation_state, job_state, error_message
862811

863812

864813
def _wait_for_history(ctx, gi, history_id, polling_backoff=0):
@@ -867,32 +816,19 @@ def _wait_for_history(ctx, gi, history_id, polling_backoff=0):
867816
# no need to wait for active jobs anymore I think.
868817

869818
def state_func():
870-
return _retry_on_timeouts(ctx, gi, lambda gi: gi.histories.show_history(history_id))
819+
return retry_on_timeouts(ctx, gi, lambda gi: gi.histories.show_history(history_id))
871820

872821
return _wait_on_state(state_func, polling_backoff)
873822

874823

875-
def _wait_for_invocation_jobs(ctx, gi, invocation_id, polling_backoff=0, early_termination=True):
876-
# Wait for invocation jobs to finish. Less brittle than waiting for a history to finish,
877-
# as you could have more than one invocation in a history, or an invocation without
878-
# steps that produce history items.
879-
880-
ctx.log(f"waiting for invocation {invocation_id}")
881-
882-
def state_func():
883-
return _retry_on_timeouts(ctx, gi, lambda gi: gi.jobs.get_jobs(invocation_id=invocation_id))
884-
885-
return _wait_on_state(state_func, polling_backoff, early_termination=early_termination)
886-
887-
888824
def _wait_for_job(gi, job_id, timeout=None):
889825
def state_func():
890826
return gi.jobs.show_job(job_id, full_details=True)
891827

892828
return _wait_on_state(state_func, timeout=timeout)
893829

894830

895-
def _wait_on_state(state_func, polling_backoff=0, timeout=None, early_termination=True):
831+
def _wait_on_state(state_func, polling_backoff=0, timeout=None):
896832
def get_state():
897833
response = state_func()
898834
if not isinstance(response, list):
@@ -914,8 +850,6 @@ def get_state():
914850
"cancelled",
915851
"failed",
916852
]
917-
if not early_termination and current_non_terminal_states:
918-
return None
919853
for terminal_state in hierarchical_fail_states:
920854
if terminal_state in current_states:
921855
# If we got here something has failed and we can return (early)

planemo/galaxy/api.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
"""A high-level interface to local Galaxy instances using bioblend."""
22

3+
import time
34
from io import StringIO
45
from typing import Optional
56

67
from bioblend.galaxy import GalaxyInstance
8+
from requests.exceptions import RequestException
79

810
DEFAULT_ADMIN_API_KEY = "test_key"
911

@@ -136,6 +138,25 @@ def _dataset_provenance(gi, history_id, id):
136138
return provenance
137139

138140

141+
def retry_on_timeouts(ctx, gi, f):
142+
gi.timeout = 60
143+
try_count = 5
144+
try:
145+
for try_num in range(try_count):
146+
start_time = time.time()
147+
try:
148+
return f(gi)
149+
except RequestException:
150+
end_time = time.time()
151+
if end_time - start_time > 45 and (try_num + 1) < try_count:
152+
ctx.vlog("Galaxy seems to have timed out, retrying to fetch status.")
153+
continue
154+
else:
155+
raise
156+
finally:
157+
gi.timeout = None
158+
159+
139160
__all__ = (
140161
"DEFAULT_ADMIN_API_KEY",
141162
"gi",

planemo/galaxy/invocations/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)