Skip to content

Commit 15d18ce

Browse files
author
Wojciech Napierała
committed
Enhance Redis memory management with automatic reconnection and fallback handling
- Added resilient Redis reconnection logic and fallback to in-memory storage when Redis is unavailable. - Implemented integration tests for Docker-backed Redis, validating TTL expiry and service reconnection. - Updated CHANGELOG to document these enhancements and changes.
1 parent 8594f43 commit 15d18ce

4 files changed

Lines changed: 278 additions & 45 deletions

File tree

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,14 @@
22

33
All notable changes to this project will be documented in this file. The format follows [Keep a Changelog](https://keepachangelog.com/) and dates use ISO-8601 (`YYYY-MM-DD`).
44

5+
## [0.1.6] - 2025-11-08
6+
7+
### Added
8+
- Docker-backed Redis integration tests validating TTL expiry, service reconnection, and fallback behaviour.
9+
10+
### Changed
11+
- Hardened Redis working-memory store with automatic reconnection and graceful fallback to in-memory storage when the service is unavailable.
12+
513
## [0.1.5] - 2025-11-07
614

715
### Changed

core/memory_manager.py

Lines changed: 100 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
v0.4 - 2025-11-07 - Added semantic graph utilities and relation management APIs.
88
v0.5 - 2025-11-07 - Instrumented memory operations with telemetry spans and metrics.
99
v0.6 - 2025-11-07 - Added retrieval helpers, tamper-evident revision hashing, and replay utilities.
10+
v0.7 - 2025-11-08 - Added resilient Redis reconnection and fallback handling.
1011
"""
1112

1213
from __future__ import annotations
@@ -240,76 +241,85 @@ def __init__(self, config: AppConfig) -> None:
240241
self._logger.warning(
241242
"Redis python client unavailable; using in-memory fallback store."
242243
)
243-
else:
244-
try:
245-
self._client = redis_module.Redis(
246-
host=self._host,
247-
port=self._port,
248-
db=self._db,
249-
socket_timeout=2,
250-
)
251-
# probe the connection lazily
252-
self._client.ping()
253-
except Exception as exc: # pragma: no cover - runtime check
254-
self._logger.error(
255-
"Redis connection failed (%s); falling back to in-memory store.",
256-
exc,
257-
)
258-
self._client = None
244+
return
245+
246+
self._initialise_client()
259247

260248
def put(self, item: WorkingMemoryItem) -> None:
261249
"""Store an item in working memory."""
262-
try:
263-
if self._client:
264-
self._client.setex(
250+
if self._ensure_client():
251+
client = cast(Any, self._client)
252+
try:
253+
client.setex(
265254
name=item.key,
266255
time=item.ttl_seconds or self._ttl_seconds,
267256
value=json.dumps(asdict(item), default=str),
268257
)
269-
else:
270-
self._fallback[item.key] = item
271-
except Exception as exc: # pragma: no cover - client failure path
272-
raise MemoryError(f"Failed to store working memory item: {exc}") from exc
258+
return
259+
except Exception as exc: # pragma: no cover - client failure path
260+
self._logger.warning(
261+
"Redis store failed (%s); switching to in-memory fallback.", exc
262+
)
263+
self._client = None
264+
265+
self._fallback[item.key] = item
273266

274267
def get(self, key: str) -> Optional[WorkingMemoryItem]:
275268
"""Retrieve an item from working memory."""
276-
try:
277-
if self._client:
278-
payload_bytes = self._client.get(key)
269+
if self._ensure_client():
270+
client = cast(Any, self._client)
271+
try:
272+
payload_bytes = client.get(key)
279273
if payload_bytes is None:
280274
return None
281275
data = cast(Dict[str, object], json.loads(payload_bytes.decode("utf-8")))
282276
key_value = str(data.get("key", ""))
283277
payload_value = cast(Dict[str, object], data.get("payload", {}))
284278
ttl_value_raw = data.get("ttl_seconds", self._ttl_seconds)
285-
ttl_value = int(ttl_value_raw) if isinstance(ttl_value_raw, (int, float)) else self._ttl_seconds
279+
ttl_value = (
280+
int(ttl_value_raw)
281+
if isinstance(ttl_value_raw, (int, float))
282+
else self._ttl_seconds
283+
)
286284
return WorkingMemoryItem(
287285
key=key_value,
288286
payload=payload_value,
289287
ttl_seconds=ttl_value,
290288
created_at=self._coerce_timestamp(data.get("created_at")),
291289
)
292-
return self._fallback.get(key)
293-
except Exception as exc: # pragma: no cover
294-
raise MemoryError(f"Failed to load working memory item: {exc}") from exc
290+
except Exception as exc: # pragma: no cover
291+
self._logger.warning(
292+
"Redis retrieval failed (%s); falling back to in-memory store.",
293+
exc,
294+
)
295+
self._client = None
296+
297+
return self._fallback.get(key)
295298

296299
def delete(self, key: str) -> None:
297300
"""Remove an item from working memory if present."""
298-
try:
299-
if self._client:
300-
self._client.delete(key)
301-
else:
302-
self._fallback.pop(key, None)
303-
except Exception as exc: # pragma: no cover
304-
raise MemoryError(f"Failed to delete working memory item: {exc}") from exc
301+
if self._ensure_client():
302+
client = cast(Any, self._client)
303+
try:
304+
client.delete(key)
305+
return
306+
except Exception as exc: # pragma: no cover
307+
self._logger.warning(
308+
"Redis deletion failed (%s); removing item from fallback store.",
309+
exc,
310+
)
311+
self._client = None
312+
313+
self._fallback.pop(key, None)
305314

306315
def list_items(self, pattern: str = "*") -> List[WorkingMemoryItem]:
307316
"""Return working memory items matching the given pattern."""
308-
try:
309-
if self._client:
317+
if self._ensure_client():
318+
client = cast(Any, self._client)
319+
try:
310320
items: List[WorkingMemoryItem] = []
311-
for key in self._client.scan_iter(match=pattern):
312-
payload_bytes = self._client.get(key)
321+
for key in client.scan_iter(match=pattern):
322+
payload_bytes = client.get(key)
313323
if not payload_bytes:
314324
continue
315325
data = cast(Dict[str, object], json.loads(payload_bytes.decode("utf-8")))
@@ -318,7 +328,11 @@ def list_items(self, pattern: str = "*") -> List[WorkingMemoryItem]:
318328
)
319329
payload_value = cast(Dict[str, object], data.get("payload", {}))
320330
ttl_raw = data.get("ttl_seconds", self._ttl_seconds)
321-
ttl_value = int(ttl_raw) if isinstance(ttl_raw, (int, float)) else self._ttl_seconds
331+
ttl_value = (
332+
int(ttl_raw)
333+
if isinstance(ttl_raw, (int, float))
334+
else self._ttl_seconds
335+
)
322336
items.append(
323337
WorkingMemoryItem(
324338
key=key_value,
@@ -328,9 +342,14 @@ def list_items(self, pattern: str = "*") -> List[WorkingMemoryItem]:
328342
)
329343
)
330344
return items
331-
return list(self._fallback.values())
332-
except Exception as exc: # pragma: no cover
333-
raise MemoryError(f"Failed to enumerate working memory: {exc}") from exc
345+
except Exception as exc: # pragma: no cover
346+
self._logger.warning(
347+
"Redis enumeration failed (%s); returning fallback store state.",
348+
exc,
349+
)
350+
self._client = None
351+
352+
return list(self._fallback.values())
334353

335354
@staticmethod
336355
def _coerce_timestamp(value: Optional[object]) -> datetime:
@@ -345,6 +364,42 @@ def _coerce_timestamp(value: Optional[object]) -> datetime:
345364
return parsed if parsed.tzinfo else parsed.replace(tzinfo=timezone.utc)
346365
return datetime.now(timezone.utc)
347366

367+
def _initialise_client(self) -> None:
368+
"""Initialise the Redis client if the dependency is available."""
369+
try:
370+
self._client = self._attempt_connect()
371+
except Exception as exc: # pragma: no cover - runtime environment dependent
372+
self._logger.error(
373+
"Redis connection failed during initialisation (%s); using in-memory fallback.",
374+
exc,
375+
)
376+
self._client = None
377+
378+
def _attempt_connect(self) -> Optional[Any]:
379+
if redis_module is None:
380+
return None
381+
client = redis_module.Redis(
382+
host=self._host,
383+
port=self._port,
384+
db=self._db,
385+
socket_timeout=2,
386+
)
387+
client.ping()
388+
return client
389+
390+
def _ensure_client(self) -> bool:
391+
"""Ensure a live Redis client is available, reconnecting if needed."""
392+
if redis_module is None:
393+
return False
394+
if self._client is None:
395+
try:
396+
self._client = self._attempt_connect()
397+
except Exception as exc: # pragma: no cover - runtime dependent
398+
self._logger.debug("Redis reconnect attempt failed: %s", exc)
399+
self._client = None
400+
return False
401+
return self._client is not None
402+
348403

349404
class ChromaMemoryStore:
350405
"""Adapter around ChromaDB for episodic, semantic, and review memory."""

pyproject.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,8 @@ module = [
3939
"openai.*",
4040
]
4141
ignore_missing_imports = true
42+
43+
[tool.pytest.ini_options]
44+
markers = [
45+
"integration: marks tests that exercise external services",
46+
]

tests/test_redis_integration.py

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
"""Integration-style tests exercising the Redis working memory store.
2+
3+
Updates:
4+
v0.1 - 2025-11-08 - Added dockerised Redis coverage for TTL expiry,
5+
reconnection, and fallback behaviour.
6+
"""
7+
8+
from __future__ import annotations
9+
10+
import shutil
11+
import subprocess
12+
import time
13+
from pathlib import Path
14+
from typing import Iterator
15+
16+
import pytest
17+
import redis
18+
19+
from config.settings import load_app_config
20+
from core.memory_manager import RedisMemoryStore
21+
from models.memory import WorkingMemoryItem
22+
23+
PROJECT_ROOT = Path(__file__).resolve().parents[1]
24+
COMPOSE_FILE = PROJECT_ROOT / "docker-compose.yml"
25+
REDIS_SERVICE = "redis"
26+
27+
28+
class _RedisServiceController:
29+
"""Helper for managing the dockerised Redis service during tests."""
30+
31+
def __init__(self, compose_file: Path) -> None:
32+
self._compose_file = compose_file
33+
34+
def run(self, *args: str) -> None:
35+
command = [
36+
"docker",
37+
"compose",
38+
"-f",
39+
str(self._compose_file),
40+
*args,
41+
]
42+
subprocess.run(command, check=True, cwd=str(PROJECT_ROOT))
43+
44+
def up(self) -> None:
45+
self.run("up", "-d", REDIS_SERVICE)
46+
47+
def stop(self) -> None:
48+
self.run("stop", REDIS_SERVICE)
49+
50+
def start(self) -> None:
51+
self.run("start", REDIS_SERVICE)
52+
53+
def down(self) -> None:
54+
self.run("rm", "-sf", REDIS_SERVICE)
55+
56+
def ensure_ready(self, timeout: float = 15.0) -> None:
57+
client = redis.Redis(host="localhost", port=6379, db=5, socket_timeout=1)
58+
deadline = time.monotonic() + timeout
59+
while time.monotonic() < deadline:
60+
try:
61+
if client.ping():
62+
return
63+
except redis.exceptions.ConnectionError:
64+
time.sleep(0.25)
65+
raise RuntimeError("Redis service did not become ready in time")
66+
67+
68+
@pytest.fixture(scope="module")
69+
def redis_service() -> Iterator[_RedisServiceController]:
70+
if shutil.which("docker") is None:
71+
pytest.skip("Docker is required for Redis integration tests.")
72+
if not COMPOSE_FILE.exists():
73+
pytest.skip("docker-compose.yml not available for Redis integration tests.")
74+
75+
controller = _RedisServiceController(COMPOSE_FILE)
76+
controller.up()
77+
try:
78+
try:
79+
controller.ensure_ready()
80+
except RuntimeError as exc:
81+
pytest.skip(str(exc))
82+
yield controller
83+
finally:
84+
controller.down()
85+
86+
87+
def _build_store(monkeypatch: pytest.MonkeyPatch, tmp_path: Path, ttl: int = 3) -> RedisMemoryStore:
88+
monkeypatch.setenv("DRM_MEMORY_LOG_PATH", str(tmp_path / "revisions.jsonl"))
89+
config = load_app_config()
90+
config.memory.redis.ttl_seconds = ttl
91+
return RedisMemoryStore(config)
92+
93+
94+
@pytest.mark.integration
95+
def test_redis_working_memory_ttl_expiry(
96+
redis_service: _RedisServiceController,
97+
monkeypatch: pytest.MonkeyPatch,
98+
tmp_path: Path,
99+
) -> None:
100+
store = _build_store(monkeypatch, tmp_path, ttl=1)
101+
item = WorkingMemoryItem(
102+
key="ttl:test",
103+
payload={"value": "ephemeral"},
104+
ttl_seconds=1,
105+
)
106+
107+
store.put(item)
108+
assert store.get(item.key) is not None
109+
110+
time.sleep(2)
111+
assert store.get(item.key) is None
112+
113+
114+
@pytest.mark.integration
115+
def test_redis_store_recovers_after_restart(
116+
redis_service: _RedisServiceController,
117+
monkeypatch: pytest.MonkeyPatch,
118+
tmp_path: Path,
119+
) -> None:
120+
store = _build_store(monkeypatch, tmp_path)
121+
122+
first_item = WorkingMemoryItem(
123+
key="reconnect:initial",
124+
payload={"attempt": 1},
125+
ttl_seconds=10,
126+
)
127+
store.put(first_item)
128+
assert store.get(first_item.key) is not None
129+
130+
redis_service.stop()
131+
time.sleep(1.0)
132+
redis_service.start()
133+
redis_service.ensure_ready()
134+
135+
second_item = WorkingMemoryItem(
136+
key="reconnect:subsequent",
137+
payload={"attempt": 2},
138+
ttl_seconds=10,
139+
)
140+
141+
store.put(second_item)
142+
assert store.get(second_item.key) is not None
143+
144+
145+
def test_redis_fallback_store_when_unavailable(
146+
monkeypatch: pytest.MonkeyPatch,
147+
tmp_path: Path,
148+
) -> None:
149+
monkeypatch.setenv("DRM_MEMORY_LOG_PATH", str(tmp_path / "fallback.jsonl"))
150+
config = load_app_config()
151+
config.memory.redis.port = 6390
152+
config.memory.redis.ttl_seconds = 3
153+
154+
store = RedisMemoryStore(config)
155+
156+
item = WorkingMemoryItem(
157+
key="fallback:item",
158+
payload={"value": "cached"},
159+
ttl_seconds=3,
160+
)
161+
store.put(item)
162+
163+
retrieved = store.get(item.key)
164+
assert retrieved is not None
165+
assert retrieved.payload == item.payload

0 commit comments

Comments
 (0)