Skip to content

Commit be0d4ca

Browse files
committed
feat(workflows_mcp): add three-level nested workflow handling with pause/resume support
Introduced a new feature allowing the orchestration of three-level nested workflows, including pause and resume functionality. This change addresses a bug where the parent workflow's remaining blocks would not execute after a child workflow resumes from a pause state. Additional changes include: - Updated orchestrator logic to handle nested workflows with proper checkpointing and iteration result serialization. - Enhanced YAML templates for workflow composition, allowing sub-workflows to be saved in hierarchical directories. - Added new test cases to ensure the correct execution of three-level nested workflows, particularly focusing on scenarios involving pauses and resumes.
1 parent e1d4e1a commit be0d4ca

9 files changed

Lines changed: 592 additions & 2 deletions

File tree

src/workflows_mcp/engine/orchestrator.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -501,6 +501,55 @@ async def resume_for_each(
501501
)
502502
iteration_results[current_key] = resumed_result
503503

504+
# Check if resumed iteration is STILL paused (nested workflow paused again)
505+
# This handles the case where a nested workflow (e.g., parent with for_each)
506+
# resumes one child but then pauses on the next child in its own for_each.
507+
if resumed_result.paused:
508+
# Validate pause_prompt is present
509+
if not resumed_result.pause_prompt:
510+
raise RuntimeError(f"Block {current_key} marked as paused but pause_prompt is None")
511+
512+
# Create checkpoint for the still-paused iteration
513+
# No iteration completed beyond what was already completed
514+
all_completed_keys = list(completed_keys) # Only previously completed
515+
516+
# Serialize completed iteration results for checkpoint (ADR-010)
517+
completed_iteration_results = {}
518+
for comp_key in all_completed_keys:
519+
if comp_key in iteration_results:
520+
exec_result = iteration_results[comp_key]
521+
completed_iteration_results[comp_key] = {
522+
"inputs": exec_result.inputs if hasattr(exec_result, "inputs") else {},
523+
"output": (exec_result.output.model_dump() if exec_result.output else None),
524+
"metadata": exec_result.metadata.model_dump(),
525+
"paused": exec_result.paused,
526+
}
527+
528+
for_each_checkpoint = {
529+
"type": "for_each_iteration",
530+
"for_each_block_id": block_id,
531+
"current_iteration_key": current_key,
532+
"current_iteration_index": current_idx,
533+
"completed_iterations": all_completed_keys,
534+
"completed_iteration_results": completed_iteration_results,
535+
"remaining_iteration_keys": remaining_keys, # Still need to process these
536+
"all_iterations": all_iterations,
537+
"executor_type": executor_type,
538+
"inputs_template": inputs_template,
539+
"mode": mode,
540+
"max_parallel": max_parallel,
541+
"continue_on_error": continue_on_error,
542+
"iteration_count": iteration_count,
543+
"wave": wave,
544+
"depth": depth,
545+
"paused_iteration_checkpoint": resumed_result.pause_checkpoint_data,
546+
}
547+
raise ExecutionPaused(
548+
prompt=resumed_result.pause_prompt,
549+
checkpoint_data=for_each_checkpoint,
550+
execution=context,
551+
)
552+
504553
# Execute remaining iterations (sequential execution)
505554
for idx_offset, key in enumerate(remaining_keys):
506555
idx = current_idx + 1 + idx_offset

src/workflows_mcp/templates/agents/workflow-creator/layer1/summary.yaml

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,20 @@ inputs:
2424
description: "(Optional) Number of validation attempts made"
2525
default: 1
2626

27+
output_dir:
28+
type: str
29+
description: "Output directory for workflow files"
30+
default: "~/.workflows"
31+
required: false
32+
2733
blocks:
2834
- id: save_final_workflow
2935
type: CreateFile
30-
description: "Save final workflow to desired location"
36+
description: "Save final workflow to hierarchical location"
3137
inputs:
3238
create_parents: true
3339
overwrite: true
34-
path: "~/.workflows/{{inputs.workflow_name}}.yaml"
40+
path: "{{inputs.output_dir}}/{{inputs.workflow_name}}/main.yaml"
3541
content: "{{ read_file(inputs.workflow_path) }}"
3642

3743
- id: generate_summary

src/workflows_mcp/templates/agents/workflow-creator/layer2/decompose.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,12 @@ inputs:
3939
description: "Current recursion depth"
4040
default: 0
4141

42+
output_dir:
43+
type: str
44+
description: "Output directory for sub-workflow files"
45+
default: "~/.workflows"
46+
required: false
47+
4248
blocks:
4349
- id: check_depth
4450
description: "Guard against excessive nesting"
@@ -121,6 +127,7 @@ blocks:
121127
recursion_depth: "{{inputs.current_depth + 1}}"
122128
max_recursion_depth: "{{inputs.max_depth}}"
123129
max_validation_attempts: 5
130+
output_dir: "{{inputs.output_dir}}"
124131

125132
- id: collect_results
126133
description: "Summarize generated sub-workflows"

src/workflows_mcp/templates/agents/workflow-creator/layer2/workflow.yaml

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,18 @@ inputs:
4343
default: 1
4444
required: false
4545

