Skip to content

Commit cdc973b

Browse files
committed
feat(workflows): improve nested workflow pause/resume handling
This update addresses a critical issue in the handling of nested workflows, particularly when using the `for_each` sequential mode. The changes ensure that child workflows correctly maintain their execution context, preventing misrouting of responses intended for child workflows back to the parent. This fix is crucial for workflows that include pause points, such as prompts for user approvals. Additionally, new tests have been introduced to verify the correct behavior of nested workflows in these scenarios.
1 parent 300b0be commit cdc973b

6 files changed

Lines changed: 433 additions & 22 deletions

File tree

src/workflows_mcp/engine/executors_workflow.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -326,24 +326,36 @@ async def resume(
326326
"ExecutionContext not found - workflow composition not supported in this context"
327327
)
328328

329-
# 4. Inject ExecutionContext into deserialized child state
329+
# 4. Get child workflow name (from checkpoint or inputs)
330+
workflow_name = pause_metadata.get("child_workflow") or inputs.workflow
331+
332+
# 5. Create child context (SAME AS execute() method)
333+
# This is critical for proper variable resolution and context isolation.
334+
# Without this, the child workflow would use parent's execution context,
335+
# causing variable resolution to fail or return incorrect values.
336+
child_context = exec_context.create_child_context(
337+
parent_execution=context,
338+
workflow_name=workflow_name,
339+
)
340+
341+
# 6. Inject child ExecutionContext into deserialized child state
330342
# Required because _execution_context is a PrivateAttr that isn't serialized.
331343
# After deserialization, child_execution_state.context._execution_context is None.
332344
# This must be set before resume_from_state() to ensure proper context propagation.
333-
child_execution_state.context.set_execution_context(exec_context)
345+
child_execution_state.context.set_execution_context(child_context)
334346

335-
# 5. Create WorkflowRunner and resume child workflow
347+
# 7. Create WorkflowRunner and resume child workflow
336348
from .workflow_runner import WorkflowRunner
337349

338350
# No checkpointing for nested workflows - parent handles all checkpointing
339351
runner = WorkflowRunner()
340352

341353
try:
342-
# Resume child workflow from ExecutionState
354+
# Resume child workflow from ExecutionState with CHILD context
343355
child_execution_result = await runner.resume_from_state(
344356
execution_state=child_execution_state,
345357
response=response,
346-
context=exec_context,
358+
context=child_context,
347359
)
348360
except Exception as e:
349361
raise ValueError(f"Failed to resume child workflow: {e}") from e

