Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions planemo/commands/cmd_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
@options.run_download_outputs_option()
@options.engine_options()
@options.test_options()
@options.no_early_termination_option()
@command_function
def cli(ctx, runnable_identifier, job_path, **kwds):
"""Planemo command for running tools and jobs.
Expand Down
24 changes: 15 additions & 9 deletions planemo/galaxy/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ def _execute( # noqa C901
no_wait=kwds.get("no_wait", False),
start_datetime=start_datetime,
log=log_contents_str(config),
early_termination=not kwds.get("no_early_termination", False),
)

else:
Expand All @@ -240,7 +241,7 @@ def _execute( # noqa C901


def invocation_to_run_response(
ctx, user_gi, runnable, invocation, polling_backoff=0, no_wait=False, start_datetime=None, log=None
ctx, user_gi, runnable, invocation, polling_backoff=0, no_wait=False, start_datetime=None, log=None, early_termination=True
):
start_datetime = start_datetime or datetime.now()
invocation_id = invocation["id"]
Expand All @@ -256,6 +257,7 @@ def invocation_to_run_response(
user_gi=user_gi,
no_wait=no_wait,
polling_backoff=polling_backoff,
early_termination=early_termination,
)
if final_invocation_state not in ("ok", "skipped", "scheduled"):
msg = f"Failed to run workflow [{workflow_id}], at least one job is in [{final_invocation_state}] state."
Expand Down Expand Up @@ -764,7 +766,7 @@ def _history_id(gi, **kwds) -> str:


def wait_for_invocation_and_jobs(
ctx, invocation_id: str, history_id: str, user_gi: GalaxyInstance, no_wait: bool, polling_backoff: int
ctx, invocation_id: str, history_id: str, user_gi: GalaxyInstance, no_wait: bool, polling_backoff: int, early_termination: bool
):
ctx.vlog("Waiting for invocation [%s]" % invocation_id)
final_invocation_state = "new"
Expand All @@ -783,7 +785,7 @@ def wait_for_invocation_and_jobs(
ctx.vlog(f"Final state of invocation {invocation_id} is [{final_invocation_state}]")

if not no_wait:
job_state = _wait_for_invocation_jobs(ctx, user_gi, invocation_id, polling_backoff)
job_state = _wait_for_invocation_jobs(ctx, user_gi, invocation_id, polling_backoff, early_termination)
if job_state not in ("ok", "skipped"):
msg = f"Failed to run workflow, at least one job is in [{job_state}] state."
error_message = msg if not error_message else f"{error_message}. {msg}"
Expand All @@ -799,6 +801,7 @@ def wait_for_invocation_and_jobs(
user_gi=user_gi,
no_wait=no_wait,
polling_backoff=polling_backoff,
early_termination=early_termination,
)
if final_invocation_state != "scheduled" or job_state not in ("ok", "skipped"):
return final_invocation_state, job_state, error_message
Expand All @@ -811,7 +814,7 @@ def _wait_for_invocation(ctx, gi, invocation_id, polling_backoff=0):
def state_func():
return _retry_on_timeouts(ctx, gi, lambda gi: gi.invocations.show_invocation(invocation_id))

return _wait_on_state(state_func, polling_backoff)
return _wait_on_state(ctx, state_func, polling_backoff)


def _retry_on_timeouts(ctx, gi, f):
Expand Down Expand Up @@ -849,10 +852,10 @@ def _wait_for_history(ctx, gi, history_id, polling_backoff=0):
def state_func():
return _retry_on_timeouts(ctx, gi, lambda gi: gi.histories.show_history(history_id))

return _wait_on_state(state_func, polling_backoff)
return _wait_on_state(ctx, state_func, polling_backoff)


def _wait_for_invocation_jobs(ctx, gi, invocation_id, polling_backoff=0):
def _wait_for_invocation_jobs(ctx, gi, invocation_id, polling_backoff=0, early_termination=True):
# Wait for invocation jobs to finish. Less brittle than waiting for a history to finish,
# as you could have more than one invocation in a history, or an invocation without
# steps that produce history items.
Expand All @@ -862,17 +865,17 @@ def _wait_for_invocation_jobs(ctx, gi, invocation_id, polling_backoff=0):
def state_func():
return _retry_on_timeouts(ctx, gi, lambda gi: gi.jobs.get_jobs(invocation_id=invocation_id))

return _wait_on_state(state_func, polling_backoff)
return _wait_on_state(ctx, state_func, polling_backoff, early_termination=early_termination)


def _wait_for_job(gi, job_id, timeout=None):
def state_func():
return gi.jobs.show_job(job_id, full_details=True)

return _wait_on_state(state_func, timeout=timeout)
return _wait_on_state(ctx, state_func, timeout=timeout)


def _wait_on_state(state_func, polling_backoff=0, timeout=None):
def _wait_on_state(ctx, state_func, polling_backoff=0, timeout=None, early_termination=True):
def get_state():
response = state_func()
if not isinstance(response, list):
Expand All @@ -894,9 +897,12 @@ def get_state():
"cancelled",
"failed",
]
if not early_termination and current_non_terminal_states:
return None
for terminal_state in hierarchical_fail_states:
if terminal_state in current_states:
# If we got here something has failed and we can return (early)
ctx.log(f"Early termination.")
return terminal_state
if current_non_terminal_states:
return None
Expand Down
13 changes: 13 additions & 0 deletions planemo/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -2067,6 +2067,19 @@ def tool_init_example_command_option(help=EXAMPLE_COMMAND_HELP):
)


def no_early_termination_option():
return planemo_option(
"--no_early_termination",
is_flag=True,
default=False,
prompt=False,
help=(
"Do not terminate when a job of a workflow fails, but other "
"jobs are still queued or running."
),
Comment thread
kostrykin marked this conversation as resolved.
Outdated
)


def mulled_conda_option():
return planemo_option(
"--mulled_conda_version",
Expand Down