@@ -36,6 +36,75 @@ def sleep(self):
3636 self .delta += self .polling_backoff
3737
3838
39+ def _summarize_invocation (invocation_api : InvocationApi , invocation_id : str ):
40+ invocation = invocation_api .get_invocation (invocation_id )
41+ assert invocation
42+ invocation_jobs = invocation_api .get_invocation_summary (invocation_id )
43+ return invocation , invocation_jobs
44+
45+
46+ def _poll_main_workflow (
47+ ctx ,
48+ invocation_id : str ,
49+ invocation_api : InvocationApi ,
50+ workflow_progress_display : WorkflowProgressDisplay ,
51+ fail_fast : bool ,
52+ ):
53+ if workflow_progress_display .workflow_progress .terminal :
54+ return None , None , None
55+
56+ try :
57+ invocation , invocation_jobs = _summarize_invocation (invocation_api , invocation_id )
58+ workflow_progress_display .handle_invocation (invocation , invocation_jobs )
59+ return invocation , invocation_jobs , None
60+ except Exception as e :
61+ print (e )
62+ return None , None , e
63+
64+
65+ def _poll_subworkflow (
66+ ctx ,
67+ invocation_id : str ,
68+ invocation_api : InvocationApi ,
69+ workflow_progress_display : WorkflowProgressDisplay ,
70+ fail_fast : bool ,
71+ ):
72+ if workflow_progress_display .all_subworkflows_complete ():
73+ return None , None , None
74+
75+ try :
76+ subworkflow_id = workflow_progress_display .an_incomplete_subworkflow_id ()
77+ invocation , invocation_jobs = _summarize_invocation (invocation_api , subworkflow_id )
78+ workflow_progress_display .handle_subworkflow_invocation (invocation , invocation_jobs )
79+ return invocation , invocation_jobs , None
80+ except Exception as e :
81+ return None , None , e
82+
83+
84+ def _check_for_errors (
85+ ctx ,
86+ invocation_id : str ,
87+ exception : Optional [Exception ],
88+ invocation ,
89+ invocation_jobs ,
90+ fail_fast : bool ,
91+ ):
92+ error_message = workflow_in_error_message (
93+ ctx , invocation_id , exception , invocation , invocation_jobs , fail_fast = fail_fast
94+ )
95+ if error_message :
96+ final_state = "new" if not invocation else invocation ["state" ]
97+ job_state = summary_job_state (invocation_jobs )
98+ return final_state , job_state , error_message
99+ return None
100+
101+
102+ def _is_polling_complete (workflow_progress_display : WorkflowProgressDisplay ) -> bool :
103+ return (
104+ workflow_progress_display .workflow_progress .terminal and workflow_progress_display .all_subworkflows_complete ()
105+ )
106+
107+
39108def wait_for_invocation_and_jobs (
40109 ctx ,
41110 invocation_id : str ,
@@ -46,80 +115,34 @@ def wait_for_invocation_and_jobs(
46115):
47116 ctx .vlog ("Waiting for invocation [%s]" % invocation_id )
48117
49- def summarize (invocation_id : str ):
50- invocation = invocation_api .get_invocation (invocation_id )
51- assert invocation
52- invocation_jobs = invocation_api .get_invocation_summary (invocation_id )
53- return invocation , invocation_jobs
54-
55118 last_invocation = None
56119 last_invocation_jobs = None
57- last_subworkflow_invocation = None
58- last_subworkflow_invocation_jobs = None
59- last_exception = None
60120 error_message : Optional [str ] = None
61121
62- done_polling = False
63- while not done_polling :
64- # loop over the main workflow and one subworkflow each iteration for display,
65-
66- # skip the main workflow if it is already tracked as complete - if all steps have been
67- # scheduled there are no new subworkflow invocations to track.
68- if not workflow_progress_display .workflow_progress .terminal :
69- try :
70- last_invocation , last_invocation_jobs = summarize (invocation_id )
71- workflow_progress_display .handle_invocation (last_invocation , last_invocation_jobs )
72- except Exception as e :
73- print (e )
74- last_exception = e
75-
76- error_message = workflow_in_error_message (
77- ctx ,
78- invocation_id ,
79- last_exception ,
80- last_invocation ,
81- last_invocation_jobs ,
82- fail_fast = fail_fast ,
83- )
84- if error_message :
85- final_invocation_state = "new" if not last_invocation else last_invocation ["state" ]
86- job_state = summary_job_state (last_invocation_jobs )
87- return final_invocation_state , job_state , error_message
88-
89- assert last_invocation # if we got here... the first check has passed and we have an invocation
90-
91- # grab a subworkflow that isn't complete and check it, also register its subworkflow
92- # invocations so we catch all the children and children of children...
93- if not workflow_progress_display .all_subworkflows_complete ():
94- try :
95- a_subworkflow_invocation_id = workflow_progress_display .an_incomplete_subworkflow_id ()
96- last_subworkflow_invocation , last_subworkflow_invocation_jobs = summarize (a_subworkflow_invocation_id )
97- workflow_progress_display .handle_subworkflow_invocation (
98- last_subworkflow_invocation , last_subworkflow_invocation_jobs
99- )
100- except Exception as e :
101- last_exception = e
102-
103- error_message = workflow_in_error_message (
104- ctx ,
105- invocation_id ,
106- last_exception ,
107- last_subworkflow_invocation ,
108- last_subworkflow_invocation_jobs ,
109- fail_fast = fail_fast ,
110- )
111- if error_message :
112- final_invocation_state = (
113- "new" if not last_subworkflow_invocation else last_subworkflow_invocation ["state" ]
114- )
115- job_state = summary_job_state (last_subworkflow_invocation_jobs )
116- return final_invocation_state , job_state , error_message
117-
118- done_polling = (
119- workflow_progress_display .workflow_progress .terminal
120- and workflow_progress_display .all_subworkflows_complete ()
122+ while not _is_polling_complete (workflow_progress_display ):
123+ # Poll main workflow
124+ main_invocation , main_jobs , main_exception = _poll_main_workflow (
125+ ctx , invocation_id , invocation_api , workflow_progress_display , fail_fast
126+ )
127+
128+ if main_invocation :
129+ last_invocation = main_invocation
130+ last_invocation_jobs = main_jobs
131+
132+ error_result = _check_for_errors (ctx , invocation_id , main_exception , main_invocation , main_jobs , fail_fast )
133+ if error_result :
134+ return error_result
135+
136+ # Poll subworkflow
137+ sub_invocation , sub_jobs , sub_exception = _poll_subworkflow (
138+ ctx , invocation_id , invocation_api , workflow_progress_display , fail_fast
121139 )
122- if not done_polling :
140+
141+ error_result = _check_for_errors (ctx , invocation_id , sub_exception , sub_invocation , sub_jobs , fail_fast )
142+ if error_result :
143+ return error_result
144+
145+ if not _is_polling_complete (workflow_progress_display ):
123146 polling_tracker .sleep ()
124147
125148 ctx .vlog (f"The final state of all jobs and subworkflow invocations for invocation [{ invocation_id } ] is 'ok'" )
@@ -129,7 +152,12 @@ def summarize(invocation_id: str):
129152
130153
131154def workflow_in_error_message (
132- ctx , invocation_id , last_exception , last_invocation , last_invocation_jobs , fail_fast = False ,
155+ ctx ,
156+ invocation_id ,
157+ last_exception ,
158+ last_invocation ,
159+ last_invocation_jobs ,
160+ fail_fast = False ,
133161) -> Optional [str ]:
134162 """Return an error message if workflow is in an error state."""
135163
0 commit comments