Skip to content

Commit 2f316f4

Browse files
merge: resolve conflicts integrating streaming with durability
- Preserve streaming functionality with DraftStreamer integration - Maintain message journaling and run control from main - Combine best features from both branches: * Progressive streaming with proper error handling * Durable message processing with crash recovery * Media handling in streaming mode * Message hooks integration * Placeholder cleanup on errors and cancellation Co-authored-by: Mervin Praison <MervinPraison@users.noreply.github.com>
2 parents 736f192 + 2f9677a commit 2f316f4

50 files changed

Lines changed: 2542 additions & 284 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

AGENTS.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# Agent Instructions
2+
3+
You are working on the PraisonAI project.
4+
5+
## Project Guidelines
6+
- Follow the existing code style and conventions
7+
- Be concise and helpful in responses
8+
- Test implementation thoroughly
9+
- Ensure backward compatibility with existing APIs
10+
- Follow protocol-driven design: core protocols in praisonaiagents/, heavy implementations in praisonai/

docker/Dockerfile.chat

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ RUN mkdir -p /root/.praison
1616
# Install Python packages (using latest versions)
1717
RUN pip install --no-cache-dir \
1818
praisonai_tools \
19-
"praisonai>=4.6.56" \
19+
"praisonai>=4.6.57" \
2020
"praisonai[chat]" \
2121
"embedchain[github,youtube]"
2222

docker/Dockerfile.dev

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ RUN mkdir -p /root/.praison
2020
# Install Python packages (using latest versions)
2121
RUN pip install --no-cache-dir \
2222
praisonai_tools \
23-
"praisonai>=4.6.56" \
23+
"praisonai>=4.6.57" \
2424
"praisonai[ui]" \
2525
"praisonai[chat]" \
2626
"praisonai[realtime]" \

