Skip to content

Commit 41105af

Browse files
committed
feat: add interactive callback CLI
1 parent 327a4a3 commit 41105af

4 files changed

Lines changed: 468 additions & 3 deletions

File tree

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
"""
2+
Interactive callback handler for durable function executions.
3+
"""
4+
5+
import logging
6+
from typing import Optional
7+
8+
import click
9+
10+
from samcli.lib.clients.lambda_client import DurableFunctionsClient
11+
12+
LOG = logging.getLogger(__name__)
13+
14+
# Menu choice constants
15+
CHOICE_SUCCESS = 1
16+
CHOICE_FAILURE = 2
17+
CHOICE_HEARTBEAT = 3
18+
CHOICE_STOP = 4
19+
20+
21+
class DurableCallbackHandler:
22+
"""
23+
Handles interactive callback detection and response for durable executions.
24+
"""
25+
26+
def __init__(self, client: DurableFunctionsClient):
27+
self.client = client
28+
self._prompted_callbacks: set[str] = set() # Track which callbacks we've already prompted for
29+
30+
def check_for_pending_callbacks(self, execution_arn: str) -> Optional[str]:
31+
"""
32+
Check execution history for pending callbacks.
33+
34+
Returns:
35+
callback_id if found, None otherwise
36+
"""
37+
try:
38+
LOG.debug("Checking for pending callbacks in execution: %s", execution_arn)
39+
history = self.client.get_durable_execution_history(execution_arn)
40+
events = history.get("Events", [])
41+
42+
if events:
43+
callback_states = {}
44+
45+
for event in events:
46+
event_type = event.get("EventType")
47+
event_id = event.get("Id")
48+
49+
if event_type == "CallbackStarted":
50+
callback_id = event.get("CallbackStartedDetails", {}).get("CallbackId")
51+
callback_states[event_id] = {"callback_id": callback_id, "status": "STARTED", "event": event}
52+
elif event_type in ["CallbackCompleted", "CallbackFailed", "CallbackSucceeded"]:
53+
if event_id in callback_states:
54+
callback_states[event_id]["status"] = "COMPLETED"
55+
56+
# Find callbacks that are started but not completed
57+
for callback_id, state in callback_states.items():
58+
if state["status"] == "STARTED" and state["callback_id"]:
59+
return str(state["callback_id"])
60+
61+
except Exception as e:
62+
LOG.error("Failed to check callback history: %s", e)
63+
64+
return None
65+
66+
def prompt_callback_response(self, execution_arn: str, callback_id: str, execution_complete=None) -> bool:
67+
"""
68+
Prompt user for callback response and send it.
69+
70+
Args:
71+
execution_arn: The execution ARN for stop execution operation
72+
callback_id: The callback ID to respond to
73+
execution_complete: Optional threading.Event to check if execution finished
74+
75+
Returns:
76+
True if callback was sent, False if user chose to continue waiting
77+
"""
78+
# Only prompt once per callback ID to avoid blocking on timed-out callbacks
79+
if callback_id in self._prompted_callbacks:
80+
return False
81+
82+
self._prompted_callbacks.add(callback_id)
83+
84+
# Check if execution already completed before prompting
85+
if execution_complete and execution_complete.is_set():
86+
return False
87+
88+
click.echo(f"\n🔄 Execution is waiting for callback: {callback_id}")
89+
click.echo("Choose an action:")
90+
click.echo(" 1. Send callback success")
91+
click.echo(" 2. Send callback failure")
92+
click.echo(" 3. Send callback heartbeat")
93+
click.echo(" 4. Stop execution")
94+
95+
choice = click.prompt("Enter choice", type=click.IntRange(1, 4), default=CHOICE_SUCCESS)
96+
97+
# Check again after user makes selection in case execution completed
98+
if execution_complete and execution_complete.is_set():
99+
click.echo("⚠️ Execution already completed, callback no longer needed")
100+
return False
101+
102+
try:
103+
if choice == CHOICE_SUCCESS:
104+
result = click.prompt("Enter success result (optional)", default="", show_default=False)
105+
self.client.send_callback_success(callback_id=callback_id, result=result)
106+
click.echo("✅ Callback success sent")
107+
return True
108+
109+
elif choice == CHOICE_FAILURE:
110+
error_message = click.prompt("Enter error message", default="User cancelled")
111+
error_type = click.prompt("Enter error type (optional)", default="", show_default=False) or None
112+
113+
self.client.send_callback_failure(
114+
callback_id=callback_id, error_message=error_message, error_type=error_type
115+
)
116+
click.echo("❌ Callback failure sent")
117+
return True
118+
119+
elif choice == CHOICE_HEARTBEAT:
120+
self.client.send_callback_heartbeat(callback_id=callback_id)
121+
click.echo("💓 Callback heartbeat sent")
122+
return False # Continue waiting after heartbeat
123+
124+
else: # CHOICE_STOP
125+
error_message = click.prompt("Enter error message", default="Execution stopped by user")
126+
error_type = click.prompt("Enter error type (optional)", default="", show_default=False) or None
127+
128+
self.client.stop_durable_execution(
129+
durable_execution_arn=execution_arn, error_message=error_message, error_type=error_type
130+
)
131+
click.echo("🛑 Execution stopped")
132+
return True
133+
134+
except Exception as e:
135+
LOG.error("Failed to send callback: %s", e)
136+
click.echo(f"❌ Failed to send callback: {e}")
137+
return False

