Skip to content

Commit a6e4c26

Browse files
praisonai-triage-agent[bot]MervinPraisonclaude
authored
fix: add bot run cancellation and timeout support (#1919)
* fix: add bot run cancellation and timeout support (fixes #1917) - Add MESSAGE_ABORT event type to gateway protocols - Enhance BotSessionManager with InterruptController integration - Add configurable run_timeout parameter (default 300s) - Implement cancel_run() method for user-initiated cancellation - Add /stop command for cancelling active runs - Add BotRunTimeout exception for timeout handling - Maintain backward compatibility - Follow protocol-driven architecture from AGENTS.md Co-authored-by: MervinPraison <MervinPraison@users.noreply.github.com> * fix: resolve critical bugs in bot run cancellation and timeout support Fixes three critical issues identified by code review: 1. Fix parameter name mismatch: Agent.chat() expects 'cancel_token', not 'interrupt_controller' - Cooperative cancellation is now functional 2. Wire /stop command to all bot adapters (telegram, discord, slack, whatsapp) - Users can now actually cancel runs via /stop command 3. Prevent BotRunTimeout from being queued in DLQ to avoid infinite retry loops - Timeout exceptions are excluded from replay to prevent loops Co-authored-by: Mervin Praison <MervinPraison@users.noreply.github.com> --------- Co-authored-by: praisonai-triage-agent[bot] <272766704+praisonai-triage-agent[bot]@users.noreply.github.com> Co-authored-by: MervinPraison <MervinPraison@users.noreply.github.com> Co-authored-by: Claude <noreply@anthropic.com>
1 parent e56456a commit a6e4c26

7 files changed

Lines changed: 164 additions & 20 deletions

File tree

โ€Žsrc/praisonai-agents/praisonaiagents/gateway/protocols.pyโ€Ž

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ class EventType(str, Enum):
5353
# Message events
5454
MESSAGE = "message"
5555
MESSAGE_ACK = "message_ack"
56+
MESSAGE_ABORT = "message_abort"
5657
TYPING = "typing"
5758

5859
# Streaming events (relayed from agent's StreamEventEmitter)

โ€Žsrc/praisonai/praisonai/bots/_commands.pyโ€Ž

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,57 @@ def format_help(
7676
return "\n".join(lines)
7777

7878

79-
async def handle_stop_command(
79+
def handle_stop_command(session_manager, user_id: str) -> str:
80+
"""Handle a /stop command to cancel an active run.
81+
82+
This function works with both the legacy BotSessionManager approach
83+
and the newer SessionRunControl approach for maximum compatibility.
84+
85+
Args:
86+
session_manager: BotSessionManager instance or SessionRunControl
87+
user_id: User ID to cancel run for
88+
89+
Returns:
90+
Response message indicating success or failure
91+
"""
92+
# Handle SessionRunControl (newer approach)
93+
if hasattr(session_manager, 'stop'):
94+
try:
95+
import asyncio
96+
loop = asyncio.get_event_loop()
97+
if loop.is_running():
98+
# We're in an async context, need to use create_task
99+
task = asyncio.create_task(session_manager.stop(user_id))
100+
# This is a synchronous function, so we can't await
101+
# Return a message indicating async operation is happening
102+
return "โณ Cancellation requested..."
103+
else:
104+
# No event loop, run synchronously
105+
stopped = asyncio.run(session_manager.stop(user_id))
106+
if stopped:
107+
return "โœ… Current task cancelled. Send a new message to start fresh."
108+
else:
109+
return "โ„น๏ธ No active task to cancel."
110+
except Exception as e:
111+
return f"โŒ Error stopping task: {e}"
112+
113+
# Handle BotSessionManager (legacy approach)
114+
elif hasattr(session_manager, 'cancel_run'):
115+
was_cancelled = session_manager.cancel_run(user_id, "user_stop_command")
116+
if was_cancelled:
117+
return "โœ… Current task cancelled. Send a new message to start fresh."
118+
else:
119+
return "โ„น๏ธ No active task to cancel."
120+
121+
else:
122+
return "โŒ Stop command not available (run control not enabled)"
123+
124+
125+
async def handle_stop_command_async(
80126
user_id: str,
81127
run_control: Optional["SessionRunControl"] = None,
82128
) -> str:
83-
"""Handle /stop command to cancel current agent task.
129+
"""Async version of handle_stop_command for SessionRunControl.
84130
85131
Args:
86132
user_id: User identifier

โ€Žsrc/praisonai/praisonai/bots/_session.pyโ€Ž

Lines changed: 84 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@
2626
logger = logging.getLogger(__name__)
2727

2828

29+
class BotRunTimeout(Exception):
30+
"""Exception raised when a bot run times out."""
31+
pass
32+
33+
2934
class BotSessionManager:
3035
"""Lightweight per-user session store for bot agents.
3136
@@ -66,6 +71,7 @@ def __init__(
6671
identity_resolver: Optional[Any] = None,
6772
ingress_journal: Optional[Any] = None,
6873
run_control: Optional[Any] = None,
74+
run_timeout: float = 300.0, # 5 minutes default timeout
6975
) -> None:
7076
self._histories: Dict[str, List[Dict[str, Any]]] = {}
7177
self._locks: Dict[str, asyncio.Lock] = {}
@@ -89,6 +95,9 @@ def __init__(
8995
self._last_journal_key = None # Store key for delayed completion
9096
# Run control for in-flight message handling
9197
self._run_control = run_control
98+
# Run timeout and active run tracking for cancellation support
99+
self._run_timeout = run_timeout
100+
self._active_runs: Dict[str, Any] = {} # user_id -> InterruptController
92101

93102
def _storage_key(self, user_id: str) -> str:
94103
"""Resolve a raw platform user id to the in-memory/store key.
@@ -275,29 +284,67 @@ async def chat(
275284
async with agent_lock:
276285
saved_history = agent.chat_history
277286
agent.chat_history = user_history
287+
288+
# Create interrupt controller for this run and register it
289+
try:
290+
from praisonaiagents.agent.interrupt import InterruptController
291+
except ImportError:
292+
# Fallback if InterruptController is not available
293+
InterruptController = None
294+
295+
controller = InterruptController() if InterruptController else None
296+
storage_key = self._storage_key(user_id)
297+
if controller:
298+
self._active_runs[storage_key] = controller
299+
278300
try:
279301
# Choose streaming vs non-streaming path based on callback
280302
if stream_callback:
281-
# Streaming path: use agent.astart() with stream callback
282-
response = await agent.astart(prompt, stream_callback=stream_callback)
283-
# Handle AutonomyResult when autonomy is enabled in caller mode
284-
if hasattr(response, 'output'):
285-
response = response.output
303+
# Streaming path: use agent.astart() with stream callback and timeout
304+
try:
305+
# astart is async so we can use asyncio.wait_for directly
306+
response = await asyncio.wait_for(
307+
agent.astart(prompt, stream_callback=stream_callback),
308+
timeout=self._run_timeout if self._run_timeout > 0 else None,
309+
)
310+
# Handle AutonomyResult when autonomy is enabled in caller mode
311+
if hasattr(response, 'output'):
312+
response = response.output
313+
except asyncio.TimeoutError:
314+
if controller:
315+
controller.request("run timeout")
316+
raise BotRunTimeout(f"Agent run timed out after {self._run_timeout}s")
286317
else:
287-
# Legacy non-streaming path: use agent.chat() in executor
288-
# Copy current task's contextvars (incl. SessionContext)
289-
# into the worker thread so tools the agent invokes can
290-
# read platform/user metadata.
318+
# Legacy non-streaming path: use agent.chat() in executor with cancel_token and timeout
291319
import contextvars
320+
import inspect
321+
from functools import partial
292322
_ctx = contextvars.copy_context()
293-
response = await loop.run_in_executor(
294-
None, _ctx.run, agent.chat, prompt
295-
)
323+
324+
# Create agent.chat call with cancel_token if supported
325+
# Use inspect.signature for safer parameter checking
326+
_chat_params = inspect.signature(agent.chat).parameters if (controller and hasattr(agent, 'chat')) else {}
327+
if controller and 'cancel_token' in _chat_params:
328+
chat_call = partial(agent.chat, prompt, cancel_token=controller)
329+
else:
330+
chat_call = partial(agent.chat, prompt)
331+
332+
# Run with timeout and interruption support
333+
try:
334+
response = await asyncio.wait_for(
335+
loop.run_in_executor(None, _ctx.run, chat_call),
336+
timeout=self._run_timeout if self._run_timeout > 0 else None,
337+
)
338+
except asyncio.TimeoutError:
339+
if controller:
340+
controller.request("run timeout")
341+
raise BotRunTimeout(f"Agent run timed out after {self._run_timeout}s")
296342
# Capture updated history before restoring caller's.
297343
updated_history = agent.chat_history
298344
except Exception as exc:
299345
# N4: persist the failed inbound message before bubbling.
300-
if self._dlq is not None:
346+
# Skip DLQ for timeout exceptions to prevent infinite retry loops
347+
if self._dlq is not None and not isinstance(exc, BotRunTimeout):
301348
try:
302349
await loop.run_in_executor(
303350
None,
@@ -319,6 +366,9 @@ async def chat(
319366
raise
320367
finally:
321368
agent.chat_history = saved_history
369+
# Clean up active run tracking
370+
if controller:
371+
self._active_runs.pop(storage_key, None)
322372

323373
# Persist outside the agent_lock โ€” it's per-user and the agent
324374
# is no longer touched.
@@ -532,6 +582,27 @@ def reset(self, user_id: str) -> bool:
532582

533583
return existed
534584

585+
def cancel_run(self, user_id: str, reason: str = "user_cancel") -> bool:
586+
"""Cancel an active run for a user.
587+
588+
Args:
589+
user_id: Raw platform user id
590+
reason: Reason for cancellation
591+
592+
Returns:
593+
bool: True if there was an active run to cancel, False otherwise
594+
"""
595+
storage_key = self._storage_key(user_id)
596+
controller = self._active_runs.get(storage_key)
597+
if controller:
598+
controller.request(reason)
599+
return True
600+
return False
601+
602+
def get_active_runs(self) -> List[str]:
603+
"""Get list of user IDs with active runs."""
604+
return list(self._active_runs.keys())
605+
535606
def _add_mirror_entry_sync(self, user_id: str, entry: dict) -> bool:
536607
"""Thread-safe method to add a mirror entry to user's history.
537608

โ€Žsrc/praisonai/praisonai/bots/discord.pyโ€Ž

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
MessageType,
2525
)
2626

27-
from ._commands import format_status, format_help
27+
from ._commands import format_status, format_help, handle_stop_command
2828
from ._session import BotSessionManager
2929
from ._debounce import InboundDebouncer
3030
from ._ack import AckReactor
@@ -200,6 +200,11 @@ async def on_message(message):
200200
elif command == "help":
201201
await message.reply(self._format_help())
202202
return
203+
elif command == "stop":
204+
user_id = str(message.author.id)
205+
response = handle_stop_command(self._session, user_id)
206+
await message.reply(response)
207+
return
203208
elif command and command in self._command_handlers:
204209
handler = self._command_handlers[command]
205210
try:

โ€Žsrc/praisonai/praisonai/bots/slack.pyโ€Ž

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
)
2727

2828
from .media import split_media_from_output, is_audio_file
29-
from ._commands import format_status, format_help
29+
from ._commands import format_status, format_help, handle_stop_command
3030
from ._session import BotSessionManager
3131
from ._debounce import InboundDebouncer
3232
from ._ack import AckReactor
@@ -213,6 +213,11 @@ async def handle_message(event, say):
213213
elif text == "/help":
214214
await say(text=self._format_help(), thread_ts=event.get("ts"))
215215
return
216+
elif text == "/stop":
217+
user_id = event.get("user", "unknown")
218+
response = handle_stop_command(self._session, user_id)
219+
await say(text=response, thread_ts=event.get("ts"))
220+
return
216221

217222
for handler in self._message_handlers:
218223
try:

โ€Žsrc/praisonai/praisonai/bots/telegram.pyโ€Ž

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
)
2828

2929
from .media import split_media_from_output, is_audio_file
30-
from ._commands import format_status, format_help
30+
from ._commands import format_status, format_help, handle_stop_command
3131
from ._session import BotSessionManager
3232
from ._debounce import InboundDebouncer
3333
from ._ack import AckReactor
@@ -398,9 +398,20 @@ async def handle_help(update: Update, context: ContextTypes.DEFAULT_TYPE):
398398
return
399399
await update.message.reply_text(self._format_help())
400400

401+
async def handle_stop(update: Update, context: ContextTypes.DEFAULT_TYPE):
402+
if not update.message:
403+
return
404+
message = await process_inbound_telegram_message(update, self)
405+
if not message:
406+
return
407+
user_id = message.sender.user_id if message.sender else "unknown"
408+
response = handle_stop_command(self._session, user_id)
409+
await update.message.reply_text(response)
410+
401411
self._application.add_handler(CommandHandler("status", handle_status))
402412
self._application.add_handler(CommandHandler("new", handle_new))
403413
self._application.add_handler(CommandHandler("help", handle_help))
414+
self._application.add_handler(CommandHandler("stop", handle_stop))
404415

405416
for command in self._command_handlers:
406417
self._application.add_handler(CommandHandler(command, handle_command))

โ€Žsrc/praisonai/praisonai/bots/whatsapp.pyโ€Ž

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
MessageType,
4444
)
4545

46-
from ._commands import format_status, format_help
46+
from ._commands import format_status, format_help, handle_stop_command
4747
from ._session import BotSessionManager
4848
from ._rate_limit import RateLimiter
4949
from ._ack import AckReactor
@@ -175,13 +175,18 @@ async def _help(msg):
175175
extra = {
176176
name: info.get("description", "")
177177
for name, info in self._command_info.items()
178-
if name not in ("status", "new", "help")
178+
if name not in ("status", "new", "help", "stop")
179179
}
180180
return format_help(self._agent, "whatsapp", extra or None)
181181

182+
async def _stop(msg):
183+
user_id = msg.sender.user_id if msg.sender else "unknown"
184+
return handle_stop_command(self._session_mgr, user_id)
185+
182186
self.register_command("status", _status, description="Show bot status and info")
183187
self.register_command("new", _new, description="Reset conversation session")
184188
self.register_command("help", _help, description="Show this help message")
189+
self.register_command("stop", _stop, description="Cancel the current agent run")
185190

186191
# โ”€โ”€ Properties โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
187192

0 commit comments

Comments
ย (0)
โšก