docker/Dockerfile.ui

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ RUN mkdir -p /root/.praison
1616
# Install Python packages (using latest versions)
1717
RUN pip install --no-cache-dir \
1818
praisonai_tools \
19-
"praisonai>=4.6.56" \
19+
"praisonai>=4.6.57" \
2020
"praisonai[ui]" \
2121
"praisonai[crewai]"
2222

Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Example: Bot In-Flight Run Control
4+
5+
Demonstrates how to use SessionRunControl to provide better UX for bots
6+
during long-running agent operations. Shows busy feedback, pending message
7+
handling, and /stop command support.
8+
9+
This example shows the "before" and "after" behavior described in issue #1914.
10+
"""
11+
12+
import asyncio
13+
import time
14+
from typing import Optional
15+
16+
# Mock agent for demonstration
17+
class MockAgent:
18+
"""Simple mock agent that simulates long-running tasks."""
19+
20+
def __init__(self, name="assistant"):
21+
self.name = name
22+
self.chat_history = []
23+
self._interrupt_controller = None
24+
25+
def chat(self, prompt: str) -> str:
26+
"""Simulate a long-running chat response."""
27+
print(f"[Agent] Starting to process: '{prompt[:50]}...'")
28+
29+
# Simulate long processing with interrupt checking
30+
for i in range(30): # 3 second task (30 x 0.1s)
31+
time.sleep(0.1)
32+
33+
# Check for interruption
34+
if (self._interrupt_controller and
35+
hasattr(self._interrupt_controller, 'is_set') and
36+
self._interrupt_controller.is_set()):
37+
reason = getattr(self._interrupt_controller, 'reason', 'unknown')
38+
print(f"[Agent] ⚠️ Interrupted: {reason}")
39+
raise InterruptedError(f"Task cancelled: {reason}")
40+
41+
response = f"Completed analysis of: {prompt}"
42+
print(f"[Agent] ✅ Finished: {response}")
43+
return response
44+
45+
46+
async def demo_without_run_control():
47+
"""Demonstrate the old behavior (silent blocking)."""
48+
print("\n" + "="*60)
49+
print("🔴 BEFORE: Without Run Control (Silent Blocking)")
50+
print("="*60)
51+
52+
from praisonai.bots._session import BotSessionManager
53+
54+
# Standard session manager (no run control)
55+
session_mgr = BotSessionManager()
56+
agent = MockAgent("research-agent")
57+
user_id = "user123"
58+
59+
print("\n1. User sends long task:")
60+
print(" User: 'research quantum computing trends'")
61+
62+
# Start first task (this will run for 3 seconds)
63+
task1 = asyncio.create_task(
64+
session_mgr.chat(agent, user_id, "research quantum computing trends")
65+
)
66+
67+
# Simulate user sending follow-up messages during execution
68+
await asyncio.sleep(0.5)
69+
print("\n2. User sends follow-up (this will block silently):")
70+
print(" User: 'actually focus on business impact'")
71+
72+
task2 = asyncio.create_task(
73+
session_mgr.chat(agent, user_id, "actually focus on business impact")
74+
)
75+
76+
await asyncio.sleep(0.5)
77+
print("\n3. User tries to send /stop (this will also block):")
78+
print(" User: '/stop'")
79+
80+
task3 = asyncio.create_task(
81+
session_mgr.chat(agent, user_id, "/stop")
82+
)
83+
84+
print("\n⏳ Waiting for all messages to complete...")
85+
results = await asyncio.gather(task1, task2, task3)
86+
87+
print("\n📊 Results (all processed sequentially with no feedback):")
88+
for i, result in enumerate(results, 1):
89+
print(f" {i}. {result}")
90+
91+
print("\n❌ Problems with this approach:")
92+
print(" - No immediate feedback on follow-up messages")
93+
print(" - No way to cancel long-running tasks")
94+
print(" - User doesn't know if bot is working or broken")
95+
96+
97+
async def demo_with_run_control():
98+
"""Demonstrate the new behavior with run control."""
99+
print("\n" + "="*60)
100+
print("🟢 AFTER: With Run Control (Better UX)")
101+
print("="*60)
102+
103+
from praisonai.bots._run_control import SessionRunControl
104+
from praisonai.bots._session import BotSessionManager
105+
from praisonai.bots._commands import handle_stop_command
106+
107+
# Session manager with run control
108+
run_control = SessionRunControl(
109+
busy_mode="queue",
110+
busy_ack_template="⏳ {action} — will process after current task finishes"
111+
)
112+
session_mgr = BotSessionManager(run_control=run_control)
113+
agent = MockAgent("research-agent")
114+
user_id = "user123"
115+
116+
print("\n1. User sends long task:")
117+
print(" User: 'research quantum computing trends'")
118+
119+
# Start first task
120+
task1 = asyncio.create_task(
121+
session_mgr.chat_with_run_control(agent, user_id, "research quantum computing trends")
122+
)
123+
124+
# Immediate follow-up messages get acknowledgments
125+
await asyncio.sleep(0.5)
126+
print("\n2. User sends follow-up (gets immediate feedback):")
127+
print(" User: 'actually focus on business impact'")
128+
129+
result2 = await session_mgr.chat_with_run_control(
130+
agent, user_id, "actually focus on business impact"
131+
)
132+
print(f" Bot: {result2['response']}")
133+
print(f" Metadata: {result2['metadata']}")
134+
135+
await asyncio.sleep(0.5)
136+
print("\n3. User decides to stop current task:")
137+
print(" User: '/stop'")
138+
139+
stop_response = await handle_stop_command(user_id, run_control)
140+
print(f" Bot: {stop_response}")
141+
142+
# Wait for first task to complete (it should be cancelled)
143+
try:
144+
result1 = await task1
145+
print(f"\n📊 Task 1 result: {result1['response']}")
146+
print(f" Metadata: {result1['metadata']}")
147+
except asyncio.CancelledError:
148+
print("\n📊 Task 1 was cancelled as expected")
149+
150+
print("\n4. User sends fresh request after stopping:")
151+
print(" User: 'what's the weather like?'")
152+
153+
result4 = await session_mgr.chat_with_run_control(
154+
agent, user_id, "what's the weather like?"
155+
)
156+
print(f" Bot: {result4['response']}")
157+
158+
print("\n✅ Benefits of this approach:")
159+
print(" - Immediate feedback on all messages")
160+
print(" - Can cancel long-running tasks with /stop")
161+
print(" - Pending messages are queued and processed in order")
162+
print(" - User always knows what's happening")
163+
164+
165+
async def demo_interrupt_mode():
166+
"""Demonstrate interrupt mode."""
167+
print("\n" + "="*60)
168+
print("⚡ BONUS: Interrupt Mode (Cancel and Restart)")
169+
print("="*60)
170+
171+
from praisonai.bots._run_control import SessionRunControl
172+
from praisonai.bots._session import BotSessionManager
173+
174+
# Use interrupt mode instead of queue mode
175+
run_control = SessionRunControl(busy_mode="interrupt")
176+
session_mgr = BotSessionManager(run_control=run_control)
177+
agent = MockAgent("research-agent")
178+
user_id = "user123"
179+
180+
print("\n1. User starts long task:")
181+
print(" User: 'research renewable energy for 30 pages'")
182+
183+
task1 = asyncio.create_task(
184+
session_mgr.chat_with_run_control(agent, user_id, "research renewable energy for 30 pages")
185+
)
186+
187+
await asyncio.sleep(0.5)
188+
print("\n2. User changes mind (interrupts and restarts):")
189+
print(" User: 'actually just summarize solar panel efficiency'")
190+
191+
result2 = await session_mgr.chat_with_run_control(
192+
agent, user_id, "actually just summarize solar panel efficiency"
193+
)
194+
print(f" Bot: {result2['response']}")
195+
196+
# Wait for original task (should be interrupted)
197+
try:
198+
result1 = await task1
199+
print(f"\n📊 Original task result: {result1['response']}")
200+
if result1['metadata'].get('interrupted'):
201+
print(" ✅ Original task was properly interrupted")
202+
except:
203+
print("\n📊 Original task was cancelled")
204+
205+
print("\n✅ Interrupt mode is great for:")
206+
print(" - When users frequently change direction")
207+
print(" - Interactive sessions where latest intent matters most")
208+
print(" - Real-time collaboration scenarios")
209+
210+
211+
async def main():
212+
"""Run all demonstrations."""
213+
print("🚀 Bot In-Flight Run Control Demo")
214+
print("Solving the silent lock problem described in issue #1914")
215+
216+
# Show the problem
217+
await demo_without_run_control()
218+
219+
# Show the solution
220+
await demo_with_run_control()
221+
222+
# Show advanced feature
223+
await demo_interrupt_mode()
224+
225+
print("\n" + "="*60)
226+
print("🎯 Summary")
227+
print("="*60)
228+
print("SessionRunControl solves the silent blocking problem by providing:")
229+
print("1. Immediate feedback for mid-run messages")
230+
print("2. /stop command support via InterruptController")
231+
print("3. Configurable policies (queue/interrupt/steer)")
232+
print("4. Run generation tracking to prevent race conditions")
233+
print("5. Pending message merging and ordering")
234+
print("\nThis makes bot interactions feel responsive and predictable!")
235+
236+
237+
if __name__ == "__main__":
238+
asyncio.run(main())

src/praisonai-agents/praisonaiagents/bots/config.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,16 @@ class BotConfig:
9191
# When set, stale sessions older than this are auto-reaped.
9292
session_ttl: int = 0
9393

94+
# In-flight run control settings for mid-run message handling
95+
# busy_mode: Policy for handling messages during active runs
96+
# - "queue": Queue message for next turn (default)
97+
# - "interrupt": Cancel current run and start new one
98+
# - "steer": Inject message into current run via steering
99+
busy_mode: str = "queue"
100+
101+
# Template for busy acknowledgment messages (use {action} placeholder)
102+
busy_ack: str = "⏳ {action} — will be considered next"
103+
94104
# Workspace settings for file operation containment and security
95105
workspace_dir: Optional[str] = None # default: ~/.praisonai/workspaces/<scope>/<session_key>
96106
workspace_access: str = "rw" # "rw" (read-write) | "ro" (read-only) | "none" (copy-on-write sandbox)
@@ -142,6 +152,8 @@ def to_dict(self) -> Dict[str, Any]:
142152
"ack_emoji": self.ack_emoji,
143153
"done_emoji": self.done_emoji,
144154
"session_ttl": self.session_ttl,
155+
"busy_mode": self.busy_mode,
156+
"busy_ack": self.busy_ack,
145157
"workspace_dir": self.workspace_dir,
146158
"workspace_access": self.workspace_access,
147159
"workspace_scope": self.workspace_scope,

src/praisonai-agents/praisonaiagents/memory/file_memory.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,16 @@ class FileMemory:
115115
"importance_threshold": 0.7, # Min importance for long-term
116116
"auto_promote": True, # Auto-promote important short-term to long-term
117117
}
118+
119+
@staticmethod
120+
def _sanitise_user_id(user_id: str) -> str:
121+
"""Reject path traversal in user_id before using it as a directory name."""
122+
if not user_id or not isinstance(user_id, str):
123+
return "default"
124+
if ".." in user_id or "/" in user_id or "\\" in user_id:
125+
raise ValueError("user_id must not contain path separators or parent references")
126+
safe = user_id.strip()
127+
return safe or "default"
118128

119129
def __init__(
120130
self,
@@ -132,7 +142,7 @@ def __init__(
132142
config: Configuration overrides
133143
verbose: Verbosity level (0=quiet, 1=info, 2+=debug)
134144
"""
135-
self.user_id = user_id
145+
self.user_id = self._sanitise_user_id(user_id)
136146
self.verbose = verbose
137147

