Skip to content

11 Scheduler

Nikolay Vyahhi edited this page Feb 19, 2026 · 2 revisions

Scheduler

Relevant source files

The following files were used as context for generating this wiki page:

The Scheduler is ZeroClaw's cron job execution engine that runs both shell commands and agent prompts on configurable schedules. It enforces security policies, handles retries with exponential backoff, and can deliver results to configured channels. For information about creating and managing cron jobs via tools, see Core Tools. For cron job configuration syntax, see Cron Job Configuration.

Sources: src/cron/scheduler.rs:1-748


Overview

The Scheduler operates as a continuous polling loop that queries the database for due jobs, executes them concurrently (up to max_concurrent limit), and persists results. It integrates deeply with the security subsystem to enforce autonomy levels, command allowlists, and path restrictions before executing shell jobs.

The scheduler runs as part of the daemon mode (zeroclaw daemon) and maintains health status that can be queried via the health check endpoints.

Sources: src/cron/scheduler.rs:21-45


Architecture

Scheduler Loop and Job Execution Flow

flowchart TD
    Start["scheduler::run()"] --> Poll["tokio::time::interval<br/>(scheduler_poll_secs)"]
    Poll --> Query["due_jobs(config, Utc::now())"]
    Query --> Check{Jobs<br/>found?}
    
    Check -->|No| Poll
    Check -->|Yes| Process["process_due_jobs()"]
    
    Process --> Stream["stream::buffer_unordered<br/>(max_concurrent)"]
    
    Stream --> JobExec["execute_and_persist_job()"]
    
    JobExec --> HealthCheck["warn_if_high_frequency_agent_job()"]
    HealthCheck --> Retry["execute_job_with_retry()"]
    
    Retry --> TypeCheck{job_type?}
    
    TypeCheck -->|Shell| ShellExec["run_job_command()"]
    TypeCheck -->|Agent| AgentExec["run_agent_job()"]
    
    ShellExec --> Persist["persist_job_result()"]
    AgentExec --> Persist
    
    Persist --> Deliver["deliver_if_configured()"]
    Deliver --> Record["record_run() + reschedule_after_run()"]
    
    Record --> Poll
    
    style Start fill:#e1f5ff
    style Process fill:#fff4e1
    style Retry fill:#ffe1f5
    style Persist fill:#e1ffe1
Loading

Sources: src/cron/scheduler.rs:21-117


Job Types

The scheduler supports two distinct job types, each with different execution semantics:

Job Type Execution Method Timeout Session Context Use Case
JobType::Shell tokio::process::Command via sh -lc 120s (configurable) Process-isolated System commands, scripts, file operations
JobType::Agent crate::agent::run() with prompt Provider-dependent Main or Isolated session LLM-driven tasks, summarization, monitoring

Sources: src/cron/scheduler.rs:52-150

Shell Jobs

Shell jobs execute commands through sh -lc in the workspace directory with the following characteristics:

  • Security checks run before execution: can_act(), is_command_allowed(), is_path_allowed(), rate limiting
  • Path argument validation strips quotes and checks every path-like token against forbidden directories
  • Timeout enforcement via tokio::time::timeout() with kill-on-drop
  • Output capture includes both stdout and stderr with status code
sequenceDiagram
    participant Sched as Scheduler
    participant Sec as SecurityPolicy
    participant Proc as tokio::process::Command
    participant FS as Filesystem
    
    Sched->>Sec: can_act()?
    Sec-->>Sched: true/false
    
    Sched->>Sec: is_command_allowed(cmd)?
    Sec-->>Sched: true/false
    
    Sched->>Sec: forbidden_path_argument(cmd)?
    Sec-->>Sched: None/Some(path)
    
    Sched->>Sec: record_action()
    Sec-->>Sched: true/false (rate limit)
    
    Sched->>Proc: spawn("sh", "-lc", cmd)
    Proc->>FS: Execute in workspace_dir
    FS-->>Proc: stdout + stderr
    Proc-->>Sched: ExitStatus + output
