33import os
44import sys
55import tempfile
6- import time
76import traceback
87from datetime import datetime
98from typing import (
4241 retry_on_timeouts ,
4342 summarize_history ,
4443)
44+ from planemo .galaxy .invocations .api import BioblendInvocationApi
45+ from planemo .galaxy .invocations .polling import (
46+ PollingTrackerImpl ,
47+ )
48+ from planemo .galaxy .invocations .polling import wait_for_invocation_and_jobs as polling_wait_for_invocation_and_jobs
49+ from planemo .galaxy .invocations .progress import WorkflowProgressDisplay
4550from planemo .io import wait_on
4651from planemo .runnable import (
4752 ErrorRunResponse ,
@@ -770,58 +775,15 @@ def _history_id(gi, **kwds) -> str:
770775def wait_for_invocation_and_jobs (
771776 ctx , invocation_id : str , history_id : str , user_gi : GalaxyInstance , polling_backoff : int
772777):
773- ctx .vlog ("Waiting for invocation [%s]" % invocation_id )
774- final_invocation_state = "new"
775-
776- # TODO: hook in invocation["messages"]
777- error_message = ""
778- job_state = "ok"
779- try :
780- final_invocation_state = _wait_for_invocation (ctx , user_gi , invocation_id , polling_backoff )
781- assert final_invocation_state == "scheduled"
782- except Exception as e :
783- ctx .vlog (f"Problem waiting on invocation: { str (e )} " )
784- summarize_history (ctx , user_gi , history_id )
785- error_message = f"Final state of invocation { invocation_id } is [{ final_invocation_state } ]"
786-
787- ctx .vlog (f"Final state of invocation { invocation_id } is [{ final_invocation_state } ]" )
788-
789- job_state = _wait_for_invocation_jobs (ctx , user_gi , invocation_id , polling_backoff )
790- if job_state not in ("ok" , "skipped" ):
791- msg = f"Failed to run workflow, at least one job is in [{ job_state } ] state."
792- error_message = msg if not error_message else f"{ error_message } . { msg } "
793- else :
794- # wait for possible subworkflow invocations
795- invocation = user_gi .invocations .show_invocation (invocation_id )
796- for step in invocation ["steps" ]:
797- if step .get ("subworkflow_invocation_id" ) is not None :
798- final_invocation_state , job_state , error_message = wait_for_invocation_and_jobs (
799- ctx ,
800- invocation_id = step ["subworkflow_invocation_id" ],
801- history_id = history_id ,
802- user_gi = user_gi ,
803- polling_backoff = polling_backoff ,
804- )
805- if final_invocation_state != "scheduled" or job_state not in ("ok" , "skipped" ):
806- return final_invocation_state , job_state , error_message
807-
808- ctx .vlog (f"The final state of all jobs and subworkflow invocations for invocation [{ invocation_id } ] is 'ok'" )
809- return final_invocation_state , job_state , error_message
810-
811-
812- def _wait_for_invocation (ctx , gi , invocation_id , polling_backoff = 0 ):
813- def state_func ():
814- return retry_on_timeouts (ctx , gi , lambda gi : gi .invocations .show_invocation (invocation_id ))
815-
816- return _wait_on_state (state_func , polling_backoff )
817-
818-
819- def has_jobs_in_states (ctx , gi , history_id , states ):
820- params = {"history_id" : history_id }
821- jobs_url = gi .url + "/jobs"
822- jobs = gi .jobs ._get (url = jobs_url , params = params )
823- target_jobs = [j for j in jobs if j ["state" ] in states ]
824- return len (target_jobs ) > 0
778+ polling_tracker = PollingTrackerImpl (polling_backoff )
779+ invocation_api = BioblendInvocationApi (ctx , user_gi )
780+ with WorkflowProgressDisplay (invocation_id ) as workflow_progress_display :
781+ final_invocation_state , job_state , error_message = polling_wait_for_invocation_and_jobs (
782+ ctx , invocation_id , invocation_api , polling_tracker , workflow_progress_display
783+ )
784+ if error_message :
785+ summarize_history (ctx , user_gi , history_id )
786+ return final_invocation_state , job_state , error_message
825787
826788
827789def _wait_for_history (ctx , gi , history_id , polling_backoff = 0 ):
@@ -835,19 +797,6 @@ def state_func():
835797 return _wait_on_state (state_func , polling_backoff )
836798
837799
838- def _wait_for_invocation_jobs (ctx , gi , invocation_id , polling_backoff = 0 ):
839- # Wait for invocation jobs to finish. Less brittle than waiting for a history to finish,
840- # as you could have more than one invocation in a history, or an invocation without
841- # steps that produce history items.
842-
843- ctx .log (f"waiting for invocation { invocation_id } " )
844-
845- def state_func ():
846- return retry_on_timeouts (ctx , gi , lambda gi : gi .jobs .get_jobs (invocation_id = invocation_id ))
847-
848- return _wait_on_state (state_func , polling_backoff )
849-
850-
851800def _wait_for_job (gi , job_id , timeout = None ):
852801 def state_func ():
853802 return gi .jobs .show_job (job_id , full_details = True )
0 commit comments