Skip to content

Commit 2014ebf

Browse files
committed
fix(engine): preserve for_each pause metadata and resume iterations
Ensure for_each pause metadata is serialized/deserialized and implement resume behavior for paused for_each iterations. Key changes: - Persist pause metadata: include ExecutionState.pause_metadata in execution serialization so for_each checkpoints survive save/load. - Resume flow: add BlockOrchestrator.resume_for_each(...) to resume a paused iteration, evaluate the user's response, run the appropriate executor for the resumed iteration, and stitch results back into the parent iteration state. WorkflowRunner now detects for_each_iteration checkpoints and delegates to the orchestrator. - Jinja/runtime helpers: add BlockProxy.__len__ to support len() and Jinja length filters against block/for_each data. - LLM schema handling: add LLMCallExecutor._prepare_schema_for_openai to prune/strictify JSON schemas for OpenAI response_format compatibility. - Tests and fixtures: add unit tests and integration tests (including YAML workflows and snapshots) covering pause/resume, serialization, and Jinja syntax cases. This resolves a bug where pause metadata could be lost across execution state serialization and enables correct resume semantics for sequential for_each pauses.
1 parent 1996cfc commit 2014ebf

11 files changed

Lines changed: 1264 additions & 93 deletions

src/workflows_mcp/engine/execution_result.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,7 @@ def _execution_state_to_dict(self) -> dict[str, Any] | None:
282282
"paused_block_id": self.execution_state.paused_block_id,
283283
"workflow_name": self.execution_state.workflow_name,
284284
"runtime_inputs": self.execution_state.runtime_inputs,
285+
"pause_metadata": self.execution_state.pause_metadata,
285286
}
286287

287288
def _build_debug_data(self) -> dict[str, Any]:

src/workflows_mcp/engine/executors_llm.py

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -681,6 +681,59 @@ async def _call_provider(
681681
# This case should be unreachable due to the Enum validation
682682
raise ValueError(f"Unsupported provider: {provider}")
683683

684+
# Whitelist of supported JSON schema keywords for OpenAI's format
685+
SUPPORTED_KEYWORDS: ClassVar[set[str]] = {
686+
"type",
687+
"properties",
688+
"items",
689+
"required",
690+
"enum",
691+
"description",
692+
"additionalProperties",
693+
}
694+
695+
def _prepare_schema_for_openai(self, schema: Any) -> Any:
696+
"""Recursively simplifies and strictifies a JSON schema for OpenAI.
697+
698+
This function processes a JSON schema to make it compliant with
699+
OpenAI's strict requirements for 'response_format'. It does this by:
700+
1. Stripping unsupported keywords to reduce complexity.
701+
2. Enforcing 'additionalProperties: false' on objects that are not
702+
explicitly defined as maps (i.e., don't have a schema in
703+
'additionalProperties' already).
704+
"""
705+
if not isinstance(schema, dict):
706+
return schema
707+
708+
# Prune unsupported keywords
709+
simplified_schema = {
710+
key: value for key, value in schema.items() if key in self.SUPPORTED_KEYWORDS
711+
}
712+
713+
# Enforce strictness for objects
714+
if simplified_schema.get("type") == "object":
715+
# If 'properties' are defined, recurse into them
716+
if "properties" in simplified_schema:
717+
simplified_schema["properties"] = {
718+
k: self._prepare_schema_for_openai(v)
719+
for k, v in simplified_schema["properties"].items()
720+
}
721+
722+
# If 'additionalProperties' is a schema, recurse into it
723+
if isinstance(simplified_schema.get("additionalProperties"), dict):
724+
simplified_schema["additionalProperties"] = self._prepare_schema_for_openai(
725+
simplified_schema["additionalProperties"]
726+
)
727+
# If it's an object that is not explicitly a map, forbid extra properties
728+
elif "additionalProperties" not in simplified_schema:
729+
simplified_schema["additionalProperties"] = False
730+
731+
# Recurse into array items
732+
elif simplified_schema.get("type") == "array" and "items" in simplified_schema:
733+
simplified_schema["items"] = self._prepare_schema_for_openai(simplified_schema["items"])
734+
735+
return simplified_schema
736+
684737
async def _call_openai(
685738
self,
686739
inputs: LLMCallInput,
@@ -750,16 +803,15 @@ async def _call_openai(
750803

751804
# Native schema validation (OpenAI Structured Outputs)
752805
if inputs.response_schema:
753-
schema = cast(dict[str, Any], inputs.response_schema).copy()
754-
if "additionalProperties" not in schema:
755-
schema["additionalProperties"] = False
806+
# Prepare the schema to be OpenAI-compliant
807+
prepared_schema = self._prepare_schema_for_openai(inputs.response_schema)
756808

757809
completion_kwargs["response_format"] = {
758810
"type": "json_schema",
759811
"json_schema": {
760812
"name": "response_schema",
761813
"strict": True,
762-
"schema": schema,
814+
"schema": prepared_schema,
763815
},
764816
}
765817

0 commit comments

Comments
 (0)