-
Notifications
You must be signed in to change notification settings - Fork 4.4k
03.3 Message Processing Flow
Relevant source files
The following files were used as context for generating this wiki page:
This document traces the complete lifecycle of a message in ZeroClaw, from initial ingestion through a channel to final response delivery. It covers message routing, context enrichment, agent processing, tool execution, and response streaming.
For configuration of individual channels, see Channel Implementations. For details on the agent's tool execution loop, see Agent Turn Cycle. For memory-based context retrieval, see Hybrid Search.
When a user sends a message through any channel (Telegram, Discord, CLI, HTTP webhook), ZeroClaw follows a consistent processing pipeline:
- Channel Listener receives the raw message
- Message Dispatcher queues the message for processing
- Context Enrichment recalls relevant memories
- Agent Turn invokes the LLM with tools
- Tool Execution runs any requested tools
- Response Delivery sends the final response back through the channel
Each stage involves specific components and data structures documented below.
All channels convert their platform-specific message format into a unified ChannelMessage struct:
pub struct ChannelMessage {
pub channel: String, // Channel name (e.g. "telegram", "discord")
pub sender: String, // Platform-specific sender ID
pub id: String, // Unique message ID
pub content: String, // Message text content
pub reply_target: String, // Target for sending replies
}Sources: src/channels/traits.rs:1-100
Each channel runs in a supervised loop with automatic restart on failure. The spawn_supervised_listener() function wraps channel listeners with exponential backoff:
Initial backoff: 2 seconds
Max backoff: 60 seconds
On clean exit: Reset backoff to initial
On error: Double backoff up to max
When a listener exits or errors, the supervisor:
- Marks the component as unhealthy
- Waits for backoff duration
- Restarts the listener
- Increments restart counter
Sources: src/channels/mod.rs:471-509
graph TB
User["User"]
TG["Telegram Bot API"]
DC["Discord Gateway"]
SL["Slack HTTP Poll"]
subgraph "Channel Implementations"
TelegramChannel["TelegramChannel::listen()"]
DiscordChannel["DiscordChannel::listen()"]
SlackChannel["SlackChannel::listen()"]
end
subgraph "Supervision Layer"
Supervisor["spawn_supervised_listener()"]
HealthCheck["health::mark_component_ok()"]
Backoff["Exponential Backoff"]
end
Dispatcher["mpsc::Sender<ChannelMessage>"]
User -->|message| TG
User -->|message| DC
User -->|message| SL
TG -->|webhook/poll| TelegramChannel
DC -->|websocket| DiscordChannel
SL -->|HTTP GET| SlackChannel
TelegramChannel --> Supervisor
DiscordChannel --> Supervisor
SlackChannel --> Supervisor
Supervisor -->|restart on failure| Backoff
Supervisor -->|health status| HealthCheck
Supervisor -->|ChannelMessage| Dispatcher
Sources: src/channels/mod.rs:471-509, src/channels/telegram.rs:1-500, src/channels/discord.rs:1-500, src/channels/slack.rs:1-500
The run_message_dispatch_loop() function consumes messages from a shared channel and spawns worker tasks with semaphore-based concurrency control:
| Parameter | Default | Description |
|---|---|---|
max_in_flight_messages |
channel_count * 4 |
Semaphore permits |
| Min permits | 8 | Floor for small deployments |
| Max permits | 64 | Ceiling for large deployments |
The dispatcher uses a JoinSet to track worker tasks and reaps completed workers opportunistically to prevent unbounded memory growth.
Sources: src/channels/mod.rs:816-844, src/channels/mod.rs:511-518
ZeroClaw maintains separate conversation histories for each sender within a channel:
History Key Format: "{channel}_{sender}"
Example: "telegram_123456789"
Per-key state:
- Conversation history (Vec<ChatMessage>)
- Provider/model selection (ChannelRouteSelection)
- Max history messages: 50
This ensures that multiple users can interact with the agent simultaneously without context bleeding.
Sources: src/channels/mod.rs:53-56, src/channels/mod.rs:125-131
sequenceDiagram
participant CL as "Channel Listener"
participant TX as "mpsc::Sender"
participant DL as "run_message_dispatch_loop()"
participant SM as "Semaphore (64)"
participant WK as "Worker Task"
participant PCM as "process_channel_message()"
CL->>TX: send(ChannelMessage)
TX->>DL: rx.recv().await
DL->>SM: acquire_owned().await
SM-->>DL: Permit
DL->>WK: spawn worker
WK->>PCM: process_channel_message(ctx, msg)
Note over PCM: Message processing<br/>(see next section)
PCM-->>WK: Result<()>
WK->>SM: drop Permit
WK-->>DL: task complete
Sources: src/channels/mod.rs:816-844
Before sending a message to the LLM, ZeroClaw enriches it with relevant context from memory.
The build_memory_context() function performs a hybrid search for relevant memories:
Query: User message content
Results: Top 5 entries
Filtering: score >= min_relevance_score (default 0.4)
Output format:
[Memory context]
- key1: content1
- key2: content2
Sources: src/channels/mod.rs:443-469
// Original user message
let user_msg = "How do I deploy this?"
// Retrieve context
let memory_context = build_memory_context(mem, &user_msg, 0.4).await;
// Enriched message
let enriched = if memory_context.is_empty() {
user_msg.clone()
} else {
format!("{memory_context}{user_msg}")
};The enriched message is what actually gets sent to the LLM, allowing it to reference previous conversations and stored facts.
Sources: src/channels/mod.rs:588-608
When auto_save_memory is enabled, incoming messages are automatically stored:
Key format: "{channel}_{sender}_{message_id}"
Category: MemoryCategory::Conversation
This builds a persistent record of all interactions for future context retrieval.
Sources: src/channels/mod.rs:591-602
The run_tool_call_loop() function implements the core agent reasoning cycle:
Max iterations: 10 (configurable via agent.max_tool_iterations)
Timeout: 300 seconds (CHANNEL_MESSAGE_TIMEOUT_SECS)
Loop:
1. Send messages to LLM (history + tools)
2. Parse response (text + tool_calls)
3. If tool_calls empty: Return text response (done)
4. For each tool_call:
a. Security check (SecurityPolicy)
b. Execute tool
c. Append result to history
5. Repeat from step 1
Sources: src/agent/loop_.rs:851-1044, src/channels/mod.rs:698-715
Conversation history is built from three sources:
- System Prompt (position 0): Tool descriptions, safety rules, workspace context
- Prior Turns: Retrieved from per-sender conversation cache
- Current Message: User's enriched message
History compaction triggers when non-system messages exceed max_history_messages (default 50), using LLM-based summarization to condense old turns.
Sources: src/channels/mod.rs:614-628, src/agent/loop_.rs:158-205
stateDiagram-v2
[*] --> BuildHistory: process_channel_message()
BuildHistory --> EnrichContext: build_memory_context()
EnrichContext --> SendLLM: run_tool_call_loop()
SendLLM --> ParseResponse: provider.chat()
ParseResponse --> CheckToolCalls: parse_tool_calls()
CheckToolCalls --> ExecuteTools: tool_calls not empty
CheckToolCalls --> ReturnText: tool_calls empty
ExecuteTools --> SecurityCheck: for each tool
SecurityCheck --> RunTool: approved
SecurityCheck --> DenialMessage: denied
RunTool --> AppendResult: ToolResult
DenialMessage --> AppendResult
AppendResult --> CheckIterations
CheckIterations --> SendLLM: iteration < max_iterations
CheckIterations --> TimeoutError: iteration >= max_iterations
ReturnText --> SaveHistory: ChatMessage::assistant()
SaveHistory --> SendReply: channel.send()
SendReply --> [*]
TimeoutError --> [*]
Sources: src/agent/loop_.rs:851-1044, src/channels/mod.rs:556-814
ZeroClaw supports multiple tool call formats:
-
Native API: Structured
tool_callsarray from providers like OpenAI/Anthropic -
XML Tags:
<tool_call>{json}</tool_call>for prompt-guided models -
Markdown Blocks:
```tool_call\n{json}\n```for models using code blocks -
GLM Style:
tool_name/param>valueproprietary format
The parse_tool_calls() function tries each format in sequence, returning the first successful parse.
Sources: src/agent/loop_.rs:595-748
1. Lookup tool by name in tools_registry
2. Security check: SecurityPolicy::can_act()
3. Execute: tool.execute(arguments)
4. Credential scrubbing: scrub_credentials(output)
5. Build result: ChatMessage with role="tool"
Security Note: All tool outputs pass through scrub_credentials() which redacts API keys, tokens, and passwords to prevent accidental exfiltration to the LLM.
Sources: src/agent/loop_.rs:975-1041, src/agent/loop_.rs:42-77
Tool results are appended to conversation history in a structured format:
For native tool calling:
{
"role": "tool",
"content": "<result>",
"tool_call_id": "<id>"
}
For prompt-guided:
<tool_result>
<tool_name>shell</tool_name>
<result>output here</result>
</tool_result>
Sources: src/agent/loop_.rs:995-1041
Channels that implement supports_draft_updates() (e.g., Telegram) receive progressive response streaming:
1. Send initial draft: channel.send_draft("...")
2. Stream deltas via mpsc channel
3. Update draft periodically: channel.update_draft()
4. Finalize: channel.finalize_draft()
Sources: src/channels/mod.rs:630-686
graph LR
subgraph "Agent Turn"
LLM["provider.chat()"]
DeltaTx["delta_tx: mpsc::Sender"]
end
subgraph "Streaming Task"
DeltaRx["delta_rx: mpsc::Receiver"]
Accumulator["accumulated: String"]
UpdateDraft["channel.update_draft()"]
end
subgraph "Channel API"
DraftMsg["Draft Message ID"]
TelegramAPI["Telegram editMessageText"]
end
LLM -->|text chunk| DeltaTx
DeltaTx -->|send| DeltaRx
DeltaRx -->|accumulate| Accumulator
Accumulator -->|every 80 chars| UpdateDraft
UpdateDraft -->|API call| TelegramAPI
TelegramAPI -->|update| DraftMsg
Sources: src/channels/mod.rs:663-686, src/agent/loop_.rs:19-20
While processing, a scoped typing task refreshes the typing indicator every 4 seconds:
spawn_scoped_typing_task():
- Interval: 4 seconds
- API: channel.start_typing(&recipient)
- Cancellation: CancellationToken
- Cleanup: channel.stop_typing() on drop
Sources: src/channels/mod.rs:526-554, src/channels/mod.rs:688-727
The response delivery path depends on whether streaming was used:
if let Some(draft_id) = draft_message_id {
// Streaming was used - finalize the draft
channel.finalize_draft(&reply_target, draft_id, &response).await?
} else {
// Non-streaming - send as new message
channel.send(&SendMessage::new(response, &reply_target)).await?
}Sources: src/channels/mod.rs:750-767
Some channels receive additional system prompts for platform-specific formatting:
| Channel | Instructions |
|---|---|
| telegram | Media marker syntax: [IMAGE:<url>], [DOCUMENT:<path>], etc. |
| other | None (standard text only) |
The channel_delivery_instructions() function injects these prompts before the LLM turn.
Sources: src/channels/mod.rs:133-140, src/channels/mod.rs:626-628
Users can switch providers and models mid-conversation using slash commands:
/models → List available providers
/models <provider> → Switch to provider
/model → Show current model
/model <model-id> → Switch to model
Route selections are stored per-sender and cleared when switching providers:
Key: "{channel}_{sender}"
Value: ChannelRouteSelection { provider, model }
Switching clears conversation history to avoid cross-model confusion.
Sources: src/channels/mod.rs:146-184, src/channels/mod.rs:365-441
To avoid re-initializing providers on every message, the system maintains a provider cache:
type ProviderCacheMap = Arc<Mutex<HashMap<String, Arc<dyn Provider>>>>The get_or_create_provider() function returns cached providers when available, only initializing new ones on demand.
Sources: src/channels/mod.rs:73-74, src/channels/mod.rs:266-308
Each message has a hard timeout of 300 seconds (5 minutes):
const CHANNEL_MESSAGE_TIMEOUT_SECS: u64 = 300;
let llm_result = tokio::time::timeout(
Duration::from_secs(CHANNEL_MESSAGE_TIMEOUT_SECS),
run_tool_call_loop(...)
).await;On timeout, the user receives: "
Sources: src/channels/mod.rs:64-65, src/channels/mod.rs:698-716, src/channels/mod.rs:789-812
LLM errors are sanitized to prevent credential leakage:
match llm_result {
Ok(Ok(response)) => { /* normal flow */ }
Ok(Err(e)) => {
let safe_err = providers::sanitize_api_error(&e.to_string());
channel.send(&SendMessage::new(
format!("⚠️ Error: {safe_err}"),
&reply_target
)).await?;
}
Err(_timeout) => { /* timeout handling */ }
}Sources: src/channels/mod.rs:769-813
If a channel listener crashes, the supervisor automatically restarts it with exponential backoff:
Failure → Mark unhealthy → Wait backoff → Restart listener → Bump restart counter
This ensures channel availability even when individual listeners encounter transient errors.
Sources: src/channels/mod.rs:471-509
sequenceDiagram
participant U as "User"
participant CH as "Channel::listen()"
participant DL as "Dispatch Loop"
participant PCM as "process_channel_message()"
participant MEM as "Memory::recall()"
participant AG as "run_tool_call_loop()"
participant PRV as "Provider::chat()"
participant TL as "Tool::execute()"
participant SEC as "SecurityPolicy"
U->>CH: Send message
CH->>CH: Validate allowlist
CH->>DL: ChannelMessage
DL->>PCM: Spawn worker task
PCM->>PCM: parse_runtime_command()?
alt Runtime command
PCM->>CH: Command response
PCM-->>DL: Done
else Regular message
PCM->>MEM: recall(content, 5)
MEM-->>PCM: Relevant entries
PCM->>PCM: build_memory_context()
PCM->>PCM: auto_save to memory
PCM->>AG: run_tool_call_loop()
loop Tool Call Loop
AG->>PRV: chat(history, tools)
PRV-->>AG: Response + tool_calls
alt Has tool_calls
loop For each tool
AG->>SEC: can_act()?
SEC-->>AG: Approved/Denied
alt Approved
AG->>TL: execute(args)
TL-->>AG: Result
AG->>AG: scrub_credentials()
AG->>AG: Append to history
else Denied
AG->>AG: Append denial
end
end
else Text only
AG->>AG: Break loop
end
end
AG-->>PCM: Final response
PCM->>PCM: Save to history cache
PCM->>CH: send() or finalize_draft()
CH->>U: Deliver response
end
Sources: src/channels/mod.rs:556-814, src/agent/loop_.rs:851-1044
| Structure | Location | Purpose |
|---|---|---|
ChannelMessage |
src/channels/traits.rs:1-50 | Unified message format across all channels |
ChatMessage |
src/providers/traits.rs:1-100 | LLM message (system/user/assistant/tool) |
ChannelRuntimeContext |
src/channels/mod.rs:101-123 | Shared state for message processing |
ConversationHistoryMap |
src/channels/mod.rs:54 | Per-sender conversation cache |
ChannelRouteSelection |
src/channels/mod.rs:76-80 | Per-sender provider/model override |
ParsedToolCall |
src/agent/loop_.rs:810-814 | Extracted tool call from LLM response |
Sources: src/channels/mod.rs:53-123, src/agent/loop_.rs:810-814
| Metric | Value | Notes |
|---|---|---|
| Message timeout | 300s | Hard limit for LLM + tools |
| Max tool iterations | 10 | Prevents runaway loops |
| History compaction | 50 msgs | Triggers LLM-based summarization |
| Concurrent messages | 8-64 | Based on channel count |
| Draft update threshold | 80 chars | Minimum chunk size for streaming |
| Typing refresh interval | 4s | Keeps indicator active |
Sources: src/channels/mod.rs:56-69, src/agent/loop_.rs:22-24, src/agent/loop_.rs:82