46+
sub_workflows:
47+
type: list
48+
description: "(Optional) List of sub-workflows to compose (from analyze_complexity)"
49+
default: []
50+
required: false
51+
52+
has_sub_workflows:
53+
type: bool
54+
description: "(Optional) Whether sub-workflows were generated"
55+
default: false
56+
required: false
57+
4658
blocks:
4759
- id: system_prompt
4860
description: "Log the current iteration"
@@ -90,6 +102,23 @@ blocks:
90102
```
91103
Without profile or provider, the block WILL FAIL at runtime.
92104
105+
# Workflow Composition (using sub-workflows)
106+
When sub-workflows are available, you MUST compose them using Workflow blocks:
107+
{% raw %}
108+
```yaml
109+
- id: run_sub_workflow
110+
type: Workflow
111+
inputs:
112+
workflow: "sub-workflow-name" # Exact name from available sub-workflows
113+
inputs:
114+
param1: "{{inputs.some_param}}"
115+
param2: "{{blocks.previous_block.outputs.result}}"
116+
```
117+
{% endraw %}
118+
- Use Workflow blocks to call pre-generated sub-workflows
119+
- Pass appropriate inputs from parent workflow inputs or block outputs
120+
- Use sub-workflow outputs via {% raw %}{{blocks.run_sub_workflow.outputs.field}}{% endraw %}
121+
93122
# Output Format
94123
JSON: {workflow: {name, description, inputs: [...], blocks: [...], outputs: [...]}, explanation, clarifications}
95124
Note: In this representation, inputs, blocks, and outputs are arrays (lists), not objects.
@@ -117,6 +146,40 @@ blocks:
117146
{{read_file(inputs.user_requests) | fromjson | prettyjson}}
118147
```
119148
149+
{%- if inputs.has_sub_workflows and inputs.sub_workflows | length > 0 %}
150+
## IMPORTANT: Available Sub-Workflows for Composition
151+
The following sub-workflows have been generated. You MUST compose them using Workflow blocks:
152+
153+
{% for sw in inputs.sub_workflows %}
154+
### `{{ sw.name }}`
155+
**Purpose**: {{ sw.purpose }}
156+
{% if sw.inputs and sw.inputs | length > 0 %}
157+
**Inputs**:
158+
{% for inp in sw.inputs %}
159+
- `{{ inp.name }}` ({{ inp.type }}): {{ inp.description }}
160+
{% endfor %}
161+
{% endif %}
162+
{% if sw.outputs and sw.outputs | length > 0 %}
163+
**Outputs**:
164+
{% for out in sw.outputs %}
165+
- `{{ out.name }}` ({{ out.type }}): {{ out.description }}
166+
{% endfor %}
167+
{% endif %}
168+
169+
{% endfor %}
170+
**Usage Example**:
171+
{% raw %}
172+
```yaml
173+
- id: call_sub_workflow
174+
type: Workflow
175+
inputs:
176+
workflow: "sub-workflow-name"
177+
inputs:
178+
param: "{{inputs.some_param}}"
179+
```
180+
{% endraw %}
181+
{% endif %}
182+
120183
{%- if inputs.index == 1 %}
121184
# Design New Workflow
122185
Create a workflow based on the user's requirements:
@@ -126,6 +189,9 @@ blocks:
126189
1. **Inputs**: All workflow input parameters
127190
2. **Blocks**: Compute/Processing steps (following DAG rules - no cycles)
128191
3. **Outputs**: Workflow outputs
192+
{% if inputs.has_sub_workflows %}
193+
4. **CRITICAL**: Use Workflow blocks to compose the available sub-workflows listed above
194+
{% endif %}
129195
130196
Generate the initial workflow design now.
131197
{% else %}
@@ -652,6 +718,8 @@ blocks:
652718
feedback: "{{blocks.request_approval.outputs.response}}"
653719
workspace: "{{inputs.workspace}}"
654720
index: "{{inputs.index + 1}}"
721+
sub_workflows: "{{inputs.sub_workflows}}"
722+
has_sub_workflows: "{{inputs.has_sub_workflows}}"
655723

656724
outputs:
657725
diagram_path:

src/workflows_mcp/templates/agents/workflow-creator/workflow-creator.yaml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,12 @@ inputs:
4040
description: "Maximum depth for recursive sub-workflow generation"
4141
default: 2
4242

43+
output_dir:
44+
type: str
45+
description: "Output directory for workflow files. Workflows save to <output_dir>/<name>/main.yaml"
46+
default: "~/.workflows"
47+
required: false
48+
4349
blocks:
4450
- id: bootstrap_context
4551
type: Workflow
@@ -89,6 +95,7 @@ blocks:
8995
workspace: "{{inputs.workspace}}/sub-workflows"
9096
current_depth: "{{inputs.recursion_depth}}"
9197
max_depth: "{{inputs.max_recursion_depth}}"
98+
output_dir: "{{inputs.output_dir}}/{{blocks.generate_requirements.outputs.response.name}}"
9299

93100
- id: generate_workflow
94101
description: "Generate main workflow (after complexity analysis and optional sub-workflow generation)"
@@ -105,6 +112,9 @@ blocks:
105112
executor_docs_path: "{{blocks.bootstrap_context.outputs.executor_docs_path}}"
106113
examples: "{{blocks.bootstrap_context.outputs.examples}}"
107114
workspace: "{{inputs.workspace}}/generate-workflow"
115+
# Pass sub-workflow info for composition
116+
sub_workflows: "{{blocks.analyze_complexity.outputs.sub_workflows | default([])}}"
117+
has_sub_workflows: "{{blocks.generate_sub_workflows.succeeded | default(false)}}"
108118

109119
- id: transform_design
110120
description: "Transform design JSON to valid workflow YAML format"
@@ -142,6 +152,7 @@ blocks:
142152
workspace: "{{inputs.workspace}}"
143153
workflows_mcp_path: "{{blocks.bootstrap_context.outputs.workspace}}"
144154
iterations: "{{blocks.recursive_validation.outputs.iterations}}"
155+
output_dir: "{{inputs.output_dir}}"
145156

146157
outputs:
147158
workflow_name:

0 commit comments

Comments
 (0)