Skip to content

Commit 7f462dc

Browse files
committed
Bugfixes and e2e tests
1 parent ffa3e28 commit 7f462dc

17 files changed

Lines changed: 1614 additions & 25 deletions

File tree

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# Chained invoke examples
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
"""Example demonstrating basic chained invoke."""
2+
3+
from typing import Any
4+
5+
from aws_durable_execution_sdk_python.context import DurableContext
6+
from aws_durable_execution_sdk_python.execution import durable_execution
7+
8+
9+
@durable_execution
10+
def handler(_event: Any, context: DurableContext) -> dict:
11+
"""Parent function that invokes a child function."""
12+
result = context.invoke(
13+
function_name="calculator",
14+
payload={"a": 10, "b": 5},
15+
name="invoke_calculator",
16+
)
17+
return {"calculation_result": result}
18+
19+
20+
def calculator_handler(event: dict, context: Any) -> dict:
21+
"""Child handler that performs calculation."""
22+
a = event.get("a", 0)
23+
b = event.get("b", 0)
24+
return {
25+
"sum": a + b,
26+
"product": a * b,
27+
"difference": a - b,
28+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
"""Example demonstrating map operations that invoke child functions."""
2+
3+
from typing import Any
4+
5+
from aws_durable_execution_sdk_python.config import MapConfig
6+
from aws_durable_execution_sdk_python.context import DurableContext
7+
from aws_durable_execution_sdk_python.execution import durable_execution
8+
9+
10+
@durable_execution
11+
def handler(_event: Any, context: DurableContext) -> list[int]:
12+
"""Process items using map where each item invokes a child function."""
13+
items = [1, 2, 3, 4, 5]
14+
15+
return context.map(
16+
inputs=items,
17+
func=lambda ctx, item, index, _: ctx.invoke(
18+
function_name="doubler",
19+
payload={"value": item},
20+
name=f"invoke_item_{index}",
21+
),
22+
name="map_with_invoke",
23+
config=MapConfig(max_concurrency=2),
24+
).get_results()
25+
26+
27+
def doubler_handler(event: dict, context: Any) -> dict:
28+
"""Child handler that doubles the input value."""
29+
value = event.get("value", 0)
30+
return {"result": value * 2}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
"""Example demonstrating nested chained invokes (invoke calling invoke)."""
2+
3+
from typing import Any
4+
5+
from aws_durable_execution_sdk_python.context import DurableContext
6+
from aws_durable_execution_sdk_python.execution import durable_execution
7+
8+
9+
@durable_execution
10+
def handler(_event: Any, context: DurableContext) -> dict:
11+
"""Parent function that invokes a child which invokes another child."""
12+
result = context.invoke(
13+
function_name="orchestrator",
14+
payload={"value": 5},
15+
name="invoke_orchestrator",
16+
)
17+
return {"final_result": result}
18+
19+
20+
@durable_execution
21+
def orchestrator_handler(event: dict, context: DurableContext) -> dict:
22+
"""Middle function that invokes the worker."""
23+
value = event.get("value", 0)
24+
25+
# First invoke: add 10
26+
added = context.invoke(
27+
function_name="adder",
28+
payload={"value": value, "add": 10},
29+
name="invoke_adder",
30+
)
31+
32+
# Second invoke: multiply by 2
33+
multiplied = context.invoke(
34+
function_name="multiplier",
35+
payload={"value": added["result"]},
36+
name="invoke_multiplier",
37+
)
38+
39+
return {"result": multiplied["result"], "steps": ["add_10", "multiply_2"]}
40+
41+
42+
def adder_handler(event: dict, context: Any) -> dict:
43+
"""Leaf handler that adds values."""
44+
value = event.get("value", 0)
45+
add = event.get("add", 0)
46+
return {"result": value + add}
47+
48+
49+
def multiplier_handler(event: dict, context: Any) -> dict:
50+
"""Leaf handler that multiplies by 2."""
51+
value = event.get("value", 0)
52+
return {"result": value * 2}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
"""Example demonstrating parallel operations that invoke child functions."""
2+
3+
from typing import Any
4+
5+
from aws_durable_execution_sdk_python.config import ParallelConfig
6+
from aws_durable_execution_sdk_python.context import DurableContext
7+
from aws_durable_execution_sdk_python.execution import durable_execution
8+
9+
10+
@durable_execution
11+
def handler(_event: Any, context: DurableContext) -> list[str]:
12+
"""Execute parallel branches where each invokes a different child function."""
13+
return context.parallel(
14+
functions=[
15+
lambda ctx: ctx.invoke(
16+
function_name="greeter",
17+
payload={"name": "Alice"},
18+
name="greet_alice",
19+
),
20+
lambda ctx: ctx.invoke(
21+
function_name="greeter",
22+
payload={"name": "Bob"},
23+
name="greet_bob",
24+
),
25+
lambda ctx: ctx.invoke(
26+
function_name="greeter",
27+
payload={"name": "Charlie"},
28+
name="greet_charlie",
29+
),
30+
],
31+
name="parallel_with_invoke",
32+
config=ParallelConfig(max_concurrency=3),
33+
).get_results()
34+
35+
36+
def greeter_handler(event: dict, context: Any) -> dict:
37+
"""Child handler that creates a greeting."""
38+
name = event.get("name", "World")
39+
return {"greeting": f"Hello, {name}!"}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# Chained invoke tests
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
"""Tests for basic chained invoke example."""
2+
3+
import pytest
4+
from aws_durable_execution_sdk_python.execution import InvocationStatus
5+
from aws_durable_execution_sdk_python.lambda_service import OperationStatus
6+
7+
from aws_durable_execution_sdk_python_testing.runner import DurableFunctionTestRunner
8+
from src.chained_invoke import invoke_basic
9+
from test.conftest import deserialize_operation_payload
10+
11+
12+
def test_invoke_basic():
13+
"""Test basic chained invoke example."""
14+
with DurableFunctionTestRunner(handler=invoke_basic.handler) as runner:
15+
runner.register_handler("calculator", invoke_basic.calculator_handler)
16+
result = runner.run(input="test", timeout=10)
17+
18+
assert result.status is InvocationStatus.SUCCEEDED
19+
parsed = deserialize_operation_payload(result.result)
20+
assert parsed["calculation_result"]["sum"] == 15
21+
assert parsed["calculation_result"]["product"] == 50
22+
assert parsed["calculation_result"]["difference"] == 5
23+
24+
# Verify the invoke operation
25+
invoke_op = result.get_invoke("invoke_calculator")
26+
assert invoke_op is not None
27+
assert invoke_op.status is OperationStatus.SUCCEEDED
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
"""Tests for map with chained invoke example."""
2+
3+
import pytest
4+
from aws_durable_execution_sdk_python.execution import InvocationStatus
5+
from aws_durable_execution_sdk_python.lambda_service import OperationStatus
6+
7+
from aws_durable_execution_sdk_python_testing.runner import DurableFunctionTestRunner
8+
from src.chained_invoke import map_with_invoke
9+
from test.conftest import deserialize_operation_payload
10+
11+
12+
def test_map_with_invoke():
13+
"""Test map operation where each item invokes a child function."""
14+
with DurableFunctionTestRunner(handler=map_with_invoke.handler) as runner:
15+
runner.register_handler("doubler", map_with_invoke.doubler_handler)
16+
result = runner.run(input="test", timeout=10)
17+
18+
assert result.status is InvocationStatus.SUCCEEDED
19+
20+
# Each item [1,2,3,4,5] is doubled, returning {"result": value*2}
21+
parsed = deserialize_operation_payload(result.result)
22+
expected = [{"result": 2}, {"result": 4}, {"result": 6}, {"result": 8}, {"result": 10}]
23+
assert parsed == expected
24+
25+
# Verify the map operation
26+
map_op = result.get_context("map_with_invoke")
27+
assert map_op is not None
28+
assert map_op.status is OperationStatus.SUCCEEDED
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
"""Tests for nested chained invoke example."""
2+
3+
import pytest
4+
from aws_durable_execution_sdk_python.execution import InvocationStatus
5+
from aws_durable_execution_sdk_python.lambda_service import OperationStatus
6+
7+
from aws_durable_execution_sdk_python_testing.runner import DurableFunctionTestRunner
8+
from src.chained_invoke import nested_invoke
9+
from test.conftest import deserialize_operation_payload
10+
11+
12+
def test_nested_invoke():
13+
"""Test nested chained invokes (invoke calling invoke).
14+
15+
Flow: handler -> orchestrator -> adder -> multiplier
16+
Value: 5 -> add 10 = 15 -> multiply 2 = 30
17+
"""
18+
with DurableFunctionTestRunner(handler=nested_invoke.handler) as runner:
19+
# Register the orchestrator (which is also a durable function)
20+
runner.register_handler("orchestrator", nested_invoke.orchestrator_handler)
21+
# Register the leaf handlers
22+
runner.register_handler("adder", nested_invoke.adder_handler)
23+
runner.register_handler("multiplier", nested_invoke.multiplier_handler)
24+
25+
result = runner.run(input="test", timeout=10)
26+
27+
assert result.status is InvocationStatus.SUCCEEDED
28+
29+
parsed = deserialize_operation_payload(result.result)
30+
# 5 + 10 = 15, 15 * 2 = 30
31+
assert parsed["final_result"]["result"] == 30
32+
assert parsed["final_result"]["steps"] == ["add_10", "multiply_2"]
33+
34+
# Verify the top-level invoke operation
35+
invoke_op = result.get_invoke("invoke_orchestrator")
36+
assert invoke_op is not None
37+
assert invoke_op.status is OperationStatus.SUCCEEDED
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
"""Tests for parallel with chained invoke example."""
2+
3+
import pytest
4+
from aws_durable_execution_sdk_python.execution import InvocationStatus
5+
from aws_durable_execution_sdk_python.lambda_service import OperationStatus
6+
7+
from aws_durable_execution_sdk_python_testing.runner import DurableFunctionTestRunner
8+
from src.chained_invoke import parallel_with_invoke
9+
from test.conftest import deserialize_operation_payload
10+
11+
12+
def test_parallel_with_invoke():
13+
"""Test parallel operation where each branch invokes a child function."""
14+
with DurableFunctionTestRunner(handler=parallel_with_invoke.handler) as runner:
15+
runner.register_handler("greeter", parallel_with_invoke.greeter_handler)
16+
result = runner.run(input="test", timeout=10)
17+
18+
assert result.status is InvocationStatus.SUCCEEDED
19+
20+
parsed = deserialize_operation_payload(result.result)
21+
expected = [
22+
{"greeting": "Hello, Alice!"},
23+
{"greeting": "Hello, Bob!"},
24+
{"greeting": "Hello, Charlie!"},
25+
]
26+
assert parsed == expected
27+
28+
# Verify the parallel operation
29+
parallel_op = result.get_context("parallel_with_invoke")
30+
assert parallel_op is not None
31+
assert parallel_op.status is OperationStatus.SUCCEEDED
32+
assert len(parallel_op.child_operations) == 3

0 commit comments

Comments
 (0)