Skip to content

Commit 7a49dd2

Browse files
committed
feat(workflows-mcp): add validate_knowledge tool for USER_VALIDATED authority promotion
Adds a dedicated in-place authority update path so existing propositions can be promoted to USER_VALIDATED (archive-immune) without losing their original UUID, created_by, created_at, or category associations. - KnowledgeExecutor: add "validate" op, _op_validate method, validated_count output field - tools_knowledge.py: register validate_knowledge MCP tool - tests: TestValidateOperation + TestUserValidatedImmunity (immunity SQL regression guards)
1 parent 1395fd0 commit 7a49dd2

3 files changed

Lines changed: 333 additions & 1 deletion

File tree

src/workflows_mcp/engine/executors_knowledge.py

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ class KnowledgeInput(BlockInput):
117117
```
118118
"""
119119

120-
op: Literal["search", "store", "recall", "forget", "context"] = Field(
120+
op: Literal["search", "store", "recall", "forget", "context", "validate"] = Field(
121121
description="Operation to perform",
122122
)
123123

@@ -276,6 +276,8 @@ def validate_op_fields(self) -> KnowledgeInput:
276276
"'proposition_ids', 'where', 'source', 'created_before', or 'created_after' "
277277
"is required for op='forget'"
278278
)
279+
if self.op == "validate" and not self.proposition_ids:
280+
raise ValueError("'proposition_ids' is required for op='validate'")
279281
if self.path is not None and not self.source:
280282
raise ValueError("'source' is required when 'path' is provided")
281283
return self
@@ -288,6 +290,7 @@ class KnowledgeOutput(BlockOutput):
288290
- search/recall: rows, columns, row_count
289291
- store: proposition_ids, stored_count
290292
- forget: archived_count, skipped_count
293+
- validate: validated_count
291294
- context: context_text, proposition_count, tokens_used
292295
"""
293296

@@ -316,6 +319,9 @@ class KnowledgeOutput(BlockOutput):
316319
archived_count: int = Field(default=0, description="Number archived")
317320
skipped_count: int = Field(default=0, description="Number skipped (immune)")
318321

322+
# Validate output
323+
validated_count: int = Field(default=0, description="Number promoted to USER_VALIDATED")
324+
319325
# Context output
320326
context_text: str = Field(default="", description="Clean content assembled text")
321327
proposition_count: int = Field(default=0, description="Propositions included in context")
@@ -394,6 +400,7 @@ async def execute( # type: ignore[override]
394400
"store": self._op_store,
395401
"recall": self._op_recall,
396402
"forget": self._op_forget,
403+
"validate": self._op_validate,
397404
"context": self._op_context,
398405
}
399406
handler = op_handlers[inputs.op]
@@ -1077,6 +1084,59 @@ async def _op_forget(
10771084
skipped_count=skipped,
10781085
)
10791086

1087+
async def _op_validate(
1088+
self,
1089+
inputs: KnowledgeInput,
1090+
context: Execution,
1091+
backend: Any,
1092+
) -> KnowledgeOutput:
1093+
"""Promote propositions to USER_VALIDATED authority (in-place update).
1094+
1095+
Unlike forget+store, this preserves the original UUID, created_by,
1096+
created_at, and category associations — making it the correct path for
1097+
authority promotion (metadata change) rather than belief replacement.
1098+
1099+
SECURITY: Records validated_by and logs to audit table.
1100+
"""
1101+
ids: list[str] = []
1102+
if isinstance(inputs.proposition_ids, str):
1103+
ids = [s.strip() for s in inputs.proposition_ids.split(",") if s.strip()]
1104+
elif isinstance(inputs.proposition_ids, list):
1105+
ids = inputs.proposition_ids
1106+
1107+
if not ids:
1108+
return KnowledgeOutput(success=True, validated_count=0)
1109+
1110+
validated_by = _get_audit_user_id(context)
1111+
auth_method = _get_auth_method(context)
1112+
user_string = _get_user_string_id(context)
1113+
1114+
placeholders = ", ".join(f"${i + 1}::uuid" for i in range(len(ids)))
1115+
update_sql = f"""
1116+
UPDATE knowledge_propositions
1117+
SET authority = '{Authority.USER_VALIDATED}'
1118+
WHERE id IN ({placeholders})
1119+
AND lifecycle_state != '{LifecycleState.ARCHIVED}'
1120+
RETURNING id
1121+
"""
1122+
result = await backend.query(update_sql, tuple(ids))
1123+
validated = len(result.rows) if result and result.rows else 0
1124+
1125+
if validated > 0:
1126+
validated_ids = [row["id"] for row in result.rows]
1127+
for prop_id in validated_ids:
1128+
await self._log_audit_entry(
1129+
backend=backend,
1130+
proposition_id=prop_id,
1131+
action="VALIDATED",
1132+
performed_by=validated_by,
1133+
auth_method=auth_method,
1134+
user_string=user_string,
1135+
metadata={"previous_authority": "unknown", "method": "explicit_ids"},
1136+
)
1137+
1138+
return KnowledgeOutput(success=True, validated_count=validated)
1139+
10801140
async def _op_context(
10811141
self,
10821142
inputs: KnowledgeInput,

src/workflows_mcp/tools_knowledge.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -570,6 +570,57 @@ async def forget_knowledge(
570570
}
571571
)
572572

573+
@mcp_server.tool(
574+
annotations=ToolAnnotations(
575+
title="Validate Knowledge",
576+
readOnlyHint=False,
577+
destructiveHint=False,
578+
idempotentHint=True,
579+
openWorldHint=True,
580+
)
581+
)
582+
async def validate_knowledge(
583+
proposition_ids: Annotated[
584+
list[str],
585+
Field(description="UUIDs of propositions to promote to USER_VALIDATED authority"),
586+
],
587+
*,
588+
ctx: AppContextType,
589+
) -> CallToolResult:
590+
"""
591+
Promote propositions to USER_VALIDATED authority, granting archive immunity.
592+
593+
WHEN TO USE: After a human has reviewed and confirmed a fact as permanently
594+
trustworthy. USER_VALIDATED propositions are immune to forget_knowledge — they
595+
cannot be archived by automated cleanup or bulk operations.
596+
597+
This is an in-place authority update, NOT a forget+store cycle. The original
598+
proposition UUID, created_by, created_at, and category associations are all
599+
preserved. Only the authority field is changed.
600+
601+
PARAMETERS:
602+
- proposition_ids: List of proposition UUIDs to validate
603+
604+
RETURNS: {validated_count: N}
605+
606+
SEE ALSO: recall_knowledge (find propositions to validate), forget_knowledge (skips
607+
USER_VALIDATED propositions automatically)
608+
"""
609+
from .engine.executors_knowledge import KnowledgeExecutor, KnowledgeInput
610+
611+
execution = _create_knowledge_execution(ctx)
612+
executor = KnowledgeExecutor()
613+
inputs = KnowledgeInput(
614+
op="validate",
615+
proposition_ids=proposition_ids,
616+
)
617+
result = await executor.execute(inputs, context=execution)
618+
619+
if not result.success:
620+
return _json_response({"status": "failure", "error": result.error})
621+
622+
return _json_response({"validated_count": result.validated_count})
623+
573624
@mcp_server.tool(
574625
annotations=ToolAnnotations(
575626
title="Knowledge Context",

tests/test_knowledge_executor.py

Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -906,6 +906,23 @@ def test_forget_with_where_has_date_filters(self) -> None:
906906
assert inp.where is not None
907907
assert inp.created_before == "2025-01-01"
908908

909+
def test_forget_sql_contains_user_validated_immunity(self) -> None:
910+
"""Both forget UPDATE paths must exclude USER_VALIDATED propositions.
911+
912+
Regression guard: if the immunity clause is removed, archived_count and
913+
skipped_count would be wrong and human-validated facts could be wiped.
914+
The SQL uses f-string interpolation of Authority.USER_VALIDATED, so we
915+
check for the enum reference rather than the evaluated string value.
916+
"""
917+
import inspect
918+
919+
from workflows_mcp.engine.executors_knowledge import KnowledgeExecutor
920+
921+
source = inspect.getsource(KnowledgeExecutor._op_forget)
922+
assert source.count("Authority.USER_VALIDATED") == 2, (
923+
"_op_forget must exclude USER_VALIDATED in BOTH update paths (by-ID and by-filter)"
924+
)
925+
909926
def test_forget_update_sql_does_not_contain_updated_at(self) -> None:
910927
"""The forget UPDATE statements must NOT reference updated_at.
911928
@@ -1575,3 +1592,207 @@ def patch_embedding() -> Any:
15751592
"workflows_mcp.engine.executors_knowledge.compute_embedding",
15761593
new=AsyncMock(return_value=(fake_embedding, "text-embedding-3-small", 3, None)),
15771594
)
1595+
1596+
1597+
# ============================================================================
1598+
# Validate Operation Tests
1599+
# ============================================================================
1600+
1601+
1602+
class TestValidateOperation:
1603+
"""Tests for validate op validation and output model."""
1604+
1605+
def test_validate_requires_proposition_ids(self) -> None:
1606+
"""validate without proposition_ids should raise ValidationError."""
1607+
with pytest.raises(ValidationError, match="proposition_ids"):
1608+
KnowledgeInput(op="validate")
1609+
1610+
def test_validate_with_ids_passes(self) -> None:
1611+
"""validate with proposition_ids passes validation."""
1612+
inp = KnowledgeInput(op="validate", proposition_ids=["uuid-1", "uuid-2"])
1613+
assert inp.op == "validate"
1614+
assert inp.proposition_ids == ["uuid-1", "uuid-2"]
1615+
1616+
def test_validate_output_model(self) -> None:
1617+
"""Validate output populates validated_count."""
1618+
out = KnowledgeOutput(success=True, validated_count=2)
1619+
assert out.validated_count == 2
1620+
assert out.success is True
1621+
1622+
def test_validate_output_default_zero(self) -> None:
1623+
"""validated_count defaults to zero."""
1624+
out = KnowledgeOutput(success=True)
1625+
assert out.validated_count == 0
1626+
1627+
def test_validate_sql_targets_user_validated_authority(self) -> None:
1628+
"""_op_validate SQL must SET authority to USER_VALIDATED, not archive.
1629+
1630+
SQL uses f-string interpolation of Authority.USER_VALIDATED; check for
1631+
the enum reference rather than the evaluated string value.
1632+
"""
1633+
import inspect
1634+
1635+
from workflows_mcp.engine.executors_knowledge import KnowledgeExecutor
1636+
1637+
source = inspect.getsource(KnowledgeExecutor._op_validate)
1638+
assert "Authority.USER_VALIDATED" in source
1639+
assert "lifecycle_state = " not in source, "_op_validate must not modify lifecycle_state"
1640+
1641+
def test_validate_sql_skips_archived_propositions(self) -> None:
1642+
"""_op_validate must not promote ARCHIVED propositions."""
1643+
import inspect
1644+
1645+
from workflows_mcp.engine.executors_knowledge import KnowledgeExecutor
1646+
1647+
source = inspect.getsource(KnowledgeExecutor._op_validate)
1648+
assert "lifecycle_state != " in source or "lifecycle_state !=" in source
1649+
1650+
@pytest.mark.asyncio
1651+
async def test_validate_updates_authority_and_logs_audit(self) -> None:
1652+
"""_op_validate updates authority and logs a VALIDATED audit entry."""
1653+
executor = KnowledgeExecutor()
1654+
prop_id = "aaaaaaaa-1111-2222-3333-444444444444"
1655+
1656+
update_result = MagicMock()
1657+
update_result.rows = [{"id": prop_id}]
1658+
1659+
backend = MagicMock()
1660+
backend.query = AsyncMock(return_value=update_result)
1661+
backend.execute = AsyncMock() # audit INSERT
1662+
1663+
inputs = KnowledgeInput(op="validate", proposition_ids=[prop_id])
1664+
result = await executor._op_validate(inputs, _make_execution_context(), backend)
1665+
1666+
assert result.success is True
1667+
assert result.validated_count == 1
1668+
1669+
# Verify the UPDATE SQL targeted USER_VALIDATED
1670+
update_call = backend.query.call_args
1671+
update_sql = update_call[0][0]
1672+
assert "USER_VALIDATED" in update_sql
1673+
assert prop_id in update_call[0][1]
1674+
1675+
# Verify audit entry was written with action=VALIDATED
1676+
audit_call = backend.execute.call_args
1677+
audit_sql = audit_call[0][0]
1678+
assert "knowledge_proposition_audits" in audit_sql
1679+
audit_params = audit_call[0][1]
1680+
assert "VALIDATED" in audit_params
1681+
1682+
def test_validate_empty_list_rejected_by_validation(self) -> None:
1683+
"""Empty proposition_ids list is rejected at validation time (not silently ignored)."""
1684+
with pytest.raises(ValidationError, match="proposition_ids"):
1685+
KnowledgeInput(op="validate", proposition_ids=[])
1686+
1687+
1688+
# ============================================================================
1689+
# USER_VALIDATED Immunity Behavioural Tests
1690+
# ============================================================================
1691+
1692+
1693+
class TestUserValidatedImmunity:
1694+
"""Behavioural tests for USER_VALIDATED archive immunity in _op_forget.
1695+
1696+
These tests verify that the immunity SQL clause is correctly applied so that
1697+
skipped_count reflects propositions not archived due to USER_VALIDATED authority.
1698+
"""
1699+
1700+
@pytest.mark.asyncio
1701+
async def test_forget_by_id_skips_user_validated(self) -> None:
1702+
"""When 2 IDs are targeted but 1 is USER_VALIDATED, skipped_count=1."""
1703+
executor = KnowledgeExecutor()
1704+
prop_id_normal = "aaaaaaaa-1111-2222-3333-444444444444"
1705+
prop_id_immune = "bbbbbbbb-5555-6666-7777-888888888888"
1706+
1707+
# Simulate DB: only normal proposition is returned (immune one is skipped by SQL)
1708+
update_result = MagicMock()
1709+
update_result.rows = [{"id": prop_id_normal}]
1710+
1711+
backend = MagicMock()
1712+
backend.query = AsyncMock(return_value=update_result)
1713+
backend.execute = AsyncMock() # audit INSERT
1714+
1715+
inputs = KnowledgeInput(
1716+
op="forget",
1717+
proposition_ids=[prop_id_normal, prop_id_immune],
1718+
)
1719+
result = await executor._op_forget(inputs, _make_execution_context(), backend)
1720+
1721+
assert result.success is True
1722+
assert result.archived_count == 1
1723+
assert result.skipped_count == 1 # immune proposition not returned by UPDATE ... RETURNING
1724+
1725+
@pytest.mark.asyncio
1726+
async def test_forget_by_id_all_immune_gives_zero_archived(self) -> None:
1727+
"""When all targeted propositions are USER_VALIDATED, archived_count=0."""
1728+
executor = KnowledgeExecutor()
1729+
1730+
update_result = MagicMock()
1731+
update_result.rows = [] # DB skips all (all USER_VALIDATED)
1732+
1733+
backend = MagicMock()
1734+
backend.query = AsyncMock(return_value=update_result)
1735+
backend.execute = AsyncMock()
1736+
1737+
inputs = KnowledgeInput(
1738+
op="forget",
1739+
proposition_ids=["immune-1", "immune-2"],
1740+
)
1741+
result = await executor._op_forget(inputs, _make_execution_context(), backend)
1742+
1743+
assert result.success is True
1744+
assert result.archived_count == 0
1745+
assert result.skipped_count == 2
1746+
1747+
@pytest.mark.asyncio
1748+
async def test_forget_by_filter_skipped_count_reflects_immunity(self) -> None:
1749+
"""Filter path: total=3, archived=2 → skipped_count=1 (one USER_VALIDATED)."""
1750+
executor = KnowledgeExecutor()
1751+
1752+
# COUNT(*) query returns 3 total
1753+
count_result = MagicMock()
1754+
count_result.rows = [{"total": 3}]
1755+
1756+
# UPDATE ... RETURNING only gives back 2 (one immune)
1757+
update_result = MagicMock()
1758+
update_result.rows = [{"id": "id-1"}, {"id": "id-2"}]
1759+
1760+
backend = MagicMock()
1761+
# First query: _build_where_clause category resolution (if any)
1762+
# For source-only filter: first call is COUNT, second is UPDATE
1763+
backend.query = AsyncMock(side_effect=[count_result, update_result])
1764+
backend.execute = AsyncMock()
1765+
1766+
inputs = KnowledgeInput(op="forget", source="old-session")
1767+
result = await executor._op_forget(inputs, _make_execution_context(), backend)
1768+
1769+
assert result.success is True
1770+
assert result.archived_count == 2
1771+
assert result.skipped_count == 1
1772+
1773+
@pytest.mark.asyncio
1774+
async def test_forget_by_filter_audit_logged_for_each_archived(self) -> None:
1775+
"""Audit entries are written for each archived proposition, not for skipped ones."""
1776+
executor = KnowledgeExecutor()
1777+
1778+
count_result = MagicMock()
1779+
count_result.rows = [{"total": 2}]
1780+
1781+
archived_ids = ["id-arch-1", "id-arch-2"]
1782+
update_result = MagicMock()
1783+
update_result.rows = [{"id": i} for i in archived_ids]
1784+
1785+
backend = MagicMock()
1786+
backend.query = AsyncMock(side_effect=[count_result, update_result])
1787+
backend.execute = AsyncMock()
1788+
1789+
inputs = KnowledgeInput(op="forget", source="cleanup-session")
1790+
await executor._op_forget(inputs, _make_execution_context(), backend)
1791+
1792+
# One audit INSERT per archived proposition
1793+
audit_calls = [
1794+
c for c in backend.execute.call_args_list if "knowledge_proposition_audits" in c[0][0]
1795+
]
1796+
assert len(audit_calls) == 2
1797+
audit_actions = [c[0][1][1] for c in audit_calls] # action is second param
1798+
assert all(a == "ARCHIVED" for a in audit_actions)

0 commit comments

Comments
 (0)