Skip to content

Commit 0f835ef

Browse files
committed
WIP: workflow progress bar.
1 parent e1fff60 commit 0f835ef

5 files changed

Lines changed: 482 additions & 17 deletions

File tree

planemo/commands/cmd_workflow_test_on_invocation.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,7 @@
1515

1616
@click.command("workflow_test_on_invocation")
1717
@options.optional_tools_arg(multiple=False, allow_uris=False, metavar="TEST.YML")
18-
@options.required_invocation_id_arg()
19-
@options.galaxy_url_option(required=True)
20-
@options.galaxy_user_key_option(required=True)
18+
@options.invocation_target_options()
2119
@options.test_index_option()
2220
@options.test_output_options()
2321
@command_function
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
"""Module describing the planemo ``workflow_track`` command."""
2+
3+
import click
4+
5+
from planemo import options
6+
from planemo.cli import command_function
7+
from planemo.galaxy.workflow_progress import WorkflowProgress
8+
from planemo.engine.factory import engine_context
9+
10+
11+
@click.command("workflow_track")
12+
@options.invocation_target_options()
13+
@command_function
14+
def cli(ctx, invocation_id, **kwds):
15+
"""Run defined tests against existing workflow invocation."""
16+
with WorkflowProgress() as workflow_progress:
17+
workflow_progress.add_bars()
18+
import time
19+
20+
time.sleep(1)
21+
new_step = {"state": "new"}
22+
scheduled_step = {"state": "scheduled"}
23+
new_steps = [new_step, new_step, new_step]
24+
one_scheduled_steps = [scheduled_step, new_step, new_step]
25+
two_scheduled_steps = [scheduled_step, scheduled_step, new_step]
26+
all_scheduled_steps = [scheduled_step, scheduled_step, scheduled_step]
27+
state_pairs = [
28+
({"state": "new"}, {}),
29+
({"state": "ready", "steps": new_steps}, {}),
30+
({"state": "ready", "steps": one_scheduled_steps}, {"states": {"new": 1}}),
31+
({"state": "ready", "steps": two_scheduled_steps}, {"states": {"new": 2}}),
32+
({"state": "ready", "steps": two_scheduled_steps}, {"states": {"new": 1, "running": 1}}),
33+
({"state": "ready", "steps": two_scheduled_steps}, {"states": {"new": 1, "ok": 1}}),
34+
({"state": "ready", "steps": two_scheduled_steps}, {"states": {"ok": 2}}),
35+
({"state": "scheduled", "steps": all_scheduled_steps}, {"states": {"ok": 2, "new": 3}}),
36+
({"state": "scheduled", "steps": all_scheduled_steps}, {"states": {"ok": 2, "running": 1, "new": 2}}),
37+
({"state": "scheduled", "steps": all_scheduled_steps}, {"states": {"ok": 3, "running": 1, "new": 1}}),
38+
({"state": "scheduled", "steps": all_scheduled_steps}, {"states": {"ok": 4, "running": 1}}),
39+
({"state": "scheduled", "steps": all_scheduled_steps}, {"states": {"ok": 5}}),
40+
]
41+
for invocation, job_states_summary in state_pairs:
42+
workflow_progress.handle_invocation(invocation, job_states_summary)
43+
time.sleep(1)
44+
45+
with engine_context(ctx, engine="external_galaxy", **kwds) as engine, engine.ensure_runnables_served([]) as config:
46+
user_gi = config.user_gi
47+
invocation = user_gi.invocations.show_invocation(invocation_id)
48+
# https://stackoverflow.com/questions/23113494/double-progress-bar-in-python
49+
50+
ctx.exit(0)

planemo/galaxy/activity.py

Lines changed: 131 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@
5252
RunResponse,
5353
SuccessfulRunResponse,
5454
)
55+
from .workflow_progress import WorkflowProgressDisplay
56+
5557

