Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 137 additions & 15 deletions nexum_ai/optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
Semantic cache and query optimizer using local embedding models
"""
import logging
import math
import numpy as np
import time
from typing import Optional, List, Dict, Any
import json
import os
Expand Down Expand Up @@ -61,6 +63,7 @@ def __init__(self, similarity_threshold: float = 0.95, cache_file: str = "semant
self.cache: List[Dict] = []
self.similarity_threshold = similarity_threshold
self.model = None
self.max_age_seconds: Optional[float] = None # None = no TTL

# Support environment variable for cache file path
cache_file_env = os.environ.get('NEXUMDB_CACHE_FILE', cache_file)
Expand Down Expand Up @@ -115,25 +118,81 @@ def cosine_similarity(self, vec1: List[float], vec2: List[float]) -> float:

return float(dot_product / (norm1 * norm2))

def _is_entry_expired(self, entry: Dict, now: Optional[float] = None) -> bool:
"""Check if a cache entry has exceeded its TTL.

Args:
entry: Cache entry dict, expected to contain a 'timestamp' key.
now: Current time as a Unix timestamp. If *None*, ``time.time()``
is called. Callers iterating over many entries should snapshot
the current time once and pass it in to avoid redundant
syscalls and subtle inconsistencies.

Returns:
True if the entry is expired, False otherwise.
Entries without a timestamp are never considered expired.
"""
if self.max_age_seconds is None:
return False
timestamp = entry.get('timestamp')
if timestamp is None:
# Legacy entries without a timestamp are kept (not expired)
return False
if now is None:
now = time.time()
return (now - timestamp) > self.max_age_seconds

def _evict_expired(self) -> int:
"""Remove all expired cache entries.

Returns:
Number of entries removed.
"""
if self.max_age_seconds is None:
return 0
now = time.time()
before = len(self.cache)
self.cache = [e for e in self.cache if not self._is_entry_expired(e, now=now)]
removed = before - len(self.cache)
if removed > 0:
logger.info(f"Evicted {removed} expired cache entries")
return removed

def get(self, query: str) -> Optional[str]:
"""Retrieve cached result if similar query exists"""
"""Retrieve cached result if similar query exists.

Expired entries (based on TTL) are skipped during lookup.
When expired entries are detected they are opportunistically
evicted so future lookups don't degrade over time.
"""
query_vec = self.vectorize(query)

now = time.time()
found_expired = False

for entry in self.cache:
# Skip expired entries and flag for cleanup
if self._is_entry_expired(entry, now=now):
found_expired = True
continue
similarity = self.cosine_similarity(query_vec, entry['vector'])
if similarity >= self.similarity_threshold:
logger.info(f"Cache hit! Similarity: {similarity:.4f}")
if found_expired:
self._evict_expired()
return entry['result']


if found_expired:
self._evict_expired()
return None

def put(self, query: str, result: str) -> None:
"""Store query and result in cache"""
"""Store query and result in cache with a creation timestamp."""
query_vec = self.vectorize(query)
self.cache.append({
'query': query,
'vector': query_vec,
'result': result
'result': result,
'timestamp': time.time()
})
logger.info(f"Cached query: {query[:50]}...")

Expand Down Expand Up @@ -248,7 +307,8 @@ def save_cache_json(self, filepath: Optional[str] = None) -> None:
'cache': self.cache,
'similarity_threshold': self.similarity_threshold,
'cache_size': len(self.cache),
'format_version': '1.0'
'format_version': '1.1',
'max_age_seconds': self.max_age_seconds,
}

with open(filepath, 'w') as f:
Expand Down Expand Up @@ -278,8 +338,17 @@ def load_cache_json(self, filepath: Optional[str] = None) -> None:

self.cache = data.get('cache', [])
self.similarity_threshold = data.get('similarity_threshold', self.similarity_threshold)

# Always restore persisted TTL state so that loading
# a no-TTL cache into a TTL-enabled instance correctly
# clears the TTL rather than keeping the stale value.
saved_max_age = data.get('max_age_seconds')
self.max_age_seconds = float(saved_max_age) if saved_max_age is not None else None

logger.info(f"Semantic cache loaded from JSON: {filepath} ({len(self.cache)} entries)")

# Evict entries that became stale while the process was down
self._evict_expired()
Comment thread
coderabbitai[bot] marked this conversation as resolved.

except Exception:
logger.exception("Error loading cache from JSON")
Expand All @@ -288,14 +357,26 @@ def load_cache_json(self, filepath: Optional[str] = None) -> None:
logger.debug(f"No JSON cache file found at {filepath}")

def get_cache_stats(self) -> Dict[str, Any]:
"""Get cache statistics"""
return {
"""Get cache statistics including TTL information."""
try:
cache_size_bytes = self.cache_path.stat().st_size
except OSError:
cache_size_bytes = 0

stats: Dict[str, Any] = {
'total_entries': len(self.cache),
'similarity_threshold': self.similarity_threshold,
'cache_file': str(self.cache_path),
'cache_exists': self.cache_path.exists(),
'cache_size_bytes': self.cache_path.stat().st_size if self.cache_path.exists() else 0
'cache_size_bytes': cache_size_bytes,
}
if self.max_age_seconds is not None:
now = time.time()
stats['max_age_hours'] = self.max_age_seconds / 3600.0
# Count how many entries are currently expired
expired = sum(1 for e in self.cache if self._is_entry_expired(e, now=now))
stats['expired_entries'] = expired
return stats
Comment thread
coderabbitai[bot] marked this conversation as resolved.

def explain_query(self, query: str) -> Dict[str, Any]:
"""
Expand Down Expand Up @@ -345,8 +426,11 @@ def explain_query(self, query: str) -> Dict[str, Any]:
best_match = None
best_similarity = 0.0

