From eef50dc0b061e889e15c28f0f9e38fb9e6f9d25b Mon Sep 17 00:00:00 2001 From: Bibhu Pradhan Date: Tue, 10 Feb 2026 17:34:19 +0530 Subject: [PATCH 1/3] feat: Configurable AI Embedding Model --- nexum_ai/optimizer.py | 67 ++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 63 insertions(+), 4 deletions(-) diff --git a/nexum_ai/optimizer.py b/nexum_ai/optimizer.py index 3f119b3..5cfcf21 100644 --- a/nexum_ai/optimizer.py +++ b/nexum_ai/optimizer.py @@ -19,6 +19,8 @@ def __init__(self, similarity_threshold: float = 0.95, cache_file: str = "semant self.cache: List[Dict] = [] self.similarity_threshold = similarity_threshold self.model = None + self.hf_model = None + self.hf_tokenizer = None # Support environment variable for cache file path cache_file_env = os.environ.get('NEXUMDB_CACHE_FILE', cache_file) @@ -33,22 +35,79 @@ def __init__(self, similarity_threshold: float = 0.95, cache_file: str = "semant def initialize_model(self) -> None: """Initialize local embedding model - deferred to avoid import errors""" + model_name = os.environ.get('NEXUM_EMBEDDING_MODEL', 'all-MiniLM-L6-v2') + + # Try SentenceTransformer first (preferred for embedding models) try: from sentence_transformers import SentenceTransformer - self.model = SentenceTransformer('all-MiniLM-L6-v2') - print("Semantic cache initialized with all-MiniLM-L6-v2") + self.model = SentenceTransformer(model_name) + print(f"Semantic cache initialized with SentenceTransformer: {model_name}") + self.hf_model = None + self.hf_tokenizer = None + return except ImportError: - print("Warning: sentence-transformers not installed, using fallback") + print("Warning: sentence-transformers not installed, trying transformers fallback") + except Exception as e: + print(f"Warning: Failed to load with SentenceTransformer ({e}), trying transformers fallback") + + # Fallback to generic HuggingFace transformers + try: + from transformers import AutoTokenizer, AutoModel + import torch + + # If default model was used but ST failed, we might want a different default for raw transformers + # but usually the same model name works for both if it's on HF Hub. + # However, 'all-MiniLM-L6-v2' is a sentence-transformers specific alias often mapped to + # 'sentence-transformers/all-MiniLM-L6-v2' on HF Hub. + if model_name == 'all-MiniLM-L6-v2': + model_name = 'sentence-transformers/all-MiniLM-L6-v2' + + self.hf_tokenizer = AutoTokenizer.from_pretrained(model_name) + self.hf_model = AutoModel.from_pretrained(model_name) self.model = None + print(f"Semantic cache initialized with HuggingFace transformers: {model_name}") + except ImportError: + print("Warning: transformers not installed, using simple fallback") + self.model = None + self.hf_model = None + self.hf_tokenizer = None + except Exception as e: + print(f"Warning: Failed to load with transformers ({e}), using simple fallback") + self.model = None + self.hf_model = None + self.hf_tokenizer = None def vectorize(self, text: str) -> List[float]: """Convert text to embedding vector""" - if self.model is None: + if self.model is None and self.hf_model is None: self.initialize_model() if self.model is not None: embedding = self.model.encode(text) return embedding.tolist() + elif self.hf_model is not None and self.hf_tokenizer is not None: + try: + import torch + # Tokenize and compute embedding + inputs = self.hf_tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512) + with torch.no_grad(): + outputs = self.hf_model(**inputs) + + # Mean pooling + # attention_mask shape: (batch, seq_len) + # last_hidden_state shape: (batch, seq_len, hidden_dim) + attention_mask = inputs['attention_mask'] + token_embeddings = outputs.last_hidden_state + + input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float() + sum_embeddings = torch.sum(token_embeddings * input_mask_expanded, 1) + sum_mask = torch.clamp(input_mask_expanded.sum(1), min=1e-9) + + embedding = sum_embeddings / sum_mask + return embedding[0].tolist() + except Exception as e: + print(f"Error during HF vectorization: {e}, using fallback") + return self._fallback_vectorize(text) else: return self._fallback_vectorize(text) From 432c19ca224ddd0741ecaea658b5e84256cd6689 Mon Sep 17 00:00:00 2001 From: Bibhu Pradhan Date: Wed, 11 Feb 2026 00:02:59 +0530 Subject: [PATCH 2/3] chore: resolve merge conflicts with main --- reproduce_issue.py | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 reproduce_issue.py diff --git a/reproduce_issue.py b/reproduce_issue.py new file mode 100644 index 0000000..5cf105d --- /dev/null +++ b/reproduce_issue.py @@ -0,0 +1,7 @@ +from nexum_ai.optimizer import SemanticCache +import os + +print("--- Default Behavior ---") +cache = SemanticCache() +# Force initialization +cache.initialize_model() From 0a7479e33eac9ed8086d124ec8e83d3e71383dcb Mon Sep 17 00:00:00 2001 From: Bibhu Pradhan Date: Wed, 18 Feb 2026 23:49:05 +0530 Subject: [PATCH 3/3] persisted Semantic Cache to SQLite --- examples/cache_persistence_demo.py | 8 +- nexum_ai/optimizer.py | 505 +++++++++++++++++------ nexum_ai/tests/test_cache_integration.py | 8 +- nexum_core/src/bridge/mod.rs | 2 +- 4 files changed, 399 insertions(+), 124 deletions(-) diff --git a/examples/cache_persistence_demo.py b/examples/cache_persistence_demo.py index 923fd1b..618fb7b 100644 --- a/examples/cache_persistence_demo.py +++ b/examples/cache_persistence_demo.py @@ -22,7 +22,7 @@ def demo_cache_persistence(): # Session 1: Populate cache print("\nSession 1: Populating semantic cache...") - cache1 = SemanticCache(cache_file="demo_cache.pkl") + cache1 = SemanticCache(cache_file="demo_cache.sqlite") # Simulate typical database queries demo_queries = [ @@ -52,7 +52,7 @@ def demo_cache_persistence(): # Session 2: Load from disk print("\nSession 2: Loading cache from disk...") - cache2 = SemanticCache(cache_file="demo_cache.pkl") + cache2 = SemanticCache(cache_file="demo_cache.sqlite") stats2 = cache2.get_cache_stats() print(f" Cache loaded: {stats2['total_entries']} entries") @@ -105,7 +105,7 @@ def demo_cache_persistence(): # Environment variable configuration print("\nEnvironment variable configuration:") print(" Set NEXUMDB_CACHE_FILE to customize cache location") - print(" Example: export NEXUMDB_CACHE_FILE=my_custom_cache.pkl") + print(" Example: export NEXUMDB_CACHE_FILE=my_custom_cache.sqlite") # Cleanup print("\nCleaning up demo files...") @@ -133,4 +133,4 @@ def demo_cache_persistence(): print("\n\nWarning: Demo interrupted by user") except Exception as e: print(f"\nError: Demo failed: {e}") - sys.exit(1) \ No newline at end of file + sys.exit(1) diff --git a/nexum_ai/optimizer.py b/nexum_ai/optimizer.py index 962ffc1..cd0128e 100644 --- a/nexum_ai/optimizer.py +++ b/nexum_ai/optimizer.py @@ -6,6 +6,9 @@ from typing import Optional, List, Dict, Any import json import os +import sqlite3 +import time +from contextlib import contextmanager from pathlib import Path logger = logging.getLogger(__name__) @@ -33,7 +36,7 @@ def _get_default_cache() -> 'SemanticCache': SemanticCache: Module-level default cache instance """ global _default_cache, _default_cache_file - current_cache_file = os.environ.get('NEXUMDB_CACHE_FILE', "semantic_cache.pkl") + current_cache_file = os.environ.get('NEXUMDB_CACHE_FILE', "semantic_cache.sqlite") if _default_cache is None or _default_cache_file != current_cache_file: _default_cache = SemanticCache(cache_file=current_cache_file) _default_cache_file = current_cache_file @@ -54,15 +57,18 @@ class SemanticCache: """ Caches query results using semantic similarity Uses local embedding models only - Supports persistence to disk via JSON or pickle files + Supports persistence to disk via SQLite (with optional legacy JSON/pickle import) """ - def __init__(self, similarity_threshold: float = 0.95, cache_file: str = "semantic_cache.pkl") -> None: + def __init__(self, similarity_threshold: float = 0.95, cache_file: str = "semantic_cache.sqlite") -> None: self.cache: List[Dict] = [] self.similarity_threshold = similarity_threshold self.model = None self.hf_model = None self.hf_tokenizer = None + self._memory_entries_limit = int(os.environ.get('NEXUMDB_CACHE_MEMORY_ENTRIES', '1000')) + self._db_ok = False + self._legacy_candidates: List[Path] = [] # Support environment variable for cache file path cache_file_env = os.environ.get('NEXUMDB_CACHE_FILE', cache_file) @@ -70,10 +76,223 @@ def __init__(self, similarity_threshold: float = 0.95, cache_file: str = "semant self.cache_dir = Path("cache") self.cache_dir.mkdir(exist_ok=True) - self.cache_path = self.cache_dir / self.cache_file + requested_path = Path(self.cache_file) + if requested_path.is_absolute(): + requested_full_path = requested_path + else: + requested_full_path = self.cache_dir / requested_path + + suffix = requested_full_path.suffix.lower() + if suffix in {'.sqlite', '.sqlite3', '.db'}: + self.cache_path = requested_full_path + else: + self.cache_path = requested_full_path.with_suffix('.sqlite') + self._legacy_candidates = [requested_full_path] + if suffix == '.pkl': + self._legacy_candidates.append(requested_full_path.with_suffix('.json')) + elif suffix == '.json': + self._legacy_candidates.append(requested_full_path.with_suffix('.pkl')) + else: + self._legacy_candidates.append(requested_full_path.with_suffix('.json')) + self._legacy_candidates.append(requested_full_path.with_suffix('.pkl')) # Load existing cache on initialization self.load_cache() + + def _connect(self) -> sqlite3.Connection: + return sqlite3.connect(str(self.cache_path), timeout=30) + + @contextmanager + def _db(self) -> Any: + conn = self._connect() + try: + yield conn + conn.commit() + finally: + conn.close() + + def _init_db(self) -> None: + try: + with self._db() as conn: + conn.execute("PRAGMA journal_mode=WAL;") + conn.execute("PRAGMA synchronous=NORMAL;") + conn.execute( + """ + CREATE TABLE IF NOT EXISTS cache_entries ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + query TEXT NOT NULL, + vector BLOB NOT NULL, + result TEXT NOT NULL, + created_at INTEGER NOT NULL + ) + """ + ) + conn.execute( + """ + CREATE TABLE IF NOT EXISTS meta ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL + ) + """ + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_cache_entries_created_at ON cache_entries(created_at)" + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_cache_entries_id ON cache_entries(id)" + ) + self._db_ok = True + except Exception: + logger.exception("Failed to initialize SQLite cache backend; using in-memory cache only") + self._db_ok = False + + def _vector_to_blob(self, vec: List[float]) -> bytes: + return np.asarray(vec, dtype=np.float32).tobytes() + + def _blob_to_vector(self, blob: bytes) -> List[float]: + return np.frombuffer(blob, dtype=np.float32).astype(float).tolist() + + def _db_entry_count(self) -> int: + if not self._db_ok: + return len(self.cache) + try: + with self._db() as conn: + row = conn.execute("SELECT COUNT(*) FROM cache_entries").fetchone() + return int(row[0]) if row else 0 + except Exception: + logger.exception("Failed to count SQLite cache entries") + return len(self.cache) + + def _load_recent_entries_into_memory(self) -> None: + if not self._db_ok or self._memory_entries_limit <= 0: + self.cache = [] + return + try: + with self._db() as conn: + rows = conn.execute( + "SELECT id, query, vector, result FROM cache_entries ORDER BY id DESC LIMIT ?", + (self._memory_entries_limit,), + ).fetchall() + entries = [] + for row in reversed(rows): + entry_id, query, vector_blob, result = row + entries.append( + { + 'id': int(entry_id), + 'query': query, + 'vector': self._blob_to_vector(vector_blob), + 'result': result, + } + ) + self.cache = entries + except Exception: + logger.exception("Failed to load recent cache entries into memory") + self.cache = [] + + def _read_meta(self, key: str) -> Optional[str]: + if not self._db_ok: + return None + try: + with self._db() as conn: + row = conn.execute("SELECT value FROM meta WHERE key = ?", (key,)).fetchone() + return str(row[0]) if row else None + except Exception: + logger.exception("Failed to read SQLite cache meta") + return None + + def _write_meta(self, key: str, value: str) -> None: + if not self._db_ok: + return + try: + with self._db() as conn: + conn.execute( + "INSERT INTO meta(key, value) VALUES(?, ?) ON CONFLICT(key) DO UPDATE SET value=excluded.value", + (key, value), + ) + except Exception: + logger.exception("Failed to write SQLite cache meta") + + def _migrate_legacy_cache_if_needed(self) -> None: + if not self._db_ok: + return + if self._db_entry_count() > 0: + return + legacy_paths = [p for p in self._legacy_candidates if p.exists()] + if not legacy_paths: + return + for legacy_path in legacy_paths: + try: + entries: List[Dict[str, Any]] = [] + threshold: Optional[float] = None + if legacy_path.suffix.lower() == '.json': + with open(legacy_path, 'r') as f: + data = json.load(f) + entries = data.get('cache', []) + threshold_val = data.get('similarity_threshold') + threshold = float(threshold_val) if threshold_val is not None else None + elif legacy_path.suffix.lower() == '.pkl': + import pickle + + class RestrictedUnpickler(pickle.Unpickler): + ALLOWED_CLASSES = { + ('builtins', 'dict'), + ('builtins', 'list'), + ('builtins', 'str'), + ('builtins', 'int'), + ('builtins', 'float'), + ('builtins', 'bool'), + ('builtins', 'tuple'), + ('builtins', 'set'), + ('builtins', 'frozenset'), + } + + def find_class(self, module: str, name: str) -> type: + if (module, name) not in self.ALLOWED_CLASSES: + raise pickle.UnpicklingError(f"Forbidden class: {module}.{name}") + return super().find_class(module, name) + + with open(legacy_path, 'rb') as f: + data = RestrictedUnpickler(f).load() + entries = data.get('cache', []) + threshold_val = data.get('similarity_threshold') + threshold = float(threshold_val) if threshold_val is not None else None + else: + continue + + valid_entries = [] + for entry in entries: + if not isinstance(entry, dict): + continue + if all(k in entry for k in ['query', 'vector', 'result']): + valid_entries.append(entry) + + if threshold is not None: + self.similarity_threshold = threshold + + if valid_entries: + now = int(time.time()) + with self._db() as conn: + conn.executemany( + "INSERT INTO cache_entries(query, vector, result, created_at) VALUES(?, ?, ?, ?)", + [ + ( + str(e['query']), + self._vector_to_blob(list(e['vector'])), + str(e['result']), + now, + ) + for e in valid_entries + ], + ) + self._write_meta('similarity_threshold', str(self.similarity_threshold)) + logger.info( + "Migrated legacy cache %s into SQLite backend (%d entries)", + str(legacy_path), + len(valid_entries), + ) + return + except Exception: + logger.exception("Failed to migrate legacy cache file %s", str(legacy_path)) def initialize_model(self) -> None: """Initialize local embedding model - deferred to avoid import errors""" @@ -183,17 +402,69 @@ def get(self, query: str) -> Optional[str]: if similarity >= self.similarity_threshold: logger.info(f"Cache hit! Similarity: {similarity:.4f}") return entry['result'] - + if not self._db_ok: + return None + + min_loaded_id: Optional[int] = None + if self.cache: + try: + min_loaded_id = min(int(e.get('id', 0)) for e in self.cache if e.get('id') is not None) + except Exception: + min_loaded_id = None + + try: + with self._db() as conn: + if min_loaded_id is not None and min_loaded_id > 0: + cursor = conn.execute( + "SELECT id, vector, result FROM cache_entries WHERE id < ? ORDER BY id DESC", + (min_loaded_id,), + ) + else: + cursor = conn.execute( + "SELECT id, vector, result FROM cache_entries ORDER BY id DESC" + ) + for _, vector_blob, result in cursor: + vec2 = np.frombuffer(vector_blob, dtype=np.float32) + vec1 = np.asarray(query_vec, dtype=np.float32) + dot_product = float(np.dot(vec1, vec2)) + norm1 = float(np.linalg.norm(vec1)) + norm2 = float(np.linalg.norm(vec2)) + if norm1 == 0.0 or norm2 == 0.0: + continue + similarity = dot_product / (norm1 * norm2) + if similarity >= self.similarity_threshold: + logger.info(f"Cache hit! Similarity: {similarity:.4f}") + return str(result) + except Exception: + logger.exception("SQLite cache lookup failed; falling back to in-memory only") + return None def put(self, query: str, result: str) -> None: """Store query and result in cache""" query_vec = self.vectorize(query) + entry_id: Optional[int] = None + if self._db_ok: + try: + with self._db() as conn: + cur = conn.execute( + "INSERT INTO cache_entries(query, vector, result, created_at) VALUES(?, ?, ?, ?)", + (query, self._vector_to_blob(query_vec), result, int(time.time())), + ) + entry_id = int(cur.lastrowid) if cur and cur.lastrowid is not None else None + self._write_meta('similarity_threshold', str(self.similarity_threshold)) + except Exception: + logger.exception("Failed to write to SQLite cache backend; storing in memory only") + entry_id = None + self.cache.append({ + 'id': entry_id, 'query': query, 'vector': query_vec, 'result': result }) + if self._memory_entries_limit > 0 and len(self.cache) > self._memory_entries_limit: + self.cache = self.cache[-self._memory_entries_limit:] logger.info(f"Cached query: {query[:50]}...") def clear(self) -> None: @@ -201,134 +472,103 @@ def clear(self) -> None: self.cache.clear() # Remove cache file when clearing if self.cache_path.exists(): - self.cache_path.unlink() - logger.info("Cache file deleted") + try: + self.cache_path.unlink() + logger.info("Cache file deleted") + except Exception: + logger.exception("Failed to delete cache file; attempting to clear tables") + if self._db_ok: + try: + with self._db() as conn: + conn.execute("DELETE FROM cache_entries") + conn.execute("DELETE FROM meta") + except Exception: + logger.exception("Failed to clear SQLite cache tables") def save_cache(self, filepath: Optional[str] = None) -> None: - """Save cache to disk using JSON format (secure default)""" - if filepath is None: - filepath = str(self.cache_path) - - # Use JSON format by default for security - json_filepath = filepath.replace('.pkl', '.json') if filepath.endswith('.pkl') else filepath - self.save_cache_json(json_filepath) + """Persist cache to SQLite backend (no-op for in-memory only mode)""" + if not self._db_ok: + return + self._write_meta('similarity_threshold', str(self.similarity_threshold)) + try: + with self._db() as conn: + conn.execute("PRAGMA wal_checkpoint(TRUNCATE);") + except Exception: + logger.exception("SQLite cache checkpoint failed") def load_cache(self, filepath: Optional[str] = None) -> None: - """Load cache from disk using JSON (safe) or pickle (legacy)""" - if filepath is None: - filepath = str(self.cache_path) - - # Try JSON first (safer format) - json_filepath = filepath.replace('.pkl', '.json') if filepath.endswith('.pkl') else f"{filepath}.json" - if os.path.exists(json_filepath): - self.load_cache_json(json_filepath) + """Load cache from SQLite backend (optionally migrating legacy JSON/pickle once)""" + self._init_db() + if not self._db_ok: + self.cache = [] return - - # Fall back to pickle for legacy files (with restricted unpickler for safety) - if os.path.exists(filepath) and filepath.endswith('.pkl'): - try: - import pickle - - # Use RestrictedUnpickler to limit allowed classes - class RestrictedUnpickler(pickle.Unpickler): - """Restricted unpickler that only allows safe types""" - ALLOWED_CLASSES = { - ('builtins', 'dict'), - ('builtins', 'list'), - ('builtins', 'str'), - ('builtins', 'int'), - ('builtins', 'float'), - ('builtins', 'bool'), - ('builtins', 'tuple'), - ('builtins', 'set'), - ('builtins', 'frozenset'), - } - - def find_class(self, module: str, name: str) -> type: - if (module, name) not in self.ALLOWED_CLASSES: - raise pickle.UnpicklingError( - f"Forbidden class: {module}.{name}" - ) - return super().find_class(module, name) - - with open(filepath, 'rb') as f: - data = RestrictedUnpickler(f).load() - - self.cache = data.get('cache', []) - self.similarity_threshold = data.get('similarity_threshold', self.similarity_threshold) - - logger.info( - "Semantic cache loaded from %s (%d entries)", - filepath, - len(self.cache), - ) - logger.info( - "Converting legacy pickle cache to JSON format for security" - ) - - # Validate cache entries - valid_entries = [] - for entry in self.cache: - if all(key in entry for key in ['query', 'vector', 'result']): - valid_entries.append(entry) - else: - logger.info("Warning: Invalid cache entry found and removed") - - self.cache = valid_entries - - # Auto-convert to JSON format for future use - self.save_cache_json(json_filepath) - + self._migrate_legacy_cache_if_needed() + threshold_str = self._read_meta('similarity_threshold') + if threshold_str is not None: + try: + self.similarity_threshold = float(threshold_str) except Exception: - logger.exception( - "Error loading semantic cache" - ) - - logger.debug( - "Starting with empty cache" - ) - self.cache = [] - else: - logger.debug(f"No cache file found at {filepath}, starting with empty cache") + pass + self._load_recent_entries_into_memory() def save_cache_json(self, filepath: Optional[str] = None) -> None: - """Save cache to JSON format (secure and portable)""" + """Export cache to JSON format (portable)""" if filepath is None: - filepath = str(self.cache_path).replace('.pkl', '.json') - + filepath = str(self.cache_path.with_suffix('.json')) + try: - # Create backup of existing cache backup_path = f"{filepath}.backup" if os.path.exists(filepath): os.rename(filepath, backup_path) - + + entries: List[Dict[str, Any]] = [] + if self._db_ok: + with self._db() as conn: + cursor = conn.execute( + "SELECT query, vector, result FROM cache_entries ORDER BY id ASC" + ) + for query, vector_blob, result in cursor: + entries.append( + { + 'query': str(query), + 'vector': self._blob_to_vector(vector_blob), + 'result': str(result), + } + ) + else: + entries = [ + {'query': e.get('query'), 'vector': e.get('vector'), 'result': e.get('result')} + for e in self.cache + ] + cache_data = { - 'cache': self.cache, + 'cache': entries, 'similarity_threshold': self.similarity_threshold, - 'cache_size': len(self.cache), + 'cache_size': len(entries), 'format_version': '1.0' } - + with open(filepath, 'w') as f: json.dump(cache_data, f, indent=2) - - logger.info(f"Semantic cache saved to {filepath} ({len(self.cache)} entries)") - - # Remove backup if save was successful + + logger.info(f"Semantic cache exported to JSON: {filepath} ({len(entries)} entries)") + if os.path.exists(backup_path): os.remove(backup_path) - except Exception: - logger.exception("Error saving cache to JSON") - # Restore backup if save failed - if os.path.exists(backup_path): - os.rename(backup_path, filepath) + logger.exception("Error exporting cache to JSON") + try: + backup_path = f"{filepath}.backup" + if os.path.exists(backup_path): + os.rename(backup_path, filepath) + except Exception: + logger.exception("Failed to restore JSON export backup") def load_cache_json(self, filepath: Optional[str] = None) -> None: """Load cache from JSON format""" if filepath is None: - filepath = str(self.cache_path).replace('.pkl', '.json') + filepath = str(self.cache_path.with_suffix('.json')) if os.path.exists(filepath): try: @@ -348,8 +588,9 @@ def load_cache_json(self, filepath: Optional[str] = None) -> None: def get_cache_stats(self) -> Dict[str, Any]: """Get cache statistics""" + total_entries = self._db_entry_count() return { - 'total_entries': len(self.cache), + 'total_entries': total_entries, 'similarity_threshold': self.similarity_threshold, 'cache_file': str(self.cache_path), 'cache_exists': self.cache_path.exists(), @@ -449,18 +690,52 @@ def explain_query(self, query: str) -> Dict[str, Any]: } def set_cache_expiration(self, max_age_hours: int = 24) -> None: - """Remove cache entries older than specified hours (future enhancement)""" - # This would require adding timestamps to cache entries - # For now, just a placeholder for TTL functionality - logger.info(f"Cache expiration set to {max_age_hours} hours (not yet implemented)") + """Remove cache entries older than specified hours""" + if not self._db_ok: + logger.info("Cache expiration skipped (SQLite backend unavailable)") + return + cutoff = int(time.time()) - int(max_age_hours * 3600) + try: + with self._db() as conn: + cur = conn.execute("DELETE FROM cache_entries WHERE created_at < ?", (cutoff,)) + removed = int(cur.rowcount) if cur and cur.rowcount is not None else 0 + logger.info(f"Cache expiration removed {removed} entries older than {max_age_hours} hours") + self._load_recent_entries_into_memory() + except Exception: + logger.exception("Cache expiration failed") def optimize_cache(self, max_entries: int = 1000) -> None: """Remove oldest entries if cache exceeds max size""" - if len(self.cache) > max_entries: - removed_count = len(self.cache) - max_entries - self.cache = self.cache[-max_entries:] # Keep most recent entries - logger.info(f"Cache optimized: removed {removed_count} oldest entries") + if max_entries <= 0: + self.clear() + return + if not self._db_ok: + if len(self.cache) > max_entries: + removed_count = len(self.cache) - max_entries + self.cache = self.cache[-max_entries:] + logger.info(f"Cache optimized: removed {removed_count} oldest entries") + return + + try: + total_entries = self._db_entry_count() + if total_entries <= max_entries: + return + remove_count = total_entries - max_entries + with self._db() as conn: + conn.execute( + """ + DELETE FROM cache_entries + WHERE id IN ( + SELECT id FROM cache_entries ORDER BY id ASC LIMIT ? + ) + """, + (remove_count,), + ) + logger.info(f"Cache optimized: removed {remove_count} oldest entries") self.save_cache() + self._load_recent_entries_into_memory() + except Exception: + logger.exception("Cache optimization failed") class QueryOptimizer: diff --git a/nexum_ai/tests/test_cache_integration.py b/nexum_ai/tests/test_cache_integration.py index 77603a2..1131e7e 100644 --- a/nexum_ai/tests/test_cache_integration.py +++ b/nexum_ai/tests/test_cache_integration.py @@ -47,8 +47,8 @@ def test_cache_persistence_lifecycle(): # Test 2: Verify file exists print("\n2. Verifying cache file creation...") - # SemanticCache saves as .json by default even if .pkl is provided - actual_cache_file = cache_file.replace('.pkl', '.json') + # SemanticCache persists to SQLite; legacy .pkl/.json paths are migrated on first run + actual_cache_file = cache_file.replace('.pkl', '.sqlite') assert os.path.exists(actual_cache_file), f"Cache file {actual_cache_file} should exist" file_size = os.path.getsize(actual_cache_file) print(f" Cache file exists ({file_size} bytes)") @@ -95,7 +95,7 @@ def test_cache_persistence_lifecycle(): print("\n7. Testing cache clearing...") cache2.clear() - assert not os.path.exists(cache_file), "Cache file should be deleted" + assert not os.path.exists(actual_cache_file), "Cache file should be deleted" print(" Cache cleared successfully") print("\nAll tests passed!") @@ -120,7 +120,7 @@ def test_environment_variable_config(): cache.save_cache() # Verify it used the custom path - actual_custom_cache = custom_cache.replace('.pkl', '.json') + actual_custom_cache = custom_cache.replace('.pkl', '.sqlite') assert os.path.exists(actual_custom_cache), f"Should use environment variable path: {actual_custom_cache}" print(" Environment variable configuration works") diff --git a/nexum_core/src/bridge/mod.rs b/nexum_core/src/bridge/mod.rs index 8aab073..be81cb4 100644 --- a/nexum_core/src/bridge/mod.rs +++ b/nexum_core/src/bridge/mod.rs @@ -68,7 +68,7 @@ pub struct SemanticCache { impl SemanticCache { pub fn new() -> Result { - Self::with_cache_file("semantic_cache.pkl") + Self::with_cache_file("semantic_cache.sqlite") } pub fn with_cache_file(cache_file: &str) -> Result {