feat(agents): wire DetectAgent.process to FusionEngine via cross-service HTTP (#190)#198
Merged
Merged
Conversation
Closes #190. Adds the missing synchronous entrypoint to the fusion pipeline so the four-agent facade (Detect/Investigate/Decide/Respond) actually delivers on Detect's promise. Three additive pieces: * fusion service exposes POST /process: validates a RawAlert, runs it through the live FusionEngine owned by FusionWorker, returns the FusedAlert envelope. Returns 503 if the worker is not yet bootstrapped (the engine attribute is None) — fail loud rather than invent a verdict. Schema errors surface as the usual FastAPI 422. * services/agents/app/tools/fusion.py: thin async httpx client that POSTs to {FUSION_SERVICE_URL}/process (default http://fusion:8003/process in the docker-compose network — fusion mounts its router at the root, not /api/fusion). Forwards optional bearer token. RAISES on non-2xx and on transport failure: per the module docstring, fusion is the primary detection path, so silent failure would lose alerts. Contrast with app.tools.graph which intentionally degrades gracefully (graph data is best-effort context). * DetectAgent.process(raw_alert, api_token=None) now delegates to the client with no transformation. Class docstring updated. Same FusionEngine instance services both Kafka and HTTP paths so the fused-state machine (dedup window, correlator buffer, entity risk ledger) stays consistent regardless of how alerts arrive. Tests (16 total, all passing offline via respx for client, ASGITransport + in-memory fakes for endpoint): * services/fusion/tests/test_process_endpoint.py covers new-incident and duplicate paths, both 503 modes (no worker, no engine), 422 on malformed and on invalid severity, and pins that the endpoint reuses the worker's engine instance instead of constructing a fresh one per request. * services/agents/tests/test_fusion_client.py pins the URL to /process (regression guard against the /api/fusion/process mismatch caught during wiring), asserts bearer-token pass-through and absence of a bare Bearer header when no token is provided, confirms HTTPStatusError propagates on 503/422 and HTTPError on transport failure, and locks DetectAgent.process as a faithful delegate (args pass through, errors propagate, no swallowed exceptions). No feature flag, no env gate. Purely additive: no existing caller of either service changes shape, and the new endpoint/method only fire when something explicitly invokes them. Co-authored-by: Cursor <cursoragent@cursor.com>
| # patched env. | ||
| import importlib | ||
|
|
||
| import app.tools.fusion as fusion_module |
| module attribute directly is intentional — it's the same mutation | ||
| the lifespan handler would do in reverse on shutdown. | ||
| """ | ||
| import app.api.router as router_mod |
Squashes two over-eager line-breaks into the canonical one-line form ruff format wants. No behaviour change; this just unblocks the Python — Lint & Type-check CI gate. Co-authored-by: Cursor <cursoragent@cursor.com>
…ent into agents step (#190) The Issue #190 PR added two new test files that the existing CI jobs weren't shaped to run: * ``services/fusion/tests/test_process_endpoint.py`` drives the new ``POST /process`` endpoint with FastAPI's ``ASGITransport`` plus ``httpx.AsyncClient``. The ``python-services-test`` job runs the fusion suite but installs only ``pydantic / pydantic-settings / pytest / pytest-asyncio / structlog / redis / aioredis`` — there was no ``fastapi`` on the PATH, so pytest blew up at collection time with ``ModuleNotFoundError: No module named 'fastapi'``. Fixed by adding ``fastapi`` and ``httpx`` to that job's minimal dep list and updating the comment to explicitly call out thin router-contract tests as in-scope (alongside the pure helpers it always covered). Postgres / Kafka / Redis stay out of scope — the new tests use ``set_worker(...)`` to inject a fake worker so no service boot is required. * ``services/agents/tests/test_fusion_client.py`` exercises the thin httpx client that wires ``DetectAgent.process`` to the fusion service's ``POST /process`` endpoint, with respx mocking the network. The ``python-test`` job enumerates which ``services/agents/tests/*.py`` files run, and this new file was missing from the list — so it was silently un-gated. Added it, and added ``respx`` to the pip install for that step. ``httpx`` is already pulled in via ``API_DEPS``. Both jobs stay fast (no service boot, no infra), and both new tests are now real CI signal instead of dead code. Co-authored-by: Cursor <cursoragent@cursor.com>
Two CI jobs were failing on PR #198 with ModuleNotFoundError after the fastapi+httpx fix landed: 1. ``python-services-test`` — ``test_process_endpoint.py`` imports ``app.api.router``, which type-references ``FusionWorker`` from ``app.workers.consumer``. That module imports ``aiokafka`` at module scope, so the router can't be imported without it. Added ``aiokafka`` to the minimal pip install for this job. 2. ``python-test`` (agents) — ``test_fusion_client.py`` imports ``DetectAgent`` from ``app.agents``. The package's __init__.py eagerly loads sibling sub-agents (auto_triage, phishing, identity, insider_threat, cloud), all of which import ``langchain_core`` and ``langchain_openai`` at module scope. Added both packages to the agents-test pip install. Both jobs keep their "minimal deps only" posture — no Postgres, Kafka, Redis, or full service boot. We're just adding the transitive import-time deps that the new Issue #190 tests pull in. Verified locally: services/fusion: 7 passed services/agents: 9 passed (test_fusion_client.py) Co-authored-by: Cursor <cursoragent@cursor.com>
…ue-190-detect-agent-fusion-wiring Co-authored-by: Cursor <cursoragent@cursor.com> # Conflicts: # CHANGELOG.md
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Closes #190.
DetectAgentalready self-described as the public detection surface but had no synchronous entry point into the fusion pipeline. Anyone wanting to fuse an ad-hoc alert (e.g. mid-investigation) had to push to Kafka and wait for the consumer path to run dedup → correlation → ML scoring → confidence labelling → RBA. That's fine for streaming but unusable for interactive flows.This PR wires the missing edge. Three additive pieces, no feature flag:
services/fusion/app/api/router.pyexposesPOST /process. Validates aRawAlert, runs it through the liveFusionEngineinstance owned byFusionWorker, returns theFusedAlert. Returns503if the worker hasn't bootstrapped its engine yet — fail loud rather than invent a verdict. Schema errors surface as the usual FastAPI422.services/agents/app/tools/fusion.pyis a thin asynchttpxclient posting to{FUSION_SERVICE_URL}/process(defaulthttp://fusion:8003/processin the docker-compose network — fusion mounts its router at root, not/api/fusion). Forwards optional bearer token. Raises on non-2xx and on transport failure: fusion is the primary detection path, so silent failure would lose alerts. The module docstring contrasts this withapp.tools.graph, which intentionally degrades gracefully (graph data is best-effort context).DetectAgent.process(raw_alert, api_token=None)now delegates to the client with no transformation. Class docstring updated. SameFusionEngineinstance services both Kafka and HTTP paths, so dedup window / correlator buffer / entity-risk ledger stay consistent regardless of how alerts arrive.Why no
/api/fusion/process?The fusion service mounts its
APIRouterat the root path. Initial wiring used/api/fusion/process, which 404'd. The client tests pin the URL to/processas a regression guard.Why raise on failure?
DetectAgent.processis the detection plane. Returning{\"error\": ...}would let downstream consumers treat a synthetic envelope as a real verdict and quietly drop the alert. The graph tool intentionally degrades gracefully because partial graph context is still useful in an in-flight investigation; fusion does not have that property.Test plan
All 16 new tests pass offline (no network, no docker):
Risk / blast radius
Purely additive:
No env gate / feature flag intentionally: the wiring closes a documented TODO and is required for the four-agent façade contract.
Made with Cursor