@@ -1760,8 +1760,7 @@ def run_workflow_with_valid_hashes(
17601760 invocation_id = self .workflow_populator .invoke_workflow_and_wait (workflow_id , request = workflow_request ).json ()[
17611761 "id"
17621762 ]
1763- invocation = self ._invocation_details (workflow_id , invocation_id )
1764- assert invocation ["state" ] in ("scheduled" , "completed" ), invocation
1763+ self .workflow_populator .wait_for_invocation_and_completion (invocation_id )
17651764 invocation_jobs = self .workflow_populator .get_invocation_jobs (invocation_id )
17661765 for job in invocation_jobs :
17671766 assert job ["state" ] == "ok"
@@ -1844,7 +1843,7 @@ def test_run_workflow_with_invalid_url(self):
18441843 def test_run_workflow_with_url_collection (self ):
18451844 with self .dataset_populator .test_history () as history_id :
18461845 invocation = self ._run_multi_data_workflow (history_id )
1847- assert invocation ["state" ] in ( "scheduled" , " completed") , invocation
1846+ assert invocation ["state" ] == " completed" , invocation
18481847 invocation_jobs = self .workflow_populator .get_invocation_jobs (invocation ["id" ])
18491848 assert len (invocation_jobs ) == 1
18501849 job = invocation_jobs [0 ]
@@ -1911,7 +1910,9 @@ def _run_multi_data_workflow(self, history_id, invalid_hash=False):
19111910 invocation_id = self .workflow_populator .invoke_workflow_and_wait (
19121911 workflow_id , request = workflow_request , assert_ok = not invalid_hash
19131912 ).json ()["id" ]
1914- return self ._invocation_details (workflow_id , invocation_id )
1913+ if invalid_hash :
1914+ return self ._invocation_details (workflow_id , invocation_id )
1915+ return self .workflow_populator .wait_for_invocation_and_completion (invocation_id )
19151916
19161917 @skip_without_tool ("collection_paired_default" )
19171918 def test_run_workflow_with_url_paired_collection (self ):
@@ -1974,8 +1975,7 @@ def test_run_workflow_with_url_paired_collection(self):
19741975 invocation_id = self .workflow_populator .invoke_workflow_and_wait (
19751976 workflow_id , request = workflow_request
19761977 ).json ()["id" ]
1977- invocation = self ._invocation_details (workflow_id , invocation_id )
1978- assert invocation ["state" ] in ("scheduled" , "completed" ), invocation
1978+ invocation = self .workflow_populator .wait_for_invocation_and_completion (invocation_id )
19791979 assert invocation ["inputs" ]["0" ]["src" ] == "hdca"
19801980 input_hdca = self .dataset_populator .get_history_collection_details (
19811981 history_id = history_id , content_id = invocation ["inputs" ]["0" ]["id" ]
@@ -2131,8 +2131,7 @@ def __run_cat_workflow(self, inputs_by, history_id: Optional[str] = None):
21312131 invocation_id = self .workflow_populator .invoke_workflow_and_wait (workflow_id , request = workflow_request ).json ()[
21322132 "id"
21332133 ]
2134- invocation = self .workflow_populator .wait_for_invocation_and_completion (invocation_id )
2135- assert invocation ["state" ] == "completed" , invocation
2134+ self .workflow_populator .wait_for_invocation_and_completion (invocation_id )
21362135
21372136 @skip_without_tool ("collection_creates_pair" )
21382137 def test_workflow_run_output_collections (self ) -> None :
@@ -3466,12 +3465,9 @@ def test_run_runtime_parameters_after_pause(self):
34663465 # Review the paused steps to allow the workflow to continue.
34673466 self .__review_paused_steps (uploaded_workflow_id , invocation_id , order_index = 1 , action = True )
34683467
3469- # Wait for the workflow to finish scheduling and ensure both the invocation
3468+ # Wait for the workflow to finish and ensure both the invocation
34703469 # and the history are in valid states.
3471- invocation_scheduled = self ._wait_for_invocation_state (
3472- uploaded_workflow_id , invocation_id , ("scheduled" , "completed" )
3473- )
3474- assert invocation_scheduled , "Workflow state is not scheduled or completed..."
3470+ self .workflow_populator .wait_for_invocation_and_completion (invocation_id )
34753471 self .dataset_populator .wait_for_history (history_id , assert_ok = True )
34763472
34773473 content = self .dataset_populator .get_history_dataset_content (history_id )
@@ -5029,12 +5025,9 @@ def test_workflow_pause(self):
50295025 # Review the paused steps to allow the workflow to continue.
50305026 self .__review_paused_steps (uploaded_workflow_id , invocation_id , order_index = 2 , action = True )
50315027
5032- # Wait for the workflow to finish scheduling and ensure both the invocation
5028+ # Wait for the workflow to finish and ensure both the invocation
50335029 # and the history are in valid states.
5034- invocation_scheduled = self ._wait_for_invocation_state (
5035- uploaded_workflow_id , invocation_id , ("scheduled" , "completed" )
5036- )
5037- assert invocation_scheduled , "Workflow state is not scheduled or completed..."
5030+ self .workflow_populator .wait_for_invocation_and_completion (invocation_id )
50385031 self .dataset_populator .wait_for_history (history_id , assert_ok = True )
50395032
50405033 @skip_without_tool ("cat" )
@@ -5092,9 +5085,7 @@ def test_workflow_map_reduce_pause(self):
50925085 self ._assert_invocation_non_terminal (uploaded_workflow_id , invocation_id )
50935086
50945087 self .__review_paused_steps (uploaded_workflow_id , invocation_id , order_index = 4 , action = True )
5095- self .workflow_populator .wait_for_invocation_and_jobs (history_id , uploaded_workflow_id , invocation_id )
5096- invocation = self ._invocation_details (uploaded_workflow_id , invocation_id )
5097- assert invocation ["state" ] in ("scheduled" , "completed" )
5088+ self .workflow_populator .wait_for_invocation_and_completion (invocation_id )
50985089 assert "reviewed\n 1\n reviewed\n 4\n " == self .dataset_populator .get_history_dataset_content (history_id )
50995090
51005091 @skip_without_tool ("cat" )
@@ -5171,10 +5162,10 @@ def test_cancel_workflow_invocation_deletes_jobs(self):
51715162 history_id = history_id ,
51725163 wait = False ,
51735164 )
5174- # wait_for_invocation just waits until scheduling complete, not jobs or subworkflow invocations
5175- self .workflow_populator .wait_for_invocation ( "null" , summary . invocation_id , assert_ok = True )
5176- invocation_before_cancellation = self . workflow_populator . get_invocation ( summary .invocation_id )
5177- assert invocation_before_cancellation [ "state" ] in ( "scheduled" , "completed" )
5165+ # Wait for workflow to complete before cancellation
5166+ invocation_before_cancellation = self .workflow_populator .wait_for_invocation_and_completion (
5167+ summary .invocation_id
5168+ )
51785169 subworkflow_invocation_id = invocation_before_cancellation ["steps" ][2 ]["subworkflow_invocation_id" ]
51795170 self .workflow_populator .cancel_invocation (summary .invocation_id )
51805171 self .workflow_populator .wait_for_invocation_and_jobs (
@@ -5245,8 +5236,7 @@ def test_workflow_warning_workflow_output_not_found(self, history_id):
52455236 assert_ok = False ,
52465237 wait = True ,
52475238 )
5248- invocation = self .workflow_populator .get_invocation (summary .invocation_id )
5249- assert invocation ["state" ] in ("scheduled" , "completed" )
5239+ invocation = self .workflow_populator .wait_for_invocation_and_completion (summary .invocation_id )
52505240 assert len (invocation ["messages" ]) == 1
52515241 message = invocation ["messages" ][0 ]
52525242 assert message ["reason" ] == "workflow_output_not_found"
@@ -5915,8 +5905,8 @@ def test_run_subworkflow_with_boolean_parameter_in_when_condition(self):
59155905 summary = self ._run_workflow (workflow , history_id = history_id , wait = True , assert_ok = True )
59165906
59175907 # Verify parent workflow executed successfully
5908+ self .workflow_populator .wait_for_invocation_and_completion (summary .invocation_id )
59185909 parent_invocation = self .workflow_populator .get_invocation (summary .invocation_id , step_details = True )
5919- assert parent_invocation ["state" ] in ("scheduled" , "completed" )
59205910
59215911 # Find the subworkflow step and get its invocation
59225912 subworkflow_step = None
0 commit comments