Skip to content

Commit 39f7df3

Browse files
Alex Wangyaythomas
authored andcommitted
refactor: use to/from_json_dict for operation serdes
- Remove customized json encoder - Use to_json_dict() and from_json_dict() for Operation and ExecutionInput serdes
1 parent 6d48cac commit 39f7df3

8 files changed

Lines changed: 42 additions & 73 deletions

File tree

src/aws_durable_execution_sdk_python_testing/execution.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
from __future__ import annotations
22

3-
import json
43
from dataclasses import replace
54
from datetime import UTC, datetime
65
from enum import Enum
@@ -33,7 +32,6 @@
3332
)
3433
from aws_durable_execution_sdk_python_testing.token import (
3534
CheckpointToken,
36-
CallbackToken,
3735
)
3836

3937

@@ -96,12 +94,12 @@ def new(input: StartDurableExecutionInput) -> Execution: # noqa: A002
9694
durable_execution_arn=str(uuid4()), start_input=input, operations=[]
9795
)
9896

99-
def to_dict(self) -> dict[str, Any]:
100-
"""Serialize execution to dictionary."""
97+
def to_json_dict(self) -> dict[str, Any]:
98+
"""Serialize execution to JSON-serializable dictionary"""
10199
return {
102100
"DurableExecutionArn": self.durable_execution_arn,
103101
"StartInput": self.start_input.to_dict(),
104-
"Operations": [op.to_dict() for op in self.operations],
102+
"Operations": [op.to_json_dict() for op in self.operations],
105103
"Updates": [update.to_dict() for update in self.updates],
106104
"InvocationCompletions": [
107105
completion.to_dict() for completion in self.invocation_completions
@@ -115,13 +113,15 @@ def to_dict(self) -> dict[str, Any]:
115113
}
116114

117115
@classmethod
118-
def from_dict(cls, data: dict[str, Any]) -> Execution:
116+
def from_json_dict(cls, data: dict[str, Any]) -> Execution:
119117
"""Deserialize execution from dictionary."""
120118
# Reconstruct start_input
121119
start_input = StartDurableExecutionInput.from_dict(data["StartInput"])
122120

123121
# Reconstruct operations
124-
operations = [Operation.from_dict(op_data) for op_data in data["Operations"]]
122+
operations = [
123+
Operation.from_json_dict(op_data) for op_data in data["Operations"]
124+
]
125125

126126
# Create execution
127127
execution = cls(

src/aws_durable_execution_sdk_python_testing/invoker.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,10 @@
1212
DurableExecutionInvocationInputWithClient,
1313
DurableExecutionInvocationOutput,
1414
InitialExecutionState,
15-
InvocationStatus,
1615
)
1716

1817
from aws_durable_execution_sdk_python_testing.exceptions import (
1918
DurableFunctionsTestError,
20-
ServiceException,
2119
)
2220
from aws_durable_execution_sdk_python_testing.model import LambdaContext
2321

@@ -239,7 +237,7 @@ def invoke(
239237
response = client.invoke(
240238
FunctionName=function_name,
241239
InvocationType="RequestResponse", # Synchronous invocation
242-
Payload=json.dumps(input.to_dict(), default=str),
240+
Payload=json.dumps(input.to_json_dict()),
243241
)
244242

245243
# Check HTTP status code

src/aws_durable_execution_sdk_python_testing/stores/filesystem.py

Lines changed: 6 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
import json
66
import logging
7-
from datetime import UTC, datetime
87
from pathlib import Path
98

109
from aws_durable_execution_sdk_python_testing.exceptions import (
@@ -16,30 +15,6 @@
1615
)
1716

1817

19-
class DateTimeEncoder(json.JSONEncoder):
20-
"""Custom JSON encoder that handles datetime objects."""
21-
22-
def default(self, obj):
23-
if isinstance(obj, datetime):
24-
return obj.timestamp()
25-
return super().default(obj)
26-
27-
28-
def datetime_object_hook(obj):
29-
"""JSON object hook to convert unix timestamps back to datetime objects."""
30-
if isinstance(obj, dict):
31-
for key, value in obj.items():
32-
if isinstance(value, int | float) and key.endswith(
33-
("_timestamp", "_time", "Timestamp", "Time")
34-
):
35-
try: # noqa: SIM105
36-
obj[key] = datetime.fromtimestamp(value, tz=UTC)
37-
except (ValueError, OSError):
38-
# Leave as number if not a valid timestamp
39-
pass
40-
return obj
41-
42-
4318
class FileSystemExecutionStore(BaseExecutionStore):
4419
"""File system-based execution store for persistence."""
4520

@@ -69,10 +44,10 @@ def _get_file_path(self, execution_arn: str) -> Path:
6944
def save(self, execution: Execution) -> None:
7045
"""Save execution to file system."""
7146
file_path = self._get_file_path(execution.durable_execution_arn)
72-
data = execution.to_dict()
47+
data = execution.to_json_dict()
7348

7449
with open(file_path, "w", encoding="utf-8") as f:
75-
json.dump(data, f, indent=2, cls=DateTimeEncoder)
50+
json.dump(data, f, indent=2)
7651

7752
def load(self, execution_arn: str) -> Execution:
7853
"""Load execution from file system."""
@@ -82,9 +57,9 @@ def load(self, execution_arn: str) -> Execution:
8257
raise ResourceNotFoundException(msg)
8358

8459
with open(file_path, encoding="utf-8") as f:
85-
data = json.load(f, object_hook=datetime_object_hook)
60+
data = json.load(f)
8661

87-
return Execution.from_dict(data)
62+
return Execution.from_json_dict(data)
8863

8964
def update(self, execution: Execution) -> None:
9065
"""Update execution in file system (same as save)."""
@@ -96,8 +71,8 @@ def list_all(self) -> list[Execution]:
9671
for file_path in self._storage_dir.glob("*.json"):
9772
try:
9873
with open(file_path, encoding="utf-8") as f:
99-
data = json.load(f, object_hook=datetime_object_hook)
100-
executions.append(Execution.from_dict(data))
74+
data = json.load(f)
75+
executions.append(Execution.from_json_dict(data))
10176
except (json.JSONDecodeError, KeyError, OSError) as e:
10277
logging.warning("Skipping corrupted file %s: %s", file_path, e)
10378
continue

src/aws_durable_execution_sdk_python_testing/stores/sqlite.py

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,6 @@
1717
from aws_durable_execution_sdk_python_testing.stores.base import (
1818
ExecutionStore,
1919
)
20-
from aws_durable_execution_sdk_python_testing.stores.filesystem import (
21-
DateTimeEncoder,
22-
datetime_object_hook,
23-
)
2420

2521

2622
class SQLiteExecutionStore(ExecutionStore):
@@ -102,7 +98,7 @@ def save(self, execution: Execution) -> None:
10298
execution_op.end_timestamp.timestamp()
10399
if execution_op.end_timestamp
104100
else None,
105-
json.dumps(execution.to_dict(), cls=DateTimeEncoder),
101+
json.dumps(execution.to_json_dict()),
106102
),
107103
)
108104
except sqlite3.Error as e:
@@ -125,9 +121,7 @@ def load(self, execution_arn: str) -> Execution:
125121
if not row:
126122
raise ResourceNotFoundException(f"Execution {execution_arn} not found")
127123