Loading

Sources: src/cron/scheduler.rs:377-467

Agent Jobs

Agent jobs execute prompts through the full agent turn cycle, inheriting all tool capabilities:

  • Prompt prefixing: [cron:{job_id} {job_name}] {prompt} for observability
  • Model override: Optional per-job model selection via job.model
  • Session targeting: SessionTarget::Main (shared history) or SessionTarget::Isolated (ephemeral)
  • Full tool access: Agent jobs can invoke any registered tool, subject to security policies

Sources: src/cron/scheduler.rs:119-150


Security Enforcement

The scheduler enforces the same security policies as interactive tool execution, with additional checks for cron-specific risks:

Pre-Execution Checks

flowchart LR
    Start["run_job_command()"] --> ReadOnly{autonomy ==<br/>ReadOnly?}
    ReadOnly -->|Yes| BlockRO["blocked: autonomy is read-only"]
    ReadOnly -->|No| RateLimit{is_rate_limited()?}
    
    RateLimit -->|Yes| BlockRL["blocked: rate limit exceeded"]
    RateLimit -->|No| CmdAllowed{is_command_allowed()?}
    
    CmdAllowed -->|No| BlockCmd["blocked: command not allowed"]
    CmdAllowed -->|Yes| PathCheck{forbidden_path_argument()?}
    
    PathCheck -->|Some| BlockPath["blocked: forbidden path argument"]
    PathCheck -->|None| RecordAction{record_action()?}
    
    RecordAction -->|false| BlockBudget["blocked: action budget exhausted"]
    RecordAction -->|true| Execute["spawn Command"]
    
    style BlockRO fill:#ffcccc
    style BlockRL fill:#ffcccc
    style BlockCmd fill:#ffcccc
    style BlockPath fill:#ffcccc
    style BlockBudget fill:#ffcccc
    style Execute fill:#ccffcc
Loading

Sources: src/cron/scheduler.rs:391-433

Path Validation Algorithm

