Skip to content

Commit 7cba5f1

Browse files
author
Wojciech Napierała
committed
Add SLO tracking and mitigation planning in SelfAdjustingController
- Introduced latency and quality SLO tracking in the SelfAdjustingController to monitor performance metrics. - Implemented a mitigation plan that captures SLO breaches and recommends actions based on drift advisories. - Enhanced LiveTaskLoop to utilize the controller's mitigation plans and persist relevant summaries. - Updated memory manager with retrieval helpers and revision logging improvements for better state management. - Added tests to verify SLO breach recording and controller plan functionality.
1 parent e2880cb commit 7cba5f1

5 files changed

Lines changed: 428 additions & 21 deletions

File tree

core/controller.py

Lines changed: 93 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
v0.3 - 2025-11-06 - Added adaptive workflow bias adjustments derived from drift
88
feedback.
99
v0.4 - 2025-11-07 - Logged workflow bias snapshots when drift triggers.
10+
v0.5 - 2025-11-07 - Added SLO tracking and mitigation planning for drift responses.
1011
"""
1112

1213
from __future__ import annotations
@@ -15,7 +16,7 @@
1516
from collections import deque
1617
from dataclasses import dataclass
1718
from statistics import mean
18-
from typing import Deque, Dict, Optional
19+
from typing import Deque, Dict, List, Optional
1920

2021
from config.settings import AppConfig
2122
from models.memory import ReviewRecord
@@ -36,14 +37,23 @@ class PerformanceSnapshot:
3637
class SelfAdjustingController:
3738
"""Monitors execution metrics and recommends adjustments."""
3839

39-
def __init__(self, config: AppConfig, window_size: int = 10) -> None:
40+
def __init__(
41+
self,
42+
config: AppConfig,
43+
window_size: int = 10,
44+
latency_slo_seconds: float = 5.0,
45+
quality_slo: float = 0.85,
46+
) -> None:
4047
self._config = config
4148
self._logger = LOGGER
4249
self._window_size = window_size
4350
self._history: Deque[PerformanceSnapshot] = deque(maxlen=window_size)
4451
self._last_advisory: Optional[str] = None
4552
self._workflow_biases: Dict[str, float] = {}
4653
self._decay = 0.9
54+
self._latency_slo = max(0.1, latency_slo_seconds)
55+
self._quality_slo = quality_slo
56+
self._last_plan: Dict[str, object] = {}
4757

4858
def register_result(
4959
self,
@@ -65,8 +75,16 @@ def register_result(
6575
snapshot.latency,
6676
snapshot.verdict,
6777
)
78+
slo_breaches = self._evaluate_slos(result, review)
6879
advisory = self._assess_drift()
6980
self._update_biases(selection, result, review, advisory is not None)
81+
self._last_plan = self._build_mitigation_plan(
82+
selection,
83+
result,
84+
review,
85+
advisory,
86+
slo_breaches,
87+
)
7088
self._last_advisory = advisory
7189
return advisory
7290

@@ -91,6 +109,22 @@ def _assess_drift(self) -> Optional[str]:
91109
self._logger.warning(advisory)
92110
return advisory
93111
return None
112+
def _evaluate_slos(
113+
self,
114+
result: TaskResult,
115+
review: Optional[ReviewRecord],
116+
) -> List[str]:
117+
"""Return a list of breached service objectives for the current run."""
118+
breaches: List[str] = []
119+
if result.latency_seconds > self._latency_slo:
120+
breaches.append("latency")
121+
if (
122+
review
123+
and review.quality_score is not None
124+
and review.quality_score < self._quality_slo
125+
):
126+
breaches.append("quality")
127+
return breaches
94128

95129
def _update_biases(
96130
self,
@@ -142,6 +176,58 @@ def _update_biases(
142176
target = reasoning_workflow if workflow == fast_workflow else fast_workflow
143177
if target:
144178
self._bump_bias(target, 0.1)
179+
def _build_mitigation_plan(
180+
self,
181+
selection: WorkflowSelection,
182+
result: TaskResult,
183+
review: Optional[ReviewRecord],
184+
advisory: Optional[str],
185+
slo_breaches: List[str],
186+
) -> Dict[str, object]:
187+
plan: Dict[str, object] = {}
188+
if advisory:
189+
plan["drift_advisory"] = advisory
190+
if slo_breaches:
191+
plan["slo_breaches"] = slo_breaches
192+
193+
recommended: List[str] = []
194+
if "latency" in slo_breaches:
195+
recommended.extend(
196+
[
197+
"reroute_to_reasoning_workflow",
198+
"increase_llm_timeout",
199+
]
200+
)
201+
if "quality" in slo_breaches:
202+
recommended.append("expand_context_retrieval")
203+
if advisory:
204+
recommended.append("trigger_memory_mitigation")
205+
if result.latency_seconds > self._latency_slo * 1.5:
206+
recommended.append("reduce_fast_workflow_bias")
207+
if (
208+
review
209+
and review.verdict.lower().startswith(("fail", "reject"))
210+
and selection.workflow != self._find_workflow("reasoning")
211+
):
212+
recommended.append("escalate_to_reasoning")
213+
214+
if recommended:
215+
deduped = list(dict.fromkeys(recommended))
216+
plan["recommended_actions"] = deduped
217+
218+
if self._history and (plan or slo_breaches or advisory or recommended):
219+
latencies = [sample.latency for sample in self._history]
220+
negatives = sum(
221+
sample.verdict.lower().startswith(("fail", "reject"))
222+
for sample in self._history
223+
)
224+
plan["window_metrics"] = {
225+
"latency_avg": round(mean(latencies), 3),
226+
"window_size": len(self._history),
227+
"negative_reviews": negatives,
228+
}
229+
230+
return plan
145231

146232
def _bump_bias(self, workflow: str, delta: float) -> None:
147233
current = self._workflow_biases.get(workflow, 0.0)
@@ -167,3 +253,8 @@ def last_advisory(self) -> Optional[str]:
167253
def workflow_biases(self) -> Dict[str, float]:
168254
"""Current controller preference weights per workflow."""
169255
return dict(self._workflow_biases)
256+
257+
@property
258+
def last_plan(self) -> Dict[str, object]:
259+
"""Return the last mitigation plan issued by the controller."""
260+
return dict(self._last_plan)

core/live_loop.py

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
v0.3 - 2025-11-06 - Persisted semantic summaries and integrated controller-aware
77
task selection.
88
v0.4 - 2025-11-07 - Linked semantic concepts and surfaced relation context for prompts.
9+
v0.5 - 2025-11-07 - Added retrieval-focused context loading and controller mitigation plans.
910
"""
1011

