Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions planemo/commands/cmd_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
@options.run_download_outputs_option()
@options.engine_options()
@options.test_options()
@options.no_early_termination_option()
@command_function
def cli(ctx, runnable_identifier, job_path, **kwds):
"""Planemo command for running tools and jobs.
Expand Down
31 changes: 25 additions & 6 deletions planemo/galaxy/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ def _execute( # noqa C901
no_wait=kwds.get("no_wait", False),
start_datetime=start_datetime,
log=log_contents_str(config),
early_termination=not kwds.get("no_early_termination", False),
)

else:
Expand All @@ -240,7 +241,15 @@ def _execute( # noqa C901


def invocation_to_run_response(
ctx, user_gi, runnable, invocation, polling_backoff=0, no_wait=False, start_datetime=None, log=None
ctx,
user_gi,
runnable,
invocation,
polling_backoff=0,
no_wait=False,
start_datetime=None,
log=None,
early_termination=True,
):
start_datetime = start_datetime or datetime.now()
invocation_id = invocation["id"]
Expand All @@ -256,6 +265,7 @@ def invocation_to_run_response(
user_gi=user_gi,
no_wait=no_wait,
polling_backoff=polling_backoff,
early_termination=early_termination,
)
if final_invocation_state not in ("ok", "skipped", "scheduled"):
msg = f"Failed to run workflow [{workflow_id}], at least one job is in [{final_invocation_state}] state."
Expand Down Expand Up @@ -764,7 +774,13 @@ def _history_id(gi, **kwds) -> str:


def wait_for_invocation_and_jobs(
ctx, invocation_id: str, history_id: str, user_gi: GalaxyInstance, no_wait: bool, polling_backoff: int
ctx,
invocation_id: str,
history_id: str,
user_gi: GalaxyInstance,
no_wait: bool,
polling_backoff: int,
early_termination: bool,
):
ctx.vlog("Waiting for invocation [%s]" % invocation_id)
final_invocation_state = "new"
Expand All @@ -783,7 +799,7 @@ def wait_for_invocation_and_jobs(
ctx.vlog(f"Final state of invocation {invocation_id} is [{final_invocation_state}]")

if not no_wait:
job_state = _wait_for_invocation_jobs(ctx, user_gi, invocation_id, polling_backoff)
job_state = _wait_for_invocation_jobs(ctx, user_gi, invocation_id, polling_backoff, early_termination)
if job_state not in ("ok", "skipped"):
msg = f"Failed to run workflow, at least one job is in [{job_state}] state."
error_message = msg if not error_message else f"{error_message}. {msg}"
Expand All @@ -799,6 +815,7 @@ def wait_for_invocation_and_jobs(
user_gi=user_gi,
no_wait=no_wait,
polling_backoff=polling_backoff,
early_termination=early_termination,
)
if final_invocation_state != "scheduled" or job_state not in ("ok", "skipped"):
return final_invocation_state, job_state, error_message
Expand Down Expand Up @@ -852,7 +869,7 @@ def state_func():
return _wait_on_state(state_func, polling_backoff)


def _wait_for_invocation_jobs(ctx, gi, invocation_id, polling_backoff=0):
def _wait_for_invocation_jobs(ctx, gi, invocation_id, polling_backoff=0, early_termination=True):
# Wait for invocation jobs to finish. Less brittle than waiting for a history to finish,
# as you could have more than one invocation in a history, or an invocation without
# steps that produce history items.
Expand All @@ -862,7 +879,7 @@ def _wait_for_invocation_jobs(ctx, gi, invocation_id, polling_backoff=0):
def state_func():
return _retry_on_timeouts(ctx, gi, lambda gi: gi.jobs.get_jobs(invocation_id=invocation_id))

return _wait_on_state(state_func, polling_backoff)
return _wait_on_state(state_func, polling_backoff, early_termination=early_termination)


def _wait_for_job(gi, job_id, timeout=None):
Expand All @@ -872,7 +889,7 @@ def state_func():
return _wait_on_state(state_func, timeout=timeout)


def _wait_on_state(state_func, polling_backoff=0, timeout=None):
def _wait_on_state(state_func, polling_backoff=0, timeout=None, early_termination=True):
def get_state():
response = state_func()
if not isinstance(response, list):
Expand All @@ -894,6 +911,8 @@ def get_state():
"cancelled",
"failed",
]
if not early_termination and current_non_terminal_states:
return None
for terminal_state in hierarchical_fail_states:
if terminal_state in current_states:
# If we got here something has failed and we can return (early)
Expand Down
10 changes: 10 additions & 0 deletions planemo/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -2067,6 +2067,16 @@ def tool_init_example_command_option(help=EXAMPLE_COMMAND_HELP):
)


def no_early_termination_option():
return planemo_option(
"--no_early_termination",
is_flag=True,
default=False,
prompt=False,
help="Wait until all jobs terminate, even if some jobs have failed",
)


def mulled_conda_option():
return planemo_option(
"--mulled_conda_version",
Expand Down
Loading