Skip to content

Commit ba4616d

Browse files
committed
[fix]: input payload too big to fit in initial execution state
1 parent 72b5336 commit ba4616d

4 files changed

Lines changed: 131 additions & 90 deletions

File tree

src/aws_durable_execution_sdk_python/execution.py

Lines changed: 15 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -69,32 +69,6 @@ def from_json_dict(input_dict: MutableMapping[str, Any]) -> InitialExecutionStat
6969
next_marker=input_dict.get("NextMarker", ""),
7070
)
7171

72-
def get_execution_operation(self) -> Operation | None:
73-
if not self.operations:
74-
# Due to payload size limitations we may have an empty operations list.
75-
# This will only happen when loading the initial page of results and is
76-
# expected behaviour. We don't fail, but instead return None
77-
# as the execution operation does not exist
78-
msg: str = "No durable operations found in initial execution state."
79-
logger.debug(msg)
80-
return None
81-
82-
candidate = self.operations[0]
83-
if candidate.operation_type is not OperationType.EXECUTION:
84-
msg = f"First operation in initial execution state is not an execution operation: {candidate.operation_type}"
85-
raise DurableExecutionsError(msg)
86-
87-
return candidate
88-
89-
def get_input_payload(self) -> str | None:
90-
# It is possible that backend will not provide an execution operation
91-
# for the initial page of results.
92-
if not (operations := self.get_execution_operation()):
93-
return None
94-
if not (execution_details := operations.execution_details):
95-
return None
96-
return execution_details.input_payload
97-
9872
def to_dict(self) -> MutableMapping[str, Any]:
9973
return {
10074
"Operations": [op.to_dict() for op in self.operations],
@@ -275,23 +249,6 @@ def wrapper(event: Any, context: LambdaContext) -> MutableMapping[str, Any]:
275249
else LambdaClient.initialize_client()
276250
)
277251

278-
raw_input_payload: str | None = (
279-
invocation_input.initial_execution_state.get_input_payload()
280-
)
281-
282-
# Python RIC LambdaMarshaller just uses standard json deserialization for event
283-
# https://github.com/aws/aws-lambda-python-runtime-interface-client/blob/main/awslambdaric/lambda_runtime_marshaller.py#L46
284-
input_event: MutableMapping[str, Any] = {}
285-
if raw_input_payload and raw_input_payload.strip():
286-
try:
287-
input_event = json.loads(raw_input_payload)
288-
except json.JSONDecodeError:
289-
logger.exception(
290-
"Failed to parse input payload as JSON: payload: %r",
291-
raw_input_payload,
292-
)
293-
raise
294-
295252
execution_state: ExecutionState = ExecutionState(
296253
durable_execution_arn=invocation_input.durable_execution_arn,
297254
initial_checkpoint_token=invocation_input.checkpoint_token,
@@ -309,6 +266,21 @@ def wrapper(event: Any, context: LambdaContext) -> MutableMapping[str, Any]:
309266
invocation_input.initial_execution_state.next_marker,
310267
)
311268

269+
raw_input_payload: str | None = execution_state.get_input_payload()
270+
271+
# Python RIC LambdaMarshaller just uses standard json deserialization for event
272+
# https://github.com/aws/aws-lambda-python-runtime-interface-client/blob/main/awslambdaric/lambda_runtime_marshaller.py#L46
273+
input_event: MutableMapping[str, Any] = {}
274+
if raw_input_payload and raw_input_payload.strip():
275+
try:
276+
input_event = json.loads(raw_input_payload)
277+
except json.JSONDecodeError:
278+
logger.exception(
279+
"Failed to parse input payload as JSON: payload: %r",
280+
raw_input_payload,
281+
)
282+
raise
283+
312284
durable_context: DurableContext = DurableContext.from_lambda_context(
313285
state=execution_state, lambda_context=context
314286
)