# Analyze cache entries safely
# Analyze cache entries safely (skip expired)
now = time.time()
for i, entry in enumerate(self.cache):
if self._is_entry_expired(entry, now=now):
continue
try:
similarity = self.cosine_similarity(query_vec, entry.get('vector', []))
except Exception as e:
Expand Down Expand Up @@ -389,11 +473,49 @@ def explain_query(self, query: str) -> Dict[str, Any]:
'top_matches': cache_analysis[:5] # Top 5 similar cached queries
}

def set_cache_expiration(self, max_age_hours: int = 24) -> None:
"""Remove cache entries older than specified hours (future enhancement)"""
# This would require adding timestamps to cache entries
# For now, just a placeholder for TTL functionality
logger.info(f"Cache expiration set to {max_age_hours} hours (not yet implemented)")
def set_cache_expiration(self, max_age_hours: Optional[float] = 24) -> int:
"""Set or disable TTL and immediately evict stale cache entries.

After calling this method every subsequent :meth:`get` call will
transparently skip entries that have exceeded the TTL, and every
:meth:`save_cache` / :meth:`save_cache_json` call will persist the
TTL setting so it survives restarts.

Pass ``None`` to disable TTL entirely (all entries are kept
regardless of age).

Args:
max_age_hours: Maximum age of a cache entry in hours.
Must be a positive finite number, or ``None`` to disable TTL.
Booleans, strings, NaN, and infinite values are rejected.

Returns:
Number of expired entries that were evicted (always 0 when
disabling TTL).

Raises:
ValueError: If *max_age_hours* is not a positive finite number
(and is not ``None``), or is a non-numeric type.
"""
if max_age_hours is None:
self.max_age_seconds = None
logger.info("Cache expiration disabled")
return 0

if not isinstance(max_age_hours, (int, float)) or isinstance(max_age_hours, bool):
raise ValueError("max_age_hours must be a positive finite number or None")

if not math.isfinite(max_age_hours) or max_age_hours <= 0:
raise ValueError("max_age_hours must be a positive finite number")

self.max_age_seconds = max_age_hours * 3600.0
Comment thread
coderabbitai[bot] marked this conversation as resolved.
evicted = self._evict_expired()
logger.info(
"Cache expiration set to %.2f hours – evicted %d stale entries",
max_age_hours,
evicted,
)
return evicted
Comment thread
coderabbitai[bot] marked this conversation as resolved.

def optimize_cache(self, max_entries: int = 1000) -> None:
"""Remove oldest entries if cache exceeds max size"""
Expand Down
Loading
Loading