This document describes the CugaSupervisor feature implementation - a subgraph system for orchestrating multiple CugaAgent instances. The supervisor pattern allows coordinating specialized agents, each with their own tools, apps, and configurations, similar to how cuga_lite_graph.py implements a subgraph.
-
CugaSupervisor Subgraph (
cuga_supervisor_graph.py)- Similar structure to
cuga_lite_graph.py - Orchestrates multiple CugaAgent instances
- Manages task delegation and result aggregation
- Supports sequential, parallel, and adaptive execution strategies
- Supports two modes: delegation and response
- Similar structure to
-
Supervisor State (
cuga_supervisor_state.py)- Extends
AgentStateto maintain compatibility - supervisor_chat_messages: Main chat messages for supervisor (separate from sub-agents)
- agent_chat_messages: Tracks individual sub-agent conversations
- supervisor_variables: Aggregated variables collected from sub-agents
- supervisor_mode: "delegation" or "response"
- Agent registry, results tracking, and delegation metadata
- Extends
-
SDK Support (
sdk.py)CugaSupervisorclass for programmatic creationfrom_yaml()method for YAML configuration loadinginvoke()method for task executionvariables_managerproperty for accessing collected variablesadd_agent()andremove_agent()for dynamic agent management
-
YAML Configuration Loader (
sdk/supervisor_config.py)- Parses YAML configuration files
- Creates internal CugaAgent instances from config
- Configures external A2A agents
- Supports tools, apps, MCP servers, and A2A protocol
-
A2A Protocol (
a2a_protocol.py)- Agent-to-Agent communication protocol
- Supports HTTP, SSE, WebSocket, and STDIO transports
- Task delegation, result sharing, capability discovery
- Status checking
-
Supervisor Node (
cuga_supervisor_node.py)- Node wrapper for integration with main graph
- Handles state conversion between AgentState and CugaSupervisorState
- Callback node for processing results
- Supervisor decides which agents to use upfront using LLM
- Executes agents with sequential, parallel, or adaptive strategy
- Controls execution flow with LLM decisions between agents (for sequential/adaptive)
- Collects variables from sub-agents
- Synthesizes natural language responses from agent results
- Uses
supervisor_chat_messagesfor conversation context - Best for complex multi-agent coordination tasks
- Supervisor acts as a single agent with delegation tools
- Conversational approach similar to cuga_lite
- Can call agents via Python code dynamically
- More flexible, agent-driven execution
- Best for simpler tasks or when dynamic agent selection is needed
supervisor_chat_messages: The supervisor's own conversation history with the user- Separate from sub-agents' messages
- Used in plan_upfront strategy for context, task delegation, and response synthesis
- Used in conversational strategy for conversational context
- Maintains continuity across multiple agent invocations
supervisor_variables: Aggregated variables collected from sub-agents- Each sub-agent maintains its own variables (stored in
agent_variables) - Supervisor aggregates variables with agent name prefixes to avoid conflicts
- Accessible via
supervisor_variables_managerproperty
START -> prepare_agents -> delegate_task -> execute_agents ->
collect_variables -> aggregate_results -> synthesize_response -> finalize -> END
START -> prepare_agents_and_prompt -> call_model ->
[if code: execute_agent_tool -> call_model] -> END
- prepare_agents: Initialize and register available agents
- delegate_task: LLM decides which agent(s) to use based on supervisor_chat_messages
- execute_agents: Execute selected agents (sequential/parallel/adaptive) with LLM-controlled flow
- collect_variables: Collect variables from sub-agents into supervisor_variables
- aggregate_results: Combine results from multiple agents
- synthesize_response: Generate final response from sub-agent results
- finalize: Prepare final answer and update supervisor_chat_messages
- prepare_agents_and_prompt: Prepare agents, create delegation tools, and generate prompt
- call_model: Call LLM to generate code or text response
- execute_agent_tool: Execute code with agent delegation tools available
- Created directly as
CugaAgentinstances - No network overhead
- Direct function calls
- Shared memory space
- Defined in YAML without
a2a_protocolsection
- Connected via HTTP/SSE/WebSocket/STDIO
- Network communication required
- Defined in YAML with
a2a_protocolsection - Supports authentication and retry policies
-
HTTP Transport (Recommended for production)
- RESTful API endpoints
- JSON message format
- Example:
http://localhost:8000/a2a
-
SSE Transport (Server-Sent Events)
- Real-time event streaming
- Similar to MCP SSE transport
- Example:
http://localhost:8000/a2a/sse
-
WebSocket Transport (Bidirectional)
- Full-duplex communication
- Real-time bidirectional messaging
- Example:
ws://localhost:8000/a2a/ws
-
STDIO Transport (Local agents)
- For agents running in same process
- Direct function calls
- Example:
local_agent_id
from cuga.backend.cuga_graph.nodes.cuga_supervisor.a2a_protocol import A2AProtocol
# HTTP Transport
a2a_agent = A2AProtocol(
endpoint="http://localhost:8000/a2a",
transport="http",
auth={"type": "bearer", "token": "secret-token"},
timeout=30
)
await a2a_agent.connect()
# Delegate task
result = await a2a_agent.delegate_task(
target_agent="sales_agent",
task="Get top accounts",
context={}
)
await a2a_agent.disconnect()supervisor:
strategy: adaptive # sequential, parallel, adaptive
mode: plan_upfront # plan_upfront or conversational
model:
provider: openai
model_name: gpt-4o
description: "Supervisor for coordinating specialized agents"
agents:
# Internal CugaAgent (no a2a_protocol section)
- name: sales_agent
type: internal
description: "Handles sales and CRM operations"
tools:
- name: get_accounts
type: langchain
apps:
- name: digital_sales
type: api
url: https://digitalsales.example.com/openapi.json
mcp_servers:
- name: filesystem
command: npx
args: ["-y", "@modelcontextprotocol/server-filesystem", "./workspace"]
transport: stdio
special_instructions: "Focus on sales operations"
# External Agent via A2A Protocol
- name: remote_agent
type: external
description: "Remote agent via A2A protocol"
a2a_protocol:
enabled: true
endpoint: http://localhost:8000/a2a
transport: http
auth:
type: bearer
token: ${A2A_TOKEN}
capabilities: ["task_delegation", "result_sharing"]
# A2A Protocol Global Configuration
a2a:
protocol_version: "1.0"
communication:
type: http
timeout: 30
retry_policy:
max_retries: 3
backoff: exponentialfrom cuga import CugaAgent, CugaSupervisor
from langchain_core.tools import tool
@tool
def get_accounts() -> str:
"""Get sales accounts"""
return "Account data"
sales_agent = CugaAgent(tools=[get_accounts])
data_agent = CugaAgent(tools=[analyze_data])
supervisor = CugaSupervisor(
agents={"sales_agent": sales_agent, "data_agent": data_agent},
strategy="adaptive",
mode="plan_upfront"
)
result = await supervisor.invoke("Get sales data and analyze it")
print(result.answer)from cuga import CugaSupervisor
supervisor = await CugaSupervisor.from_yaml("supervisor_config.yaml")
result = await supervisor.invoke("Complex task requiring multiple agents")
print(result.answer)
# Access collected variables
vars_manager = supervisor.variables_manager
variables = vars_manager.get_variable_names()- Execute agents one after another
- Each agent receives results from previous agents
- Best for dependent tasks
- Execute all agents simultaneously
- Faster for independent tasks
- Results aggregated after all complete
- Currently uses sequential as default
- Future: Dynamic strategy selection based on task complexity and agent capabilities
src/cuga/
├── backend/cuga_graph/nodes/cuga_supervisor/
│ ├── __init__.py
│ ├── cuga_supervisor_graph.py # Main supervisor subgraph
│ ├── cuga_supervisor_state.py # Supervisor state schema
│ ├── cuga_supervisor_node.py # Supervisor node wrapper
│ ├── a2a_protocol.py # A2A protocol implementation
│ └── prompts/
│ ├── supervisor_system.jinja2 # (To be created)
│ └── supervisor_user.jinja2 # (To be created)
├── sdk/
│ └── supervisor_config.py # YAML loader
└── sdk_core/tests/
├── test_supervisor_sdk.py
├── test_supervisor_multi_agent.py
├── test_supervisor_yaml_config.py
└── fixtures/
└── supervisor_config.yaml
-
test_supervisor_sdk.py - SDK integration tests
- Supervisor creation from YAML
- Agent registration
- Task delegation
- Result aggregation
- Variable collection
-
test_supervisor_multi_agent.py - Multi-agent coordination
- Sequential execution
- Parallel execution
- Adaptive strategy
- Agent failure handling
- Delegation vs response modes
-
test_supervisor_yaml_config.py - YAML configuration tests
- YAML parsing
- Agent configuration loading
- MCP server integration
- A2A protocol setup and connection
✅ Completed:
- CugaSupervisorState with supervisor_chat_messages and variable management
- CugaSupervisor subgraph with all nodes
- A2A protocol implementation (HTTP, SSE, WebSocket, STDIO)
- CugaSupervisorNode wrapper
- CugaSupervisor SDK class with from_yaml() and invoke()
- YAML configuration loader
- E2E tests for SDK, multi-agent coordination, and YAML config
- Dynamic agent discovery and registration
- Agent capability learning and optimization
- Cross-agent variable sharing with permissions
- Supervisor learning from past delegations
- Support for nested supervisors (supervisor of supervisors)
- A2A protocol extensions for streaming responses
- Agent health monitoring and auto-recovery
- Prompt templates for supervisor nodes
- Full MCP server integration in YAML loader
- Tool loading from YAML definitions
- Main Graph (
graph.py): Supervisor subgraph can be integrated similar to CugaLiteSubgraph (optional) - SDK (
sdk.py): CugaSupervisor class available alongside CugaAgent - Tool Provider: Reuses existing tool provider interfaces
- State Management: Compatible with AgentState for seamless integration
- Variables Manager: Reuses StateVariablesManager for supervisor variable management
- State Compatibility: Supervisor state extends AgentState for seamless integration
- Supervisor Chat Messages: Critical for maintaining conversation context and enabling response mode
- Error Handling: Robust error handling for agent failures and timeouts
- Resource Management: Efficient resource usage when running multiple agents
- Security: Proper isolation between agents and secure A2A communication
- Observability: Logging and monitoring for supervisor operations
- Variable Isolation: Each agent's variables are isolated, with supervisor aggregating as needed
- Message Isolation: Sub-agents maintain their own chat_messages, while supervisor_chat_messages tracks the high-level conversation