Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .changeset/every-spoons-smash.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
"trackio": minor
---

feat: Add additional support for autonomous ML experiments

- `trackio.watch()` / `trackio.should_stop()`: register metric watchers (NaN/Inf, threshold, spike, stagnation, custom fn) that fire alerts automatically on every `trackio.log()` call
- `AlertReason` constants for programmatic alert filtering
- Run lifecycle status tracking (`running` → `finished` / `failed`) persisted in SQLite
- New CLI commands: `trackio best`, `trackio compare`, `trackio summary`
- `Run.status`, `Run.final_metrics`, `Run.metrics()`, `Run.history()` on the Python API
- `alerts.data` column (SQL migration) for structured alert metadata
223 changes: 223 additions & 0 deletions autonomous-experiments/test_harness/agent_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
"""
Agent test runner for autonomous ML experiments with Trackio.

Acts as an autonomous agent that:
1. Launches simulated training via subprocess
2. Polls alerts via trackio CLI
3. Decides next hyperparameters based on results
4. Iterates for N rounds
"""

import argparse
import json
import subprocess
import sys
import time
from pathlib import Path

SIMULATOR = str(Path(__file__).parent / "simulate_training.py")


def run_cli(args_list):
result = subprocess.run(
[sys.executable, "-m", "trackio.cli", *args_list, "--json"],
capture_output=True,
text=True,
)
if result.returncode != 0:
return None
try:
return json.loads(result.stdout)
except json.JSONDecodeError:
return None


def run_training(project, run_name, **kwargs):
cmd = [
sys.executable,
SIMULATOR,
"--project",
project,
"--run-name",
run_name,
]
for key, value in kwargs.items():
cmd.extend([f"--{key.replace('_', '-')}", str(value)])

print(f"\n{'=' * 60}")
print(f"Launching training: {run_name}")
print(f" Config: {kwargs}")
print(f"{'=' * 60}")

result = subprocess.run(cmd, capture_output=True, text=True)
print(f" stdout: {result.stdout.strip()}")
if result.stderr:
print(f" stderr: {result.stderr.strip()[:200]}")
return result.returncode


def get_alerts(project, run_name=None):
args = ["list", "alerts", "--project", project]
if run_name:
args.extend(["--run", run_name])
result = run_cli(args)
if result and "alerts" in result:
return result["alerts"]
return []


def experiment_failure_recovery(project):
print("\n" + "=" * 60)
print("EXPERIMENT: Failure Recovery")
print("Goal: Detect crashes and restart with adjusted parameters")
print("=" * 60)

attempts = []
lr = 1.0
max_attempts = 5

for attempt in range(max_attempts):
run_name = f"attempt-{attempt}-lr{lr}"
returncode = run_training(project, run_name, steps=500, lr=lr, seed=42)

alerts = get_alerts(project, run_name)
error_alerts = [a for a in alerts if a.get("level") == "error"]

if returncode != 0 or error_alerts:
error_msg = (
error_alerts[0]["title"] if error_alerts else "non-zero exit code"
)
print(f" [AGENT] Attempt {attempt} failed: {error_msg}")
prev_lr = lr
lr *= 0.1
print(f" [AGENT] Reducing LR to {lr}")
attempts.append({"run": run_name, "status": "failed", "lr": prev_lr})
else:
result = run_cli(
[
"get",
"metric",
"--project",
project,
"--run",
run_name,
"--metric",
"val/loss",
]
)
val_loss = (
result["values"][-1]["value"]
if result and result.get("values")
else None
)
print(f" [AGENT] Attempt {attempt} succeeded! val_loss={val_loss}")
attempts.append(
{"run": run_name, "status": "success", "val_loss": val_loss}
)
break

print("\n[AGENT] Recovery history:")
for a in attempts:
print(f" {a}")
return {"attempts": len(attempts)}


def experiment_long_monitoring(project):
print("\n" + "=" * 60)
print("EXPERIMENT: Long-Running Monitoring")
print("Goal: Test alert polling with alert_id dedup during active training")
print("=" * 60)

run_name = "long-run"

cmd = [
sys.executable,
SIMULATOR,
"--project",
project,
"--run-name",
run_name,
"--steps",
"1000",
"--lr",
"0.05",
"--spike-at-step",
"500",
"--sleep",
"0.005",
"--seed",
"42",
]

print(" [AGENT] Starting long training run in background...")
proc = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True
)

seen_ids: set[str] = set()

while proc.poll() is None:
time.sleep(0.5)
alerts = get_alerts(project, run_name)
new_alerts = [a for a in alerts if a.get("alert_id") not in seen_ids]
for alert in new_alerts:
print(
f" [AGENT] New alert: [{alert.get('level', '?')}] {alert.get('title', '?')}"
)
if alert.get("alert_id") is not None:
seen_ids.add(alert["alert_id"])

stdout, _ = proc.communicate()
print(f" [AGENT] Training finished. Exit code: {proc.returncode}")
print(f" [AGENT] stdout: {stdout.strip()}")

final_alerts = get_alerts(project, run_name)
print(f"\n[AGENT] Total alerts captured: {len(final_alerts)}")
return {"alerts": len(final_alerts)}


EXPERIMENTS = {
"failure_recovery": experiment_failure_recovery,
"long_monitoring": experiment_long_monitoring,
}