src/aws_durable_execution_sdk_python/state.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,33 @@ def fetch_paginated_operations(
290290
with self._operations_lock:
291291
self.operations.update({op.operation_id: op for op in all_operations})
292292

293+
def get_input_payload(self) -> str | None:
294+
# It is possible that backend will not provide an execution operation
295+
# for the initial page of results.
296+
if not (operations := self.get_execution_operation()):
297+
return None
298+
if not (execution_details := operations.execution_details):
299+
return None
300+
return execution_details.input_payload
301+
302+
def get_execution_operation(self) -> Operation | None:
303+
# invocation id is id of execution operation
304+
invocation_id = self.durable_execution_arn.split("/")[-1]
305+
candidate = self.operations.get(invocation_id)
306+
if not candidate:
307+
# Due to payload size limitations we may have an empty operations list.
308+
# This will only happen when loading the initial page of results and is
309+
# expected behaviour. We don't fail, but instead return None
310+
# as the execution operation does not exist
311+
msg: str = "No durable operations found in execution state."
312+
logger.debug(msg)
313+
return None
314+
if candidate.operation_type is not OperationType.EXECUTION:
315+
msg = f"The execution operation in execution state does not have EXECUTION type: {candidate.operation_type}"
316+
raise DurableExecutionsError(msg)
317+
318+
return candidate
319+
293320
def track_replay(self, operation_id: str) -> None:
294321
"""Check if operation exists with completed status; if not, transition to NEW status.
295322

tests/execution_test.py

Lines changed: 1 addition & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -774,51 +774,6 @@ def test_handler(event: Any, context: DurableContext) -> dict:
774774
mock_lambda_client.initialize_client.assert_called_once()
775775

776776

777-
def test_initial_execution_state_get_execution_operation_no_operations():
778-
"""Test get_execution_operation logs debug and returns None when no operations exist."""
779-
state = InitialExecutionState(operations=[], next_marker="")
780-
781-
with patch("aws_durable_execution_sdk_python.execution.logger") as mock_logger:
782-
result = state.get_execution_operation()
783-
784-
assert result is None
785-
mock_logger.debug.assert_called_once_with(
786-
"No durable operations found in initial execution state."
787-
)
788-
789-
790-
def test_initial_execution_state_get_execution_operation_wrong_type():
791-
"""Test get_execution_operation raises error when first operation is not EXECUTION."""
792-
operation = Operation(
793-
operation_id="step1",
794-
operation_type=OperationType.STEP,
795-
status=OperationStatus.STARTED,
796-
)
797-
798-
state = InitialExecutionState(operations=[operation], next_marker="")
799-
800-
with pytest.raises(
801-
Exception,
802-
match="First operation in initial execution state is not an execution operation",
803-
):
804-
state.get_execution_operation()
805-
806-
807-
def test_initial_execution_state_get_input_payload_none():
808-
"""Test get_input_payload returns None when execution_details is None."""
809-
operation = Operation(
810-
operation_id="exec1",
811-
operation_type=OperationType.EXECUTION,
812-
status=OperationStatus.STARTED,
813-
execution_details=None,
814-
)
815-
816-
state = InitialExecutionState(operations=[operation], next_marker="")
817-
818-
result = state.get_input_payload()
819-
assert result is None
820-
821-
822777
def test_durable_handler_empty_input_payload():
823778
"""Test durable_handler handles empty input payload correctly."""
824779
mock_client = Mock(spec=DurableServiceClient)
@@ -916,7 +871,7 @@ def test_handler(event: Any, context: DurableContext) -> dict:
916871
initial_state = InitialExecutionState(operations=[operation], next_marker="")
917872

918873
invocation_input = DurableExecutionInvocationInputWithClient(
919-
durable_execution_arn="arn:test:execution",
874+
durable_execution_arn="arn:test:execution/exec1",
920875
checkpoint_token="token123", # noqa: S106
921876
initial_execution_state=initial_state,
922877
service_client=mock_client,

tests/state_test.py

Lines changed: 88 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import time
1010
import unittest.mock
1111
from concurrent.futures import ThreadPoolExecutor
12-
from unittest.mock import Mock, call
12+
from unittest.mock import Mock, call, patch
1313

1414
import pytest
1515

@@ -3562,3 +3562,90 @@ def test_collect_checkpoint_batch_first_empty_counts_toward_limit():
35623562
) # Only the leading empty; trailing deferred to next batch
35633563
# op_2 and trailing empties remain in the queue
35643564
assert state._checkpoint_queue.qsize() == 51
3565+
3566+
3567+
def test_execution_state_get_execution_operation_no_operations():
3568+
"""Test get_execution_operation logs debug and returns None when no operations exist."""
3569+
mock_lambda_client = Mock(spec=LambdaClient)
3570+
config = CheckpointBatcherConfig(
3571+
max_batch_size_bytes=10 * 1024 * 1024,
3572+
max_batch_time_seconds=10.0,
3573+
max_batch_operations=2,
3574+
)
3575+
state = ExecutionState(
3576+
durable_execution_arn="test_arn",
3577+
initial_checkpoint_token="token123", # noqa: S106
3578+
operations={},
3579+
service_client=mock_lambda_client,
3580+
batcher_config=config,
3581+
)
3582+
3583+
with patch("aws_durable_execution_sdk_python.state.logger") as mock_logger:
3584+
result = state.get_execution_operation()
3585+
3586+
assert result is None
3587+
mock_logger.debug.assert_called_once_with(
3588+
"No durable operations found in execution state."
3589+
)
3590+
3591+
3592+
def test_initial_execution_state_get_execution_operation_wrong_type():
3593+
"""Test get_execution_operation raises error when first operation is not EXECUTION."""
3594+
operation = Operation(
3595+
operation_id="step1",
3596+
operation_type=OperationType.STEP,
3597+
status=OperationStatus.STARTED,
3598+
)
3599+
3600+
mock_lambda_client = Mock(spec=LambdaClient)
3601+
config = CheckpointBatcherConfig(
3602+
max_batch_size_bytes=10 * 1024 * 1024,
3603+
max_batch_time_seconds=10.0,
3604+
max_batch_operations=2,
3605+
)
3606+
state = ExecutionState(
3607+
durable_execution_arn="test_arn/step1",
3608+
initial_checkpoint_token="token123", # noqa: S106
3609+
operations={"step1": operation},
3610+
service_client=mock_lambda_client,
3611+
batcher_config=config,
3612+
)
3613+
3614+
with pytest.raises(
3615+
Exception,
3616+
match="The execution operation in execution state does not have EXECUTION type: OperationType.STEP",
3617+
):
3618+
state.get_execution_operation()
3619+
3620+
3621+
def test_initial_execution_state_get_input_payload_none():
3622+
"""Test get_input_payload returns None when execution_details is None."""
3623+
operation = Operation(
3624+
operation_id="exec1",
3625+
operation_type=OperationType.EXECUTION,
3626+
status=OperationStatus.STARTED,
3627+
execution_details=None,
3628+
)
3629+
3630+
operation = Operation(
3631+
operation_id="step1",
3632+
operation_type=OperationType.STEP,
3633+
status=OperationStatus.STARTED,
3634+
)
3635+
3636+
mock_lambda_client = Mock(spec=LambdaClient)
3637+
config = CheckpointBatcherConfig(
3638+
max_batch_size_bytes=10 * 1024 * 1024,
3639+
max_batch_time_seconds=10.0,
3640+
max_batch_operations=2,
3641+
)
3642+
state = ExecutionState(
3643+
durable_execution_arn="test_arn/exec1",
3644+
initial_checkpoint_token="token123", # noqa: S106
3645+
operations={"step1": operation},
3646+
service_client=mock_lambda_client,
3647+
batcher_config=config,
3648+
)
3649+
3650+
result = state.get_input_payload()
3651+
assert result is None

0 commit comments

Comments
 (0)