Skip to content

feat(agents): wire DetectAgent.process to FusionEngine via cross-service HTTP client #190

@jay-cyble

Description

@jay-cyble

Context

PR #125 landed the four-agent branded façade (T2.5) with TriageAgent, HuntAgent, RespondAgent, and DetectAgent. The first three have executable methods (auto_triage, engine, plan), but DetectAgent shipped as a branding-only stub with a TODO:

# services/agents/app/agents/__init__.py:118-121
TODO(v8.0/T2.5): wire ``DetectAgent.process`` to the
``services.fusion.app.services.fusion_engine.FusionEngine`` once a thin
cross-service client is added (tracked separately so this façade can
ship independently).

AISOC_V8_PROGRESS.md marks T2.5 as [x] done (branding consolidation) with a note that DetectAgent.process wiring is "in flight" as a follow-on.

Proposed Implementation

Following the cross-service pattern established by tools/graph.py and the T2.2 PR series (#139, #141, #142):

1. New fusion endpoint — POST /api/fusion/process

Add a synchronous HTTP entry point to the fusion service that accepts a RawAlert and runs the full FusionEngine.process() pipeline (dedup → correlate → ML score → confidence → RBA entity rollup → narrative), returning the resulting FusedAlert.

File: services/fusion/app/api/router.py

This complements the existing endpoints (POST /confidence/score, GET /entity-risk/*) which expose individual pipeline stages. The new endpoint runs the complete pipeline as a single request-response call.

2. Thin HTTP client — services/agents/app/tools/fusion.py

A new httpx-based client following the tools/graph.py pattern:

  • Base URL via os.getenv("FUSION_SERVICE_URL", "http://fusion:8003")
  • async def process_alert(alert: dict) -> dict — calls POST /api/fusion/process
  • async def get_entity_risk(tenant_id, entity_type, entity_value) -> dict — calls existing GET /entity-risk/{type}/{value}
  • async def get_confidence(alert: dict) -> dict — calls existing POST /confidence/score
  • Graceful degradation (returns error dict, never raises)
  • structlog logging, configurable timeout

3. Wire DetectAgent.process()services/agents/app/agents/__init__.py

@staticmethod
async def process(alert: dict[str, Any], *, api_token: str | None = None) -> dict[str, Any]:
    from ..tools.fusion import process_alert
    return await process_alert(alert, api_token=api_token)

4. Feature flag

Gate via AISOC_DETECT_AGENT_ENABLED (default OFF), matching the T2.2 convention for new agent wiring.

Alternatives Considered

Option A — Call existing endpoints only (no fusion service changes): DetectAgent.process() would call POST /confidence/score + GET /entity-risk/* directly. Simpler, but only exposes a subset of the pipeline (no dedup, correlation, ML scoring, or narrative generation). Doesn't match the TODO's intent of wiring to FusionEngine.

Scope

  • services/agents/app/agents/__init__.py — add DetectAgent.process()
  • services/agents/app/tools/fusion.py — new file (thin HTTP client)
  • services/fusion/app/api/router.py — add POST /api/fusion/process endpoint
  • Tests for the new endpoint and client

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions