@@ -215,6 +215,7 @@ def _execute( # noqa C901
215215 no_wait = kwds .get ("no_wait" , False ),
216216 start_datetime = start_datetime ,
217217 log = log_contents_str (config ),
218+ early_termination = not kwds .get ("no_early_termination" , False ),
218219 )
219220
220221 else :
@@ -240,7 +241,15 @@ def _execute( # noqa C901
240241
241242
242243def invocation_to_run_response (
243- ctx , user_gi , runnable , invocation , polling_backoff = 0 , no_wait = False , start_datetime = None , log = None
244+ ctx ,
245+ user_gi ,
246+ runnable ,
247+ invocation ,
248+ polling_backoff = 0 ,
249+ no_wait = False ,
250+ start_datetime = None ,
251+ log = None ,
252+ early_termination = True ,
244253):
245254 start_datetime = start_datetime or datetime .now ()
246255 invocation_id = invocation ["id" ]
@@ -256,6 +265,7 @@ def invocation_to_run_response(
256265 user_gi = user_gi ,
257266 no_wait = no_wait ,
258267 polling_backoff = polling_backoff ,
268+ early_termination = early_termination ,
259269 )
260270 if final_invocation_state not in ("ok" , "skipped" , "scheduled" ):
261271 msg = f"Failed to run workflow [{ workflow_id } ], at least one job is in [{ final_invocation_state } ] state."
@@ -764,7 +774,13 @@ def _history_id(gi, **kwds) -> str:
764774
765775
766776def wait_for_invocation_and_jobs (
767- ctx , invocation_id : str , history_id : str , user_gi : GalaxyInstance , no_wait : bool , polling_backoff : int
777+ ctx ,
778+ invocation_id : str ,
779+ history_id : str ,
780+ user_gi : GalaxyInstance ,
781+ no_wait : bool ,
782+ polling_backoff : int ,
783+ early_termination : bool ,
768784):
769785 ctx .vlog ("Waiting for invocation [%s]" % invocation_id )
770786 final_invocation_state = "new"
@@ -783,7 +799,7 @@ def wait_for_invocation_and_jobs(
783799 ctx .vlog (f"Final state of invocation { invocation_id } is [{ final_invocation_state } ]" )
784800
785801 if not no_wait :
786- job_state = _wait_for_invocation_jobs (ctx , user_gi , invocation_id , polling_backoff )
802+ job_state = _wait_for_invocation_jobs (ctx , user_gi , invocation_id , polling_backoff , early_termination )
787803 if job_state not in ("ok" , "skipped" ):
788804 msg = f"Failed to run workflow, at least one job is in [{ job_state } ] state."
789805 error_message = msg if not error_message else f"{ error_message } . { msg } "
@@ -799,6 +815,7 @@ def wait_for_invocation_and_jobs(
799815 user_gi = user_gi ,
800816 no_wait = no_wait ,
801817 polling_backoff = polling_backoff ,
818+ early_termination = early_termination ,
802819 )
803820 if final_invocation_state != "scheduled" or job_state not in ("ok" , "skipped" ):
804821 return final_invocation_state , job_state , error_message
@@ -852,7 +869,7 @@ def state_func():
852869 return _wait_on_state (state_func , polling_backoff )
853870
854871
855- def _wait_for_invocation_jobs (ctx , gi , invocation_id , polling_backoff = 0 ):
872+ def _wait_for_invocation_jobs (ctx , gi , invocation_id , polling_backoff = 0 , early_termination = True ):
856873 # Wait for invocation jobs to finish. Less brittle than waiting for a history to finish,
857874 # as you could have more than one invocation in a history, or an invocation without
858875 # steps that produce history items.
@@ -862,7 +879,7 @@ def _wait_for_invocation_jobs(ctx, gi, invocation_id, polling_backoff=0):
862879 def state_func ():
863880 return _retry_on_timeouts (ctx , gi , lambda gi : gi .jobs .get_jobs (invocation_id = invocation_id ))
864881
865- return _wait_on_state (state_func , polling_backoff )
882+ return _wait_on_state (state_func , polling_backoff , early_termination = early_termination )
866883
867884
868885def _wait_for_job (gi , job_id , timeout = None ):
@@ -872,7 +889,7 @@ def state_func():
872889 return _wait_on_state (state_func , timeout = timeout )
873890
874891
875- def _wait_on_state (state_func , polling_backoff = 0 , timeout = None ):
892+ def _wait_on_state (state_func , polling_backoff = 0 , timeout = None , early_termination = True ):
876893 def get_state ():
877894 response = state_func ()
878895 if not isinstance (response , list ):
@@ -894,6 +911,8 @@ def get_state():
894911 "cancelled" ,
895912 "failed" ,
896913 ]
914+ if not early_termination and current_non_terminal_states :
915+ return None
897916 for terminal_state in hierarchical_fail_states :
898917 if terminal_state in current_states :
899918 # If we got here something has failed and we can return (early)
0 commit comments