Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ AI Efficiency Platform (`ai-efficiency`) is a standalone system for measuring an

- The backend is the central orchestration point for auth, repo management, analysis, attribution, deployment control, and webhook handling.
- The frontend is built separately and embedded into the backend binary for deployment.
- `ae-cli start` bootstraps a session with the backend, writes local workspace/runtime state, and can start a local session proxy for Codex and Claude.
- `ae-cli start` remains the primary interactive CLI workflow: it bootstraps a session with the backend, writes local workspace/runtime state, and can start a local session proxy for Codex and Claude.
- A newer sessionless local attribution path has landed partially for tool-local ingest and checkpoint binding, but it has not yet replaced the session/local-proxy workflow.
- Production deployment currently supports Docker Compose and Linux systemd.

## Repository Layout
Expand Down
27 changes: 4 additions & 23 deletions ae-cli/internal/attributionlocal/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,12 @@ import (
)

type ScanState struct {
CodexSQLite CodexSQLiteWatermark `json:"codex_sqlite"`
}

type Scanner struct {
codexSQLite *CodexSQLiteParser
}
type Scanner struct{}

func NewScanner() *Scanner {
return &Scanner{codexSQLite: NewCodexSQLiteParser()}
return &Scanner{}
}

func (s *Scanner) ScanWorkspace(workspaceRoot string, state ScanState) ([]LocalToolUsageEvent, ScanState, error) {
Expand All @@ -29,19 +26,6 @@ func (s *Scanner) ScanWorkspace(workspaceRoot string, state ScanState) ([]LocalT
}
homeDir, _ := os.UserHomeDir()

codexDB := filepath.Join(homeDir, ".codex", "logs_2.sqlite")
if _, err := os.Stat(codexDB); err == nil {
items, wm, err := s.codexSQLite.Parse(codexDB, state.CodexSQLite)
if err != nil {
return nil, state, err
}
for _, item := range items {
item.WorkspaceID = workspaceID
out = append(out, item)
}
state.CodexSQLite = wm
}

for _, path := range findCodexJSONLFiles(workspaceRoot, homeDir) {
items, err := ParseCodexJSONLFallback(path, workspaceRoot)
if err != nil {
Expand Down Expand Up @@ -104,11 +88,8 @@ func dedupeAndSort(items []LocalToolUsageEvent) []LocalToolUsageEvent {
}

func findCodexJSONLFiles(workspaceRoot, homeDir string) []string {
var out []string
workspaceCodex := filepath.Join(workspaceRoot, ".ae", "codex-home")
out = append(out, walkFiles(workspaceCodex, ".jsonl")...)
out = append(out, walkFiles(filepath.Join(homeDir, ".codex"), ".jsonl")...)
return out
_ = workspaceRoot
return walkFiles(filepath.Join(homeDir, ".codex"), ".jsonl")
}

func findClaudeJSONLFiles(homeDir string) []string {
Expand Down
56 changes: 47 additions & 9 deletions ae-cli/internal/attributionlocal/scanner_test.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,62 @@
package attributionlocal

import "testing"
import (
"os"
"path/filepath"
"testing"
)

func TestScanner_SkipsAlreadyWatermarkedCodexRows(t *testing.T) {
func TestScanner_ScanWorkspaceReadsMatchingCodexJSONL(t *testing.T) {
fixture := buildAttributionFixture(t)
scanner := NewScanner()

first, state, err := scanner.ScanWorkspace(fixture.WorkspaceRoot, ScanState{})
first, _, err := scanner.ScanWorkspace(fixture.WorkspaceRoot, ScanState{})
if err != nil {
t.Fatalf("first scan: %v", err)
}
if len(first) == 0 {
t.Fatal("expected first scan events")
if len(first) != 1 {
t.Fatalf("first scan events = %d, want 1", len(first))
}
if first[0].DedupeKey != "codex-jsonl:sess-1:resp-1" {
t.Fatalf("dedupe key = %q, want %q", first[0].DedupeKey, "codex-jsonl:sess-1:resp-1")
}
}

func TestScanner_IgnoresGlobalCodexSQLiteTransportLogs(t *testing.T) {
fixture := buildSQLiteOnlyAttributionFixture(t)
scanner := NewScanner()

second, _, err := scanner.ScanWorkspace(fixture.WorkspaceRoot, state)
first, _, err := scanner.ScanWorkspace(fixture.WorkspaceRoot, ScanState{})
if err != nil {
t.Fatalf("second scan: %v", err)
t.Fatalf("scan: %v", err)
}
if len(first) != 0 {
t.Fatal("expected first scan events")
}
}

func TestFindCodexJSONLFiles_IgnoresWorkspaceScopedCodexHome(t *testing.T) {
workspaceRoot := t.TempDir()
homeDir := t.TempDir()

workspaceCodex := filepath.Join(workspaceRoot, ".ae", "codex-home", "sessions", "workspace.jsonl")
if err := os.MkdirAll(filepath.Dir(workspaceCodex), 0o700); err != nil {
t.Fatalf("mkdir workspace codex dir: %v", err)
}
if err := os.WriteFile(workspaceCodex, []byte("{}\n"), 0o600); err != nil {
t.Fatalf("write workspace codex file: %v", err)
}
if len(second) != 0 {
t.Fatalf("second scan events = %d, want 0", len(second))

globalCodex := filepath.Join(homeDir, ".codex", "sessions", "global.jsonl")
if err := os.MkdirAll(filepath.Dir(globalCodex), 0o700); err != nil {
t.Fatalf("mkdir global codex dir: %v", err)
}
if err := os.WriteFile(globalCodex, []byte("{}\n"), 0o600); err != nil {
t.Fatalf("write global codex file: %v", err)
}

paths := findCodexJSONLFiles(workspaceRoot, homeDir)
if len(paths) != 1 || paths[0] != globalCodex {
t.Fatalf("paths = %v, want only %s", paths, globalCodex)
}
}
34 changes: 31 additions & 3 deletions ae-cli/internal/attributionlocal/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

type BackendClient interface {
SendToolUsageEvent(ctx context.Context, req client.ToolUsageEventRequest) error
BindToolUsageEvents(ctx context.Context, req client.BindToolUsageEventsRequest) error
}

type SyncEngine struct {
Expand All @@ -27,6 +26,9 @@ func NewSyncEngine(c BackendClient) *SyncEngine {
}

func (e *SyncEngine) Replay(ctx context.Context, workspaceRoot string) error {
if e == nil || e.Client == nil {
return nil
}
spooled, err := loadSpooledEvents(e.spoolPath)
if err != nil {
return err
Expand Down Expand Up @@ -69,10 +71,21 @@ func (e *SyncEngine) RunForWorkspace(ctx context.Context, workspaceRoot string)
if err != nil {
return err
}
for _, ev := range events {
if err := e.Client.SendToolUsageEvent(ctx, toClientUsageRequest(ev)); err != nil {

if e.Client == nil {
if err := appendSpooledEvents(spoolPath, events); err != nil {
return err
}
return SaveJSON(statePath, nextState)
}

for idx, ev := range events {
if err := e.Client.SendToolUsageEvent(ctx, toClientUsageRequest(ev)); err != nil {
if err := appendSpooledEvents(spoolPath, events[idx:]); err != nil {
return err
}
return SaveJSON(statePath, nextState)
}
}
return SaveJSON(statePath, nextState)
}
Expand Down Expand Up @@ -106,11 +119,26 @@ func loadSpooledEvents(path string) ([]LocalToolUsageEvent, error) {
}
var out []LocalToolUsageEvent
if err := LoadJSON(path, &out); err != nil {
if os.IsNotExist(err) {
return nil, nil
}
return nil, err
}
return out, nil
}

func appendSpooledEvents(path string, events []LocalToolUsageEvent) error {
if path == "" || len(events) == 0 {
return nil
}
existing, err := loadSpooledEvents(path)
if err != nil {
return err
}
merged := append(existing, events...)
return SaveJSON(path, dedupeAndSort(merged))
}

func clearSpooledEvents(path string) error {
if path == "" {
return nil
Expand Down
48 changes: 48 additions & 0 deletions ae-cli/internal/attributionlocal/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package attributionlocal

import (
"context"
"os"
"path/filepath"
"testing"
)

Expand Down Expand Up @@ -47,3 +49,49 @@ func TestSync_ReplayDropsAlreadyUploadedPrefixOnFailure(t *testing.T) {
t.Fatalf("remaining = %+v, want only second item", remaining)
}
}

func TestSync_RunForWorkspaceWithoutClientSpoolsNewEvents(t *testing.T) {
fixture := buildAttributionFixture(t)
engine := &SyncEngine{
Scanner: NewScanner(),
}

if err := engine.RunForWorkspace(context.Background(), fixture.WorkspaceRoot); err != nil {
t.Fatalf("RunForWorkspace: %v", err)
}

spooled, err := loadSpooledEvents(filepath.Join(AttributionRootDir(), "spool.json"))
if err != nil {
t.Fatalf("loadSpooledEvents: %v", err)
}
if len(spooled) == 0 {
t.Fatal("expected new events to be spooled when no backend client is configured")
}
}

func TestSync_RunForWorkspaceSpoolsNewEventsWhenUploadFails(t *testing.T) {
fixture := buildAttributionFixture(t)

engine := &SyncEngine{
Scanner: NewScanner(),
Client: &syncBackendClientStub{
failOn: "codex-jsonl:sess-1:resp-1",
},
}

if err := engine.RunForWorkspace(context.Background(), fixture.WorkspaceRoot); err != nil {
t.Fatalf("RunForWorkspace: %v", err)
}

spoolPath := filepath.Join(AttributionRootDir(), "spool.json")
spooled, err := loadSpooledEvents(spoolPath)
if err != nil {
t.Fatalf("loadSpooledEvents: %v", err)
}
if len(spooled) != 1 || spooled[0].DedupeKey != "codex-jsonl:sess-1:resp-1" {
t.Fatalf("spooled = %+v, want the failed scanned event", spooled)
}
if _, err := os.Stat(filepath.Join(AttributionRootDir(), "scan-state.json")); err != nil {
t.Fatalf("expected scan state to be persisted after spooling, stat err=%v", err)
}
}
27 changes: 23 additions & 4 deletions ae-cli/internal/attributionlocal/test_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,29 @@ type attributionFixture struct {
func buildAttributionFixture(t *testing.T) attributionFixture {
t.Helper()

root := fixtureRepoRoot(t)
home := t.TempDir()
t.Setenv("HOME", home)
codexSessions := filepath.Join(home, ".codex", "sessions", "2026", "05", "13")
if err := os.MkdirAll(codexSessions, 0o700); err != nil {
t.Fatalf("mkdir codex home: %v", err)
}
codexJSONL := filepath.Join(codexSessions, "sess-1.jsonl")
codexBody := `{"type":"session_meta","payload":{"id":"sess-1","cwd":"` + root + `"}}
{"type":"event_msg","payload":{"type":"token_count","response_id":"resp-1","info":{"last_token_usage":{"input_tokens":12,"cached_input_tokens":4,"output_tokens":5,"reasoning_output_tokens":2,"total_tokens":23}}}}`
if err := os.WriteFile(codexJSONL, []byte(codexBody), 0o600); err != nil {
t.Fatalf("write codex jsonl: %v", err)
}

return attributionFixture{
WorkspaceRoot: root,
HomeDir: home,
}
}

func buildSQLiteOnlyAttributionFixture(t *testing.T) attributionFixture {
t.Helper()

root := fixtureRepoRoot(t)
home := t.TempDir()
t.Setenv("HOME", home)
Expand Down Expand Up @@ -142,10 +165,6 @@ func (s *syncBackendClientStub) SendToolUsageEvent(_ context.Context, req client
return nil
}

func (s *syncBackendClientStub) BindToolUsageEvents(_ context.Context, _ client.BindToolUsageEventsRequest) error {
return nil
}

func (s *syncBackendClientStub) SawUpload(dedupeKey string) bool {
for _, item := range s.uploads {
if item == dedupeKey {
Expand Down
29 changes: 0 additions & 29 deletions ae-cli/internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,6 @@ type ToolUsageEventRequest struct {
RawPayload map[string]any `json:"raw_payload,omitempty"`
}

type BindToolUsageEventsRequest struct {
WorkspaceID string `json:"workspace_id"`
CommitCheckpointID int `json:"commit_checkpoint_id"`
CommitCapturedAt time.Time `json:"commit_captured_at"`
PreviousCapturedAt time.Time `json:"previous_captured_at"`
}

func New(baseURL, token string) *Client {
return &Client{
baseURL: baseURL,
Expand Down Expand Up @@ -480,28 +473,6 @@ func (c *Client) SendToolUsageEvent(ctx context.Context, req ToolUsageEventReque
return nil
}

func (c *Client) BindToolUsageEvents(ctx context.Context, req BindToolUsageEventsRequest) error {
body, err := json.Marshal(req)
if err != nil {
return fmt.Errorf("marshal tool usage bind request: %w", err)
}
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/api/v1/tool-usage-events/bind", bytes.NewReader(body))
if err != nil {
return fmt.Errorf("create tool usage bind request: %w", err)
}
c.setHeaders(httpReq)
resp, err := c.httpClient.Do(httpReq)
if err != nil {
return fmt.Errorf("send tool usage bind request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
respBody, _ := io.ReadAll(resp.Body)
return fmt.Errorf("unexpected tool usage bind status %d: %s", resp.StatusCode, string(respBody))
}
return nil
}

func (c *Client) BaseURL() string {
if c == nil {
return ""
Expand Down
24 changes: 0 additions & 24 deletions ae-cli/internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,30 +575,6 @@ func TestSendToolUsageEvent(t *testing.T) {
}
}

func TestBindToolUsageEvents(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
t.Errorf("method = %s, want POST", r.Method)
}
if r.URL.Path != "/api/v1/tool-usage-events/bind" {
t.Errorf("path = %s, want /api/v1/tool-usage-events/bind", r.URL.Path)
}
w.WriteHeader(http.StatusOK)
}))
defer srv.Close()

c := New(srv.URL, "tok")
err := c.BindToolUsageEvents(context.Background(), BindToolUsageEventsRequest{
WorkspaceID: "ws-1",
CommitCheckpointID: 101,
CommitCapturedAt: time.Now().UTC(),
PreviousCapturedAt: time.Now().UTC().Add(-1 * time.Minute),
})
if err != nil {
t.Fatalf("BindToolUsageEvents: %v", err)
}
}

func TestHeartbeatNotFound(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
Expand Down
Loading