1112
from __future__ import annotations
@@ -92,7 +93,7 @@ def run_task(
9293
payload=working_payload,
9394
)
9495

95-
layers = self._load_memory_snapshots()
96+
layers = self._load_memory_snapshots(task)
9697
prompt_context = PromptContext(
9798
task=task,
9899
workflow=selection.workflow,
@@ -143,13 +144,28 @@ def run_task(
143144
raise
144145
self._persist_review(review)
145146

146-
mitigation_summary: Optional[Dict[str, object]] = None
147147
drift_advisory = self._controller.register_result(selection, result, review)
148+
controller_plan = dict(self._controller.last_plan)
149+
150+
memory_actions: Optional[Dict[str, object]] = None
148151
if drift_advisory:
149-
mitigation_summary = self._memory_manager.apply_drift_mitigation(task_id=request.task_id)
150-
if mitigation_summary:
151-
self._persist_mitigation_summary(request.task_id, mitigation_summary)
152+
memory_actions = self._memory_manager.apply_drift_mitigation(task_id=request.task_id)
152153
self._persist_drift_advisory(request.task_id, selection.workflow, drift_advisory)
154+
elif controller_plan:
155+
self._logger.debug(
156+
"Controller produced plan without drift advisory: %s", controller_plan
157+
)
158+
159+
mitigation_summary: Optional[Dict[str, object]] = None
160+
if controller_plan:
161+
mitigation_summary = dict(controller_plan)
162+
if memory_actions:
163+
if mitigation_summary is None:
164+
mitigation_summary = {}
165+
mitigation_summary["memory_actions"] = memory_actions
166+
167+
if mitigation_summary:
168+
self._persist_mitigation_summary(request.task_id, mitigation_summary)
153169

154170
result_payload = cast(
155171
Dict[str, object],
@@ -322,9 +338,9 @@ def _store_working_item(self, key: str, payload: Dict[str, object]) -> None:
322338
except MemoryError as exc:
323339
self._logger.error("Failed to persist working memory item %s: %s", key, exc)
324340

325-
def _load_memory_snapshots(self, limit: int = 5) -> _LayerSnapshot:
326-
episodic = self._safe_layer_slice("episodic", limit)
327-
semantic = self._safe_layer_slice("semantic", limit)
341+
def _load_memory_snapshots(self, task_query: str, limit: int = 5) -> _LayerSnapshot:
342+
episodic = self._safe_layer_query("episodic", task_query, limit)
343+
semantic = self._safe_layer_query("semantic", task_query, limit)
328344
reviews = self._load_recent_reviews(limit)
329345
relations = self._build_semantic_relations(semantic, limit)
330346
return _LayerSnapshot(
@@ -361,6 +377,19 @@ def _build_semantic_relations(
361377
relations[node_id] = summary
362378
return relations
363379

380+
def _safe_layer_query(
381+
self, layer: str, query: str, limit: int
382+
) -> List[Dict[str, object]]:
383+
if query.strip():
384+
try:
385+
results = self._memory_manager.query_layer(layer, query, limit)
386+
except MemoryError as exc:
387+
self._logger.warning("Unable to query %s memory: %s", layer, exc)
388+
results = []
389+
if results:
390+
return results
391+
return self._safe_layer_slice(layer, limit)
392+
364393
def _safe_layer_slice(self, layer: str, limit: int) -> List[Dict[str, object]]:
365394
try:
366395
items = self._memory_manager.list_layer(layer)

0 commit comments

Comments
 (0)