128-
return Execution.from_dict(
129-
json.loads(row[0], object_hook=datetime_object_hook)
130-
)
124+
return Execution.from_json_dict(json.loads(row[0]))
131125
except sqlite3.Error as e:
132126
raise RuntimeError(f"Failed to load execution {execution_arn}: {e}") from e
133127
except json.JSONDecodeError as e:
@@ -222,11 +216,7 @@ def query(
222216
executions: list[Execution] = []
223217
for durable_execution_arn, data in rows:
224218
try:
225-
executions.append(
226-
Execution.from_dict(
227-
json.loads(data, object_hook=datetime_object_hook)
228-
)
229-
)
219+
executions.append(Execution.from_json_dict(json.loads(data)))
230220
except (json.JSONDecodeError, ValueError) as e:
231221
# Log corrupted data but continue with other records
232222
print(

tests/execution_test.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
from aws_durable_execution_sdk_python_testing.exceptions import (
2020
IllegalStateException,
21-
InvalidParameterValueException,
2221
)
2322
from aws_durable_execution_sdk_python_testing.execution import Execution
2423
from aws_durable_execution_sdk_python_testing.model import StartDurableExecutionInput
@@ -813,7 +812,7 @@ def test_from_dict_with_none_result():
813812
"aws_durable_execution_sdk_python_testing.model.StartDurableExecutionInput.from_dict"
814813
) as mock_from_dict:
815814
mock_from_dict.return_value = Mock()
816-
execution = Execution.from_dict(data)
815+
execution = Execution.from_json_dict(data)
817816
assert execution.result is None
818817

819818

tests/how-to-run-from-term.txt

Lines changed: 0 additions & 1 deletion
This file was deleted.

tests/invoker_test.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,15 @@
1212
InvocationStatus,
1313
)
1414

