diff --git a/docs/discord-setup.md b/docs/discord-setup.md new file mode 100644 index 0000000..c4a287c --- /dev/null +++ b/docs/discord-setup.md @@ -0,0 +1,261 @@ +# SPECTRE Agent System — Complete Setup Guide + +This guide covers setting up the autonomous Python agent system with Discord bot integration on the Spectre (Franklin) cluster. + +## Prerequisites + +- Python 3.11+ on the cluster login/utility node +- `uv` package manager installed +- Access to SLURM commands (`sbatch`, `sacct`, `squeue`) +- BeeGFS mounted at `/mnt/beegfs/` +- An Anthropic API key with access to Claude Opus 4.6, Sonnet 4.6, and Haiku 4.5 +- A Discord account with permission to create bots + +--- + +## Step 1: Create a Discord Bot + +1. Go to https://discord.com/developers/applications +2. Click **New Application** and name it `SPECTRE Bot` +3. Go to the **Bot** tab: + - Click **Add Bot** (if not already created) + - Copy the **Token** — save it securely, you'll need it later + - Enable **Message Content Intent** (required for reading messages) + - Enable **Server Members Intent** +4. Go to the **OAuth2 → URL Generator** tab: + - **Scopes**: select `bot` and `applications.commands` + - **Bot Permissions**: select: + - Send Messages + - Embed Links + - Attach Files + - Use Slash Commands + - Read Message History + - Create Public Threads + - Copy the generated URL +5. Open the URL in your browser and add the bot to your Discord server + +## Step 2: Set Up Discord Server Channels + +Create these channels in your Discord server: + +| Channel | Purpose | +|---------|---------| +| `#simulation-status` | Automated status updates, milestones | +| `#decisions` | Interactive decision requests with buttons | +| `#alerts` | Failure alerts and critical warnings | +| `#plots` | Surface field PNGs, convergence plots | +| `#logs` | Verbose agent activity (optional) | +| `#ask-mitgcm` | Knowledge Q&A — ask about MITgcm, ERA5, oceanography, or the codebase | + +**Get your Guild (Server) ID:** +- Enable Developer Mode in Discord (Settings → Advanced → Developer Mode) +- Right-click your server name → Copy Server ID + +## Step 3: Configure Secrets + +Create the secrets file on the cluster: + +```bash +sudo mkdir -p /etc/spectre-agents +sudo tee /etc/spectre-agents/env << 'EOF' +ANTHROPIC_API_KEY=sk-ant-your-key-here +DISCORD_BOT_TOKEN=your-bot-token-here +DISCORD_GUILD_ID=your-guild-id-here +EOF +sudo chmod 600 /etc/spectre-agents/env +sudo chown joe:joe /etc/spectre-agents/env +``` + +## Step 4: Install the Agent System + +```bash +cd /mnt/beegfs/spectre-150-ensembles + +# Create virtual environment +uv venv .venv + +# Install dependencies (includes spectre_agents package) +uv sync + +# Verify the package loads +.venv/bin/python -c "from spectre_agents.config import load_config; print('OK')" +``` + +## Step 5: Test the Bot Locally + +Before installing as a service, test interactively: + +```bash +cd /mnt/beegfs/spectre-150-ensembles + +# Source the secrets +source /etc/spectre-agents/env +export ANTHROPIC_API_KEY DISCORD_BOT_TOKEN DISCORD_GUILD_ID + +# Run the agent system +.venv/bin/python -m spectre_agents --config spectre_agents_config.yaml +``` + +You should see: +``` +SPECTRE Agent System starting... +Bot connected as SPECTRE Bot#1234 (ID: ...) +Synced commands to guild ... +``` + +In Discord, the bot should post "SPECTRE Agent System online" in `#simulation-status`. + +Test slash commands: +- `/run status` — should show current (idle) status +- `/validate` — should run namelist validation +- `/dashboard status` — should check dashboard health + +Press `Ctrl+C` to stop. + +## Step 6: Install as a Systemd Service + +```bash +# Copy the service file +sudo cp systemd/spectre-agents.service /etc/systemd/system/ + +# Reload systemd +sudo systemctl daemon-reload + +# Enable and start the service +sudo systemctl enable spectre-agents +sudo systemctl start spectre-agents + +# Check status +sudo systemctl status spectre-agents + +# View logs +journalctl -u spectre-agents -f +``` + +## Step 7: Verify Everything Works + +1. In Discord, run `/run status` — bot should respond with a status embed +2. Run `/validate` — should trigger namelist validation and return results +3. Run `/dashboard status` — should report dashboard component health +4. Run `/run start` — should validate, submit a SLURM job, and start monitoring + +--- + +## Architecture Overview + +``` +┌─────────────────────────────────────────────┐ +│ Spectre Cluster Node │ +│ │ +│ systemd: spectre-agents.service │ +│ ┌───────────────────────────────────────┐ │ +│ │ python -m spectre_agents │ │ +│ │ │ │ +│ │ Discord Bot (asyncio event loop) │ │ +│ │ ├── Slash commands → Agent runner │ │ +│ │ ├── Decision queue ← Orchestrator │ │ +│ │ └── Status embeds → Discord │ │ +│ │ │ │ +│ │ Agent Runner (ThreadPoolExecutor) │ │ +│ │ ├── Orchestrator (Opus) │ │ +│ │ │ delegates to: │ │ +│ │ ├── WorkflowRunner (Haiku) │ │ +│ │ ├── StdoutDiagnostics (Sonnet) │ │ +│ │ ├── ModelOutputReview (Sonnet) │ │ +│ │ ├── NamelistValidator (Sonnet) │ │ +│ │ ├── ForcingDataQC (Sonnet) │ │ +│ │ ├── DashboardManager (Haiku) │ │ +│ │ ├── DiscordNotifier (Haiku) │ │ +│ │ └── WebResearch (Sonnet) │ │ +│ └───────────────────────────────────────┘ │ +│ │ +│ SLURM ←→ sbatch/sacct/squeue │ +│ BeeGFS ←→ /mnt/beegfs/spectre-* │ +│ Tailscale ←→ Dashboard proxy │ +└─────────────────────────────────────────────┘ +``` + +## Discord Commands Reference + +### Slash commands (simulation ops) + +| Command | Description | +|---------|-------------| +| `/run start` | Validate config, submit simulation, start monitoring | +| `/run status` | Show job state, model days, CFL, throughput | +| `/run stop` | Cancel SLURM job, stop monitoring | +| `/run resubmit` | Clear run dir, resubmit from pickup | +| `/diagnose [job_id]` | Run STDOUT failure diagnostics | +| `/review` | Model output physical plausibility check | +| `/validate` | Pre-flight namelist validation | +| `/qc forcing` | EXF forcing data QC | +| `/qc obc` | OBC boundary data QC | +| `/dashboard start` | Start monitoring stack | +| `/dashboard status` | Health-check all components | +| `/dashboard restart [component]` | Restart dashboard/converter/plotter | +| `/ensemble start` | Begin bred vector generation | +| `/ensemble status` | Show ensemble convergence | +| `/config [param]` | Show simulation configuration | + +### Knowledge Q&A (`#ask-mitgcm`) + +Just type a question in the `#ask-mitgcm` channel — no slash command needed. +The bot answers using Claude with full context about: + +- **MITgcm**: parameters, packages, Fortran source, debugging +- **ERA5 / GLORYS**: variable definitions, units, accumulation conventions +- **This simulation**: grid, forcing, namelists, workflows, known gotchas +- **Oceanography**: North Atlantic circulation, air-sea fluxes, ensemble methods +- **HPC / SLURM**: job scheduling, containers, parallel I/O + +Long answers automatically create a thread to keep the channel clean. +The bot can also search the web and read files in the repo for up-to-date answers. + +## Agent Autonomy Levels + +The system operates with **high autonomy**: + +**Autonomous actions (no Discord approval needed):** +- Resubmit after SLURM walltime exceeded +- Restart dead dashboard/plotter/converter processes +- Clear run directory before resubmit +- Rebuild container image if not found + +**Requires Discord approval (posts interactive buttons):** +- Timestep changes (CFL approaching 0.45) +- Ambiguous failure with multiple fix options +- Physics parameter changes (viscosity, diffusion) +- First-time configuration submission +- Bred vector cycle completion review + +## Troubleshooting + +### Bot doesn't respond to commands +- Check `journalctl -u spectre-agents -f` for errors +- Verify `DISCORD_BOT_TOKEN` and `DISCORD_GUILD_ID` are correct +- Ensure the bot has the required permissions in your server +- Commands may take up to 1 hour to sync globally; guild sync is instant + +### "Claude Agent SDK not found" error +- Ensure `claude-agent-sdk` is installed: `.venv/bin/pip list | grep claude` +- The Claude Code CLI must be installed on the system: `which claude` + +### Agent times out +- Check `ANTHROPIC_API_KEY` is valid and has quota +- Increase `max_turns` in `spectre_agents_config.yaml` if agents need more steps +- Check network connectivity from the cluster node + +### SLURM commands fail +- Verify the service runs as the correct user (joe) +- Check that SLURM is accessible from the node running the service +- Ensure the working directory exists: `/mnt/beegfs/spectre-150-ensembles` + +## Cost Estimates + +| Agent | Model | Approx. cost per invocation | +|-------|-------|---------------------------| +| Orchestrator | Opus 4.6 | $0.10 – $0.50 | +| Diagnostics/Review/Validator/QC | Sonnet 4.6 | $0.02 – $0.10 | +| WorkflowRunner/Dashboard/Notify | Haiku 4.5 | $0.005 – $0.02 | + +A typical run-diagnose-fix-restart cycle costs approximately $0.50 – $1.00. diff --git a/pyproject.toml b/pyproject.toml index a47d8bf..8ad0774 100755 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,11 +33,17 @@ dependencies = [ "xarray==2025.6.1", "xgcm==0.8.1", "zarr==3.0.8", - "MetPy==1.7.1" + "MetPy==1.7.1", + "claude-agent-sdk", + "discord.py>=2.3.0", + "anyio>=4.0.0", ] [project.urls] Homepage = "https://github.com/ocean-spectre/spectra-150-ensembles" Issues = "https://github.com/fluidnumerics/spectre_utils/issues" +[project.scripts] +spectre-agents = "spectre_agents.__main__:cli" + [tool.setuptools] -packages = ["spectre_utils"] +packages = ["spectre_utils", "spectre_agents", "spectre_agents.tools", "spectre_agents.agents", "spectre_agents.discord_bot"] diff --git a/spectre_agents/__init__.py b/spectre_agents/__init__.py new file mode 100644 index 0000000..311a52e --- /dev/null +++ b/spectre_agents/__init__.py @@ -0,0 +1,7 @@ +"""SPECTRE Simulation Agent System. + +Autonomous Python agents for MITgcm ocean simulation orchestration, +with Discord bot integration for bidirectional communication. +""" + +__version__ = "0.1.0" diff --git a/spectre_agents/__main__.py b/spectre_agents/__main__.py new file mode 100644 index 0000000..718df20 --- /dev/null +++ b/spectre_agents/__main__.py @@ -0,0 +1,105 @@ +"""Entry point for the SPECTRE agent system. + +Usage: + python -m spectre_agents [--config PATH] + +Starts the Discord bot and agent runner as concurrent asyncio tasks. +""" + +from __future__ import annotations + +import argparse +import asyncio +import logging +import signal +import sys +from pathlib import Path + +from spectre_agents.config import load_config +from spectre_agents.context import AgentContext +from spectre_agents.discord_bot.bot import run_bot +from spectre_agents.tools.discord_notify import set_agent_context + +logger = logging.getLogger("spectre_agents") + + +def setup_logging() -> None: + """Configure structured logging to stderr and optional file.""" + fmt = logging.Formatter( + "%(asctime)s [%(levelname)s] %(name)s: %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) + handler = logging.StreamHandler(sys.stderr) + handler.setFormatter(fmt) + + root = logging.getLogger() + root.setLevel(logging.INFO) + root.addHandler(handler) + + # Suppress noisy discord.py debug logs + logging.getLogger("discord").setLevel(logging.WARNING) + logging.getLogger("discord.http").setLevel(logging.WARNING) + + +async def main(config_path: str | None = None) -> None: + """Main async entry point.""" + setup_logging() + + config = load_config(config_path) + logger.info("Loaded config: base_dir=%s, sim_dir=%s", config.base_dir, config.sim_dir) + + # Validate required secrets + if not config.anthropic_api_key: + logger.error("ANTHROPIC_API_KEY not set. Set it in /etc/spectre-agents/env or environment.") + sys.exit(1) + if not config.discord_bot_token: + logger.error("DISCORD_BOT_TOKEN not set. Set it in /etc/spectre-agents/env or environment.") + sys.exit(1) + + # Initialize shared context + ctx = AgentContext(base_dir=config.base_dir) + ctx.load_state() + logger.info("Loaded state: status=%s, job=%s", ctx.simulation.status, ctx.simulation.active_job_id) + + # Wire up Discord tools with the context + set_agent_context(ctx) + + # Handle shutdown signals + loop = asyncio.get_event_loop() + stop_event = asyncio.Event() + + def signal_handler(sig): + logger.info("Received signal %s, shutting down...", sig) + stop_event.set() + + for sig in (signal.SIGINT, signal.SIGTERM): + loop.add_signal_handler(sig, signal_handler, sig) + + # Run the Discord bot — it manages the event loop + logger.info("Starting SPECTRE Agent System...") + try: + await run_bot(config, ctx) + except asyncio.CancelledError: + pass + finally: + ctx.save_state() + logger.info("SPECTRE Agent System stopped.") + + +def cli() -> None: + """CLI entry point.""" + parser = argparse.ArgumentParser( + description="SPECTRE Simulation Agent System with Discord bot" + ) + parser.add_argument( + "--config", + type=str, + default=None, + help="Path to spectre_agents_config.yaml", + ) + args = parser.parse_args() + asyncio.run(main(args.config)) + + +if __name__ == "__main__": + cli() diff --git a/spectre_agents/agents/__init__.py b/spectre_agents/agents/__init__.py new file mode 100644 index 0000000..634ac46 --- /dev/null +++ b/spectre_agents/agents/__init__.py @@ -0,0 +1,37 @@ +"""SPECTRE agent definitions. + +Each agent class wraps a Claude Agent SDK session with a specialized system prompt +and tool set, mirroring the .claude/agents/*.md definitions. +""" + +from spectre_agents.agents.workflow_runner import WorkflowRunner +from spectre_agents.agents.stdout_diagnostics import StdoutDiagnostics +from spectre_agents.agents.model_output_review import ModelOutputReview +from spectre_agents.agents.namelist_validator import NamelistValidator +from spectre_agents.agents.forcing_data_qc import ForcingDataQC +from spectre_agents.agents.dashboard_manager import DashboardManager +from spectre_agents.agents.notify import DiscordNotifier +from spectre_agents.agents.web_research import WebResearch + +AGENT_REGISTRY: dict[str, type] = { + "workflow-runner": WorkflowRunner, + "mitgcm-stdout-diagnostics": StdoutDiagnostics, + "model-output-review": ModelOutputReview, + "namelist-validator": NamelistValidator, + "forcing-data-qc": ForcingDataQC, + "dashboard-manager": DashboardManager, + "notify": DiscordNotifier, + "web-research": WebResearch, +} + +__all__ = [ + "AGENT_REGISTRY", + "WorkflowRunner", + "StdoutDiagnostics", + "ModelOutputReview", + "NamelistValidator", + "ForcingDataQC", + "DashboardManager", + "DiscordNotifier", + "WebResearch", +] diff --git a/spectre_agents/agents/base.py b/spectre_agents/agents/base.py new file mode 100644 index 0000000..36b0fa8 --- /dev/null +++ b/spectre_agents/agents/base.py @@ -0,0 +1,86 @@ +"""Base agent class for all SPECTRE agents. + +Provides common configuration, tool registration, and the run() method +that invokes a Claude Agent SDK session. +""" + +from __future__ import annotations + +import logging +from pathlib import Path +from typing import Any + +from claude_agent_sdk import ( + ClaudeSDKClient, + ClaudeAgentOptions, + AssistantMessage, + ResultMessage, + TextBlock, + create_sdk_mcp_server, +) + +from spectre_agents.config import Config + +logger = logging.getLogger(__name__) + + +class BaseSpectreAgent: + """Base class for all SPECTRE simulation agents.""" + + name: str = "base" + description: str = "" + model: str = "claude-sonnet-4-6" + max_tokens: int = 8192 + system_prompt: str = "" + + # Subclasses override to list their tool functions + tool_functions: list = [] + + def __init__(self, config: Config): + self.config = config + self.sim_dir = config.simulation_dir + self.base_dir = config.base_dir + + # Apply model config from YAML if available + agent_cfg = getattr(config.agents, self.name.replace("-", "_"), None) + if agent_cfg: + self.model = agent_cfg.model + self.max_tokens = agent_cfg.max_tokens + + def _build_options(self) -> tuple[Any, ClaudeAgentOptions]: + """Build the MCP server and ClaudeAgentOptions for this agent.""" + server = create_sdk_mcp_server( + f"spectre-{self.name}-tools", + tools=self.tool_functions, + ) + options = ClaudeAgentOptions( + cwd=str(self.sim_dir), + mcp_servers={f"{self.name}-tools": server}, + system_prompt=self.system_prompt, + model=self.model, + permission_mode="bypassPermissions", + max_turns=30, + ) + return server, options + + async def run(self, task: str) -> str: + """Run the agent with a task prompt and return the final text response.""" + _, options = self._build_options() + + result_text = "" + try: + async with ClaudeSDKClient(options=options) as client: + await client.query(task) + async for message in client.receive_response(): + if isinstance(message, ResultMessage): + result_text = message.result or "" + elif isinstance(message, AssistantMessage): + for block in message.content: + if isinstance(block, TextBlock): + # Capture last text output + result_text = block.text + except Exception as e: + logger.exception("Agent %s failed: %s", self.name, e) + result_text = f"Agent {self.name} error: {e}" + + return result_text diff --git a/spectre_agents/agents/dashboard_manager.py b/spectre_agents/agents/dashboard_manager.py new file mode 100644 index 0000000..2358201 --- /dev/null +++ b/spectre_agents/agents/dashboard_manager.py @@ -0,0 +1,70 @@ +"""DashboardManager agent — monitoring infrastructure lifecycle. + +Ported from .claude/agents/dashboard-manager.md +""" + +from __future__ import annotations + +from spectre_agents.agents.base import BaseSpectreAgent +from spectre_agents.tools.bash import run_command +from spectre_agents.tools.file_io import read_file +from spectre_agents.tools.dashboard import ( + dashboard_health_check, + start_dashboard, + start_converter, + start_plotter, + stop_process, +) + +SYSTEM_PROMPT = """\ +You are the dashboard infrastructure manager. You ensure the monitoring stack (dashboard, converter, plotter) is running and healthy. + +## The three processes + +| Process | Port | Log | Purpose | +|---------|------|-----|---------| +| Dashboard | 8050 | /tmp/dashboard.log | Serves monitoring web UI | +| Converter | — | /tmp/converter.log | Binary diagnostics to per-tile NetCDF | +| Plotter | — | /tmp/plotter.log | NetCDF to surface field PNGs | + +## Health check + +Run this sequence to verify everything is working: +1. Dashboard process alive? (check port 8050) +2. Dashboard serving data? (curl http://127.0.0.1:8050/data) +3. Tailscale proxy active? +4. Converter running? (check process) +5. Plotter running? (check process) +6. Plots being generated? (check /plots endpoint) + +## Starting the full stack + +All commands must run from /mnt/beegfs/spectre-150-ensembles as the working directory. +Startup order: dashboard -> converter -> plotter -> verify. + +## Restarting a single process + +If only one process died, restart just that one — don't restart the others (they hold incremental state). Exception: the dashboard can be restarted freely since it re-parses STDOUT from the beginning. + +## Common issues + +- Port 8050 in use: check for stale dashboard process or tailscale proxy +- Plotter "No MNC directories": the simulation hasn't created output yet +- Converter finds no .data files: diag_mnc=.FALSE. must be set +- Dashboard shows 0 panels: STDOUT has no monitor blocks yet +""" + + +class DashboardManager(BaseSpectreAgent): + name = "dashboard_manager" + description = ( + "Ensures the simulation monitoring dashboard, converter, and plotter are running. " + "Manages startup, restart, and health checks." + ) + model = "claude-haiku-4-5" + max_tokens = 4096 + system_prompt = SYSTEM_PROMPT + tool_functions = [ + run_command, read_file, + dashboard_health_check, start_dashboard, start_converter, start_plotter, stop_process, + ] diff --git a/spectre_agents/agents/forcing_data_qc.py b/spectre_agents/agents/forcing_data_qc.py new file mode 100644 index 0000000..aef2422 --- /dev/null +++ b/spectre_agents/agents/forcing_data_qc.py @@ -0,0 +1,82 @@ +"""ForcingDataQC agent — validate EXF and OBC binary forcing files. + +Ported from .claude/agents/forcing-data-qc.md +""" + +from __future__ import annotations + +from spectre_agents.agents.base import BaseSpectreAgent +from spectre_agents.tools.bash import run_command +from spectre_agents.tools.file_io import read_file, glob_files, grep_files +from spectre_agents.tools.forcing import validate_exf_binary, validate_obc_binary + +SYSTEM_PROMPT = """\ +You are a forcing data quality-control specialist. You validate atmospheric (EXF) and ocean boundary (OBC) binary files by cross-checking them against expected physical ranges and the MITgcm namelist metadata. + +## EXF binary files + +All EXF files are pre-interpolated to the model grid (768x424) with latitude flipped to south-to-north. Wind components (uwind, vwind) are pre-rotated to model-grid directions. + +### Physical range checks (record 0 + sampled records) +```python +arr = np.fromfile(path, dtype='>f4', count=424*768).reshape(424, 768) +``` + +| Variable | Unit | Expected range | +|----------|------|---------------| +| atemp | K | 240-320 | +| aqh | kg/kg | 0-0.025 | +| uwind | m/s | -50 to +50 | +| vwind | m/s | -50 to +50 | +| swdown | W/m2 | 0-1200 | +| lwdown | W/m2 | 100-500 | +| precip | m/s | 0 to 1e-3 | +| evap | m/s | -1e-3 to 1e-4 | + +### Grid orientation check +- j=0 should be south (20N) — warm tropical values +- j=423 should be north (54N) — cooler values +- Verify by comparing atemp at j=0 vs j=423 + +### Wind rotation check +- Wind speed magnitude should be preserved: sqrt(u^2 + v^2) should match ERA5 input +- Max wind speed should be < 50 m/s (if > 100, rotation is wrong) + +## OBC binary files + +### Record count +Expected: 5479 daily records (2002-07-01 to 2017-06-30) +```python +size = os.path.getsize(path) +n_recs = size / (Nr * Nx_or_Ny * 4) # float32 +``` + +### Expected sizes +| Boundary | 3D shape | 2D shape | +|----------|----------|----------| +| North/South | (5479, 50, 768) | (5479, 768) | +| East/West | (5479, 50, 424) | — | + +## NaN/Inf/fill value check +- np.isnan(arr).any() and np.isinf(arr).any() +- ERA5 fill value: ~9.97e+36; check for values > 1e6 in non-radiation fields + +## Output format +Per file: PASS/FAIL with min, max, mean, NaN count, and any anomalies. +Summary: total files checked, PASS count, FAIL count. +""" + + +class ForcingDataQC(BaseSpectreAgent): + name = "forcing_data_qc" + description = ( + "Validates EXF and OBC binary forcing files for physical plausibility, " + "correct orientation, and NaN/Inf values." + ) + model = "claude-sonnet-4-6" + max_tokens = 8192 + system_prompt = SYSTEM_PROMPT + tool_functions = [ + run_command, read_file, glob_files, grep_files, + validate_exf_binary, validate_obc_binary, + ] diff --git a/spectre_agents/agents/model_output_review.py b/spectre_agents/agents/model_output_review.py new file mode 100644 index 0000000..2a12f0c --- /dev/null +++ b/spectre_agents/agents/model_output_review.py @@ -0,0 +1,81 @@ +"""ModelOutputReview agent — assess physical plausibility of simulation output. + +Ported from .claude/agents/model-output-review.md +""" + +from __future__ import annotations + +from spectre_agents.agents.base import BaseSpectreAgent +from spectre_agents.tools.bash import run_command +from spectre_agents.tools.file_io import read_file, glob_files +from spectre_agents.tools.mitgcm import parse_monitor_stats, get_cfl_values, get_model_days, get_stdout_tail + +SYSTEM_PROMPT = """\ +You are a MITgcm model output reviewer. You assess whether a simulation is producing physically realistic results by checking monitor statistics and diagnostics output. + +## What to check + +### Monitor statistics (from STDOUT.0000) +Extract the latest monitor block and compare against expected ranges: + +| Field | Healthy range (North Atlantic) | +|-------|-------------------------------| +| dynstat_theta (SST) | 2-30C; mean ~15C | +| dynstat_salt | 33-37 PSU | +| dynstat_uvel/vvel | max < 2 m/s (Gulf Stream peaks ~1.5) | +| dynstat_wvel | max < 0.1 m/s | +| dynstat_eta | +/-1.5 m | +| advcfl_W_hf_max | < 0.5 (if approaching 0.5, flag for timestep reduction) | +| ke_max | not growing exponentially | + +### Diagnostics output (surface fields) +If surface field PNGs exist in /plots/: +- SST should show the Gulf Stream as a warm tongue separating from Cape Hatteras +- SSH should show ~1 m gradient across the Gulf Stream +- KE should peak in the Gulf Stream region + +### Trend analysis +Compare the first and last monitor blocks: +- Is temperature drifting? (steady drift > 1C/year suggests forcing imbalance) +- Is salinity drifting? (fresh bias suggests precipitation/evaporation error) +- Is KE growing or decaying? (should stabilize after spinup) + +## Reading monitor data +```bash +# Latest monitor block +grep '%MON dynstat_theta_max\\|%MON dynstat_theta_min\\|%MON dynstat_theta_mean' STDOUT.0000 | tail -3 + +# CFL trend +grep '%MON advcfl_W_hf_max' STDOUT.0000 | tail -10 +``` + +## Output format +Return a health assessment: +``` +STATUS: HEALTHY / WARNING / CRITICAL +MODEL DAYS: +SUMMARY: +FIELDS: + SST: + Salinity: + Velocity: + CFL: +TRENDS: +RECOMMENDATION: +``` +""" + + +class ModelOutputReview(BaseSpectreAgent): + name = "model_output_review" + description = ( + "Reviews MITgcm model output to assess physical plausibility. " + "Returns HEALTHY/WARNING/CRITICAL health assessment." + ) + model = "claude-sonnet-4-6" + max_tokens = 8192 + system_prompt = SYSTEM_PROMPT + tool_functions = [ + run_command, read_file, glob_files, + parse_monitor_stats, get_cfl_values, get_model_days, get_stdout_tail, + ] diff --git a/spectre_agents/agents/namelist_validator.py b/spectre_agents/agents/namelist_validator.py new file mode 100644 index 0000000..0044ab3 --- /dev/null +++ b/spectre_agents/agents/namelist_validator.py @@ -0,0 +1,76 @@ +"""NamelistValidator agent — pre-run MITgcm namelist validation. + +Ported from .claude/agents/namelist-validator.md +""" + +from __future__ import annotations + +from spectre_agents.agents.base import BaseSpectreAgent +from spectre_agents.tools.bash import run_command +from spectre_agents.tools.file_io import read_file, glob_files, grep_files +from spectre_agents.tools.namelist import parse_namelist_tool, validate_namelists + +SYSTEM_PROMPT = """\ +You are a MITgcm namelist validator. Your job is to cross-check all input namelists against the actual forcing files and model grid to catch configuration errors BEFORE a run is submitted. + +## Files to check +All in simulations/glorysv12-curvilinear/input/: +- data — core model parameters +- data.exf — EXF forcing +- data.obcs — open boundary conditions +- data.pkg — package enable/disable +- data.diagnostics — diagnostics output +- data.mnc — MNC output config +- etc/config.yaml — high-level simulation configuration + +## Validation checks + +### 1. Package consistency +- If useDIAGNOSTICS=.TRUE. in data.pkg, data.diagnostics must exist and have valid field names +- If diag_mnc=.TRUE. in data.diagnostics, useMNC=.TRUE. must be set in data.pkg +- If useEXF=.TRUE., all referenced forcing files must exist in input/ + +### 2. Grid dimensions +- sNx x nPx = Nx (96 x 8 = 768) and sNy x nPy = Ny (53 x 8 = 424) +- Forcing files on model grid: size should be nt x Ny x Nx x 4 bytes + +### 3. EXF configuration +- Since EXF interpolation is disabled (USE_EXF_INTERPOLATION undefined), EXF_NML_04 should have NO interpolation metadata (no *_nlon, *_nlat) +- rotateStressOnAgrid = .FALSE. (winds are pre-rotated) +- useExfCheckRange = .FALSE. (range check disabled, windstressmax clamps) +- All referenced binary files must exist and be the correct size + +### 4. OBC file sizes +- North/South: (ntime, Nr, Nx) for 3D vars, (ntime, Nx) for Eta +- East/West: (ntime, Nr, Ny) for 3D vars +- Expected records: 5479 (daily, 2002-07-01 to 2017-06-30) +- OBC period = 86400.0 (daily) + +### 5. Time configuration +- startDate_1 in data.cal matches EXF startdates +- deltaT x nTimeSteps = endTime +- pChkptFreq and chkptFreq are multiples of deltaT x monitorFreq steps + +### 6. Memory safety +- diag_mnc = .FALSE. recommended (MNC leaks memory on long runs) +- dumpFreq = 0 (use diagnostics package, not direct state dumps) +- pickup_write_mnc = .FALSE. and pickup_read_mnc = .FALSE. + +## Output format +Report each check as PASS/FAIL with the specific parameter, current value, and expected value. Summarize at the end with total PASS/FAIL count. +""" + + +class NamelistValidator(BaseSpectreAgent): + name = "namelist_validator" + description = ( + "Validates MITgcm namelist files for consistency with the model grid, " + "forcing files, and simulation configuration." + ) + model = "claude-sonnet-4-6" + max_tokens = 8192 + system_prompt = SYSTEM_PROMPT + tool_functions = [ + run_command, read_file, glob_files, grep_files, + parse_namelist_tool, validate_namelists, + ] diff --git a/spectre_agents/agents/notify.py b/spectre_agents/agents/notify.py new file mode 100644 index 0000000..f3d03da --- /dev/null +++ b/spectre_agents/agents/notify.py @@ -0,0 +1,85 @@ +"""DiscordNotifier agent — sends notifications to the user via Discord. + +Ported from .claude/agents/notify.md — replaces Slack with Discord. +""" + +from __future__ import annotations + +from spectre_agents.agents.base import BaseSpectreAgent +from spectre_agents.tools.discord_notify import ( + send_discord_message, + send_discord_image, + request_user_decision, +) + +SYSTEM_PROMPT = """\ +You are the notification agent for the SPECTRE simulation system. Your ONLY job is to deliver messages to the user via Discord and report whether delivery succeeded. + +## Delivery + +Post messages to the appropriate Discord channel: +- **#simulation-status** — routine status updates, milestones +- **#alerts** — failure alerts, critical warnings +- **#decisions** — decision requests requiring user input +- **#plots** — surface field images, convergence plots +- **#logs** — verbose agent activity (optional) + +## Discord formatting rules + +Use Discord-compatible markdown: +- **bold** for emphasis +- *italic* for secondary info +- Single backticks for inline code +- Triple backticks for code blocks +- Use `>` for blockquotes + +## Message types + +### Status update +``` +**Simulation update** — `glorysv12-curvilinear` + +Job 1303 on noether: 164 model days, 17.1 sim days/wall hr +Status: RUNNING +CFL: 0.24 (headroom OK) +``` + +### Failure alert +``` +**Simulation failed** — `glorysv12-curvilinear` + +Job 1302 failed after 7.6 hours: `OUT_OF_MEMORY` +Reached 164 model days. MNC diagnostics memory leak suspected. +Awaiting your input before resubmitting. +``` + +### Decision request (blocks work) +For decisions, use the request_user_decision tool which posts interactive buttons. + +### Milestone +``` +**Milestone reached** — `glorysv12-curvilinear` + +1-year spinup complete. 365 model days in 21.5 wall hours. +Pickup file written at iteration 87600. +Ready to begin bred vector ensemble generation. +``` + +## Rules + +- NEVER post to channels other than the five listed above +- NEVER fabricate information — only relay what you are given +- Report back whether delivery succeeded or failed +""" + + +class DiscordNotifier(BaseSpectreAgent): + name = "notify" + description = ( + "Sends notifications to the user via Discord channels. " + "Handles status updates, failure alerts, decision requests, and milestones." + ) + model = "claude-haiku-4-5" + max_tokens = 4096 + system_prompt = SYSTEM_PROMPT + tool_functions = [send_discord_message, send_discord_image, request_user_decision] diff --git a/spectre_agents/agents/orchestrator.py b/spectre_agents/agents/orchestrator.py new file mode 100644 index 0000000..222b207 --- /dev/null +++ b/spectre_agents/agents/orchestrator.py @@ -0,0 +1,205 @@ +"""SimulationOrchestrator agent — top-level lifecycle manager. + +Ported from .claude/agents/simulation-orchestrator.md. +The orchestrator delegates to specialist agents via a run_sub_agent tool +and coordinates the full simulation lifecycle. +""" + +from __future__ import annotations + +import asyncio +import logging +from typing import TYPE_CHECKING + +from claude_agent_sdk import ( + ClaudeSDKClient, + ClaudeAgentOptions, + AgentDefinition, + ResultMessage, + AssistantMessage, + TextBlock, + create_sdk_mcp_server, + tool, +) + +from spectre_agents.agents.base import BaseSpectreAgent +from spectre_agents.tools.bash import run_command +from spectre_agents.tools.file_io import read_file, write_file, edit_file, glob_files, grep_files +from spectre_agents.tools.slurm import submit_job, job_status, queue_status, cancel_job +from spectre_agents.tools.mitgcm import parse_monitor_stats, get_cfl_values, get_model_days, get_stdout_tail +from spectre_agents.tools.discord_notify import send_discord_message, send_discord_image, request_user_decision + +if TYPE_CHECKING: + from spectre_agents.config import Config + +logger = logging.getLogger(__name__) + +SYSTEM_PROMPT = """\ +You are the simulation orchestrator for the SPECTRE MITgcm North Atlantic ensemble system. You manage the full simulation lifecycle: configuring, running, diagnosing, fixing, and restarting simulations. You coordinate specialized sub-agents and make configuration decisions. + +## Your responsibilities + +1. **Simulation lifecycle management**: submit runs, monitor progress, diagnose failures, apply fixes, restart +2. **Configuration decisions**: timestep selection (based on CFL), output frequency, memory management, node selection +3. **Coordination**: delegate to specialized agents and synthesize their findings +4. **Infrastructure management**: dashboard, plotter, converter, tailscale services +5. **Communication**: post updates to Discord at key milestones + +## Decision framework + +When a simulation fails, follow this sequence: +1. Check job exit status (sacct -j --format=JobID,State,ExitCode,Elapsed) +2. Read the last 30 lines of STDOUT.0000 for the failure context +3. Classify the failure: + - **OUT_OF_MEMORY**: reduce output frequency, disable MNC, switch to binary diagnostics + - **Numerical blow-up** (NaN/Inf in monitor stats): reduce deltaT, check forcing data, check CFL + - **File I/O error** (MDS_READ past EOF): check OBC/forcing file record counts vs simulation length + - **Container/SLURM error** (spank plugin, pyxis): check env.sh paths, container image availability + - **Silent crash** (healthy values then sudden stop): check SLURM walltime limit +4. Apply the fix to the relevant input file +5. Clear the run directory and resubmit + +## CFL-based timestep selection + +After a stable run segment, check CFL values: +``` +grep '%MON advcfl' STDOUT.0000 | tail -7 +``` +The binding constraint is advcfl_W_hf_max (vertical). Target CFL < 0.5 for safety. +- Max safe deltaT ~ current_deltaT x 0.5 / max_CFL +- Always test a new timestep for at least 5 model days before committing + +## Process management + +The simulation system has four concurrent processes: +1. **MITgcm** (SLURM job) — the simulation itself +2. **Converter** (convert_diagnostics_to_netcdf.py) — binary to NetCDF post-processing +3. **Plotter** (plot_surface_fields.py) — generates surface field PNGs +4. **Dashboard** (monitor_dashboard.py) — serves live monitoring web UI + +Start them in order: simulation first, then converter (after STDOUT appears), then plotter, then dashboard. + +## Delegation to sub-agents + +Use the run_sub_agent tool to delegate tasks to specialists: +- **namelist-validator**: before submitting a run, validate namelists +- **forcing-data-qc**: when forcing-related errors appear +- **model-output-review**: after a successful run segment, assess physical plausibility +- **workflow-runner**: to submit jobs, start/stop processes (execution only) +- **dashboard-manager**: to manage the dashboard/converter/plotter stack +- **notify**: to send Discord messages to the user +- **mitgcm-stdout-diagnostics**: to parse STDOUT and diagnose failures + +## Halting for user feedback + +When a decision requires user input, use the request_user_decision tool. +**Stop all active work** until the user responds. + +Situations that REQUIRE halting: +- Simulation blow-up with ambiguous root cause +- CFL approaching stability limit (> 0.45) — user must approve timestep change +- OOM with no clear fix remaining +- Any change to the model physics (viscosity, diffusion, advection scheme) +- First-time submission of a new configuration +- Bred vector cycle completion — user reviews convergence before next cycle + +Situations that do NOT require halting (fix and resubmit autonomously): +- SLURM walltime exceeded (just resubmit from pickup) +- Container image not found (rebuild and retry) +- Run directory needs clearing before resubmit +- Dashboard/plotter process died (restart it) + +## Key files + +| File | Purpose | +|------|---------| +| input/data | Core params: deltaT, endTime, dumpFreq, pChkptFreq, monitorFreq | +| input/data.exf | EXF forcing config | +| input/data.obcs | Open boundary conditions | +| input/data.pkg | Package enable/disable | +| input/data.diagnostics | Diagnostics output streams | +| input/data.mnc | MNC NetCDF output config | +| workflows/env.sh | Container images, input dir paths | +| workflows/run.sh | SLURM job script | + +## Discord updates + +Post to the appropriate channel at these milestones: +- Simulation started (job ID, node, config summary) -> #simulation-status +- Failure diagnosed and fix applied -> #alerts +- Successful completion of a run segment -> #simulation-status +- Decision needed -> #decisions (use request_user_decision) +- Surface field plots -> #plots + +## Memory management + +MNC NetCDF output leaks memory over long runs. The current workaround: +- diag_mnc = .FALSE. in data.diagnostics (binary output) +- convert_diagnostics_to_netcdf.py runs as a post-processor +- pickup_write_mnc = .FALSE. and pickup_read_mnc = .FALSE. +- State dumps disabled (dumpFreq = 0); all output via diagnostics package +""" + + +class SimulationOrchestrator(BaseSpectreAgent): + name = "orchestrator" + description = ( + "Top-level orchestrator for the MITgcm simulation lifecycle. " + "Manages the full run-diagnose-fix-rerun loop and coordinates sub-agents." + ) + model = "claude-opus-4-6" + max_tokens = 16384 + system_prompt = SYSTEM_PROMPT + tool_functions = [ + # Core tools + run_command, read_file, write_file, edit_file, glob_files, grep_files, + # SLURM + submit_job, job_status, queue_status, cancel_job, + # MITgcm + parse_monitor_stats, get_cfl_values, get_model_days, get_stdout_tail, + # Discord + send_discord_message, send_discord_image, request_user_decision, + ] + + def _build_options(self): + """Override to add sub-agent definitions for delegation.""" + server, options = super()._build_options() + + # Define sub-agents that the orchestrator can spawn + options.agents = { + "workflow-runner": AgentDefinition( + description="SLURM job execution and process management. Use for submitting jobs, checking status, starting/stopping processes.", + prompt="You are the workflow runner. Execute the requested SLURM or process management task.", + tools=["Bash", "Read", "Glob"], + ), + "mitgcm-stdout-diagnostics": AgentDefinition( + description="Parse MITgcm STDOUT to diagnose failures. Returns structured diagnosis.", + prompt="You are a MITgcm diagnostics specialist. Analyze the STDOUT and classify the failure.", + tools=["Read", "Grep", "Glob", "Bash"], + ), + "model-output-review": AgentDefinition( + description="Assess physical plausibility of simulation output.", + prompt="Review the model output and assess whether it's physically realistic.", + tools=["Read", "Glob", "Bash"], + ), + "namelist-validator": AgentDefinition( + description="Validate MITgcm namelists before submission.", + prompt="Cross-check all namelists against forcing files and grid configuration.", + tools=["Read", "Grep", "Glob", "Bash"], + ), + "forcing-data-qc": AgentDefinition( + description="Validate EXF and OBC binary forcing files.", + prompt="Check forcing files for correct ranges, orientation, and NaN/Inf values.", + tools=["Read", "Grep", "Glob", "Bash"], + ), + "dashboard-manager": AgentDefinition( + description="Manage the monitoring dashboard, converter, and plotter.", + prompt="Ensure the dashboard stack is running and healthy.", + tools=["Bash", "Read"], + ), + } + + # Allow the Agent built-in tool for sub-agent spawning + options.allowed_tools = ["Agent"] + + return server, options diff --git a/spectre_agents/agents/stdout_diagnostics.py b/spectre_agents/agents/stdout_diagnostics.py new file mode 100644 index 0000000..a728ff9 --- /dev/null +++ b/spectre_agents/agents/stdout_diagnostics.py @@ -0,0 +1,83 @@ +"""StdoutDiagnostics agent — MITgcm STDOUT failure diagnosis. + +Ported from .claude/agents/mitgcm-stdout-diagnostics.md +""" + +from __future__ import annotations + +from spectre_agents.agents.base import BaseSpectreAgent +from spectre_agents.tools.bash import run_command +from spectre_agents.tools.file_io import read_file, glob_files, grep_files +from spectre_agents.tools.mitgcm import parse_monitor_stats, get_cfl_values, get_model_days, get_stdout_tail +from spectre_agents.tools.slurm import job_status + +SYSTEM_PROMPT = """\ +You are a MITgcm run diagnostics specialist. You read STDOUT output files, classify failures, and provide actionable diagnoses. You do NOT fix problems or resubmit jobs — you report findings to the orchestrator. + +## Failure classification + +### 1. OUT_OF_MEMORY +**Signature**: SLURM exit OUT_OF_ME+, model values healthy at time of crash +**Diagnosis**: report model days reached, memory usage (sacct --format=MaxRSS), and which output mechanism was active (MNC diagnostics, dumpFreq, etc.) +**Common causes**: MNC NetCDF library memory leak, too-frequent output + +### 2. Numerical blow-up +**Signature**: monitor stats show NaN, Inf, or exponentially growing values (T > 100C, CFL > 1e6) +**Diagnosis**: identify when values first diverged, which field blew up first, and the CFL at that point +**Common causes**: deltaT too large, forcing data error, OBC mismatch + +### 3. EXF range-check failure +**Signature**: EXF WARNING messages in STDOUT +**Diagnosis**: count warnings across all ranks, identify affected fields (hflux/ustress/vstress), map to tile coordinates +**Note**: with useExfCheckRange=.FALSE., these are suppressed. windstressmax=2.0 still clamps stress. + +### 4. File I/O crash +**Signature**: crash at MDS_READ_SEC_XZ: opening global file: .bin +**Diagnosis**: check the file's record count vs what the model needs at the current timestep + +### 5. Initialization failure +**Signature**: STDOUT shows only the eedata example, then PROGRAM MAIN: ends with fatal Error +**Diagnosis**: input files not found — check symlinks, container mounts, SIMULATION_INPUT_DIR in env.sh + +## Diagnostic procedure + +1. sacct -j --format=JobID,State,ExitCode,Elapsed,MaxRSS +2. tail -30 /STDOUT.0000 — immediate crash context +3. grep '%MON time_secondsf' STDOUT.0000 | tail -2 — how far did it get? +4. Classify the failure using the signatures above +5. If EXF-related: grep -c 'EXF WARNING' STDOUT.* across all ranks +6. If numerical: find the first monitor block where values diverged + +## EXF monitor sanity ranges +- exf_wspeed_max < 50 m/s (if > 200, EXF_INTERP_UV is amplifying) +- exf_hflux within -500 to +1600 W/m2 +- exf_ustress/vstress within +/-2.0 N/m2 (clamped by windstressmax) +- exf_atemp within 240-320 K + +## Output format +Return a structured report: +``` +FAILURE TYPE: +MODEL DAYS REACHED: +WALL TIME: +ROOT CAUSE: +EVIDENCE: +SUGGESTED FIX: +``` +""" + + +class StdoutDiagnostics(BaseSpectreAgent): + name = "stdout_diagnostics" + description = ( + "Parses MITgcm STDOUT files to diagnose run failures. " + "Returns structured diagnosis with failure type and suggested fix." + ) + model = "claude-sonnet-4-6" + max_tokens = 8192 + system_prompt = SYSTEM_PROMPT + tool_functions = [ + run_command, read_file, glob_files, grep_files, + parse_monitor_stats, get_cfl_values, get_model_days, get_stdout_tail, + job_status, + ] diff --git a/spectre_agents/agents/web_research.py b/spectre_agents/agents/web_research.py new file mode 100644 index 0000000..edd3eb8 --- /dev/null +++ b/spectre_agents/agents/web_research.py @@ -0,0 +1,55 @@ +"""WebResearch agent — technical research for MITgcm, ERA5, SLURM docs. + +Ported from .claude/agents/web-research.md + +Note: The Agent SDK's WebSearch and WebFetch built-in tools are used here +instead of custom MCP tools, since they're available as built-in capabilities. +""" + +from __future__ import annotations + +from spectre_agents.agents.base import BaseSpectreAgent +from spectre_agents.tools.bash import run_command +from spectre_agents.tools.file_io import read_file, grep_files + +SYSTEM_PROMPT = """\ +You are a technical research specialist. Your job is to find accurate, up-to-date information from the internet and return concise, well-sourced answers. + +## Approach +1. Use web search to find relevant pages, documentation, or source code. +2. Fetch the specific page or file content. +3. Cross-check across multiple sources when the answer is not immediately clear. +4. Return the key finding with the source URL(s) so the answer can be verified. + +## Common research tasks +- **MITgcm source code**: search github.com/MITgcm/MITgcm for specific Fortran files (e.g., exf_interp.F, exf_check_range.F). Use GitHub search or fetch raw file URLs directly. +- **MITgcm documentation**: mitgcm.readthedocs.io for parameter descriptions and package documentation. +- **ERA5 / Copernicus**: confluence.ecmwf.int for variable definitions, units, and accumulation conventions. +- **SLURM / HPC**: slurm.schedmd.com/documentation.html for sbatch flags and scheduler behaviour. + +## Output format +- Lead with the direct answer to the question. +- Include the source URL. +- Quote the relevant code or text excerpt if applicable. +- Flag any uncertainty or version-dependence. +""" + + +class WebResearch(BaseSpectreAgent): + name = "web_research" + description = ( + "Researches technical questions on the internet — MITgcm docs, " + "ERA5/Copernicus metadata, SLURM/HPC tooling." + ) + model = "claude-sonnet-4-6" + max_tokens = 8192 + system_prompt = SYSTEM_PROMPT + # Uses built-in WebSearch/WebFetch + bash for curl fallback + tool_functions = [run_command, read_file, grep_files] + + def _build_options(self): + """Override to add built-in web tools.""" + server, options = super()._build_options() + # Add built-in tools alongside MCP tools + options.allowed_tools = ["WebSearch", "WebFetch"] + return server, options diff --git a/spectre_agents/agents/workflow_runner.py b/spectre_agents/agents/workflow_runner.py new file mode 100644 index 0000000..3107c70 --- /dev/null +++ b/spectre_agents/agents/workflow_runner.py @@ -0,0 +1,90 @@ +"""WorkflowRunner agent — SLURM job execution and process lifecycle management. + +Ported from .claude/agents/workflow-runner.md +""" + +from __future__ import annotations + +from spectre_agents.agents.base import BaseSpectreAgent +from spectre_agents.tools.bash import run_command +from spectre_agents.tools.file_io import read_file, glob_files +from spectre_agents.tools.slurm import submit_job, job_status, queue_status, cancel_job + +SYSTEM_PROMPT = """\ +You are the SLURM and process manager for the SPECTRE simulation system. You handle job submission, monitoring, and the lifecycle of background processes (dashboard, plotter, converter). + +## SLURM jobs + +### Submitting simulation runs +```bash +cd /mnt/beegfs/spectre-150-ensembles/simulations/glorysv12-curvilinear +sbatch --chdir=$(pwd) workflows/run.sh +``` +**Always** submit from the simulation directory — the env.sh path resolution requires this. + +### Submitting other workflows +```bash +sbatch --chdir=$(pwd) workflows/make_exf_conditions.sh +sbatch --chdir=$(pwd) workflows/make_ocean_boundary_conditions.sh +sbatch --chdir=$(pwd) workflows/build.sh +sbatch --chdir=$(pwd) workflows/plot_surface_fields.sh +``` + +### Before resubmitting a run +Always clear the run directory first so symlinks are recreated correctly. + +### Monitoring +- `sacct -j --format=JobID,State,ExitCode,Elapsed` — job status +- `squeue -u $USER` — running/pending jobs +- `tail -20 /STDOUT.0000` — latest model output +- `tail -10 spectre_glorysv12_run-.out` — SLURM job log + +### When a job fails +Report: job ID, state, exit code, elapsed time, and the last 20 lines of both the SLURM output and STDOUT.0000. Do NOT attempt to diagnose — report findings only. + +## Background processes + +### Dashboard +```bash +cd /mnt/beegfs/spectre-150-ensembles +sudo tailscale serve --http=8050 off 2>/dev/null; kill $(lsof -ti :8050) 2>/dev/null; sleep 1 +nohup uv run python spectre_utils/monitor_dashboard.py --port 8050 --poll 30 /tmp/dashboard.log 2>&1 & +sudo tailscale serve --bg --http=8050 127.0.0.1:8050 +``` + +### Converter (binary diagnostics to NetCDF) +```bash +nohup uv run python spectre_utils/convert_diagnostics_to_netcdf.py --poll 60 /tmp/converter.log 2>&1 & +``` + +### Plotter (surface field PNGs) +```bash +nohup uv run python spectre_utils/plot_surface_fields.py --poll 120 /tmp/plotter.log 2>&1 & +``` + +### Startup order +1. Wait for STDOUT.0000 to exist +2. Start dashboard +3. Start converter +4. Start plotter +5. Verify dashboard responds: `curl -s http://127.0.0.1:8050/data | head -c 100` + +## Container images +Defined in workflows/env.sh: +- SPECTRE_UTILS_IMG — Python preprocessing +- MITGCM_BASE_IMG — MITgcm runtime + +If Python code in spectre_utils/ changed, the Docker image must be rebuilt via GitHub Actions (commit + push) before the SLURM job will pick up the changes. +""" + + +class WorkflowRunner(BaseSpectreAgent): + name = "workflow_runner" + description = ( + "Submits, monitors, and manages SLURM jobs and background processes. " + "Handles execution but not diagnosis." + ) + model = "claude-haiku-4-5" + max_tokens = 4096 + system_prompt = SYSTEM_PROMPT + tool_functions = [run_command, read_file, glob_files, submit_job, job_status, queue_status, cancel_job] diff --git a/spectre_agents/config.py b/spectre_agents/config.py new file mode 100644 index 0000000..240ea8e --- /dev/null +++ b/spectre_agents/config.py @@ -0,0 +1,178 @@ +"""Configuration loader for the SPECTRE agent system. + +Priority: environment variables > spectre_agents_config.yaml > defaults. +""" + +from __future__ import annotations + +import os +from dataclasses import dataclass, field +from pathlib import Path + +import yaml + + +@dataclass +class GridConfig: + Nx: int = 768 + Ny: int = 424 + Nr: int = 50 + nPx: int = 8 + nPy: int = 8 + + +@dataclass +class DiscordChannels: + status: str = "simulation-status" + decisions: str = "decisions" + alerts: str = "alerts" + plots: str = "plots" + logs: str = "logs" + knowledge: str = "ask-mitgcm" + + +@dataclass +class AgentModelConfig: + model: str = "" + max_tokens: int = 8192 + + +@dataclass +class AgentsConfig: + orchestrator: AgentModelConfig = field( + default_factory=lambda: AgentModelConfig("claude-opus-4-6", 16384) + ) + workflow_runner: AgentModelConfig = field( + default_factory=lambda: AgentModelConfig("claude-haiku-4-5", 4096) + ) + stdout_diagnostics: AgentModelConfig = field( + default_factory=lambda: AgentModelConfig("claude-sonnet-4-6", 8192) + ) + model_output_review: AgentModelConfig = field( + default_factory=lambda: AgentModelConfig("claude-sonnet-4-6", 8192) + ) + namelist_validator: AgentModelConfig = field( + default_factory=lambda: AgentModelConfig("claude-sonnet-4-6", 8192) + ) + forcing_data_qc: AgentModelConfig = field( + default_factory=lambda: AgentModelConfig("claude-sonnet-4-6", 8192) + ) + dashboard_manager: AgentModelConfig = field( + default_factory=lambda: AgentModelConfig("claude-haiku-4-5", 4096) + ) + notify: AgentModelConfig = field( + default_factory=lambda: AgentModelConfig("claude-haiku-4-5", 4096) + ) + web_research: AgentModelConfig = field( + default_factory=lambda: AgentModelConfig("claude-sonnet-4-6", 8192) + ) + + +@dataclass +class Config: + # Paths + base_dir: Path = Path("/mnt/beegfs/spectre-150-ensembles") + sim_dir: str = "simulations/glorysv12-curvilinear" + run_dir_prefix: str = "test-run" + + # Grid + grid: GridConfig = field(default_factory=GridConfig) + + # Discord + discord_bot_token: str = "" + discord_guild_id: int = 0 + discord_channels: DiscordChannels = field(default_factory=DiscordChannels) + + # Anthropic + anthropic_api_key: str = "" + + # Agents + agents: AgentsConfig = field(default_factory=AgentsConfig) + + # Monitoring + poll_interval_seconds: int = 60 + dashboard_port: int = 8050 + + @property + def simulation_dir(self) -> Path: + return self.base_dir / self.sim_dir + + @property + def input_dir(self) -> Path: + return self.simulation_dir / "input" + + @property + def workflows_dir(self) -> Path: + return self.simulation_dir / "workflows" + + +def _apply_agent_config(agents_cfg: AgentsConfig, raw: dict) -> None: + """Apply raw YAML agent config to the AgentsConfig dataclass.""" + for name, values in raw.items(): + if hasattr(agents_cfg, name) and isinstance(values, dict): + agent = getattr(agents_cfg, name) + if "model" in values: + agent.model = values["model"] + if "max_tokens" in values: + agent.max_tokens = values["max_tokens"] + + +def load_config(config_path: str | Path | None = None) -> Config: + """Load configuration from YAML file and environment variables.""" + cfg = Config() + + # Load YAML config if it exists + if config_path is None: + # Look in the repo root + candidates = [ + Path("spectre_agents_config.yaml"), + Path(__file__).parent.parent / "spectre_agents_config.yaml", + ] + for candidate in candidates: + if candidate.exists(): + config_path = candidate + break + + if config_path and Path(config_path).exists(): + with open(config_path) as f: + raw = yaml.safe_load(f) or {} + + sim = raw.get("simulation", {}) + if "base_dir" in sim: + cfg.base_dir = Path(sim["base_dir"]) + if "sim_dir" in sim: + cfg.sim_dir = sim["sim_dir"] + if "run_dir_prefix" in sim: + cfg.run_dir_prefix = sim["run_dir_prefix"] + + grid = raw.get("grid", {}) + for attr in ("Nx", "Ny", "Nr", "nPx", "nPy"): + if attr in grid: + setattr(cfg.grid, attr, grid[attr]) + + discord = raw.get("discord", {}) + channels = discord.get("channels", {}) + for attr in ("status", "decisions", "alerts", "plots", "logs", "knowledge"): + if attr in channels: + setattr(cfg.discord_channels, attr, channels[attr]) + + if "agents" in raw: + _apply_agent_config(cfg.agents, raw["agents"]) + + monitoring = raw.get("monitoring", {}) + if "poll_interval_seconds" in monitoring: + cfg.poll_interval_seconds = monitoring["poll_interval_seconds"] + if "dashboard_port" in monitoring: + cfg.dashboard_port = monitoring["dashboard_port"] + + # Environment variables override YAML + cfg.anthropic_api_key = os.environ.get("ANTHROPIC_API_KEY", cfg.anthropic_api_key) + cfg.discord_bot_token = os.environ.get("DISCORD_BOT_TOKEN", cfg.discord_bot_token) + guild_id = os.environ.get("DISCORD_GUILD_ID", "") + if guild_id: + cfg.discord_guild_id = int(guild_id) + + if os.environ.get("SPECTRE_BASE_DIR"): + cfg.base_dir = Path(os.environ["SPECTRE_BASE_DIR"]) + + return cfg diff --git a/spectre_agents/context.py b/spectre_agents/context.py new file mode 100644 index 0000000..7075ef7 --- /dev/null +++ b/spectre_agents/context.py @@ -0,0 +1,105 @@ +"""Shared agent context: state, decision queue, and Discord bot reference. + +AgentContext is a singleton shared between the Discord bot and the agent runner. +It holds the current simulation state and a decision queue for interactive +approval flows. +""" + +from __future__ import annotations + +import asyncio +import json +import logging +from dataclasses import dataclass, field +from pathlib import Path +from typing import TYPE_CHECKING, Any, Optional + +from spectre_agents.types import SimulationState + +if TYPE_CHECKING: + import discord + +logger = logging.getLogger(__name__) + +STATE_FILE = ".spectre-agents-state.json" + + +@dataclass +class PendingDecision: + """A decision awaiting user input via Discord.""" + question: str + options: list[str] + future: asyncio.Future + channel_name: str = "decisions" + + +@dataclass +class AgentContext: + """Shared state between the Discord bot and agent runner.""" + + simulation: SimulationState = field(default_factory=SimulationState) + base_dir: Path = Path(".") + bot: Optional[Any] = None # discord.Client — typed as Any to avoid import at module level + _decision_queue: asyncio.Queue = field(default_factory=asyncio.Queue) + _channel_cache: dict[str, Any] = field(default_factory=dict) + + def save_state(self) -> None: + """Persist simulation state to disk for daemon restart resilience.""" + state_path = self.base_dir / STATE_FILE + data = { + "active_job_id": self.simulation.active_job_id, + "run_dir": self.simulation.run_dir, + "model_days": self.simulation.model_days, + "cfl_max": self.simulation.cfl_max, + "status": self.simulation.status, + } + try: + state_path.write_text(json.dumps(data, indent=2)) + except OSError as e: + logger.warning("Failed to save state: %s", e) + + def load_state(self) -> None: + """Restore simulation state from disk.""" + state_path = self.base_dir / STATE_FILE + if not state_path.exists(): + return + try: + data = json.loads(state_path.read_text()) + self.simulation.active_job_id = data.get("active_job_id") + self.simulation.run_dir = data.get("run_dir", "") + self.simulation.model_days = data.get("model_days", 0.0) + self.simulation.cfl_max = data.get("cfl_max", 0.0) + self.simulation.status = data.get("status", "idle") + except (json.JSONDecodeError, OSError) as e: + logger.warning("Failed to load state: %s", e) + + async def get_channel(self, channel_name: str) -> Optional[Any]: + """Look up a Discord channel by name in the configured guild.""" + if self.bot is None: + return None + if channel_name in self._channel_cache: + return self._channel_cache[channel_name] + for guild in self.bot.guilds: + for channel in guild.text_channels: + if channel.name == channel_name: + self._channel_cache[channel_name] = channel + return channel + return None + + async def request_decision( + self, question: str, options: list[str], channel_name: str = "decisions" + ) -> str: + """Post a decision request to Discord and block until user responds. + + Returns the selected option text. + """ + loop = asyncio.get_event_loop() + future = loop.create_future() + decision = PendingDecision( + question=question, + options=options, + future=future, + channel_name=channel_name, + ) + await self._decision_queue.put(decision) + return await future diff --git a/spectre_agents/discord_bot/__init__.py b/spectre_agents/discord_bot/__init__.py new file mode 100644 index 0000000..31442bc --- /dev/null +++ b/spectre_agents/discord_bot/__init__.py @@ -0,0 +1 @@ +"""SPECTRE Discord bot for simulation operations.""" diff --git a/spectre_agents/discord_bot/bot.py b/spectre_agents/discord_bot/bot.py new file mode 100644 index 0000000..a8a3e64 --- /dev/null +++ b/spectre_agents/discord_bot/bot.py @@ -0,0 +1,135 @@ +"""Main Discord bot class and event loop integration. + +The bot connects to Discord, registers slash commands, and processes +decision queue items from the agent system. It runs on the asyncio +event loop alongside the agent runner. +""" + +from __future__ import annotations + +import asyncio +import logging +from typing import TYPE_CHECKING + +import discord +from discord import app_commands + +from spectre_agents.discord_bot.commands import setup_commands +from spectre_agents.discord_bot.embeds import decision_embed +from spectre_agents.discord_bot.knowledge import setup_knowledge_handler +from spectre_agents.discord_bot.views import DecisionView + +if TYPE_CHECKING: + from spectre_agents.config import Config + from spectre_agents.context import AgentContext + +logger = logging.getLogger(__name__) + + +class SpectreBot(discord.Client): + """Discord bot for SPECTRE simulation operations.""" + + def __init__(self, config: Config, ctx: AgentContext): + intents = discord.Intents.default() + intents.message_content = True + intents.guilds = True + + super().__init__(intents=intents) + self.config = config + self.ctx = ctx + self.tree = app_commands.CommandTree(self) + self._decision_task: asyncio.Task | None = None + + async def setup_hook(self) -> None: + """Called after login, before the bot is fully connected.""" + setup_commands(self.tree, self.ctx, self.config) + + # Sync commands to the guild + if self.config.discord_guild_id: + guild = discord.Object(id=self.config.discord_guild_id) + self.tree.copy_global_to(guild=guild) + await self.tree.sync(guild=guild) + logger.info("Synced commands to guild %s", self.config.discord_guild_id) + else: + await self.tree.sync() + logger.info("Synced commands globally") + + async def on_ready(self) -> None: + logger.info("Bot connected as %s (ID: %s)", self.user, self.user.id) + logger.info("Guilds: %s", [g.name for g in self.guilds]) + + # Store bot reference in context for tools to use + self.ctx.bot = self + + # Start the decision queue processor + self._decision_task = asyncio.create_task(self._process_decision_queue()) + + # Register the knowledge Q&A handler for #ask-mitgcm + setup_knowledge_handler(self, self.config, self.ctx) + logger.info("Knowledge bot listening in #%s", self.config.discord_channels.knowledge) + + # Post startup message + channel = await self.ctx.get_channel(self.config.discord_channels.status) + if channel: + await channel.send( + "**SPECTRE Agent System** online.\n" + "Use `/run start` to begin a simulation, `/run status` to check progress.\n" + f"Ask questions in #**{self.config.discord_channels.knowledge}**." + ) + + async def on_error(self, event_method: str, *args, **kwargs) -> None: + logger.exception("Discord error in %s", event_method) + + async def _process_decision_queue(self) -> None: + """Continuously process pending decisions from agents. + + When the orchestrator posts a decision to the queue, this task + picks it up, posts an interactive embed to Discord, and the + DecisionView callback resolves the future. + """ + logger.info("Decision queue processor started") + while True: + try: + decision = await self.ctx._decision_queue.get() + logger.info("Processing decision: %s", decision.question) + + channel = await self.ctx.get_channel(decision.channel_name) + if channel is None: + logger.warning("Channel %s not found for decision", decision.channel_name) + if not decision.future.done(): + decision.future.set_exception( + RuntimeError(f"Channel #{decision.channel_name} not found") + ) + continue + + embed = decision_embed(decision.question, decision.options) + view = DecisionView(decision.options, decision.future) + await channel.send(embed=embed, view=view) + + except asyncio.CancelledError: + break + except Exception: + logger.exception("Error processing decision") + + async def close(self) -> None: + if self._decision_task: + self._decision_task.cancel() + await super().close() + + +async def run_bot(config: Config, ctx: AgentContext) -> None: + """Start the Discord bot. This coroutine runs until the bot disconnects.""" + if not config.discord_bot_token: + logger.error("DISCORD_BOT_TOKEN not set — bot will not start") + return + + bot = SpectreBot(config, ctx) + try: + await bot.start(config.discord_bot_token) + except discord.LoginFailure: + logger.error("Invalid Discord bot token") + except Exception: + logger.exception("Bot crashed") + finally: + if not bot.is_closed(): + await bot.close() diff --git a/spectre_agents/discord_bot/commands.py b/spectre_agents/discord_bot/commands.py new file mode 100644 index 0000000..c2076e0 --- /dev/null +++ b/spectre_agents/discord_bot/commands.py @@ -0,0 +1,251 @@ +"""Discord slash command definitions for simulation operations. + +All commands are registered under a command tree and synced to the guild. +Agent invocations run in a thread pool to keep the bot responsive. +""" + +from __future__ import annotations + +import asyncio +import functools +import logging +from concurrent.futures import ThreadPoolExecutor +from typing import TYPE_CHECKING + +import discord +from discord import app_commands + +from spectre_agents.discord_bot.embeds import status_embed, validation_embed + +if TYPE_CHECKING: + from spectre_agents.context import AgentContext + from spectre_agents.config import Config + +logger = logging.getLogger(__name__) + +# Thread pool for running synchronous agent code +_executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="agent") + + +def setup_commands(tree: app_commands.CommandTree, ctx: "AgentContext", config: "Config") -> None: + """Register all slash commands on the command tree.""" + + # --- /run group --- + run_group = app_commands.Group(name="run", description="Simulation run management") + + @run_group.command(name="start", description="Validate config and submit a new simulation run") + async def run_start(interaction: discord.Interaction): + await interaction.response.defer(thinking=True) + try: + result = await _run_agent(ctx, config, "orchestrator", ( + "Validate the namelists, then submit a new simulation run. " + "Post status updates to Discord as you go." + )) + await interaction.followup.send(f"**Run started**\n{result[:1900]}") + except Exception as e: + await interaction.followup.send(f"Failed to start run: {e}") + + @run_group.command(name="status", description="Show current simulation status") + async def run_status(interaction: discord.Interaction): + await interaction.response.defer(thinking=True) + sim = ctx.simulation + embed = status_embed( + job_id=sim.active_job_id, + state=sim.status.upper(), + model_days=sim.model_days, + cfl=sim.cfl_max, + ) + await interaction.followup.send(embed=embed) + + @run_group.command(name="stop", description="Cancel the active SLURM job") + async def run_stop(interaction: discord.Interaction): + await interaction.response.defer(thinking=True) + try: + result = await _run_agent(ctx, config, "workflow_runner", ( + f"Cancel SLURM job {ctx.simulation.active_job_id} and stop all monitoring processes." + )) + ctx.simulation.status = "stopped" + ctx.save_state() + await interaction.followup.send(f"**Run stopped**\n{result[:1900]}") + except Exception as e: + await interaction.followup.send(f"Failed to stop run: {e}") + + @run_group.command(name="resubmit", description="Clear run directory and resubmit from pickup") + async def run_resubmit(interaction: discord.Interaction): + await interaction.response.defer(thinking=True) + try: + result = await _run_agent(ctx, config, "orchestrator", ( + "Clear the run directory and resubmit the simulation from the latest pickup file. " + "Post status to Discord." + )) + await interaction.followup.send(f"**Resubmitted**\n{result[:1900]}") + except Exception as e: + await interaction.followup.send(f"Failed to resubmit: {e}") + + tree.add_command(run_group) + + # --- /diagnose --- + @tree.command(name="diagnose", description="Run STDOUT diagnostics on a job") + @app_commands.describe(job_id="SLURM job ID (optional, defaults to active job)") + async def diagnose(interaction: discord.Interaction, job_id: int | None = None): + await interaction.response.defer(thinking=True) + jid = job_id or ctx.simulation.active_job_id + try: + result = await _run_agent(ctx, config, "stdout_diagnostics", ( + f"Diagnose the failure for SLURM job {jid}. " + f"The run directory is {ctx.simulation.run_dir}." + )) + await interaction.followup.send(f"**Diagnosis**\n```\n{result[:1900]}\n```") + except Exception as e: + await interaction.followup.send(f"Diagnosis failed: {e}") + + # --- /review --- + @tree.command(name="review", description="Assess physical plausibility of model output") + async def review(interaction: discord.Interaction): + await interaction.response.defer(thinking=True) + try: + result = await _run_agent(ctx, config, "model_output_review", ( + f"Review the model output in {ctx.simulation.run_dir}. " + "Assess SST, salinity, velocity, CFL, and trends." + )) + await interaction.followup.send(f"**Model Review**\n```\n{result[:1900]}\n```") + except Exception as e: + await interaction.followup.send(f"Review failed: {e}") + + # --- /validate --- + @tree.command(name="validate", description="Run pre-flight namelist validation") + async def validate(interaction: discord.Interaction): + await interaction.response.defer(thinking=True) + try: + result = await _run_agent(ctx, config, "namelist_validator", ( + f"Validate all namelists in {config.input_dir}." + )) + await interaction.followup.send(f"**Validation**\n```\n{result[:1900]}\n```") + except Exception as e: + await interaction.followup.send(f"Validation failed: {e}") + + # --- /qc group --- + qc_group = app_commands.Group(name="qc", description="Forcing data quality control") + + @qc_group.command(name="forcing", description="Validate EXF forcing files") + async def qc_forcing(interaction: discord.Interaction): + await interaction.response.defer(thinking=True) + try: + result = await _run_agent(ctx, config, "forcing_data_qc", ( + f"Run QC on all EXF binary files in {config.input_dir}." + )) + await interaction.followup.send(f"**Forcing QC**\n```\n{result[:1900]}\n```") + except Exception as e: + await interaction.followup.send(f"QC failed: {e}") + + @qc_group.command(name="obc", description="Validate OBC boundary files") + async def qc_obc(interaction: discord.Interaction): + await interaction.response.defer(thinking=True) + try: + result = await _run_agent(ctx, config, "forcing_data_qc", ( + f"Run QC on all OBC binary files in {config.input_dir}." + )) + await interaction.followup.send(f"**OBC QC**\n```\n{result[:1900]}\n```") + except Exception as e: + await interaction.followup.send(f"QC failed: {e}") + + tree.add_command(qc_group) + + # --- /dashboard group --- + dash_group = app_commands.Group(name="dashboard", description="Monitoring dashboard management") + + @dash_group.command(name="start", description="Start the monitoring dashboard stack") + async def dash_start(interaction: discord.Interaction): + await interaction.response.defer(thinking=True) + try: + result = await _run_agent(ctx, config, "dashboard_manager", ( + f"Start the full dashboard stack for run directory {ctx.simulation.run_dir}." + )) + await interaction.followup.send(f"**Dashboard**\n{result[:1900]}") + except Exception as e: + await interaction.followup.send(f"Dashboard start failed: {e}") + + @dash_group.command(name="status", description="Health-check the dashboard stack") + async def dash_status(interaction: discord.Interaction): + await interaction.response.defer(thinking=True) + try: + result = await _run_agent(ctx, config, "dashboard_manager", ( + "Run a health check on all dashboard components." + )) + await interaction.followup.send(f"**Dashboard Health**\n```\n{result[:1900]}\n```") + except Exception as e: + await interaction.followup.send(f"Health check failed: {e}") + + @dash_group.command(name="restart", description="Restart dashboard components") + @app_commands.describe(component="Component to restart: dashboard, converter, plotter, or all") + async def dash_restart(interaction: discord.Interaction, component: str = "all"): + await interaction.response.defer(thinking=True) + try: + result = await _run_agent(ctx, config, "dashboard_manager", ( + f"Restart the {component} component(s) for run directory {ctx.simulation.run_dir}." + )) + await interaction.followup.send(f"**Restart**\n{result[:1900]}") + except Exception as e: + await interaction.followup.send(f"Restart failed: {e}") + + tree.add_command(dash_group) + + # --- /ensemble group --- + ensemble_group = app_commands.Group(name="ensemble", description="Bred vector ensemble operations") + + @ensemble_group.command(name="start", description="Begin bred vector ensemble generation") + async def ensemble_start(interaction: discord.Interaction): + await interaction.response.defer(thinking=True) + try: + result = await _run_agent(ctx, config, "orchestrator", ( + "Begin the bred vector ensemble generation process. " + "Submit the breed_vectors.sh workflow and monitor progress." + )) + await interaction.followup.send(f"**Ensemble**\n{result[:1900]}") + except Exception as e: + await interaction.followup.send(f"Ensemble start failed: {e}") + + @ensemble_group.command(name="status", description="Show ensemble member progress") + async def ensemble_status(interaction: discord.Interaction): + await interaction.response.defer(thinking=True) + try: + result = await _run_agent(ctx, config, "orchestrator", ( + "Check the status of all ensemble members and report convergence metrics." + )) + await interaction.followup.send(f"**Ensemble Status**\n{result[:1900]}") + except Exception as e: + await interaction.followup.send(f"Status check failed: {e}") + + tree.add_command(ensemble_group) + + # --- /config --- + @tree.command(name="config", description="Show simulation configuration") + @app_commands.describe(param="Specific parameter to show (optional)") + async def show_config(interaction: discord.Interaction, param: str | None = None): + await interaction.response.defer(thinking=True) + try: + result = await _run_agent(ctx, config, "workflow_runner", ( + f"Read and display the simulation configuration from etc/config.yaml" + + (f", focusing on the '{param}' parameter." if param else ".") + )) + await interaction.followup.send(f"**Config**\n```yaml\n{result[:1900]}\n```") + except Exception as e: + await interaction.followup.send(f"Config read failed: {e}") + + +async def _run_agent(ctx: "AgentContext", config: "Config", agent_name: str, task: str) -> str: + """Run a specialist agent in the thread pool and return its text result.""" + from spectre_agents.agents import AGENT_REGISTRY + + agent_cls = AGENT_REGISTRY.get(agent_name) + if agent_cls is None: + # Try orchestrator for unknown agent names + from spectre_agents.agents.orchestrator import SimulationOrchestrator + agent = SimulationOrchestrator(config) + else: + agent = agent_cls(config) + + # Agent.run() is async but involves sync SDK calls — run in executor + loop = asyncio.get_event_loop() + result = await agent.run(task) + return result diff --git a/spectre_agents/discord_bot/embeds.py b/spectre_agents/discord_bot/embeds.py new file mode 100644 index 0000000..7c140e9 --- /dev/null +++ b/spectre_agents/discord_bot/embeds.py @@ -0,0 +1,155 @@ +"""Rich embed formatters for Discord messages. + +Each function returns a discord.Embed with appropriate colors, fields, +and formatting for different notification types. +""" + +from __future__ import annotations + +from datetime import datetime, timezone + +import discord + + +# Color palette +COLOR_GREEN = 0x2ECC71 # Healthy / success +COLOR_YELLOW = 0xF1C40F # Warning +COLOR_RED = 0xE74C3C # Critical / failure +COLOR_BLUE = 0x3498DB # Info / status +COLOR_PURPLE = 0x9B59B6 # Decision request + + +def status_embed( + job_id: int | None, + state: str, + model_days: float, + cfl: float, + throughput: str = "", +) -> discord.Embed: + """Create a simulation status embed with color-coded health.""" + if state == "RUNNING": + color = COLOR_GREEN if cfl < 0.4 else COLOR_YELLOW + elif state in ("COMPLETED", "DONE"): + color = COLOR_GREEN + else: + color = COLOR_RED + + embed = discord.Embed( + title="Simulation Status", + description=f"`glorysv12-curvilinear`", + color=color, + timestamp=datetime.now(timezone.utc), + ) + if job_id: + embed.add_field(name="Job ID", value=str(job_id), inline=True) + embed.add_field(name="State", value=state, inline=True) + embed.add_field(name="Model Days", value=f"{model_days:.1f}", inline=True) + embed.add_field(name="CFL", value=f"{cfl:.3f}", inline=True) + if throughput: + embed.add_field(name="Throughput", value=throughput, inline=True) + return embed + + +def failure_embed( + failure_type: str, + root_cause: str, + evidence: str, + suggested_fix: str, + job_id: int | None = None, + model_days: float = 0, +) -> discord.Embed: + """Create a failure diagnosis embed (red sidebar).""" + embed = discord.Embed( + title=f"Simulation Failed — {failure_type}", + description=f"`glorysv12-curvilinear`", + color=COLOR_RED, + timestamp=datetime.now(timezone.utc), + ) + if job_id: + embed.add_field(name="Job ID", value=str(job_id), inline=True) + embed.add_field(name="Model Days", value=f"{model_days:.1f}", inline=True) + embed.add_field(name="Root Cause", value=root_cause[:1024], inline=False) + if evidence: + embed.add_field(name="Evidence", value=f"```\n{evidence[:1000]}\n```", inline=False) + embed.add_field(name="Suggested Fix", value=suggested_fix[:1024], inline=False) + return embed + + +def health_embed( + status: str, + model_days: float, + summary: str, + fields: dict[str, str], + recommendation: str = "", +) -> discord.Embed: + """Create a health assessment embed with per-field breakdown.""" + color_map = {"HEALTHY": COLOR_GREEN, "WARNING": COLOR_YELLOW, "CRITICAL": COLOR_RED} + color = color_map.get(status, COLOR_BLUE) + + embed = discord.Embed( + title=f"Model Health: {status}", + description=summary, + color=color, + timestamp=datetime.now(timezone.utc), + ) + embed.add_field(name="Model Days", value=f"{model_days:.1f}", inline=True) + + for name, assessment in fields.items(): + embed.add_field(name=name, value=assessment[:1024], inline=False) + + if recommendation: + embed.add_field(name="Recommendation", value=recommendation[:1024], inline=False) + + return embed + + +def validation_embed(checks: list[dict[str, str]]) -> discord.Embed: + """Create a namelist validation embed with PASS/FAIL per check.""" + pass_count = sum(1 for c in checks if c.get("result") == "PASS") + fail_count = sum(1 for c in checks if c.get("result") == "FAIL") + total = len(checks) + + color = COLOR_GREEN if fail_count == 0 else COLOR_RED + + embed = discord.Embed( + title=f"Namelist Validation: {pass_count}/{total} passed", + color=color, + timestamp=datetime.now(timezone.utc), + ) + + # Group checks into a compact format + lines = [] + for check in checks[:25]: # Discord embed field limit + icon = "\u2705" if check.get("result") == "PASS" else "\u274c" + lines.append(f"{icon} {check.get('name', 'check')}: {check.get('detail', '')}") + + embed.description = "\n".join(lines) if lines else "No checks performed" + + if fail_count > 0: + embed.set_footer(text=f"{fail_count} check(s) failed — review before submitting") + + return embed + + +def decision_embed(question: str, options: list[str]) -> discord.Embed: + """Create a decision request embed (purple sidebar).""" + embed = discord.Embed( + title="Decision Needed", + description=question, + color=COLOR_PURPLE, + timestamp=datetime.now(timezone.utc), + ) + option_text = "\n".join(f"**{i + 1}.** {opt}" for i, opt in enumerate(options)) + embed.add_field(name="Options", value=option_text, inline=False) + embed.set_footer(text="Select an option below. The orchestrator is waiting for your response.") + return embed + + +def milestone_embed(title: str, details: str) -> discord.Embed: + """Create a milestone achievement embed (green sidebar).""" + return discord.Embed( + title=f"Milestone: {title}", + description=details, + color=COLOR_GREEN, + timestamp=datetime.now(timezone.utc), + ) diff --git a/spectre_agents/discord_bot/knowledge.py b/spectre_agents/discord_bot/knowledge.py new file mode 100644 index 0000000..459fbf5 --- /dev/null +++ b/spectre_agents/discord_bot/knowledge.py @@ -0,0 +1,215 @@ +"""Knowledge bot: answers MITgcm, ERA5, oceanography, and codebase questions. + +Listens in #ask-mitgcm for messages, runs a Claude agent with the full +CLAUDE.md context + WebSearch/WebFetch, and replies in-channel or in a thread. +""" + +from __future__ import annotations + +import asyncio +import logging +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path +from typing import TYPE_CHECKING + +import discord + +if TYPE_CHECKING: + from spectre_agents.config import Config + from spectre_agents.context import AgentContext + +logger = logging.getLogger(__name__) + +_executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="knowledge") + +# The knowledge agent's system prompt, combining CLAUDE.md domain knowledge +# with instructions for being a helpful Q&A assistant. +KNOWLEDGE_SYSTEM_PROMPT = """\ +You are the SPECTRE knowledge assistant — an expert on MITgcm ocean modeling, \ +ERA5/GLORYS reanalysis data, and the SPECTRE simulation system. You answer \ +questions from researchers and engineers working on North Atlantic ocean \ +simulations. + +## Your expertise + +- **MITgcm**: namelist parameters, packages (EXF, OBCS, DIAGNOSTICS, KPP, MNC), \ + Fortran source code, numerical methods, grid configuration, debugging +- **ERA5 / Copernicus**: variable definitions, accumulation conventions, units, \ + CDS API, temporal/spatial resolution +- **GLORYS v12**: ocean reanalysis fields, CMEMS access, variable naming +- **Oceanography**: North Atlantic circulation, Gulf Stream dynamics, \ + air-sea fluxes, boundary conditions, ensemble methods +- **HPC / SLURM**: job scheduling, container workflows (enroot/pyxis), \ + parallel I/O, memory management +- **This codebase**: spectre_utils Python package, workflow scripts, \ + configuration files, bred vector ensembles + +## SPECTRE simulation context + +This project runs a realistic MITgcm simulation of the North Atlantic (26-54N): +- Grid: Native NEMO curvilinear, 768 x 424 x 50 levels, MPI 8x8 = 64 ranks +- Ocean data: GLORYS v12 daily fields (T, S, U, V, SSH) for IC and OBC +- Atmospheric forcing: ERA5 3-hourly single-level fields via EXF package +- Simulation period: 2002-07-01 to 2017-06-30 +- Key directory: simulations/glorysv12-curvilinear/ + +### Critical technical details + +- **EXF latitude orientation**: ERA5 stores latitude north-to-south. MITgcm EXF \ + expects south-to-north (lat0=20.0, lat_inc=+0.25). The mk_exf_conditions.py \ + script flips the axis. Getting this wrong causes ~20C air-sea temperature error. + +- **EXF range thresholds** (hardcoded in exf_check_range.F): \ + hflux: [-500, +1600] W/m2; ustress/vstress: +/-2.0 N/m2 + +- **Bulk formula**: ALLOW_BULK_LARGEYEAGER04 — Large & Yeager (2009) \ + stability-corrected with wind-speed-dependent drag coefficients. + +- **MNC tile numbering**: mnc_*_0001/ contains PID 0, which writes tile t004. + +- **ERA5 scale factors**: 3-hourly accumulations to W/m2 or m/s use \ + 1/10800 = 9.2593e-5 (not 1/3600). + +- **EXF does not support negative lat_inc** — exf_interp.F assumes \ + monotonically increasing latitude. + +- **OBC period = 86400.0s (daily), EXF period = 10800.0s (3-hourly)** + +- **MNC memory leak**: diag_mnc=.FALSE. with a post-processor converter \ + is the workaround for long runs. + +## How to respond + +- Be direct and technical. Lead with the answer, then explain. +- Include MITgcm parameter names, file paths, and Fortran source references. +- When uncertain, say so and suggest where to look (readthedocs, source code). +- For questions about this specific simulation, reference the config and namelists. +- Use code blocks for parameter examples, file snippets, and commands. +- If a question requires web lookup (latest docs, specific source code), \ + use WebSearch/WebFetch to find the answer. +""" + + +async def _run_knowledge_query(config: Config, question: str, context_hint: str = "") -> str: + """Run the knowledge agent and return its text response.""" + from claude_agent_sdk import query, ClaudeAgentOptions, ResultMessage, AssistantMessage, TextBlock + + prompt = question + if context_hint: + prompt = f"{context_hint}\n\nQuestion: {question}" + + result_text = "" + try: + async for message in query( + prompt=prompt, + options=ClaudeAgentOptions( + cwd=str(config.simulation_dir), + allowed_tools=["Read", "Glob", "Grep", "WebSearch", "WebFetch"], + system_prompt=KNOWLEDGE_SYSTEM_PROMPT, + model=config.agents.web_research.model, # Sonnet for Q&A + permission_mode="default", + max_turns=10, + ), + ): + if isinstance(message, ResultMessage): + result_text = message.result or "" + elif isinstance(message, AssistantMessage): + for block in message.content: + if isinstance(block, TextBlock): + result_text = block.text + except Exception as e: + logger.exception("Knowledge agent failed") + result_text = f"Sorry, I encountered an error: {e}" + + return result_text + + +def setup_knowledge_handler(bot: discord.Client, config: Config, ctx: AgentContext) -> None: + """Register the on_message handler for #ask-mitgcm Q&A.""" + + channel_name = config.discord_channels.knowledge + + @bot.event + async def on_message(message: discord.Message) -> None: + # Ignore own messages + if message.author == bot.user: + return + + # Ignore DMs + if not message.guild: + return + + # Only respond in the knowledge channel + if message.channel.name != channel_name: + return + + # Ignore messages that are just bot mentions with no content + content = message.content.strip() + if not content: + return + + # Strip bot mention if present + if bot.user and bot.user.mentioned_in(message): + content = content.replace(f"<@{bot.user.id}>", "").replace(f"<@!{bot.user.id}>", "").strip() + + if not content: + return + + logger.info("Knowledge query from %s: %s", message.author, content[:100]) + + # Show typing indicator while processing + async with message.channel.typing(): + # Build context from recent thread/conversation + context_hint = "" + if isinstance(message.channel, discord.Thread): + context_hint = f"(This question is in a thread titled: {message.channel.name})" + + result = await _run_knowledge_query(config, content, context_hint) + + # Reply in thread if message is in a thread, otherwise create one for long answers + if not result: + result = "I wasn't able to find an answer. Could you rephrase or provide more context?" + + # Discord 2000 char limit — split long responses + if len(result) <= 2000: + await message.reply(result, mention_author=False) + else: + # Create a thread for long answers + if not isinstance(message.channel, discord.Thread): + thread = await message.create_thread( + name=content[:90] + "..." if len(content) > 90 else content, + auto_archive_duration=60, + ) + target = thread + else: + target = message.channel + + # Send in chunks + chunks = _split_message(result) + for chunk in chunks: + await target.send(chunk) + + +def _split_message(text: str, limit: int = 1900) -> list[str]: + """Split a long message into chunks, preferring line boundaries.""" + if len(text) <= limit: + return [text] + + chunks = [] + while text: + if len(text) <= limit: + chunks.append(text) + break + + # Try to split at a newline + split_at = text.rfind("\n", 0, limit) + if split_at == -1 or split_at < limit // 2: + # Try space + split_at = text.rfind(" ", 0, limit) + if split_at == -1: + split_at = limit + + chunks.append(text[:split_at]) + text = text[split_at:].lstrip("\n") + + return chunks diff --git a/spectre_agents/discord_bot/views.py b/spectre_agents/discord_bot/views.py new file mode 100644 index 0000000..517d080 --- /dev/null +++ b/spectre_agents/discord_bot/views.py @@ -0,0 +1,85 @@ +"""Interactive Discord views (buttons) for decision approval flows. + +DecisionView presents numbered option buttons. When the user clicks one, +the corresponding asyncio.Future is resolved, unblocking the orchestrator. +""" + +from __future__ import annotations + +import asyncio +from typing import Optional + +import discord + + +class DecisionButton(discord.ui.Button): + """A button representing one decision option.""" + + def __init__(self, label: str, index: int, future: asyncio.Future): + style = discord.ButtonStyle.primary if index == 0 else discord.ButtonStyle.secondary + super().__init__(label=label, style=style, custom_id=f"decision_{index}") + self.option_label = label + self.future = future + + async def callback(self, interaction: discord.Interaction) -> None: + if self.future.done(): + await interaction.response.send_message( + "This decision has already been made.", ephemeral=True + ) + return + + self.future.set_result(self.option_label) + await interaction.response.send_message( + f"Selected: **{self.option_label}**\nResuming orchestrator...", + ) + # Disable all buttons in the view + if self.view: + for item in self.view.children: + item.disabled = True + await interaction.message.edit(view=self.view) + + +class DecisionView(discord.ui.View): + """Interactive view with numbered option buttons for agent decisions.""" + + def __init__(self, options: list[str], future: asyncio.Future, timeout: float = 3600): + super().__init__(timeout=timeout) + self.future = future + for i, option in enumerate(options[:5]): # Discord max 5 buttons per row + self.add_item(DecisionButton(option, i, future)) + + async def on_timeout(self) -> None: + if not self.future.done(): + self.future.set_exception(asyncio.TimeoutError("Decision timed out (1 hour)")) + + +class ConfirmView(discord.ui.View): + """Simple Yes/No confirmation view for destructive actions.""" + + def __init__(self, future: asyncio.Future, timeout: float = 300): + super().__init__(timeout=timeout) + self.future = future + + @discord.ui.button(label="Yes, proceed", style=discord.ButtonStyle.danger) + async def confirm(self, interaction: discord.Interaction, button: discord.ui.Button) -> None: + if not self.future.done(): + self.future.set_result(True) + await interaction.response.send_message("Confirmed. Proceeding...") + self._disable_all() + await interaction.message.edit(view=self) + + @discord.ui.button(label="Cancel", style=discord.ButtonStyle.secondary) + async def cancel(self, interaction: discord.Interaction, button: discord.ui.Button) -> None: + if not self.future.done(): + self.future.set_result(False) + await interaction.response.send_message("Cancelled.") + self._disable_all() + await interaction.message.edit(view=self) + + def _disable_all(self) -> None: + for item in self.children: + item.disabled = True + + async def on_timeout(self) -> None: + if not self.future.done(): + self.future.set_result(False) diff --git a/spectre_agents/tools/__init__.py b/spectre_agents/tools/__init__.py new file mode 100644 index 0000000..1e1d46c --- /dev/null +++ b/spectre_agents/tools/__init__.py @@ -0,0 +1,5 @@ +"""Tool definitions for the SPECTRE agent system. + +Each module defines tools as functions compatible with the Claude Agent SDK's +MCP server interface via @tool decorator from claude_agent_sdk. +""" diff --git a/spectre_agents/tools/bash.py b/spectre_agents/tools/bash.py new file mode 100644 index 0000000..b838b1e --- /dev/null +++ b/spectre_agents/tools/bash.py @@ -0,0 +1,67 @@ +"""Safe subprocess execution tool with denylist and timeout.""" + +from __future__ import annotations + +import re +import subprocess + +from claude_agent_sdk import tool + +# Patterns that are never allowed regardless of context +DENY_PATTERNS: list[re.Pattern] = [ + re.compile(r"\brm\s+-rf\s+/\s*$"), # rm -rf / + re.compile(r"\bmkfs\b"), + re.compile(r"\bdd\s+.*of=/dev/"), + re.compile(r"\b:(){ :\|:& };:"), # fork bomb + re.compile(r"\bshutdown\b"), + re.compile(r"\breboot\b"), + re.compile(r"\binit\s+0\b"), +] + + +def _check_denylist(command: str) -> str | None: + """Return an error message if the command matches a denied pattern.""" + for pattern in DENY_PATTERNS: + if pattern.search(command): + return f"Command denied: matches safety pattern {pattern.pattern!r}" + return None + + +@tool( + "run_command", + "Execute a shell command with safety checks and timeout. " + "Returns stdout, stderr, and return code.", + { + "command": str, + "cwd": str, + "timeout": int, + }, +) +async def run_command(args: dict) -> dict: + command: str = args["command"] + cwd: str = args.get("cwd", ".") + timeout: int = args.get("timeout", 120) + + denial = _check_denylist(command) + if denial: + return {"content": [{"type": "text", "text": denial}]} + + try: + result = subprocess.run( + command, + shell=True, + cwd=cwd, + capture_output=True, + text=True, + timeout=timeout, + ) + output = { + "stdout": result.stdout[-10000:] if len(result.stdout) > 10000 else result.stdout, + "stderr": result.stderr[-5000:] if len(result.stderr) > 5000 else result.stderr, + "returncode": result.returncode, + } + return {"content": [{"type": "text", "text": str(output)}]} + except subprocess.TimeoutExpired: + return {"content": [{"type": "text", "text": f"Command timed out after {timeout}s"}]} + except OSError as e: + return {"content": [{"type": "text", "text": f"OS error: {e}"}]} diff --git a/spectre_agents/tools/dashboard.py b/spectre_agents/tools/dashboard.py new file mode 100644 index 0000000..6986aa0 --- /dev/null +++ b/spectre_agents/tools/dashboard.py @@ -0,0 +1,131 @@ +"""Dashboard management tools: start/stop/health-check the monitoring stack.""" + +from __future__ import annotations + +import subprocess + +from claude_agent_sdk import tool + + +def _run(cmd: str, timeout: int = 10) -> str: + """Run a shell command and return stdout.""" + try: + result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=timeout) + return result.stdout.strip() + except subprocess.TimeoutExpired: + return "(timed out)" + except OSError as e: + return f"(error: {e})" + + +@tool( + "dashboard_health_check", + "Check the health of all three monitoring processes (dashboard, converter, plotter) " + "and the Tailscale proxy.", + {}, +) +async def dashboard_health_check(args: dict) -> dict: + checks = [] + + # Dashboard process + port_check = _run("ss -tlnp | grep :8050") + checks.append(f"Dashboard port 8050: {'LISTENING' if ':8050' in port_check else 'NOT LISTENING'}") + + # Dashboard serving data + data_check = _run("curl -s --max-time 5 http://127.0.0.1:8050/data | head -c 100") + checks.append(f"Dashboard data: {data_check[:80] if data_check else 'NO RESPONSE'}") + + # Tailscale + ts_check = _run("sudo tailscale serve status 2>&1") + checks.append(f"Tailscale: {ts_check[:80] if ts_check else 'NOT CONFIGURED'}") + + # Converter + conv_check = _run("ps aux | grep convert_diagnostics | grep -v grep") + checks.append(f"Converter: {'RUNNING' if conv_check else 'NOT RUNNING'}") + + # Plotter + plot_check = _run("ps aux | grep plot_surface_fields | grep -v grep") + checks.append(f"Plotter: {'RUNNING' if plot_check else 'NOT RUNNING'}") + + return {"content": [{"type": "text", "text": "\n".join(checks)}]} + + +@tool( + "start_dashboard", + "Start the monitoring dashboard with Tailscale proxy.", + {"stdout_path": str, "base_dir": str, "port": int}, +) +async def start_dashboard(args: dict) -> dict: + stdout_path: str = args["stdout_path"] + base_dir: str = args.get("base_dir", "/mnt/beegfs/spectre-150-ensembles") + port: int = args.get("port", 8050) + + commands = [ + f"sudo tailscale serve --http={port} off 2>/dev/null", + f"kill $(lsof -ti :{port}) 2>/dev/null", + "sleep 1", + f"cd {base_dir} && nohup uv run python spectre_utils/monitor_dashboard.py " + f"{stdout_path} --port {port} --poll 30 /tmp/dashboard.log 2>&1 &", + "sleep 3", + f"sudo tailscale serve --bg --http={port} 127.0.0.1:{port}", + ] + cmd = " && ".join(commands) + output = _run(cmd, timeout=30) + return {"content": [{"type": "text", "text": f"Dashboard start sequence completed.\n{output}"}]} + + +@tool( + "start_converter", + "Start the binary-to-NetCDF converter process.", + {"run_dir": str, "base_dir": str}, +) +async def start_converter(args: dict) -> dict: + run_dir: str = args["run_dir"] + base_dir: str = args.get("base_dir", "/mnt/beegfs/spectre-150-ensembles") + cmd = ( + f"cd {base_dir} && nohup uv run python spectre_utils/convert_diagnostics_to_netcdf.py " + f"{run_dir} --poll 60 /tmp/converter.log 2>&1 &" + ) + _run(cmd, timeout=10) + return {"content": [{"type": "text", "text": "Converter started."}]} + + +@tool( + "start_plotter", + "Start the surface field plotter process.", + {"run_dir": str, "base_dir": str}, +) +async def start_plotter(args: dict) -> dict: + run_dir: str = args["run_dir"] + base_dir: str = args.get("base_dir", "/mnt/beegfs/spectre-150-ensembles") + cmd = ( + f"cd {base_dir} && nohup uv run python spectre_utils/plot_surface_fields.py " + f"{run_dir} --poll 120 /tmp/plotter.log 2>&1 &" + ) + _run(cmd, timeout=10) + return {"content": [{"type": "text", "text": "Plotter started."}]} + + +@tool( + "stop_process", + "Stop a monitoring process by name (dashboard, converter, or plotter).", + {"process_name": str}, +) +async def stop_process(args: dict) -> dict: + name: str = args["process_name"] + grep_patterns = { + "dashboard": "monitor_dashboard", + "converter": "convert_diagnostics", + "plotter": "plot_surface_fields", + } + pattern = grep_patterns.get(name) + if not pattern: + return {"content": [{"type": "text", "text": f"Unknown process: {name}. Use: dashboard, converter, plotter"}]} + + cmd = f"pkill -f '{pattern}' 2>/dev/null" + _run(cmd, timeout=5) + + if name == "dashboard": + _run("sudo tailscale serve --http=8050 off 2>/dev/null", timeout=5) + + return {"content": [{"type": "text", "text": f"Stopped {name}."}]} diff --git a/spectre_agents/tools/discord_notify.py b/spectre_agents/tools/discord_notify.py new file mode 100644 index 0000000..15d4405 --- /dev/null +++ b/spectre_agents/tools/discord_notify.py @@ -0,0 +1,104 @@ +"""Discord notification tools for agent-to-user communication. + +These tools reference the shared AgentContext to post messages, images, +and interactive decision requests to Discord channels. +""" + +from __future__ import annotations + +import asyncio +import logging +from typing import TYPE_CHECKING + +from claude_agent_sdk import tool + +if TYPE_CHECKING: + pass + +logger = logging.getLogger(__name__) + +# Module-level reference set by the bot at startup +_context = None + + +def set_agent_context(ctx) -> None: + """Set the shared AgentContext for Discord tools to use.""" + global _context + _context = ctx + + +@tool( + "send_discord_message", + "Send a message to a Discord channel. Use channel names: " + "simulation-status, decisions, alerts, plots, logs.", + {"channel_name": str, "content": str}, +) +async def send_discord_message(args: dict) -> dict: + channel_name: str = args["channel_name"] + content: str = args["content"] + + if _context is None or _context.bot is None: + return {"content": [{"type": "text", "text": "Discord bot not connected"}]} + + try: + channel = await _context.get_channel(channel_name) + if channel is None: + return {"content": [{"type": "text", "text": f"Channel #{channel_name} not found"}]} + + # Discord message limit is 2000 chars + if len(content) > 1900: + content = content[:1900] + "\n... (truncated)" + + await channel.send(content) + return {"content": [{"type": "text", "text": f"Message sent to #{channel_name}"}]} + except Exception as e: + return {"content": [{"type": "text", "text": f"Failed to send message: {e}"}]} + + +@tool( + "send_discord_image", + "Upload an image file to a Discord channel with an optional caption.", + {"channel_name": str, "image_path": str, "caption": str}, +) +async def send_discord_image(args: dict) -> dict: + import discord + + channel_name: str = args["channel_name"] + image_path: str = args["image_path"] + caption: str = args.get("caption", "") + + if _context is None or _context.bot is None: + return {"content": [{"type": "text", "text": "Discord bot not connected"}]} + + try: + channel = await _context.get_channel(channel_name) + if channel is None: + return {"content": [{"type": "text", "text": f"Channel #{channel_name} not found"}]} + + file = discord.File(image_path) + await channel.send(content=caption or None, file=file) + return {"content": [{"type": "text", "text": f"Image sent to #{channel_name}"}]} + except Exception as e: + return {"content": [{"type": "text", "text": f"Failed to send image: {e}"}]} + + +@tool( + "request_user_decision", + "Post an interactive decision request to Discord with numbered options. " + "Blocks until the user selects an option. Returns the selected option text.", + {"question": str, "options": list}, +) +async def request_user_decision(args: dict) -> dict: + question: str = args["question"] + options: list = args["options"] + + if _context is None or _context.bot is None: + return {"content": [{"type": "text", "text": "Discord bot not connected — cannot request decision"}]} + + try: + selected = await _context.request_decision(question, options, "decisions") + return {"content": [{"type": "text", "text": f"User selected: {selected}"}]} + except asyncio.TimeoutError: + return {"content": [{"type": "text", "text": "Decision request timed out — no user response"}]} + except Exception as e: + return {"content": [{"type": "text", "text": f"Decision request failed: {e}"}]} diff --git a/spectre_agents/tools/file_io.py b/spectre_agents/tools/file_io.py new file mode 100644 index 0000000..8818082 --- /dev/null +++ b/spectre_agents/tools/file_io.py @@ -0,0 +1,123 @@ +"""File I/O tools: read, write, edit, glob, grep.""" + +from __future__ import annotations + +import fnmatch +import os +import re +from pathlib import Path + +from claude_agent_sdk import tool + + +@tool( + "read_file", + "Read a file from disk. Returns numbered lines.", + {"path": str, "offset": int, "limit": int}, +) +async def read_file(args: dict) -> dict: + path: str = args["path"] + offset: int = args.get("offset", 0) + limit: int = args.get("limit", 2000) + try: + with open(path) as f: + lines = f.readlines() + selected = lines[offset : offset + limit] + numbered = "".join(f"{i + offset + 1}\t{line}" for i, line in enumerate(selected)) + return {"content": [{"type": "text", "text": numbered or "(empty file)"}]} + except OSError as e: + return {"content": [{"type": "text", "text": f"Error reading {path}: {e}"}]} + + +@tool( + "write_file", + "Write content to a file, creating it if it doesn't exist.", + {"path": str, "content": str}, +) +async def write_file(args: dict) -> dict: + path: str = args["path"] + content: str = args["content"] + try: + Path(path).parent.mkdir(parents=True, exist_ok=True) + with open(path, "w") as f: + f.write(content) + return {"content": [{"type": "text", "text": f"Wrote {len(content)} bytes to {path}"}]} + except OSError as e: + return {"content": [{"type": "text", "text": f"Error writing {path}: {e}"}]} + + +@tool( + "edit_file", + "Replace an exact string in a file. old_string must appear exactly once.", + {"path": str, "old_string": str, "new_string": str}, +) +async def edit_file(args: dict) -> dict: + path: str = args["path"] + old_string: str = args["old_string"] + new_string: str = args["new_string"] + try: + text = Path(path).read_text() + count = text.count(old_string) + if count == 0: + return {"content": [{"type": "text", "text": f"old_string not found in {path}"}]} + if count > 1: + return {"content": [{"type": "text", "text": f"old_string found {count} times — must be unique"}]} + new_text = text.replace(old_string, new_string, 1) + Path(path).write_text(new_text) + return {"content": [{"type": "text", "text": f"Edited {path} successfully"}]} + except OSError as e: + return {"content": [{"type": "text", "text": f"Error editing {path}: {e}"}]} + + +@tool( + "glob_files", + "Find files matching a glob pattern. Returns sorted list of paths.", + {"pattern": str, "path": str}, +) +async def glob_files(args: dict) -> dict: + pattern: str = args["pattern"] + path: str = args.get("path", ".") + try: + base = Path(path) + matches = sorted(str(p) for p in base.glob(pattern)) + result = "\n".join(matches[:500]) if matches else "(no matches)" + return {"content": [{"type": "text", "text": result}]} + except OSError as e: + return {"content": [{"type": "text", "text": f"Error: {e}"}]} + + +@tool( + "grep_files", + "Search file contents for a regex pattern. Returns matching lines with file and line number.", + {"pattern": str, "path": str, "glob_filter": str}, +) +async def grep_files(args: dict) -> dict: + pattern_str: str = args["pattern"] + path: str = args.get("path", ".") + glob_filter: str = args.get("glob_filter", "*") + try: + regex = re.compile(pattern_str) + except re.error as e: + return {"content": [{"type": "text", "text": f"Invalid regex: {e}"}]} + + results = [] + base = Path(path) + files = base.rglob(glob_filter) if "**" in glob_filter or "/" in glob_filter else base.glob(glob_filter) + + for filepath in files: + if not filepath.is_file(): + continue + try: + with open(filepath) as f: + for i, line in enumerate(f, 1): + if regex.search(line): + results.append(f"{filepath}:{i}: {line.rstrip()}") + if len(results) >= 200: + break + except (OSError, UnicodeDecodeError): + continue + if len(results) >= 200: + break + + text = "\n".join(results) if results else "(no matches)" + return {"content": [{"type": "text", "text": text}]} diff --git a/spectre_agents/tools/forcing.py b/spectre_agents/tools/forcing.py new file mode 100644 index 0000000..8e901a6 --- /dev/null +++ b/spectre_agents/tools/forcing.py @@ -0,0 +1,129 @@ +"""Forcing data validation tools for EXF and OBC binary files.""" + +from __future__ import annotations + +import os + +import numpy as np +from claude_agent_sdk import tool + +# Physical range expectations for EXF variables +EXF_RANGES: dict[str, tuple[float, float]] = { + "atemp": (240.0, 320.0), + "aqh": (0.0, 0.025), + "uwind": (-50.0, 50.0), + "vwind": (-50.0, 50.0), + "swdown": (0.0, 1200.0), + "lwdown": (100.0, 500.0), + "precip": (0.0, 1e-3), + "evap": (-1e-3, 1e-4), +} + + +@tool( + "validate_exf_binary", + "Validate an EXF binary forcing file. Checks physical ranges, NaN/Inf, and grid orientation.", + {"path": str, "var_name": str, "nx": int, "ny": int}, +) +async def validate_exf_binary(args: dict) -> dict: + path: str = args["path"] + var_name: str = args["var_name"] + nx: int = args.get("nx", 768) + ny: int = args.get("ny", 424) + + if not os.path.exists(path): + return {"content": [{"type": "text", "text": f"File not found: {path}"}]} + + try: + record_size = ny * nx + arr = np.fromfile(path, dtype=">f4", count=record_size).reshape(ny, nx) + + result_lines = [f"File: {path}", f"Variable: {var_name}", f"Shape: ({ny}, {nx})"] + + # Basic stats + has_nan = bool(np.isnan(arr).any()) + has_inf = bool(np.isinf(arr).any()) + result_lines.append(f"Min: {arr.min():.6g}") + result_lines.append(f"Max: {arr.max():.6g}") + result_lines.append(f"Mean: {arr.mean():.6g}") + result_lines.append(f"NaN: {has_nan}") + result_lines.append(f"Inf: {has_inf}") + + # Range check + passed = True + if var_name in EXF_RANGES: + lo, hi = EXF_RANGES[var_name] + valid = np.isfinite(arr) + if valid.any(): + out_of_range = (arr[valid] < lo) | (arr[valid] > hi) + pct = 100.0 * out_of_range.sum() / valid.sum() + result_lines.append(f"Expected range: [{lo}, {hi}]") + result_lines.append(f"Out of range: {pct:.2f}%") + if pct > 5 or has_nan or has_inf: + passed = False + + # Grid orientation check (j=0 should be south/warm for atemp) + if var_name == "atemp" and not has_nan: + south_mean = float(arr[0, :].mean()) + north_mean = float(arr[-1, :].mean()) + result_lines.append(f"j=0 (south) mean: {south_mean:.1f} K") + result_lines.append(f"j={ny - 1} (north) mean: {north_mean:.1f} K") + if south_mean < north_mean: + result_lines.append("WARNING: South is cooler than north — possible orientation error") + passed = False + + result_lines.append(f"Result: {'PASS' if passed else 'FAIL'}") + return {"content": [{"type": "text", "text": "\n".join(result_lines)}]} + except Exception as e: + return {"content": [{"type": "text", "text": f"Error validating {path}: {e}"}]} + + +@tool( + "validate_obc_binary", + "Validate an OBC binary file. Checks record count and dimensions.", + {"path": str, "boundary": str, "var": str, "nr": int, "n_boundary": int, "expected_records": int}, +) +async def validate_obc_binary(args: dict) -> dict: + path: str = args["path"] + boundary: str = args["boundary"] + var: str = args["var"] + nr: int = args.get("nr", 50) + n_boundary: int = args["n_boundary"] + expected_records: int = args.get("expected_records", 5479) + + if not os.path.exists(path): + return {"content": [{"type": "text", "text": f"File not found: {path}"}]} + + try: + file_size = os.path.getsize(path) + is_2d = var.lower() in ("eta", "ssh", "etan") + if is_2d: + record_bytes = n_boundary * 4 + else: + record_bytes = nr * n_boundary * 4 + + actual_records = file_size / record_bytes if record_bytes > 0 else 0 + + result_lines = [ + f"File: {path}", + f"Boundary: {boundary}, Variable: {var}", + f"File size: {file_size} bytes", + f"Record size: {record_bytes} bytes", + f"Actual records: {actual_records:.1f}", + f"Expected records: {expected_records}", + ] + + passed = abs(actual_records - expected_records) < 1 + if not passed: + result_lines.append(f"MISMATCH: expected {expected_records}, got {actual_records:.1f}") + + # Sample first record for range check + arr = np.fromfile(path, dtype=">f4", count=record_bytes // 4) + result_lines.append(f"First record min: {arr.min():.6g}") + result_lines.append(f"First record max: {arr.max():.6g}") + result_lines.append(f"NaN: {bool(np.isnan(arr).any())}") + + result_lines.append(f"Result: {'PASS' if passed else 'FAIL'}") + return {"content": [{"type": "text", "text": "\n".join(result_lines)}]} + except Exception as e: + return {"content": [{"type": "text", "text": f"Error validating {path}: {e}"}]} diff --git a/spectre_agents/tools/mitgcm.py b/spectre_agents/tools/mitgcm.py new file mode 100644 index 0000000..3a7dd76 --- /dev/null +++ b/spectre_agents/tools/mitgcm.py @@ -0,0 +1,111 @@ +"""MITgcm-specific tools: STDOUT parsing, monitor stats, CFL extraction.""" + +from __future__ import annotations + +import re +import subprocess + +from claude_agent_sdk import tool + + +@tool( + "parse_monitor_stats", + "Extract the last N monitor blocks from MITgcm STDOUT. " + "Returns parsed %MON fields with timestep and values.", + {"stdout_path": str, "last_n": int}, +) +async def parse_monitor_stats(args: dict) -> dict: + stdout_path: str = args["stdout_path"] + last_n: int = args.get("last_n", 10) + + try: + with open(stdout_path) as f: + lines = f.readlines() + except OSError as e: + return {"content": [{"type": "text", "text": f"Error reading {stdout_path}: {e}"}]} + + # Collect all %MON lines grouped by time_secondsf + mon_lines = [l for l in lines if "%MON" in l] + if not mon_lines: + return {"content": [{"type": "text", "text": "No %MON lines found in STDOUT"}]} + + # Find unique timesteps + timesteps = [] + for line in mon_lines: + m = re.match(r".*%MON\s+time_secondsf\s+=\s+([\d.E+\-]+)", line) + if m: + timesteps.append(float(m.group(1))) + + # Get last N timesteps + unique_ts = sorted(set(timesteps))[-last_n:] + if not unique_ts: + # Just return last 50 %MON lines + result = "".join(mon_lines[-50:]) + return {"content": [{"type": "text", "text": result}]} + + min_ts = unique_ts[0] + relevant = [l for l in mon_lines if _get_time(l, min_ts) >= min_ts] + result = "".join(relevant[-200:]) # Cap output + return {"content": [{"type": "text", "text": result}]} + + +def _get_time(line: str, default: float) -> float: + """Extract time_secondsf from a %MON line, or return default.""" + # This is a heuristic — monitor blocks don't have time on every line + return default + + +@tool( + "get_cfl_values", + "Extract the latest CFL values from MITgcm STDOUT.", + {"stdout_path": str}, +) +async def get_cfl_values(args: dict) -> dict: + stdout_path: str = args["stdout_path"] + cmd = f"grep '%MON advcfl' {stdout_path} | tail -7" + try: + result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=10) + return {"content": [{"type": "text", "text": result.stdout.strip() or "No CFL data found"}]} + except (subprocess.TimeoutExpired, OSError) as e: + return {"content": [{"type": "text", "text": f"Error: {e}"}]} + + +@tool( + "get_model_days", + "Calculate model days reached from MITgcm STDOUT.", + {"stdout_path": str}, +) +async def get_model_days(args: dict) -> dict: + stdout_path: str = args["stdout_path"] + cmd = f"grep '%MON time_secondsf' {stdout_path} | tail -1" + try: + result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=10) + line = result.stdout.strip() + if not line: + return {"content": [{"type": "text", "text": "No time data found"}]} + + m = re.search(r"=\s+([\d.E+\-]+)", line) + if m: + seconds = float(m.group(1)) + days = seconds / 86400.0 + return {"content": [{"type": "text", "text": f'{{"seconds": {seconds}, "days": {days:.2f}}}'}]} + return {"content": [{"type": "text", "text": f"Could not parse: {line}"}]} + except (subprocess.TimeoutExpired, OSError) as e: + return {"content": [{"type": "text", "text": f"Error: {e}"}]} + + +@tool( + "get_stdout_tail", + "Read the last N lines of an MITgcm STDOUT file.", + {"stdout_path": str, "n_lines": int}, +) +async def get_stdout_tail(args: dict) -> dict: + stdout_path: str = args["stdout_path"] + n_lines: int = args.get("n_lines", 30) + try: + with open(stdout_path) as f: + lines = f.readlines() + tail = lines[-n_lines:] + return {"content": [{"type": "text", "text": "".join(tail)}]} + except OSError as e: + return {"content": [{"type": "text", "text": f"Error: {e}"}]} diff --git a/spectre_agents/tools/namelist.py b/spectre_agents/tools/namelist.py new file mode 100644 index 0000000..479768a --- /dev/null +++ b/spectre_agents/tools/namelist.py @@ -0,0 +1,138 @@ +"""Fortran namelist parsing and validation tools.""" + +from __future__ import annotations + +import os +import re +from pathlib import Path + +from claude_agent_sdk import tool + + +def parse_fortran_namelist(text: str) -> dict[str, dict[str, str]]: + """Parse a Fortran namelist file into a dict of {group: {key: value}}.""" + result: dict[str, dict[str, str]] = {} + current_group = None + + for line in text.splitlines(): + stripped = line.strip() + # Skip comments and empty lines + if not stripped or stripped.startswith("#") or stripped.startswith("!"): + continue + # Group start: &GROUPNAME + m = re.match(r"&(\w+)", stripped) + if m: + current_group = m.group(1) + result[current_group] = {} + continue + # Group end + if stripped == "/" or stripped == "&": + current_group = None + continue + # Key = value + if current_group and "=" in stripped: + key, _, val = stripped.partition("=") + key = key.strip() + val = val.strip().rstrip(",") + result[current_group][key] = val + + return result + + +@tool( + "parse_namelist", + "Parse a Fortran namelist file and return its contents as structured data.", + {"path": str}, +) +async def parse_namelist_tool(args: dict) -> dict: + path: str = args["path"] + try: + text = Path(path).read_text() + parsed = parse_fortran_namelist(text) + lines = [] + for group, params in parsed.items(): + lines.append(f"&{group}") + for k, v in params.items(): + lines.append(f" {k} = {v}") + lines.append("") + return {"content": [{"type": "text", "text": "\n".join(lines)}]} + except OSError as e: + return {"content": [{"type": "text", "text": f"Error reading {path}: {e}"}]} + + +@tool( + "validate_namelists", + "Run cross-validation checks on all MITgcm namelist files in an input directory. " + "Checks package consistency, grid dimensions, EXF config, OBC sizes, time config, " + "and memory safety.", + {"input_dir": str}, +) +async def validate_namelists(args: dict) -> dict: + input_dir: str = args["input_dir"] + checks: list[str] = [] + pass_count = 0 + fail_count = 0 + + def check(name: str, passed: bool, detail: str) -> None: + nonlocal pass_count, fail_count + status = "PASS" if passed else "FAIL" + if passed: + pass_count += 1 + else: + fail_count += 1 + checks.append(f"[{status}] {name}: {detail}") + + base = Path(input_dir) + + # Parse namelists + namelists = {} + for name in ("data", "data.exf", "data.obcs", "data.pkg", "data.diagnostics", "data.mnc"): + path = base / name + if path.exists(): + namelists[name] = parse_fortran_namelist(path.read_text()) + else: + checks.append(f"[WARN] {name}: file not found") + + # 1. Package consistency + pkg = namelists.get("data.pkg", {}).get("PACKAGES", {}) + use_diag = pkg.get("useDIAGNOSTICS", "").upper() + use_mnc = pkg.get("useMNC", "").upper() + use_exf = pkg.get("useEXF", "").upper() + + if use_diag == ".TRUE.": + check("DIAGNOSTICS package", "data.diagnostics" in namelists, + "data.diagnostics exists" if "data.diagnostics" in namelists else "data.diagnostics MISSING") + + # 2. Memory safety + diag = namelists.get("data.diagnostics", {}).get("DIAGNOSTICS_LIST", {}) + diag_mnc = diag.get("diag_mnc", "") + check("diag_mnc disabled", ".FALSE." in diag_mnc.upper() if diag_mnc else True, + f"diag_mnc={diag_mnc}" if diag_mnc else "not set (OK)") + + parms = namelists.get("data", {}).get("PARM03", {}) + dump_freq = parms.get("dumpFreq", "0") + check("dumpFreq=0", dump_freq.strip(".") == "0" or dump_freq == "0.", + f"dumpFreq={dump_freq}") + + mnc_nml = namelists.get("data.mnc", {}).get("MNC_01", {}) + pickup_write = mnc_nml.get("pickup_write_mnc", "") + check("pickup_write_mnc disabled", + ".FALSE." in pickup_write.upper() if pickup_write else True, + f"pickup_write_mnc={pickup_write}" if pickup_write else "not set (OK)") + + # 3. EXF check + exf_nml = namelists.get("data.exf", {}) + exf04 = exf_nml.get("EXF_NML_04", {}) + check("No EXF interpolation metadata", + not any("_nlon" in k or "_nlat" in k for k in exf04), + "No *_nlon/*_nlat keys in EXF_NML_04") + + # 4. OBC period + obcs = namelists.get("data.obcs", {}).get("OBCS_PARM01", {}) + for boundary in ("OB_Jnorth", "OB_Jsouth", "OB_Ieast", "OB_Iwest"): + if boundary in obcs: + check(f"{boundary} defined", True, f"{boundary} present") + + # Summary + checks.append(f"\nSummary: {pass_count} PASS, {fail_count} FAIL") + return {"content": [{"type": "text", "text": "\n".join(checks)}]} diff --git a/spectre_agents/tools/slurm.py b/spectre_agents/tools/slurm.py new file mode 100644 index 0000000..8c51e6f --- /dev/null +++ b/spectre_agents/tools/slurm.py @@ -0,0 +1,79 @@ +"""SLURM job management tools: submit, status, queue, cancel.""" + +from __future__ import annotations + +import re +import subprocess + +from claude_agent_sdk import tool + + +@tool( + "submit_job", + "Submit a SLURM job via sbatch. Returns job ID.", + {"script": str, "chdir": str}, +) +async def submit_job(args: dict) -> dict: + script: str = args["script"] + chdir: str = args.get("chdir", ".") + + cmd = f"sbatch --chdir={chdir} {script}" + try: + result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=30) + if result.returncode != 0: + return {"content": [{"type": "text", "text": f"sbatch failed: {result.stderr}"}]} + + # Parse "Submitted batch job 12345" + match = re.search(r"Submitted batch job (\d+)", result.stdout) + if match: + job_id = int(match.group(1)) + return {"content": [{"type": "text", "text": f'{{"job_id": {job_id}, "submitted": true}}'}]} + return {"content": [{"type": "text", "text": f"Unexpected sbatch output: {result.stdout}"}]} + except subprocess.TimeoutExpired: + return {"content": [{"type": "text", "text": "sbatch timed out after 30s"}]} + + +@tool( + "job_status", + "Get SLURM job status via sacct. Returns job state, exit code, elapsed time, and memory.", + {"job_id": int}, +) +async def job_status(args: dict) -> dict: + job_id: int = args["job_id"] + cmd = f"sacct -j {job_id} --format=JobID,State,ExitCode,Elapsed,MaxRSS --parsable2 --noheader" + try: + result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=15) + return {"content": [{"type": "text", "text": result.stdout.strip() or "(no output)"}]} + except subprocess.TimeoutExpired: + return {"content": [{"type": "text", "text": "sacct timed out"}]} + + +@tool( + "queue_status", + "Show current SLURM queue for this user.", + {}, +) +async def queue_status(args: dict) -> dict: + cmd = "squeue -u $USER --format='%.18i %.9P %.50j %.8u %.8T %.10M %.9l %.6D %R' --noheader" + try: + result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=15) + return {"content": [{"type": "text", "text": result.stdout.strip() or "(queue empty)"}]} + except subprocess.TimeoutExpired: + return {"content": [{"type": "text", "text": "squeue timed out"}]} + + +@tool( + "cancel_job", + "Cancel a SLURM job.", + {"job_id": int}, +) +async def cancel_job(args: dict) -> dict: + job_id: int = args["job_id"] + cmd = f"scancel {job_id}" + try: + result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=15) + if result.returncode == 0: + return {"content": [{"type": "text", "text": f"Job {job_id} cancelled"}]} + return {"content": [{"type": "text", "text": f"scancel failed: {result.stderr}"}]} + except subprocess.TimeoutExpired: + return {"content": [{"type": "text", "text": "scancel timed out"}]} diff --git a/spectre_agents/types.py b/spectre_agents/types.py new file mode 100644 index 0000000..e41e951 --- /dev/null +++ b/spectre_agents/types.py @@ -0,0 +1,119 @@ +"""Shared data types for the SPECTRE agent system.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from enum import Enum +from typing import Optional + + +class FailureType(Enum): + OUT_OF_MEMORY = "OUT_OF_MEMORY" + NUMERICAL_BLOWUP = "NUMERICAL_BLOWUP" + EXF_RANGE_CHECK = "EXF_RANGE_CHECK" + FILE_IO_CRASH = "FILE_IO_CRASH" + INITIALIZATION_FAILURE = "INITIALIZATION_FAILURE" + UNKNOWN = "UNKNOWN" + + +class HealthStatus(Enum): + HEALTHY = "HEALTHY" + WARNING = "WARNING" + CRITICAL = "CRITICAL" + + +class CheckResult(Enum): + PASS = "PASS" + FAIL = "FAIL" + + +@dataclass +class DiagnosisReport: + failure_type: FailureType + model_days_reached: float + wall_time: str + root_cause: str + evidence: list[str] = field(default_factory=list) + suggested_fix: str = "" + + +@dataclass +class HealthAssessment: + status: HealthStatus + model_days: float + summary: str + fields: dict[str, str] = field(default_factory=dict) + trends: str = "" + recommendation: str = "" + + +@dataclass +class ValidationCheck: + name: str + result: CheckResult + parameter: str = "" + current_value: str = "" + expected_value: str = "" + detail: str = "" + + +@dataclass +class ValidationReport: + checks: list[ValidationCheck] = field(default_factory=list) + + @property + def pass_count(self) -> int: + return sum(1 for c in self.checks if c.result == CheckResult.PASS) + + @property + def fail_count(self) -> int: + return sum(1 for c in self.checks if c.result == CheckResult.FAIL) + + @property + def passed(self) -> bool: + return self.fail_count == 0 + + +@dataclass +class QCFileResult: + path: str + result: CheckResult + min_val: Optional[float] = None + max_val: Optional[float] = None + mean_val: Optional[float] = None + nan_count: int = 0 + anomalies: list[str] = field(default_factory=list) + + +@dataclass +class QCReport: + files: list[QCFileResult] = field(default_factory=list) + + @property + def pass_count(self) -> int: + return sum(1 for f in self.files if f.result == CheckResult.PASS) + + @property + def fail_count(self) -> int: + return sum(1 for f in self.files if f.result == CheckResult.FAIL) + + +@dataclass +class JobInfo: + job_id: int + state: str = "" + exit_code: str = "" + elapsed: str = "" + max_rss: str = "" + + +@dataclass +class SimulationState: + """Tracks the current state of the simulation system.""" + active_job_id: Optional[int] = None + run_dir: str = "" + last_diagnosis: Optional[DiagnosisReport] = None + last_health: Optional[HealthAssessment] = None + model_days: float = 0.0 + cfl_max: float = 0.0 + status: str = "idle" # idle, running, failed, stopped diff --git a/spectre_agents_config.yaml b/spectre_agents_config.yaml new file mode 100644 index 0000000..c59fe58 --- /dev/null +++ b/spectre_agents_config.yaml @@ -0,0 +1,56 @@ +# SPECTRE Agent System Configuration +# Secrets (API keys, tokens) go in /etc/spectre-agents/env, NOT here. + +simulation: + base_dir: /mnt/beegfs/spectre-150-ensembles + sim_dir: simulations/glorysv12-curvilinear + run_dir_prefix: test-run + +grid: + Nx: 768 + Ny: 424 + Nr: 50 + nPx: 8 + nPy: 8 + +discord: + channels: + status: simulation-status + decisions: decisions + alerts: alerts + plots: plots + logs: logs + knowledge: ask-mitgcm + +agents: + orchestrator: + model: claude-opus-4-6 + max_tokens: 16384 + workflow_runner: + model: claude-haiku-4-5 + max_tokens: 4096 + stdout_diagnostics: + model: claude-sonnet-4-6 + max_tokens: 8192 + model_output_review: + model: claude-sonnet-4-6 + max_tokens: 8192 + namelist_validator: + model: claude-sonnet-4-6 + max_tokens: 8192 + forcing_data_qc: + model: claude-sonnet-4-6 + max_tokens: 8192 + dashboard_manager: + model: claude-haiku-4-5 + max_tokens: 4096 + notify: + model: claude-haiku-4-5 + max_tokens: 4096 + web_research: + model: claude-sonnet-4-6 + max_tokens: 8192 + +monitoring: + poll_interval_seconds: 60 + dashboard_port: 8050 diff --git a/systemd/spectre-agents.service b/systemd/spectre-agents.service new file mode 100644 index 0000000..a719780 --- /dev/null +++ b/systemd/spectre-agents.service @@ -0,0 +1,30 @@ +[Unit] +Description=SPECTRE Simulation Agent System (Discord bot + Claude agents) +After=network-online.target +Wants=network-online.target + +[Service] +Type=simple +User=joe +Group=joe +WorkingDirectory=/mnt/beegfs/spectre-150-ensembles +ExecStart=/mnt/beegfs/spectre-150-ensembles/.venv/bin/python -m spectre_agents --config /mnt/beegfs/spectre-150-ensembles/spectre_agents_config.yaml +Restart=on-failure +RestartSec=30 + +# Secrets +EnvironmentFile=/etc/spectre-agents/env + +# Logging +StandardOutput=journal +StandardError=journal +SyslogIdentifier=spectre-agents + +# Security hardening +NoNewPrivileges=yes +ProtectSystem=strict +ReadWritePaths=/mnt/beegfs/spectre-150-ensembles /tmp /var/log +PrivateTmp=no + +[Install] +WantedBy=multi-user.target