Skip to content

anjijava16/orchestrator_flows

Folders and files

NameName
Last commit message
Last commit date

Latest commit

ย 

History

3 Commits
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 

Repository files navigation

Orchestrator Flows: Multi-Agent Call Orchestration Framework

A sophisticated FastAPI-based orchestration engine that manages multi-step conversational flows through distributed Agent-to-Agent (A2A) microservices. This system coordinates complex customer service call flows by routing intents between specialized agents while maintaining session state and audit trails.


๐ŸŽฏ Project Overview

The Orchestrator Flows system is designed to handle 5-stage customer service call workflows where each stage is handled by a specialized agent. The orchestrator acts as a central coordinator that:

  • Manages session state across multiple turns of conversation
  • Routes intents to appropriate specialized agents
  • Tracks conversation history with audit trails
  • Maintains flow progression through defined stages
  • Provides resumable sessions with Redis-backed persistence

Use Case

When a customer calls a service center with a complex request (e.g., "I'd like to lodge a complaint and request a refund"), the orchestrator:

  1. Routes through greeting โ†’ authentication โ†’ intent capture โ†’ fulfillment โ†’ closing
  2. At each step, different specialized agents handle their domain
  3. Session state is preserved so clients can reconnect mid-call
  4. Full audit trail tracks every intent and response

๐Ÿ—๏ธ Architecture

System Design

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚   Client/User   โ”‚
โ”‚  (FastAPI Call) โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
         โ”‚ POST /orchestrate
         โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚   Orchestrator (main.py)             โ”‚
โ”‚  โ€ข Session mgmt                      โ”‚
โ”‚  โ€ข Intent validation                 โ”‚
โ”‚  โ€ข Agent routing                     โ”‚
โ”‚  โ€ข Response building                 โ”‚
โ””โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
   โ”‚              โ”‚           โ”‚
   โ–ผ              โ–ผ           โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ Redis    โ”‚  โ”‚ Router โ”‚  โ”‚ A2A      โ”‚
โ”‚ Store    โ”‚  โ”‚(Intent โ”‚  โ”‚ Client   โ”‚
โ”‚(Session) โ”‚  โ”‚ Maps)  โ”‚  โ”‚(HTTP)    โ”‚
โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”˜
     โ”‚                         โ”‚
     โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                โ”‚
                โ–ผ
        โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
        โ”‚ Specialized Agents   โ”‚
        โ”‚  (greeting, auth,    โ”‚
        โ”‚   billing, refund,   โ”‚
        โ”‚   booking, closing)  โ”‚
        โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

๐Ÿ“ File Structure

File Purpose
main.py Core orchestrator engine with FastAPI endpoints and flow control logic
models.py Pydantic data models (requests, responses, session state)
router.py Intent-to-agent mapping and flow validation rules
redis_store.py Redis persistence layer for session management
a2a_client.py HTTP client for communicating with downstream agents
pyproject.toml Project dependencies and metadata

๐Ÿ”„ Flow Architecture: 5-Stage Call Pipeline

Each call progresses through exactly 5 intent flow stages:

Flow ID 1: GREETING
โ”œโ”€ Intent Examples: greeting, welcome
โ”œโ”€ Agent: greeting-agent:8001
โ””โ”€ Purpose: Initial contact, call opening

Flow ID 2: AUTHENTICATION
โ”œโ”€ Intent Examples: verify_identity, authenticate, otp_verify
โ”œโ”€ Agent: auth-agent:8002
โ””โ”€ Purpose: Verify caller identity

Flow ID 3: INTENT_CAPTURE
โ”œโ”€ Intent Examples: refund_request, billing_inquiry, booking_request, etc.
โ”œโ”€ Agents: Specialized domain agents (refund, billing, booking, complaint, account, balance)
โ””โ”€ Purpose: Understand what customer wants

Flow ID 4: FULFILLMENT
โ”œโ”€ Intent Examples: process_refund, confirm_booking, update_account
โ”œโ”€ Agents: Same specialized agents
โ””โ”€ Purpose: Execute the action

Flow ID 5: CLOSING
โ”œโ”€ Intent Examples: closing, end_call, feedback
โ”œโ”€ Agent: closing-agent:8009
โ””โ”€ Purpose: Wrap up, confirmation, survey

Key Rule: Session automatically moves to the next flow ID after each step completes. Once Flow ID 5 completes, is_complete: true and no further steps are recorded.


๐Ÿš€ Core Features

