forked from aws/aws-sam-cli
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdurable_context.py
More file actions
67 lines (54 loc) · 2.17 KB
/
durable_context.py
File metadata and controls
67 lines (54 loc) · 2.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
"""
Context manager for durable functions emulator operations.
"""
import logging
from typing import Optional
from samcli.lib.clients.lambda_client import DurableFunctionsClient
from samcli.local.docker.durable_functions_emulator_container import DurableFunctionsEmulatorContainer
LOG = logging.getLogger(__name__)
class DurableContext:
"""
Context manager for durable functions emulator operations.
Provides a clean interface for CLI commands to interact with the emulator.
Automatically reuses existing running containers when possible.
"""
def __init__(self, skip_pull_image=False):
"""
Initialize the durable context.
Parameters
----------
skip_pull_image : bool
If True, skip pulling the emulator container image
"""
self._emulator: Optional[DurableFunctionsEmulatorContainer] = None
self._reused_container = False
self._skip_pull_image = skip_pull_image
def __enter__(self) -> "DurableContext":
"""
Start the emulator container or attach to an already running one
"""
self._emulator = DurableFunctionsEmulatorContainer(skip_pull_image=self._skip_pull_image)
self._reused_container = self._emulator.start_or_attach()
return self
def __exit__(self, *args):
"""
Clean up emulator container only if we created it.
"""
if self._emulator and not self._reused_container:
LOG.debug("Stopping durable functions emulator container")
self._emulator.stop()
elif self._reused_container:
LOG.debug("Leaving existing durable functions emulator container running")
@property
def client(self) -> DurableFunctionsClient:
"""
Get the durable functions client.
Returns:
DurableFunctionsClient instance
Raises:
RuntimeError: If context is not initialized
"""
if not self._emulator or not self._emulator.lambda_client:
raise RuntimeError("DurableContext not initialized - use within 'with' statement")
client: DurableFunctionsClient = self._emulator.lambda_client
return client