Skip to content

Commit c175691

Browse files
fix: wrapper layer stale locks and dead imports (fixes #1971) (#1972)
* fix: wrapper layer stale locks and dead imports (fixes #1971) - Remove dead imports from agents_generator.py (load_dotenv, AutoGenerator, PraisonAIModel) - Replace BotSessionManager id()-based locks with WeakKeyDictionary to prevent corruption - Create LockMap utility with LRU eviction and TTL cleanup - Refactor duplicated lock patterns in _session, _debounce, and _run_control Co-authored-by: MervinPraison <MervinPraison@users.noreply.github.com> * fix: address reviewer feedback on LockMap implementation - Fix AttributeError: Change _session.py reset() to use .drop() instead of .pop() - Fix LRU eviction logic: Continue past locked entries instead of breaking - Update docstring: Clarify asyncio-safe vs thread-safe guarantees Co-authored-by: Mervin Praison <MervinPraison@users.noreply.github.com> --------- Co-authored-by: praisonai-triage-agent[bot] <272766704+praisonai-triage-agent[bot]@users.noreply.github.com> Co-authored-by: MervinPraison <MervinPraison@users.noreply.github.com>
1 parent 2ece9d5 commit c175691

5 files changed

Lines changed: 106 additions & 24 deletions

File tree

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
"""Per-key asyncio.Lock map with bounded LRU eviction and optional TTL.
2+
3+
Use this anywhere you need 'a lock per (user_id|chat_id|key)' instead of
4+
re-implementing the dict-of-Lock pattern.
5+
"""
6+
from __future__ import annotations
7+
import asyncio
8+
import time
9+
from collections import OrderedDict
10+
from typing import Hashable
11+
12+
13+
class LockMap:
14+
"""Asyncio-safe per-key asyncio.Lock map with automatic cleanup.
15+
16+
Provides a lock per key with LRU eviction and TTL-based cleanup to prevent
17+
unbounded memory growth in long-running applications.
18+
19+
Note: Safe for use within a single asyncio event loop (cooperative
20+
multitasking means no interleaving between non-awaited calls), but NOT
21+
safe for concurrent access from multiple OS threads.
22+
"""
23+
24+
def __init__(self, *, max_entries: int = 10_000, ttl_seconds: float = 3600.0):
25+
"""Initialize LockMap with bounds.
26+
27+
Args:
28+
max_entries: Maximum number of locks to cache (LRU eviction)
29+
ttl_seconds: Time-to-live for unused locks in seconds
30+
"""
31+
self._locks: "OrderedDict[Hashable, tuple[asyncio.Lock, float]]" = OrderedDict()
32+
self._max = max_entries
33+
self._ttl = ttl_seconds
34+
35+
def get(self, key: Hashable) -> asyncio.Lock:
36+
"""Get or create a lock for the given key.
37+
38+
Args:
39+
key: The key to get a lock for
40+
41+
Returns:
42+
asyncio.Lock for the key
43+
"""
44+
now = time.monotonic()
45+
entry = self._locks.get(key)
46+
if entry is not None:
47+
lock, _ = entry
48+
# Move to end (most recently used) and update timestamp
49+
self._locks.move_to_end(key)
50+
self._locks[key] = (lock, now)
51+
return lock
52+
53+
# Create new lock + insert + LRU/TTL evict
54+
lock = asyncio.Lock()
55+
self._locks[key] = (lock, now)
56+
self._evict_stale(now)
57+
return lock
58+
59+
def _evict_stale(self, now: float) -> None:
60+
"""Evict expired and excess entries."""
61+
# Expire by TTL (only if not currently locked)
62+
expired = [
63+
k for k, (lock, ts) in self._locks.items()
64+
if (now - ts) > self._ttl and not lock.locked()
65+
]
66+
for k in expired:
67+
self._locks.pop(k, None)
68+
69+
# Cap by LRU (don't evict locks currently held)
70+
while len(self._locks) > self._max:
71+
k, (lock, _) = next(iter(self._locks.items()))
72+
if lock.locked():
73+
# Don't evict locks currently held; bump them to the end
74+
self._locks.move_to_end(k)
75+
# Continue trying to evict other unlocked entries
76+
continue
77+
self._locks.popitem(last=False)
78+
break
79+
80+
def drop(self, key: Hashable) -> None:
81+
"""Manually remove a lock for the given key."""
82+
self._locks.pop(key, None)
83+
84+
def size(self) -> int:
85+
"""Return current number of cached locks."""
86+
return len(self._locks)

src/praisonai/praisonai/agents_generator.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,6 @@
55
from .version import __version__
66
import yaml, os
77
from rich import print
8-
from dotenv import load_dotenv
9-
from .auto import AutoGenerator
10-
from .inc import PraisonAIModel
118
import inspect
129
from pathlib import Path
1310
import importlib

src/praisonai/praisonai/bots/_debounce.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import asyncio
1818
import logging
1919
from typing import Dict, List
20+
from .._lockmap import LockMap
2021

2122
logger = logging.getLogger(__name__)
2223

@@ -43,12 +44,10 @@ def __init__(
4344
self._buffers: Dict[str, List[str]] = {}
4445
self._timers: Dict[str, asyncio.TimerHandle] = {}
4546
self._futures: Dict[str, List[asyncio.Future]] = {}
46-
self._locks: Dict[str, asyncio.Lock] = {}
47+
self._locks = LockMap()
4748

4849
def _get_lock(self, user_id: str) -> asyncio.Lock:
49-
if user_id not in self._locks:
50-
self._locks[user_id] = asyncio.Lock()
51-
return self._locks[user_id]
50+
return self._locks.get(user_id)
5251

5352
async def debounce(self, user_id: str, text: str) -> str:
5453
"""Buffer *text* for *user_id* and return coalesced result.

src/praisonai/praisonai/bots/_run_control.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import time
3939
from enum import Enum
4040
from typing import Dict, Optional, Any, TYPE_CHECKING
41+
from .._lockmap import LockMap
4142

4243
if TYPE_CHECKING:
4344
from praisonaiagents.agent.interrupt import InterruptController
@@ -99,13 +100,11 @@ def __init__(
99100

100101
self._busy_ack_template = busy_ack_template
101102
self._sessions: Dict[str, SessionRunState] = {}
102-
self._locks: Dict[str, asyncio.Lock] = {}
103+
self._locks = LockMap()
103104

104105
def _get_lock(self, user_id: str) -> asyncio.Lock:
105106
"""Get or create lock for user."""
106-
if user_id not in self._locks:
107-
self._locks[user_id] = asyncio.Lock()
108-
return self._locks[user_id]
107+
return self._locks.get(user_id)
109108

110109
def _get_session(self, user_id: str) -> SessionRunState:
111110
"""Get or create session state for user."""
@@ -338,7 +337,7 @@ def cleanup_stale_sessions(self, max_age_seconds: int = 3600) -> int:
338337

339338
for user_id in stale_users:
340339
del self._sessions[user_id]
341-
self._locks.pop(user_id, None)
340+
self._locks.drop(user_id)
342341

343342
if stale_users:
344343
logger.debug(f"SessionRunControl: cleaned up {len(stale_users)} stale sessions")

src/praisonai/praisonai/bots/_session.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
import asyncio
1919
import logging
2020
import time
21+
import weakref
2122
from typing import Any, Dict, List, Optional, TYPE_CHECKING
23+
from .._lockmap import LockMap
2224

2325
if TYPE_CHECKING:
2426
from praisonaiagents import Agent
@@ -74,8 +76,8 @@ def __init__(
7476
run_timeout: float = 300.0, # 5 minutes default timeout
7577
) -> None:
7678
self._histories: Dict[str, List[Dict[str, Any]]] = {}
77-
self._locks: Dict[str, asyncio.Lock] = {}
78-
self._agent_locks: Dict[int, asyncio.Lock] = {}
79+
self._locks = LockMap()
80+
self._agent_locks: "weakref.WeakKeyDictionary[Any, asyncio.Lock]" = weakref.WeakKeyDictionary()
7981
self._max_history = max_history
8082
self._store = store
8183
self._platform = platform
@@ -131,16 +133,15 @@ def _session_key(self, user_id: str) -> str:
131133
def _get_lock(self, user_id: str) -> asyncio.Lock:
132134
"""Get or create an asyncio.Lock for *user_id* (storage-keyed)."""
133135
key = self._storage_key(user_id)
134-
if key not in self._locks:
135-
self._locks[key] = asyncio.Lock()
136-
return self._locks[key]
136+
return self._locks.get(key)
137137

138138
def _get_agent_lock(self, agent: "Agent") -> asyncio.Lock:
139-
"""Get or create a lock for the *agent* instance (by id)."""
140-
agent_id = id(agent)
141-
if agent_id not in self._agent_locks:
142-
self._agent_locks[agent_id] = asyncio.Lock()
143-
return self._agent_locks[agent_id]
139+
"""Get or create a lock for the *agent* instance (using WeakKeyDictionary)."""
140+
lock = self._agent_locks.get(agent)
141+
if lock is None:
142+
lock = asyncio.Lock()
143+
self._agent_locks[agent] = lock
144+
return lock
144145

145146
def _load_history(self, user_id: str) -> List[Dict[str, Any]]:
146147
"""Load user history from store (if available) or in-memory cache."""
@@ -537,7 +538,7 @@ def reap_stale(self, max_age_seconds: int) -> int:
537538
for storage_key in stale:
538539
self._histories.pop(storage_key, None)
539540
self._last_active.pop(storage_key, None)
540-
self._locks.pop(storage_key, None)
541+
self._locks.drop(storage_key)
541542
if self._store is not None:
542543
key = self._persist_key(storage_key)
543544
try:
@@ -571,7 +572,7 @@ def reset(self, user_id: str) -> bool:
571572
existed = storage_key in self._histories
572573
self._histories.pop(storage_key, None)
573574
self._last_active.pop(storage_key, None)
574-
self._locks.pop(storage_key, None)
575+
self._locks.drop(storage_key)
575576

576577
if self._store is not None:
577578
persist_key = self._session_key(user_id)

0 commit comments

Comments
 (0)