def main():
parser = argparse.ArgumentParser(description="Agent test runner for autonomous ML")
parser.add_argument(
"--experiment",
choices=[*EXPERIMENTS.keys(), "all"],
default="all",
help="Which experiment to run",
)
parser.add_argument(
"--project-prefix",
default="agent-test",
help="Prefix for project names",
)
args = parser.parse_args()

experiments = list(EXPERIMENTS) if args.experiment == "all" else [args.experiment]

results = {}

for exp_name in experiments:
project = f"{args.project_prefix}-{exp_name}"
try:
result = EXPERIMENTS[exp_name](project)
results[exp_name] = result
except Exception as e:
print(f"\n[ERROR] Experiment {exp_name} failed: {e}")
results[exp_name] = {"error": str(e)}

print("\n" + "=" * 60)
print("SUMMARY")
print("=" * 60)
for name, result in results.items():
print(f"\n{name}:")
for k, v in result.items():
print(f" {k}: {v}")


if __name__ == "__main__":
main()
139 changes: 139 additions & 0 deletions autonomous-experiments/test_harness/simulate_training.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
"""
Synthetic training simulator for testing autonomous ML workflows with Trackio.

Usage:
python simulate_training.py --project my_exp --run-name lr-0.01 --steps 500 --lr 0.01
python simulate_training.py --project my_exp --run-name spike-test --steps 500 --lr 0.01 --spike-at-step 300
"""

import argparse
import math
import random
import time

import trackio
from trackio.alerts import AlertLevel

NOISE_SCALE = 0.05


def simulate_loss(step, total_steps, lr, depth, batch_size, base_loss=3.0):
convergence_rate = 2.0 / (1.0 + lr * 100)
if lr > 0.5:
convergence_rate = -0.5

depth_factor = min(depth / 6.0, 2.0)
batch_factor = 1.0 + 0.1 * math.log2(max(batch_size, 1) / 32)

progress = step / max(total_steps, 1)
base = base_loss * math.exp(
-convergence_rate * depth_factor * batch_factor * progress * 5
)
noise = random.gauss(0, NOISE_SCALE * (1 - progress * 0.5))

if lr > 0.1:
oscillation = 0.3 * lr * math.sin(step * lr * 0.5)
base += abs(oscillation)

if lr > 1.0 and step > 50:
base += lr * step * 0.001

return max(0.01, base + noise)


def simulate_val_loss(train_loss, step, total_steps, depth, overfitting_threshold=0.6):
progress = step / max(total_steps, 1)
gap = 0.05

if progress > overfitting_threshold and depth > 8:
overfit_amount = (progress - overfitting_threshold) * depth * 0.05
gap += overfit_amount

noise = random.gauss(0, 0.02)
return train_loss + gap + noise


def simulate_accuracy(loss):
return max(0, min(1, 1 - loss / 3.0 + random.gauss(0, 0.01)))


def main():
parser = argparse.ArgumentParser(description="Synthetic training simulator")
parser.add_argument("--project", required=True, help="Trackio project name")
parser.add_argument("--run-name", required=True, help="Run name")
parser.add_argument("--steps", type=int, default=500, help="Total training steps")
parser.add_argument("--lr", type=float, default=0.01, help="Learning rate")
parser.add_argument("--depth", type=int, default=6, help="Model depth (layers)")
parser.add_argument("--batch-size", type=int, default=32, help="Batch size")
parser.add_argument(
"--spike-at-step", type=int, default=None, help="Simulate loss spike at step N"
)
parser.add_argument("--seed", type=int, default=None, help="Random seed")
parser.add_argument(
"--sleep",
type=float,
default=0.0,
help="Sleep between steps (simulate wall-clock time)",
)
args = parser.parse_args()

if args.seed is not None:
random.seed(args.seed)

config = {
"lr": args.lr,
"depth": args.depth,
"batch_size": args.batch_size,
"steps": args.steps,
}

trackio.init(project=args.project, name=args.run_name, config=config)

trackio.watch("train/loss", nan=True, max_value=10.0, spike_factor=3.0, window=10)

for step in range(args.steps):
train_loss = simulate_loss(
step, args.steps, args.lr, args.depth, args.batch_size
)

if args.spike_at_step is not None and step == args.spike_at_step:
train_loss *= 10.0

val_loss = simulate_val_loss(train_loss, step, args.steps, args.depth)
accuracy = simulate_accuracy(val_loss)

if val_loss > train_loss * 1.5 and step > args.steps * 0.5:
trackio.alert(
"Overfitting detected",
text=f"Val loss ({val_loss:.4f}) >> train loss ({train_loss:.4f}) at step {step}",
level=AlertLevel.WARN,
)

trackio.log(
{
"train/loss": round(train_loss, 4),
"val/loss": round(val_loss, 4),
"accuracy": round(accuracy, 4),
"lr": args.lr,
},
step=step,
)

if trackio.should_stop():
print(f"TERMINATED EARLY: watcher triggered stop at step {step}")
break

if args.sleep > 0:
time.sleep(args.sleep)

trackio.alert(
"Training complete",
text=f"Finished at step {step}.",
level=AlertLevel.INFO,
)
trackio.finish()
print(f"Training complete. Final step: {step}")


if __name__ == "__main__":
main()
Loading
Loading