src/workflows_mcp/engine/orchestrator.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -424,9 +424,16 @@ async def resume_for_each(
424424
metadata = Metadata(**serialized["metadata"])
425425

426426
# Reconstruct output from serialized data
427-
output = None
427+
# Use Any type because output can be Execution (Workflow) or BlockOutput
428+
output: Any = None
428429
if serialized["output"] is not None:
429-
output = executor.output_type(**serialized["output"])
430+
# Special case: WorkflowExecutor returns Execution, not BlockOutput
431+
# Check by executor type name (Workflow blocks have output_type=type(None))
432+
if executor_type == "Workflow":
433+
# Workflow block - output is serialized Execution object
434+
output = Execution.model_validate(serialized["output"])
435+
else:
436+
output = executor.output_type(**serialized["output"])
430437

431438
iteration_results[key] = BlockExecution(
432439
inputs=serialized["inputs"],

src/workflows_mcp/templates/agents/workflow-creator/layer2/decompose.yaml

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -52,26 +52,13 @@ blocks:
5252
fi
5353
echo "Depth {{inputs.current_depth}}/{{inputs.max_depth}} - OK to proceed"
5454
55-
- id: generate_sub_workflows
56-
description: "Generate each sub-workflow by calling workflow-creator-generate-workflow"
57-
type: Workflow
58-
for_each: "{{inputs.sub_workflows}}"
59-
for_each_mode: sequential
60-
depends_on: [check_depth]
61-
condition: "{{inputs.current_depth < inputs.max_depth}}"
62-
inputs:
63-
workflow: workflow-creator-generate-workflow
64-
inputs:
65-
user_requests: "{{inputs.workspace}}/{{each.value.name}}-requirements.json"
66-
executor_docs_path: "{{inputs.executor_docs_path}}"
67-
examples: "{{inputs.examples}}"
68-
workspace: "{{inputs.workspace}}/{{each.value.name}}"
69-
7055
- id: prepare_sub_requirements
7156
description: "Create requirements files for each sub-workflow"
7257
type: CreateFile
7358
for_each: "{{inputs.sub_workflows}}"
7459
for_each_mode: parallel
60+
depends_on: [check_depth]
61+
condition: "{{inputs.current_depth < inputs.max_depth}}"
7562
inputs:
7663
create_parents: true
7764
overwrite: true
@@ -84,9 +71,33 @@ blocks:
8471
"outputs": {{each.value.outputs | default([]) | tojson}},
8572
"steps": ["Generated as sub-workflow of parent requirements"],
8673
"is_sub_workflow": true,
74+
"is_recursive": {{each.value.is_recursive | default(false) | tojson}},
8775
"parent_context": "{{inputs.parent_requirements.name | default('parent')}}"
8876
}
8977
78+
- id: generate_sub_workflows
79+
description: "Recursively call workflow-creator for each sub-workflow (includes design, validation, and save)"
80+
type: Workflow
81+
for_each: "{{inputs.sub_workflows}}"
82+
for_each_mode: sequential
83+
depends_on: [prepare_sub_requirements]
84+
condition: "{{inputs.current_depth < inputs.max_depth}}"
85+
inputs:
86+
workflow: workflow-creator
87+
inputs:
88+
# Pass the sub-workflow requirements as prefilled (skips interactive prompts)
89+
prefilled_requirements:
90+
name: "{{each.value.name}}"
91+
description: "{{each.value.purpose}}"
92+
inputs: "{{each.value.inputs | default([])}}"
93+
outputs: "{{each.value.outputs | default([])}}"
94+
steps: ["Generated as sub-workflow"]
95+
tags: ["sub-workflow", "{{inputs.parent_requirements.name | default('parent')}}"]
96+
workspace: "{{inputs.workspace}}/{{each.value.name}}"
97+
recursion_depth: "{{inputs.current_depth + 1}}"
98+
max_recursion_depth: "{{inputs.max_depth}}"
99+
max_validation_attempts: 5
100+
90101
- id: collect_results
91102
description: "Summarize generated sub-workflows"
92103
type: Shell
@@ -101,6 +112,8 @@ blocks:
101112
echo " Depth: {{inputs.current_depth}}/{{inputs.max_depth}}"
102113
echo " Sub-workflows requested: {{inputs.sub_workflows | length}}"
103114
echo " Workspace: {{inputs.workspace}}"
115+
echo ""
116+
echo "Generated sub-workflows saved to ~/.workflows/"
104117
105118
outputs:
106119
sub_workflow_count:
Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
#!/usr/bin/env python3
2+
"""Test nested Workflow blocks in for_each with pause/resume.
3+
4+
This test reproduces a bug where:
5+
1. Parent workflow has for_each: sequential with Workflow blocks
6+
2. Child workflow pauses (Prompt block)
7+
3. User provides feedback/response
8+
4. Response is incorrectly routed to parent instead of child
9+
10+
The bug causes:
11+
- Child workflow never completes its approval flow
12+
- Remaining for_each iterations are skipped
13+
- Sub-workflows are never saved
14+
15+
Test Scenario:
16+
1. Parent workflow with for_each: sequential calling child workflows
17+
2. Child workflows have Prompt blocks that pause for approval
18+
3. Resume first child with "yes"
19+
4. Verify first child completes with approval
20+
5. Second child should pause for its approval
21+
6. Resume second child with "yes"
22+
7. Verify both children completed successfully
23+
"""
24+
25+
import json
26+
from typing import Any
27+
28+
import pytest
29+
from mcp.types import TextContent
30+
from test_mcp_client import get_mcp_client
31+
32+
33+
class TestNestedWorkflowPauseResume:
34+
"""Test pause/resume for nested Workflow blocks in for_each loops."""
35+
36+
@pytest.mark.asyncio
37+
async def test_nested_workflow_in_foreach_sequential_pause_resume(self) -> None:
38+
"""Test that nested workflow pause/resume works correctly in for_each sequential.
39+
40+
This is the core bug reproduction test:
41+
1. Start parent workflow with 2 items
42+
2. First child pauses for approval
43+
3. Resume with "yes" -> first child should complete
44+
4. Second child should pause for approval (not parent!)
45+
5. Resume with "yes" -> second child should complete
46+
6. Parent workflow should complete with all items processed
47+
"""
48+
async with get_mcp_client() as client:
49+
# Step 1: Start parent workflow
50+
exec_result = await client.call_tool(
51+
"execute_workflow",
52+
arguments={
53+
"workflow": "nested-workflow-in-foreach-parent",
54+
"inputs": {
55+
"work_items": [
56+
{"name": "item1", "value": "value1"},
57+
{"name": "item2", "value": "value2"},
58+
]
59+
},
60+
"debug": True,
61+
},
62+
)
63+
64+
exec_content = exec_result.content[0]
65+
assert isinstance(exec_content, TextContent)
66+
exec_response: dict[str, Any] = json.loads(exec_content.text)
67+
68+
# Verify workflow paused at first child
69+
assert exec_response["status"] == "paused", (
70+
f"Expected workflow to pause, got status: {exec_response.get('status')}"
71+
)
72+
assert "job_id" in exec_response, "Expected job_id in paused response"
73+
job_id = exec_response["job_id"]
74+
75+
# The prompt should be from the CHILD workflow (item1)
76+
prompt = exec_response.get("prompt", "")
77+
assert "item1" in prompt, (
78+
f"Expected prompt to mention 'item1' (first child), got: {prompt}"
79+
)
80+
81+
# Step 2: Resume first child with "yes"
82+
resume1_result = await client.call_tool(
83+
"resume_workflow",
84+
arguments={
85+
"job_id": job_id,
86+
"response": "yes",
87+
"debug": True,
88+
},
89+
)
90+
91+
resume1_content = resume1_result.content[0]
92+
assert isinstance(resume1_content, TextContent)
93+
resume1_response: dict[str, Any] = json.loads(resume1_content.text)
94+
95+
# Step 3: Should pause again for SECOND child (item2)
96+
# THIS IS WHERE THE BUG MANIFESTS:
97+
# - Bug behavior: status == "success" (parent completes, skipping item2)
98+
# - Correct behavior: status == "paused" with prompt for item2
99+
assert resume1_response["status"] == "paused", (
100+
f"Expected workflow to pause for item2, got: {resume1_response.get('status')}. "
101+
f"Resume response was incorrectly routed to parent workflow. "
102+
f"Full response: {resume1_response}"
103+
)
104+
105+
# Verify prompt is for second child (item2)
106+
prompt2 = resume1_response.get("prompt", "")
107+
assert "item2" in prompt2, (
108+
f"Expected prompt to mention 'item2' (second child), got: {prompt2}. "
109+
f"This indicates for_each iteration was not correctly resumed."
110+
)
111+
112+
job_id_2 = resume1_response.get("job_id", job_id)
113+
114+
# Step 4: Resume second child with "yes"
115+
resume2_result = await client.call_tool(
116+
"resume_workflow",
117+
arguments={
118+
"job_id": job_id_2,
119+
"response": "yes",
120+
"debug": True,
121+
},
122+
)
123+
124+
resume2_content = resume2_result.content[0]
125+
assert isinstance(resume2_content, TextContent)
126+
resume2_response: dict[str, Any] = json.loads(resume2_content.text)
127+
128+
# Step 5: Now workflow should complete successfully
129+
assert resume2_response["status"] == "success", (
130+
f"Expected workflow to complete after both children approved, "
131+
f"got status: {resume2_response.get('status')}. "
132+
f"Error: {resume2_response.get('error')}"
133+
)
134+
135+
# Verify outputs
136+
outputs = resume2_response.get("outputs", {})
137+
assert outputs.get("setup_completed") is True
138+
assert outputs.get("all_items_processed") is True
139+
assert outputs.get("finalize_completed") is True
140+
141+
@pytest.mark.asyncio
142+
async def test_nested_workflow_single_item_pause_resume(self) -> None:
143+
"""Test simpler case: single item for_each with nested workflow pause.
144+
145+
This isolates the core pause/resume logic without multiple iterations.
146+
"""
147+
async with get_mcp_client() as client:
148+
# Start with single item
149+
exec_result = await client.call_tool(
150+
"execute_workflow",
151+
arguments={
152+
"workflow": "nested-workflow-in-foreach-parent",
153+
"inputs": {
154+
"work_items": [
155+
{"name": "single_item", "value": "single_value"},
156+
]
157+
},
158+
"debug": True,
159+
},
160+
)
161+
162+
exec_content = exec_result.content[0]
163+
assert isinstance(exec_content, TextContent)
164+
exec_response: dict[str, Any] = json.loads(exec_content.text)
165+
166+
# Should pause for the single child
167+
assert exec_response["status"] == "paused"
168+
job_id = exec_response["job_id"]
169+
170+
prompt = exec_response.get("prompt", "")
171+
assert "single_item" in prompt
172+
173+
# Resume with approval
174+
resume_result = await client.call_tool(
175+
"resume_workflow",
176+
arguments={
177+
"job_id": job_id,
178+
"response": "yes",
179+
"debug": True,
180+
},
181+
)
182+
183+
resume_content = resume_result.content[0]
184+
assert isinstance(resume_content, TextContent)
185+
resume_response: dict[str, Any] = json.loads(resume_content.text)
186+
187+
# Should complete (only one item)
188+
assert resume_response["status"] == "success", (
189+
f"Expected success after single item approval, got: {resume_response}"
190+
)
191+
192+
outputs = resume_response.get("outputs", {})
193+
assert outputs.get("all_items_processed") is True
194+
195+
@pytest.mark.asyncio
196+
async def test_nested_workflow_feedback_iteration(self) -> None:
197+
"""Test that feedback (non-approval response) triggers re-design iteration.
198+
199+
This simulates the workflow-creator flow where:
200+
1. First response is feedback ("add more details")
201+
2. Second response is approval ("approve")
202+
"""
203+
async with get_mcp_client() as client:
204+
# Start workflow
205+
exec_result = await client.call_tool(
206+
"execute_workflow",
207+
arguments={
208+
"workflow": "nested-workflow-in-foreach-parent",
209+
"inputs": {
210+
"work_items": [
211+
{"name": "test_item", "value": "test_value"},
212+
]
213+
},
214+
"debug": True,
215+
},
216+
)
217+
218+
exec_content = exec_result.content[0]
219+
assert isinstance(exec_content, TextContent)
220+
exec_response: dict[str, Any] = json.loads(exec_content.text)
221+
222+
assert exec_response["status"] == "paused"
223+
job_id = exec_response["job_id"]
224+
225+
# Send denial (should complete with denied branch)
226+
resume_result = await client.call_tool(
227+
"resume_workflow",
228+
arguments={
229+
"job_id": job_id,
230+
"response": "no",
231+
"debug": True,
232+
},
233+
)
234+
235+
resume_content = resume_result.content[0]
236+
assert isinstance(resume_content, TextContent)
237+
resume_response: dict[str, Any] = json.loads(resume_content.text)
238+
239+
# Should complete (denial is a valid response that completes the workflow)
240+
assert resume_response["status"] == "success", (
241+
f"Expected success after denial, got: {resume_response}"
242+
)
243+
244+
245+
if __name__ == "__main__":
246+
pytest.main([__file__, "-v"])

0 commit comments

Comments
 (0)