15+
from aws_durable_execution_sdk_python.lambda_service import (
16+
ExecutionDetails,
17+
Operation,
18+
OperationStatus,
19+
OperationType,
20+
)
21+
22+
from datetime import datetime, UTC
23+
1524
from aws_durable_execution_sdk_python_testing.execution import Execution
1625
from aws_durable_execution_sdk_python_testing.invoker import (
1726
InProcessInvoker,
@@ -168,10 +177,23 @@ def test_lambda_invoker_invoke_success():
168177

169178
invoker = LambdaInvoker(lambda_client)
170179

180+
mock_operation = Operation(
181+
operation_id="op-1",
182+
parent_id=None,
183+
name="test-execution",
184+
start_timestamp=datetime.now(UTC),
185+
end_timestamp=datetime.now(UTC),
186+
operation_type=OperationType.EXECUTION,
187+
status=OperationStatus.SUCCEEDED,
188+
execution_details=ExecutionDetails(input_payload='{"test": "data"}'),
189+
)
190+
171191
input_data = DurableExecutionInvocationInput(
172192
durable_execution_arn="test-arn",
173193
checkpoint_token="test-token", # noqa: S106
174-
initial_execution_state=InitialExecutionState(operations=[], next_marker=""),
194+
initial_execution_state=InitialExecutionState(
195+
operations=[mock_operation], next_marker=""
196+
),
175197
)
176198

177199
response = invoker.invoke("test-function", input_data)
@@ -185,7 +207,7 @@ def test_lambda_invoker_invoke_success():
185207
lambda_client.invoke.assert_called_once_with(
186208
FunctionName="test-function",
187209
InvocationType="RequestResponse",
188-
Payload=json.dumps(input_data.to_dict(), default=str),
210+
Payload=json.dumps(input_data.to_json_dict()),
189211
)
190212

191213

tests/stores/filesystem_store_test.py

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
from aws_durable_execution_sdk_python_testing.model import StartDurableExecutionInput
1313
from aws_durable_execution_sdk_python_testing.stores.filesystem import (
1414
FileSystemExecutionStore,
15-
datetime_object_hook,
1615
)
1716

1817
from datetime import datetime, timezone
@@ -269,19 +268,6 @@ def test_filesystem_execution_store_thread_safety_basic(store, sample_execution)
269268
assert loaded.durable_execution_arn == sample_execution.durable_execution_arn
270269

271270

272-
def test_datetime_object_hook_converts_timestamp_fields():
273-
"""Test conversion of timestamp fields to datetime objects."""
274-
timestamp = 1672531200.0 # 2023-01-01 00:00:00 UTC
275-
obj = {
276-
"start_timestamp": timestamp,
277-
}
278-
279-
result = datetime_object_hook(obj)
280-
281-
expected_datetime = datetime.fromtimestamp(timestamp, tz=timezone.utc)
282-
assert result["start_timestamp"] == expected_datetime
283-
284-
285271
def test_filesystem_execution_store_query_empty(store):
286272
"""Test query method with empty store."""
287273
executions, next_marker = store.query()

0 commit comments

Comments
 (0)