samcli/local/docker/durable_lambda_container.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import click
1010
from flask import has_request_context
1111

12+
from samcli.lib.utils.durable_callback_handler import DurableCallbackHandler
1213
from samcli.lib.utils.durable_formatters import format_execution_details, format_next_commands_after_invoke
1314
from samcli.local.docker.lambda_container import LambdaContainer
1415

@@ -58,6 +59,9 @@ def _update_lambda_environment_with_emulator_endpoint(self, kwargs):
5859
extra_hosts["host.docker.internal"] = "host-gateway"
5960
kwargs["extra_hosts"] = extra_hosts
6061

62+
# Bind to 0.0.0.0 so emulator can reach Lambda via host.docker.internal
63+
kwargs["container_host_interface"] = "0.0.0.0"
64+
6165
def _get_lambda_container_endpoint(self):
6266
"""
6367
Get the Lambda container endpoint URL for the emulator to invoke.
@@ -146,8 +150,12 @@ def _write_execution_result_to_stdout(self, execution_details: dict, stdout):
146150
def _wait_for_execution(self, execution_arn):
147151
"""Poll the execution status until completion and return the final result."""
148152

149-
# TODO - poll until the execution timeout is hit
153+
LOG.debug("Starting to poll execution status for: %s", execution_arn)
154+
callback_handler = DurableCallbackHandler(self.emulator_container.lambda_client)
150155
execution_details = None
156+
callback_thread = None
157+
stop_callback_prompts = threading.Event()
158+
151159
try:
152160
while True:
153161
try:
@@ -156,13 +164,36 @@ def _wait_for_execution(self, execution_arn):
156164
status = execution_details.get("Status")
157165

158166
if status != "RUNNING":
167+
stop_callback_prompts.set() # Signal callback thread to stop
168+
if callback_thread and callback_thread.is_alive():
169+
callback_thread.join(timeout=0.5) # Brief wait for thread cleanup
159170
return execution_details
160171

172+
# Check for pending callbacks (only in CLI context)
173+
if self._is_cli_context():
174+
callback_id = callback_handler.check_for_pending_callbacks(execution_arn)
175+
if callback_id:
176+
177+
def _prompt_in_thread():
178+
if not stop_callback_prompts.is_set():
179+
# give the function logs time to settle after the invocation is suspended
180+
time.sleep(0.5)
181+
callback_sent = callback_handler.prompt_callback_response(
182+
execution_arn, callback_id, stop_callback_prompts
183+
)
184+
if callback_sent:
185+
click.echo("\n" + "─" * 80)
186+
187+
# Start callback prompt in separate thread so it doesn't block polling
188+
callback_thread = threading.Thread(target=_prompt_in_thread, daemon=True)
189+
callback_thread.start()
190+
161191
time.sleep(1) # Poll every second
162192
except Exception as e:
163193
LOG.error("Error polling execution status: %s", e)
164194
break
165195
finally:
196+
stop_callback_prompts.set() # Ensure callback thread knows to stop
166197
self._cleanup_if_needed()
167198

168199
return execution_details

0 commit comments

Comments
 (0)