5658
if TYPE_CHECKING:
5759
from planemo.cli import PlanemoCliContext
@@ -788,25 +790,140 @@ def wait_for_invocation_and_jobs(
788790
msg = f"Failed to run workflow, at least one job is in [{job_state}] state."
789791
error_message = msg if not error_message else f"{error_message}. {msg}"
790792
else:
791-
# wait for possible subworkflow invocations
792-
invocation = user_gi.invocations.show_invocation(invocation_id)
793-
for step in invocation["steps"]:
794-
if step.get("subworkflow_invocation_id") is not None:
795-
final_invocation_state, job_state, error_message = wait_for_invocation_and_jobs(
796-
ctx,
797-
invocation_id=step["subworkflow_invocation_id"],
798-
history_id=history_id,
799-
user_gi=user_gi,
800-
no_wait=no_wait,
801-
polling_backoff=polling_backoff,
802-
)
803-
if final_invocation_state != "scheduled" or job_state not in ("ok", "skipped"):
804-
return final_invocation_state, job_state, error_message
793+
for subworkflow_invocation_id in subworkflow_invocation_ids(user_gi, invocation_id):
794+
final_invocation_state, job_state, error_message = wait_for_invocation_and_jobs(
795+
ctx,
796+
invocation_id=subworkflow_invocation_id,
797+
history_id=history_id,
798+
user_gi=user_gi,
799+
no_wait=no_wait,
800+
polling_backoff=polling_backoff,
801+
)
802+
if final_invocation_state != "scheduled" or job_state not in ("ok", "skipped"):
803+
return final_invocation_state, job_state, error_message
805804

806805
ctx.vlog(f"The final state of all jobs and subworkflow invocations for invocation [{invocation_id}] is 'ok'")
807806
return final_invocation_state, job_state, error_message
808807

809808

809+
def wait_for_invocation_and_jobs_tracked(
810+
ctx,
811+
invocation_id: str,
812+
history_id: str,
813+
user_gi: GalaxyInstance,
814+
no_wait: bool,
815+
polling_backoff: int,
816+
workflow_progress_display: WorkflowProgressDisplay,
817+
):
818+
819+
def summary_job_state(job_states_summary: Optional[Dict]):
820+
states = (job_states_summary or {}).get("states").copy()
821+
states.pop("ok").pop("skipped")
822+
if states:
823+
return next(states.keys())
824+
else:
825+
return "ok"
826+
827+
def summarize(invocation_id: str):
828+
invocation = _retry_on_timeouts(ctx, user_gi, lambda gi: gi.invocations.show_invocation(invocation_id))
829+
invocation_jobs = _retry_on_timeouts(
830+
ctx, user_gi, lambda gi: gi.invocations.get_invocation_summary(invocation_id)
831+
)
832+
return invocation, invocation_jobs
833+
834+
ctx.vlog("Waiting for invocation [%s]" % invocation_id)
835+
done_polling = False
836+
last_invocation = None
837+
last_invocation_jobs = None
838+
last_exception = None
839+
while not done_polling:
840+
try:
841+
last_invocation, last_invocation_jobs = summarize(invocation_id)
842+
workflow_progress_display.handle_invocation(last_invocation, last_invocation_jobs)
843+
final_invocation_state = workflow_progress_display.workflow_progress.invocation_state
844+
if workflow_progress_display.workflow_progress.invocation_scheduling_terminal and no_wait:
845+
done_polling = True
846+
break
847+
if not no_wait and workflow_progress_display.workflow_progress.jobs_terminal:
848+
done_polling = True
849+
break
850+
time.sleep(1)
851+
except Exception as e:
852+
last_exception = e
853+
done_polling = True
854+
855+
final_invocation_state = "new" if not last_invocation else last_invocation["state"]
856+
job_state = summary_job_state(last_invocation_jobs)
857+
ctx.vlog(f"Final state of invocation {invocation_id} is [{final_invocation_state}]")
858+
859+
def workflow_in_error() -> str:
860+
error_message = None
861+
if last_exception:
862+
ctx.vlog(f"Problem waiting on invocation: {str(last_exception)}")
863+
error_message = f"Final state of invocation {invocation_id} is [{final_invocation_state}]"
864+
865+
if final_invocation_state != "scheduled":
866+
msg = f"Failed to run workflow, invocation ended in [{final_invocation_state}] state."
867+
ctx.vlog(msg)
868+
error_message = msg if not error_message else f"{error_message}. {msg}"
869+
870+
if job_state != "ok":
871+
msg = f"Failed to run workflow, at least one job is in [{job_state}] state."
872+
ctx.vlog(msg)
873+
error_message = msg if not error_message else f"{error_message}. {msg}"
874+
875+
return error_message
876+
877+
error_message = workflow_in_error()
878+
if error_message:
879+
summarize_history(ctx, user_gi, history_id)
880+
return final_invocation_state, job_state, error_message
881+
882+
if not no_wait:
883+
subworkflow_ids = subworkflow_invocation_ids(user_gi, invocation_id)
884+
workflow_progress_display.register_subworkflow_invocation_ids(subworkflow_ids)
885+
done_polling = workflow_progress_display.all_subworkflows_complete()
886+
while not done_polling:
887+
try:
888+
a_subworkflow_invocation_id = workflow_progress_display.an_incomplete_subworkflow_id()
889+
subworkflow_subworkflow_invocation_ids = subworkflow_invocation_ids(
890+
user_gi, a_subworkflow_invocation_id
891+
)
892+
workflow_progress_display.register_subworkflow_invocation_ids(subworkflow_subworkflow_invocation_ids)
893+
894+
last_invocation, last_invocation_jobs = summarize(invocation_id)
895+
workflow_progress_display.handle_subworkflow_invocation(last_invocation, last_invocation_jobs)
896+
897+
scheduling_terminal = workflow_progress_display.subworkflow_progress.invocation_scheduling_terminal
898+
jobs_terminal = workflow_progress_display.subworkflow_progress.jobs_terminal
899+
if scheduling_terminal and jobs_terminal:
900+
workflow_progress_display.complete_subworkflow(a_subworkflow_invocation_id)
901+
done_polling = workflow_progress_display.all_subworkflows_complete()
902+
except Exception as e:
903+
last_exception = e
904+
done_polling = True
905+
906+
final_invocation_state = "new" if not last_invocation else last_invocation["state"]
907+
job_state = summary_job_state(last_invocation_jobs)
908+
ctx.vlog(f"Final state of invocation {invocation_id} is [{final_invocation_state}]")
909+
910+
error_message = workflow_in_error()
911+
if error_message:
912+
summarize_history(ctx, user_gi, history_id)
913+
return final_invocation_state, job_state, error_message
914+
915+
return final_invocation_state, job_state, error_message
916+
917+
918+
def subworkflow_invocation_ids(user_gi, invocation_id: str):
919+
invocation = user_gi.invocations.show_invocation(invocation_id)
920+
subworkflow_invocation_ids = []
921+
for step in invocation["steps"]:
922+
if step.get("subworkflow_invocation_id") is not None:
923+
subworkflow_invocation_ids.append(step["subworkflow_invocation_id"])
924+
return subworkflow_invocation_ids
925+
926+
810927
def _wait_for_invocation(ctx, gi, invocation_id, polling_backoff=0):
811928
def state_func():
812929
return _retry_on_timeouts(ctx, gi, lambda gi: gi.invocations.show_invocation(invocation_id))

0 commit comments

Comments
 (0)