The forbidden_path_argument() function implements sophisticated path extraction and validation:

  1. Normalize separators: Replace &&, ||, ;, |, \n with null bytes for tokenization
  2. Skip env assignments: Detect KEY=value patterns via is_env_assignment()
  3. Strip quotes: Remove wrapping " and ' characters via strip_wrapping_quotes()
  4. Filter non-paths: Skip flags (-), URLs (://), empty tokens
  5. Path detection heuristics: Check for /, ./, ../, ~/ prefixes or embedded /
  6. Security validation: Call security.is_path_allowed(candidate) for each path-like token

This prevents command injection via path arguments like cat /etc/passwd or rm -rf /sys.

Sources: src/cron/scheduler.rs:319-375


Retry Mechanism

The scheduler implements exponential backoff with jitter for transient failures:

flowchart TD
    Start["execute_job_with_retry()"] --> Loop{attempt ≤<br/>retries?}
    
    Loop -->|No| ReturnFail["return (false, last_output)"]
    Loop -->|Yes| Execute{job_type?}
    
    Execute -->|Shell| RunCmd["run_job_command()"]
    Execute -->|Agent| RunAgent["run_agent_job()"]
    
    RunCmd --> CheckSuccess{success?}
    RunAgent --> CheckSuccess
    
    CheckSuccess -->|Yes| ReturnOk["return (true, output)"]
    CheckSuccess -->|No| PolicyCheck{output starts with<br/>'blocked by security policy'?}
    
    PolicyCheck -->|Yes| ReturnFail2["return (false, output)<br/>(non-retryable)"]
    PolicyCheck -->|No| Backoff["sleep(backoff_ms + jitter_ms)"]
    
    Backoff --> IncrementBackoff["backoff_ms = min(backoff_ms * 2, 30000)"]
    IncrementBackoff --> Loop
    
    style ReturnOk fill:#ccffcc
    style ReturnFail fill:#ffcccc
    style ReturnFail2 fill:#ffcccc
Loading

Configuration:

  • Initial backoff: config.reliability.provider_backoff_ms (default 200ms)
  • Max backoff: 30 seconds
  • Jitter: 0-250ms derived from Utc::now().timestamp_subsec_millis()
  • Retry count: config.reliability.scheduler_retries (default 3)

Non-retryable errors:

  • Security policy violations (deterministic rejections)
  • Rate limit exhaustion

Sources: src/cron/scheduler.rs:52-85


Result Delivery

The scheduler can announce job results to configured channels via the DeliveryConfig mechanism:

Delivery Configuration

pub struct DeliveryConfig {
    mode: String,        // "announce" or ""
    channel: Option<String>,  // "telegram" | "discord" | "slack" | "mattermost"
    to: Option<String>,       // Channel/chat ID or recipient
    best_effort: bool,        // If true, delivery failures don't fail the job
}

Supported Channels

Channel Instantiation Recipient Format
telegram TelegramChannel::new() Chat ID (e.g., "-1001234567890")
discord DiscordChannel::new() Channel ID (e.g., "987654321098765432")
slack SlackChannel::new() Channel ID (e.g., "C01234567")
mattermost MattermostChannel::new() Channel ID (e.g., "abc123xyz")

Delivery Flow:

sequenceDiagram
    participant Scheduler
    participant DeliveryConfig
    participant Channel
    participant Platform
    
    Scheduler->>DeliveryConfig: Check mode == "announce"
    DeliveryConfig-->>Scheduler: true
    
    Scheduler->>DeliveryConfig: Extract channel + to
    
    Scheduler->>Channel: Instantiate (bot_token, allowed_users)
    Scheduler->>Channel: send(SendMessage::new(output, target))
    
    Channel->>Platform: HTTP POST (platform-specific)
    Platform-->>Channel: 200 OK / Error
    
    alt Delivery Failed
        Channel-->>Scheduler: Err(e)
        Scheduler->>Scheduler: Check best_effort
        alt best_effort == true
            Scheduler->>Scheduler: Log warning, continue
        else
            Scheduler->>Scheduler: Mark job as failed
        end
    end
Loading

Sources: src/cron/scheduler.rs:240-317


Job Lifecycle and Persistence

One-Shot Jobs

Jobs with schedule = At { at: DateTime } and delete_after_run = true exhibit special lifecycle behavior:

  • On success: Job is deleted via remove_job(config, &job.id)
  • On failure: Job is disabled via update_job() with enabled = false, preserving failure history

Sources: src/cron/scheduler.rs:181-207

Recurring Jobs

Recurring jobs (Every { every_ms } or Cron { expr }) follow this persistence flow:

  1. Record execution: record_run(config, &job.id, started_at, finished_at, status, output, duration_ms)
  2. Update state: reschedule_after_run(config, job, success, output) calculates next run time
  3. Prune history: Old run records are kept according to retention policy (implementation-dependent)

Sources: src/cron/scheduler.rs:152-207

High-Frequency Agent Job Warning

The scheduler emits warnings for agent jobs scheduled more frequently than every 5 minutes to prevent excessive LLM API costs:

fn warn_if_high_frequency_agent_job(job: &CronJob) {
    if !matches!(job.job_type, JobType::Agent) { return; }
    
    let too_frequent = match &job.schedule {
        Schedule::Every { every_ms } => *every_ms < 5 * 60 * 1000,
        Schedule::Cron { expr, .. } => {
            // Check time delta between consecutive runs
            next_run_for_schedule() delta < 5 minutes
        }
        _ => false,
    };
    
    if too_frequent {
        tracing::warn!("Cron agent job '{}' scheduled < 5min frequency", job.id);
    }
}

Sources: src/cron/scheduler.rs:213-238


Configuration Reference

Scheduler Settings

[scheduler]
max_concurrent = 5  # Concurrent job execution limit

[reliability]
scheduler_poll_secs = 10     # Poll interval (minimum 5s enforced)
scheduler_retries = 3        # Retry attempts for failed jobs
provider_backoff_ms = 200    # Initial backoff for exponential retry

Constants

Constant Value Purpose
MIN_POLL_SECONDS 5 Minimum safe polling interval
SHELL_JOB_TIMEOUT_SECS 120 Shell command execution timeout

Sources: src/cron/scheduler.rs:18-22


Database Integration

The scheduler relies on functions from src/cron/mod.rs for persistence:

flowchart LR
    Scheduler["Scheduler"] --> DueJobs["due_jobs(config, now)"]
    Scheduler --> RecordRun["record_run(config, id, ...)"]
    Scheduler --> RecordLast["record_last_run(config, id, ...)"]
    Scheduler --> Reschedule["reschedule_after_run(config, job, ...)"]
    Scheduler --> RemoveJob["remove_job(config, id)"]
    Scheduler --> UpdateJob["update_job(config, id, patch)"]
    
    DueJobs --> DB[(cron_jobs table)]
    RecordRun --> DB2[(cron_runs table)]
    RecordLast --> DB
    Reschedule --> DB
    RemoveJob --> DB
    UpdateJob --> DB
Loading

Key queries:

  • due_jobs() returns Vec<CronJob> where next_run <= now AND enabled = true
  • record_run() inserts execution history into cron_runs table
  • reschedule_after_run() updates next_run timestamp based on schedule type

Sources: src/cron/scheduler.rs:5-8, src/cron/scheduler.rs:34-45


Health Monitoring

The scheduler integrates with the health check subsystem:

// On successful initialization
crate::health::mark_component_ok("scheduler");

// On query failures
crate::health::mark_component_error("scheduler", error_message);

// On job execution failures
crate::health::mark_component_error("scheduler", format!("job {job_id} failed"));

Health status can be queried via the gateway's /health endpoint or through the health check system.

Sources: src/cron/scheduler.rs:29-108


Error Handling

The scheduler distinguishes between recoverable and non-recoverable errors:

Recoverable Errors (Retryable)

  • Network timeouts during agent jobs
  • Transient LLM provider failures
  • Temporary filesystem issues

Non-Recoverable Errors (Non-Retryable)

  • Security policy violations (blocked by security policy: prefix)
  • Missing required fields in job configuration
  • Invalid schedule expressions (caught during add_job(), not during execution)

Error Propagation

  • Query failures log warnings and continue polling
  • Job execution failures are persisted but don't crash the scheduler loop
  • Delivery failures respect best_effort flag

Sources: src/cron/scheduler.rs:52-85, src/cron/scheduler.rs:162-169


Testing

The scheduler has comprehensive test coverage including:

Unit Tests

Integration Tests

The scheduler is tested end-to-end in daemon mode tests that verify:

  • Concurrent job execution respects max_concurrent
  • Jobs execute at scheduled times (within polling window)
  • Security policies are enforced across job types

Sources: src/cron/scheduler.rs:469-747


Security Considerations

Command Injection Prevention

The scheduler implements multiple layers of defense:

  1. Command allowlist: Only commands in autonomy.allowed_commands can execute
  2. Path validation: forbidden_path_argument() blocks access to 14 system directories
  3. Argument sanitization: Strips quotes and validates each token independently
  4. Shell isolation: Executes via sh -lc with stdin=null and kill_on_drop=true

Rate Limiting

The security policy's record_action() enforces max_actions_per_hour across:

  • Interactive tool executions
  • Scheduled shell jobs
  • Scheduled agent jobs (with tool usage)

This prevents runaway scheduled jobs from exhausting quotas.

Sources: src/cron/scheduler.rs:391-433

Workspace Scoping

All shell jobs execute with current_dir(&config.workspace_dir), preventing directory traversal outside the configured workspace. Combined with path validation, this creates a defense-in-depth boundary.

Sources: src/cron/scheduler.rs:438


Clone this wiki locally