138148
# Set up paths

src/praisonai-agents/praisonaiagents/server/server.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,16 @@ def __init__(
117117
self._server_thread: Optional[threading.Thread] = None
118118
self._app = None
119119
self._server = None
120+
121+
def _authorise_request(self, request) -> bool:
122+
"""Verify bearer token when auth_token is configured."""
123+
token = self.config.auth_token
124+
if not token:
125+
return True
126+
auth = request.headers.get("Authorization", "")
127+
if auth.startswith("Bearer ") and auth[7:] == token:
128+
return True
129+
return request.headers.get("X-Auth-Token") == token
120130

121131
def _create_app(self):
122132
"""Create the ASGI application."""
@@ -137,6 +147,8 @@ async def health(request):
137147
})
138148

139149
async def events(request):
150+
if not self._authorise_request(request):
151+
return JSONResponse({"error": "Unauthorized"}, status_code=401)
140152
client_id = str(uuid.uuid4())
141153
client = SSEClient(client_id)
142154
self._clients[client_id] = client
@@ -162,6 +174,8 @@ async def event_generator():
162174
)
163175

164176
async def publish(request):
177+
if not self._authorise_request(request):
178+
return JSONResponse({"error": "Unauthorized"}, status_code=401)
165179
try:
166180
data = await request.json()
167181
event_type = data.get("type", "message")

src/praisonai-agents/praisonaiagents/tools/ast_grep_tool.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
from praisonaiagents._logging import get_logger
3232
from typing import Optional, List
3333

34+
from ..approval import require_approval
35+
3436
logger = get_logger(__name__)
3537

3638
# Availability cache for performance (checked once per process)
@@ -146,6 +148,7 @@ def ast_grep_search(
146148
logger.exception("Unexpected error in ast_grep_search")
147149
return f"Error: {e}"
148150

151+
@require_approval(risk_level="high")
149152
def ast_grep_rewrite(
150153
pattern: str,
151154
replacement: str,

src/praisonai-agents/praisonaiagents/tools/spider_tools.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,11 @@ def _ip_blocked(ip: ipaddress._BaseAddress) -> bool:
5858
pass
5959

6060
try:
61-
return _ip_blocked(ipaddress.ip_address(socket.inet_aton(host)))
62-
except OSError:
63-
pass
61+
for info in socket.getaddrinfo(host, None):
62+
if _ip_blocked(ipaddress.ip_address(info[4][0])):
63+
return True
64+
except socket.gaierror:
65+
return True
6466

6567
return False
6668

0 commit comments

Comments
 (0)