Skip to content

Commit 1fe7bc7

Browse files
committed
feat(engine): emit on_block_transition events for for_each blocks
for_each blocks were invisible in the run timeline because _execute_for_each_block() did not emit any observer events. Changes: - workflow_runner.py: Wrap _execute_for_each_block() with block_started/completed/failed events matching _execute_block() pattern - orchestrator.py: Add on_iteration_transition callback to execute_for_each() for per-iteration observer events - New test: test_for_each_observer.py with 6 tests covering parent events, iteration events, failed iterations, metadata, and ordering
1 parent 0082e3e commit 1fe7bc7

3 files changed

Lines changed: 433 additions & 62 deletions

File tree

src/workflows_mcp/engine/orchestrator.py

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import asyncio
1818
import traceback
19+
from collections.abc import Awaitable, Callable
1920
from datetime import UTC, datetime
2021
from typing import TYPE_CHECKING, Any, Literal
2122

@@ -723,6 +724,7 @@ async def execute_for_each(
723724
wave: int = 0,
724725
depth: int = 0,
725726
condition: str | None = None,
727+
on_iteration_transition: (Callable[[dict[str, Any]], Awaitable[None]] | None) = None,
726728
) -> tuple[dict[str, BlockExecution], Metadata]:
727729
"""
728730
Execute a for_each block with multiple iterations (ADR-009).
@@ -740,6 +742,9 @@ async def execute_for_each(
740742
depth: Nesting depth (0 for root blocks, 1+ for nested iterations)
741743
condition: Optional condition expression evaluated per-iteration
742744
with access to {{each.*}} variables
745+
on_iteration_transition: Optional callback for per-iteration
746+
observer events (block_started/completed/failed/skipped).
747+
Uses same event shape as WorkflowRunner.on_block_transition.
743748
744749
Returns:
745750
Tuple of (iteration_results_dict, parent_metadata)
@@ -806,11 +811,34 @@ async def execute_iteration(
806811
depth=depth + 1,
807812
message=f"Condition '{condition}' evaluated to False",
808813
)
809-
return iteration_key, BlockExecution(
814+
iter_result = BlockExecution(
810815
output=None,
811816
metadata=skipped_metadata,
812817
paused=False,
813818
)
819+
if on_iteration_transition:
820+
await on_iteration_transition(
821+
{
822+
"event": "block_skipped",
823+
"block_id": iteration_key,
824+
"block_type": executor.type_name,
825+
"depth": depth + 1,
826+
"reason": f"Condition '{condition}' evaluated to False",
827+
"metadata": skipped_metadata.model_dump(),
828+
}
829+
)
830+
return iteration_key, iter_result
831+
832+
# Notify observer: iteration starting
833+
if on_iteration_transition:
834+
await on_iteration_transition(
835+
{
836+
"event": "block_started",
837+
"block_id": iteration_key,
838+
"block_type": executor.type_name,
839+
"depth": depth + 1,
840+
}
841+
)
814842

815843
# Resolve iteration inputs (replace {{each.*}} variables)
816844
resolved_inputs = await resolver.resolve_async(inputs_template)
@@ -829,6 +857,30 @@ async def execute_iteration(
829857
depth=depth + 1,
830858
)
831859

860+
# Notify observer: iteration completed/failed
861+
if on_iteration_transition:
862+
if result.metadata.failed:
863+
await on_iteration_transition(
864+
{
865+
"event": "block_failed",
866+
"block_id": iteration_key,
867+
"block_type": executor.type_name,
868+
"depth": depth + 1,
869+
"error": result.metadata.message or "",
870+
"metadata": result.metadata.model_dump(),
871+
}
872+
)
873+
else:
874+
await on_iteration_transition(
875+
{
876+
"event": "block_completed",
877+
"block_id": iteration_key,
878+
"block_type": executor.type_name,
879+
"depth": depth + 1,
880+
"metadata": result.metadata.model_dump(),
881+
}
882+
)
883+
832884
return iteration_key, result
833885

834886
# Parallel mode: execute with concurrency control

src/workflows_mcp/engine/workflow_runner.py

Lines changed: 110 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -880,6 +880,10 @@ async def _execute_for_each_block(
880880
"""
881881
Execute a for_each block with multiple iterations (ADR-009).
882882
883+
Emits on_block_transition events for the parent block lifecycle
884+
(block_started/completed/failed) and threads the callback to the
885+
orchestrator for per-iteration visibility.
886+
883887
Args:
884888
block_id: Block ID
885889
block_def: Block definition with for_each field
@@ -891,76 +895,121 @@ async def _execute_for_each_block(
891895
ExecutionPaused: If any iteration pauses (not yet supported for for_each)
892896
Exception: On execution errors
893897
"""
894-
# 1. Resolve for_each expression to get iterations
895-
context_dict = self._execution_to_dict(exec_context)
898+
# Notify observer: for_each block starting
899+
if self.on_block_transition:
900+
event: dict[str, Any] = {
901+
"event": "block_started",
902+
"block_id": block_id,
903+
"block_type": block_def.type,
904+
"depth": exec_context.depth,
905+
}
906+
if block_def.description:
907+
event["description"] = block_def.description
908+
await self.on_block_transition(event)
896909

897-
# Get workflow name safely from internal metadata
898-
workflow_metadata = exec_context.workflow_metadata
899-
workflow_name = workflow_metadata.get("workflow_name", "")
910+
try:
911+
# 1. Resolve for_each expression to get iterations
912+
context_dict = self._execution_to_dict(exec_context)
900913

901-
resolver = UnifiedVariableResolver(
902-
context_dict,
903-
secret_provider=self.secret_provider,
904-
audit_log=self.secret_audit_log,
905-
workflow_name=workflow_name,
906-
block_id=block_id,
907-
)
908-
for_each_value = await resolver.resolve_async(block_def.for_each)
909-
910-
# 2. Convert to dict format (ADR-009: iterations are always dicts)
911-
if isinstance(for_each_value, list):
912-
# Convert list to dict with numeric string keys: ["a", "b"] → {"0": "a", "1": "b"}
913-
iterations = {str(i): value for i, value in enumerate(for_each_value)}
914-
elif isinstance(for_each_value, dict):
915-
# Already a dict, use as-is
916-
iterations = for_each_value
917-
else:
918-
raise ValueError(
919-
f"for_each expression must evaluate to dict or list, "
920-
f"got {type(for_each_value).__name__}: {block_def.for_each}"
921-
)
914+
# Get workflow name safely from internal metadata
915+
workflow_metadata = exec_context.workflow_metadata
916+
workflow_name = workflow_metadata.get("workflow_name", "")
922917

923-
# 3. Handle empty collection - mark block as skipped
924-
if not iterations:
925-
# Empty for_each is valid - mark block as skipped (like conditional execution)
926-
await self._mark_block_skipped(
918+
resolver = UnifiedVariableResolver(
919+
context_dict,
920+
secret_provider=self.secret_provider,
921+
audit_log=self.secret_audit_log,
922+
workflow_name=workflow_name,
927923
block_id=block_id,
928-
block_def=block_def,
929-
exec_context=exec_context,
930-
wave_idx=wave_idx,
931-
execution_order=0,
932-
reason=f"for_each expression resulted in empty collection: {block_def.for_each}",
933924
)
934-
return
925+
for_each_value = await resolver.resolve_async(block_def.for_each)
926+
927+
# 2. Convert to dict format (ADR-009: iterations are always dicts)
928+
if isinstance(for_each_value, list):
929+
# Convert list to dict with numeric string keys: ["a", "b"] → {"0": "a", "1": "b"}
930+
iterations = {str(i): value for i, value in enumerate(for_each_value)}
931+
elif isinstance(for_each_value, dict):
932+
# Already a dict, use as-is
933+
iterations = for_each_value
934+
else:
935+
raise ValueError(
936+
f"for_each expression must evaluate to dict or list, "
937+
f"got {type(for_each_value).__name__}: {block_def.for_each}"
938+
)
935939

936-
# 4. Execute via orchestrator.execute_for_each()
937-
# Cast mode to Literal type for type safety
938-
mode = cast(Literal["parallel", "sequential"], block_def.for_each_mode)
940+
# 3. Handle empty collection - mark block as skipped
941+
if not iterations:
942+
# Empty for_each is valid - mark block as skipped (like conditional execution)
943+
await self._mark_block_skipped(
944+
block_id=block_id,
945+
block_def=block_def,
946+
exec_context=exec_context,
947+
wave_idx=wave_idx,
948+
execution_order=0,
949+
reason=(
950+
f"for_each expression resulted in empty collection: {block_def.for_each}"
951+
),
952+
)
953+
return
939954

940-
iteration_results, parent_meta = await self.orchestrator.execute_for_each(
941-
id=block_id,
942-
executor=executor,
943-
inputs_template=block_def.inputs,
944-
iterations=iterations,
945-
context=exec_context,
946-
mode=mode,
947-
max_parallel=block_def.max_parallel,
948-
continue_on_error=block_def.continue_on_error,
949-
wave=wave_idx,
950-
depth=exec_context.depth,
951-
condition=block_def.condition, # Pass condition for per-iteration evaluation
952-
)
955+
# 4. Execute via orchestrator.execute_for_each()
956+
# Cast mode to Literal type for type safety
957+
mode = cast(Literal["parallel", "sequential"], block_def.for_each_mode)
953958

954-
# 5. Store results in execution context using fractal structure
955-
exec_context.set_for_each_result(
956-
block_id=block_id,
957-
parent_meta=parent_meta,
958-
iteration_results=iteration_results,
959-
)
959+
iteration_results, parent_meta = await self.orchestrator.execute_for_each(
960+
id=block_id,
961+
executor=executor,
962+
inputs_template=block_def.inputs,
963+
iterations=iterations,
964+
context=exec_context,
965+
mode=mode,
966+
max_parallel=block_def.max_parallel,
967+
continue_on_error=block_def.continue_on_error,
968+
wave=wave_idx,
969+
depth=exec_context.depth,
970+
condition=block_def.condition,
971+
on_iteration_transition=self.on_block_transition,
972+
)
973+
974+
# 5. Store results in execution context using fractal structure
975+
exec_context.set_for_each_result(
976+
block_id=block_id,
977+
parent_meta=parent_meta,
978+
iteration_results=iteration_results,
979+
)
980+
981+
# Notify observer: for_each block completed
982+
if self.on_block_transition:
983+
event = {
984+
"event": "block_completed",
985+
"block_id": block_id,
986+
"block_type": block_def.type,
987+
"depth": exec_context.depth,
988+
"metadata": parent_meta.model_dump(),
989+
"outputs": {},
990+
}
991+
if block_def.description:
992+
event["description"] = block_def.description
993+
await self.on_block_transition(event)
960994

961-
# Note: Pause handling for for_each blocks is not yet implemented.
962-
# If any iteration pauses, the entire for_each block would need to pause,
963-
# storing iteration state in checkpoint. This is Phase 2+ enhancement.
995+
except ExecutionPaused:
996+
# Pause bubbles up naturally — no block_failed event needed
997+
raise
998+
999+
except Exception as e:
1000+
# Notify observer: for_each block failed
1001+
if self.on_block_transition:
1002+
event = {
1003+
"event": "block_failed",
1004+
"block_id": block_id,
1005+
"block_type": block_def.type,
1006+
"depth": exec_context.depth,
1007+
"error": str(e),
1008+
}
1009+
if block_def.description:
1010+
event["description"] = block_def.description
1011+
await self.on_block_transition(event)
1012+
raise
9641013

9651014
def _should_skip_block(
9661015
self,

0 commit comments

Comments
 (0)