1. Session Management

  • Automatic session creation on first request with call_identify_id
  • Resumable sessions - clients can reconnect with same call_identify_id
  • TTL-based expiry - sessions auto-expire after 1 hour of inactivity
  • Complete state tracking - current flow, completed flows, all steps recorded

2. Intent Routing

  • Intent-to-URL mapping - routing rules defined in INTENT_AGENT_MAP
  • Flow-aware validation - certain intents only allowed at specific flow stages
  • Flexible fallback - unknown intent handled gracefully at any stage

3. Persistent Audit Trail

  • Every interaction step is recorded with:
    • Intent and input message
    • Agent URL called
    • Response status and data
    • Timestamp
    • Full session state snapshots

4. A2A Integration

  • JSON-RPC 2.0 protocol for agent communication
  • Context passing - agents receive account_id, eci, flow metadata
  • Error handling - timeouts, HTTP errors, and unknown exceptions captured gracefully
  • Response extraction - parses multi-part agent messages

๐Ÿ“Š Data Models

OrchestratorRequest

Input from client:

{
    "call_identify_id": "CALL-001",    # Unique session ID
    "account_id":       "ACC-999",     # Customer account
    "eci":              "ECI-7",       # E-commerce indicator
    "input_message":    "I need help", # User utterance
    "intent":           "greeting"     # Classified intent
}

OrchestratorResponse

Output to client:

{
    "call_identify_id": "CALL-001",
    "intent_flow_id":   1,              # Which flow stage just completed
    "intent":           "greeting",
    "status":           "success",      # "success" | "error"
    "response":         {...},          # Agent's response data
    "next_flow_id":     2,              # Where to send next request (null if complete)
    "is_complete":      false,
    "session_summary":  null            # Populated only when is_complete: true
}

CallSessionState (Redis)

Full session stored in Redis:

{
    "call_identify_id":  "CALL-001",
    "account_id":        "ACC-999",
    "eci":               "ECI-7",
    "current_flow_id":   3,             # Where we are now
    "completed_flows":   [1, 2],        # What's been done
    "is_complete":       false,
    "steps": [
        {
            "intent_flow_id":   1,
            "intent":           "greeting",
            "input_message":    "Hello",
            "a2a_agent_url":    "http://greeting-agent:8001",
            "response_status":  "success",
            "response_data":    {...},
            "timestamp":        "2026-02-28T10:00:00Z"
        },
        ...
    ],
    "created_at":  "2026-02-28T10:00:00Z",
    "updated_at":  "2026-02-28T10:01:30Z"
}

๐Ÿ”Œ API Endpoints

POST /orchestrate

Main entry point - Process a single interaction step

Request:

curl -X POST http://localhost:8000/orchestrate \
  -H "Content-Type: application/json" \
  -d '{
    "call_identify_id": "CALL-001",
    "account_id": "ACC-999",
    "eci": "ECI-7",
    "input_message": "I need a refund",
    "intent": "refund_request"
  }'

Response: Returns orchestrator response with next flow ID and agent response


GET /session/{call_identify_id}

Inspect current session state

curl http://localhost:8000/session/CALL-001

Returns CallSessionState with current flow ID, completed flows, and all steps


GET /session/{call_identify_id}/steps

Retrieve full audit trail

curl http://localhost:8000/session/CALL-001/steps

Returns list of all IntentStepRecord entries for the session


DELETE /session/{call_identify_id}

End call and clear cache

curl -X DELETE http://localhost:8000/session/CALL-001

Removes session from Redis and ends tracking


๐Ÿ”„ Request/Response Flow Example

Request 1: Initial Greeting

POST /orchestrate
{
  "call_identify_id": "CALL-001",
  "account_id": "ACC-999",
  "eci": "ECI-7",
  "input_message": "Hello, I need help",
  "intent": "greeting"
}

Orchestrator Logic:

  1. Check Redis: No existing session
  2. Create new session with current_flow_id = 1
  3. Validate: "greeting" is allowed at Flow 1 โœ“
  4. Route: greeting โ†’ http://greeting-agent:8001
  5. Call agent with context (account_id, eci, flow_label="GREETING")
  6. Record step in Redis
  7. Advance to Flow ID 2
  8. Return response with next_flow_id: 2

Response:

{
  "call_identify_id": "CALL-001",
  "intent_flow_id": 1,
  "intent": "greeting",
  "status": "success",
  "response": {"text": "Welcome to customer service..."},
  "next_flow_id": 2,
  "is_complete": false,
  "session_summary": null
}

Request 2: Authentication

POST /orchestrate
{
  "call_identify_id": "CALL-001",
  "account_id": "ACC-999",
  "eci": "ECI-7",
  "input_message": "My SSN is 123-45-6789",
  "intent": "verify_identity"
}

Orchestrator Logic:

  1. Check Redis: Session exists with current_flow_id = 2
  2. Validate: "verify_identity" is allowed at Flow 2 โœ“
  3. Route: verify_identity โ†’ http://auth-agent:8002
  4. Call agent (session knows this is flow 2)
  5. Record step
  6. Advance to Flow ID 3
  7. Return response with next_flow_id: 3

Requests 3-5: Continue Pattern

Flow 3 โ†’ INTENT_CAPTURE (refund_request โ†’ refund-agent:8003)
Flow 4 โ†’ FULFILLMENT (process_refund)
Flow 5 โ†’ CLOSING (closing โ†’ closing-agent:8009)
        โ””โ”€ is_complete: true
           session_summary included with full audit trail

๐Ÿ› ๏ธ Installation & Setup

Prerequisites

  • Python 3.13+
  • Redis server running on localhost:6379
  • FastAPI, Uvicorn, httpx, Pydantic, redis

Steps

  1. Clone and navigate to project:
cd orchestrator_flows
source .venv/bin/activate
  1. Install dependencies:
pip install -e .
  1. Ensure Redis is running:
redis-server
# or with Homebrew on macOS
brew services start redis
  1. Run orchestrator:
uvicorn main:app --reload --port 8000

Orchestrator will be available at http://localhost:8000


๐Ÿ“ Configuration

Redis Connection

Edit redis_store.py:

REDIS_URL       = "redis://localhost:6379"   # Connection string
SESSION_TTL_SEC = 3600   # 1 hour session timeout

Agent Endpoints

Edit router.py INTENT_AGENT_MAP to update agent URLs:

INTENT_AGENT_MAP: dict[str, str] = {
    "greeting":       "http://your-greeting-agent:8001",
    "verify_identity": "http://your-auth-agent:8002",
    # ... etc
}

A2A Client Timeout

Edit a2a_client.py:

self.timeout = 15  # seconds for agent requests

๐Ÿงช Testing the System

Test with curl

# Start a new call flow
curl -X POST http://localhost:8000/orchestrate \
  -H "Content-Type: application/json" \
  -d '{
    "call_identify_id": "TEST-001",
    "account_id": "ACC-TEST",
    "eci": "ECI-TEST",
    "input_message": "Hello",
    "intent": "greeting"
  }'

# Check session state
curl http://localhost:8000/session/TEST-001

# Get steps for audit trail
curl http://localhost:8000/session/TEST-001/steps

# End the call
curl -X DELETE http://localhost:8000/session/TEST-001

๐Ÿ” Key Design Decisions

Decision Rationale
Redis for sessions Fast, expires automatically, perfect for short-lived call sessions
JSON-RPC 2.0 for A2A Standard protocol for agent-to-agent communication
5 fixed flow stages Ensures structured, consistent call flow across all customers
TTL expiry (1 hour) Balances memory efficiency with reasonable call duration
Flexible intent validation Allows agents to handle unexpected intents gracefully
Full audit trail Enables debugging, compliance, and quality assurance

๐Ÿšจ Error Handling

The system handles errors gracefully:

  • Missing agent โ†’ unknown intent routes to fallback agent
  • Agent timeout (15s) โ†’ Returns error response, session continues
  • HTTP errors โ†’ Captured and returned to client
  • Unknown exception โ†’ Generic error response, session unaffected
  • Session not found โ†’ Returns 404 HTTP Exception

๐Ÿ“ˆ Future Enhancements

Potential improvements:

  • Implement agent retry logic with exponential backoff
  • Add WebSocket support for real-time streaming responses
  • Support for branching flows (conditional next_flow_id based on response)
  • Multi-language support with intent classification
  • Metrics/logging dashboard (call duration, agent response times, error rates)
  • Support for parallel agent calls in certain flow stages
  • Agent versioning and A/B testing capabilities

๐Ÿ“ž Support & Documentation

For detailed code walkthroughs:

  • See main.py for full orchestration logic
  • See router.py for intent routing configuration
  • See models.py for all data structures
  • See redis_store.py for persistence implementation
  • See a2a_client.py for agent communication

About

orchestrator

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages

โšก