Skip to content

Commit 196ea28

Browse files
committed
fix(session): keep read-surface cutover scoped
1 parent 359bba3 commit 196ea28

9 files changed

Lines changed: 556 additions & 83 deletions

File tree

ae-cli/cmd/hook.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ package cmd
33
import (
44
"context"
55
"os"
6+
"time"
67

78
"github.com/ai-efficiency/ae-cli/internal/attributionlocal"
89
"github.com/ai-efficiency/ae-cli/internal/hooks"
10+
"github.com/ai-efficiency/ae-cli/internal/proxy"
911
"github.com/spf13/cobra"
1012
)
1113

@@ -47,6 +49,30 @@ var hookPostRewriteCmd = &cobra.Command{
4749
},
4850
}
4951

52+
var hookSessionEventCmd = &cobra.Command{
53+
Use: "session-event",
54+
Short: "Forward tool hook events to the local proxy (hidden)",
55+
Hidden: true,
56+
RunE: func(cmd *cobra.Command, args []string) error {
57+
tool, err := cmd.Flags().GetString("tool")
58+
if err != nil {
59+
return err
60+
}
61+
err = proxy.ForwardHookEvent(context.Background(), os.Stdin, proxy.HookForwardRequest{
62+
Tool: tool,
63+
LocalProxyURL: os.Getenv("AE_LOCAL_PROXY_URL"),
64+
LocalProxyToken: os.Getenv("AE_LOCAL_PROXY_TOKEN"),
65+
SessionID: os.Getenv("AE_SESSION_ID"),
66+
WorkspaceID: os.Getenv("AE_WORKSPACE_ID"),
67+
CapturedAt: time.Now().UTC(),
68+
})
69+
if err != nil {
70+
return nil
71+
}
72+
return nil
73+
},
74+
}
75+
5076
var hookAttributionSyncCmd = &cobra.Command{
5177
Use: "attribution-sync",
5278
Short: "Run local attribution sync (hidden)",
@@ -61,6 +87,8 @@ var hookAttributionSyncCmd = &cobra.Command{
6187
func init() {
6288
hookCmd.AddCommand(hookPostCommitCmd)
6389
hookCmd.AddCommand(hookPostRewriteCmd)
90+
hookSessionEventCmd.Flags().String("tool", "", "originating tool name")
91+
hookCmd.AddCommand(hookSessionEventCmd)
6492
hookCmd.AddCommand(hookAttributionSyncCmd)
6593
rootCmd.AddCommand(hookCmd)
6694
}

ae-cli/cmd/hook_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,21 @@ func TestHookCommandHasPostRewriteSubcommand(t *testing.T) {
2323
}
2424
}
2525

26+
func TestHookCommandHasSessionEventSubcommand(t *testing.T) {
27+
var found bool
28+
for _, c := range hookCmd.Commands() {
29+
if c.Name() == "session-event" {
30+
found = true
31+
if !c.Hidden {
32+
t.Fatalf("expected hook session-event to be hidden")
33+
}
34+
}
35+
}
36+
if !found {
37+
t.Fatalf("expected hidden subcommand 'ae-cli hook session-event' to exist")
38+
}
39+
}
40+
2641
func TestHookPostCommitCommandQueuesWhenUploaderUnsupported(t *testing.T) {
2742
repo := initRepoWithCommitForCmdTests(t)
2843

ae-cli/internal/hooks/handler.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616

1717
"github.com/ai-efficiency/ae-cli/internal/attributionlocal"
1818
"github.com/ai-efficiency/ae-cli/internal/collector"
19+
"github.com/ai-efficiency/ae-cli/internal/proxy"
1920
"github.com/ai-efficiency/ae-cli/internal/session"
2021
)
2122

@@ -235,6 +236,16 @@ func (h *Handler) PostCommit(ctx context.Context, cwd string) error {
235236
CapturedAt: time.Now().UTC().Format(time.RFC3339),
236237
}
237238

239+
if rt, err := session.ReadRuntimeBundle(sessionID); err == nil && rt != nil && rt.Proxy != nil {
240+
if err := proxy.PostEvent(ctx, rt.Proxy.ListenAddr, rt.Proxy.AuthToken, proxy.EventEnvelope{
241+
EventType: "post_commit",
242+
SessionID: sessionID,
243+
Payload: ev,
244+
}); err == nil {
245+
return nil
246+
}
247+
}
248+
238249
if h == nil || h.uploader == nil {
239250
// No uploader wired; behave like upload failure (queue best-effort).
240251
q, err := NewLocalQueue(sessionID)
@@ -355,6 +366,16 @@ func (h *Handler) PostRewrite(ctx context.Context, cwd string, rewriteType strin
355366
CapturedAt: time.Now().UTC().Format(time.RFC3339),
356367
}
357368

369+
if rt, err := session.ReadRuntimeBundle(sessionID); err == nil && rt != nil && rt.Proxy != nil {
370+
if err := proxy.PostEvent(ctx, rt.Proxy.ListenAddr, rt.Proxy.AuthToken, proxy.EventEnvelope{
371+
EventType: "post_rewrite",
372+
SessionID: sessionID,
373+
Payload: ev,
374+
}); err == nil {
375+
continue
376+
}
377+
}
378+
358379
if h == nil || h.uploader == nil {
359380
if q != nil {
360381
_ = q.Enqueue(ev)

ae-cli/internal/hooks/handler_test.go

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package hooks
22

33
import (
4+
"bufio"
45
"bytes"
56
"context"
67
"encoding/json"
@@ -12,6 +13,7 @@ import (
1213
"testing"
1314

1415
"github.com/ai-efficiency/ae-cli/internal/collector"
16+
"github.com/ai-efficiency/ae-cli/internal/proxy"
1517
"github.com/ai-efficiency/ae-cli/internal/session"
1618
)
1719

@@ -29,13 +31,81 @@ func (f *fakeUploader) UploadHookEvent(ctx context.Context, ev HookEvent) error
2931
return f.err
3032
}
3133

34+
func startRealProxy(t *testing.T, sessionID string) (listenAddr, authToken string) {
35+
t.Helper()
36+
37+
cfg := proxy.RuntimeConfig{
38+
SessionID: sessionID,
39+
ListenAddr: "127.0.0.1:0",
40+
AuthToken: "proxy-token",
41+
}
42+
result, err := proxy.Spawn(cfg)
43+
if err != nil {
44+
t.Fatalf("proxy.Spawn: %v", err)
45+
}
46+
t.Cleanup(func() {
47+
if err := proxy.Stop(proxy.StopRequest{
48+
PID: result.PID,
49+
ListenAddr: result.ListenAddr,
50+
AuthToken: cfg.AuthToken,
51+
ConfigPath: result.ConfigPath,
52+
}); err != nil {
53+
t.Fatalf("proxy.Stop: %v", err)
54+
}
55+
})
56+
57+
return result.ListenAddr, cfg.AuthToken
58+
}
59+
60+
func writeRuntimeWithProxy(t *testing.T, sessionID, listenAddr, authToken string) {
61+
t.Helper()
62+
if err := session.WriteRuntimeBundle(&session.RuntimeBundle{
63+
SessionID: sessionID,
64+
Proxy: &session.ProxyRuntime{
65+
ListenAddr: listenAddr,
66+
AuthToken: authToken,
67+
},
68+
}); err != nil {
69+
t.Fatalf("WriteRuntimeBundle: %v", err)
70+
}
71+
}
72+
3273
func writeMarker(t *testing.T, repo, sessionID string) {
3374
t.Helper()
3475
if err := session.WriteMarker(repo, &session.Marker{SessionID: sessionID, RepoFullName: "origin"}); err != nil {
3576
t.Fatalf("WriteMarker: %v", err)
3677
}
3778
}
3879

80+
func readProxySpoolEvents(t *testing.T, sessionID string) []proxy.EventEnvelope {
81+
t.Helper()
82+
83+
path := proxy.EventSpoolPath(sessionID)
84+
f, err := os.Open(path)
85+
if err != nil {
86+
t.Fatalf("open proxy spool: %v", err)
87+
}
88+
defer f.Close()
89+
90+
var out []proxy.EventEnvelope
91+
sc := bufio.NewScanner(f)
92+
for sc.Scan() {
93+
line := strings.TrimSpace(sc.Text())
94+
if line == "" {
95+
continue
96+
}
97+
var ev proxy.EventEnvelope
98+
if err := json.Unmarshal([]byte(line), &ev); err != nil {
99+
t.Fatalf("unmarshal proxy spool line: %v", err)
100+
}
101+
out = append(out, ev)
102+
}
103+
if err := sc.Err(); err != nil {
104+
t.Fatalf("scan proxy spool: %v", err)
105+
}
106+
return out
107+
}
108+
39109
func git2(t *testing.T, dir string, args ...string) string {
40110
t.Helper()
41111
cmd := exec.Command("git", args...)
@@ -133,6 +203,128 @@ func TestPostCommitBootstrapsMarkerFromEnv(t *testing.T) {
133203
}
134204
}
135205

206+
func TestPostCommitSendsEventToLocalProxyBeforeQueueFallback(t *testing.T) {
207+
repo := initRepoWithCommit2(t)
208+
home := t.TempDir()
209+
t.Setenv("HOME", home)
210+
211+
listenAddr, authToken := startRealProxy(t, "sess-1")
212+
writeRuntimeWithProxy(t, "sess-1", listenAddr, authToken)
213+
writeMarker(t, repo, "sess-1")
214+
215+
h := NewHandler(nil)
216+
if err := h.PostCommit(context.Background(), repo); err != nil {
217+
t.Fatalf("PostCommit: %v", err)
218+
}
219+
220+
q, err := NewLocalQueue("sess-1")
221+
if err != nil {
222+
t.Fatalf("NewLocalQueue: %v", err)
223+
}
224+
items, err := q.List()
225+
if err != nil {
226+
t.Fatalf("List: %v", err)
227+
}
228+
if len(items) != 0 {
229+
t.Fatalf("queued items = %d, want 0 (expected local proxy ingress to accept event)", len(items))
230+
}
231+
232+
events := readProxySpoolEvents(t, "sess-1")
233+
if len(events) != 1 {
234+
t.Fatalf("proxy spool events = %d, want 1", len(events))
235+
}
236+
if got := events[0].EventType; got != "post_commit" {
237+
t.Fatalf("event_type = %q, want %q", got, "post_commit")
238+
}
239+
if got := events[0].SessionID; got != "sess-1" {
240+
t.Fatalf("session_id = %q, want %q", got, "sess-1")
241+
}
242+
}
243+
244+
func TestPostCommitFallsBackToQueueWhenProxyTokenInvalid(t *testing.T) {
245+
repo := initRepoWithCommit2(t)
246+
home := t.TempDir()
247+
t.Setenv("HOME", home)
248+
249+
listenAddr, _ := startRealProxy(t, "sess-1")
250+
writeRuntimeWithProxy(t, "sess-1", listenAddr, "wrong-token")
251+
writeMarker(t, repo, "sess-1")
252+
253+
h := NewHandler(nil)
254+
if err := h.PostCommit(context.Background(), repo); err != nil {
255+
t.Fatalf("PostCommit: %v", err)
256+
}
257+
258+
q, err := NewLocalQueue("sess-1")
259+
if err != nil {
260+
t.Fatalf("NewLocalQueue: %v", err)
261+
}
262+
items, err := q.List()
263+
if err != nil {
264+
t.Fatalf("List: %v", err)
265+
}
266+
if len(items) != 1 {
267+
t.Fatalf("queued items = %d, want 1", len(items))
268+
}
269+
}
270+
271+
func TestPostRewriteSendsEventToLocalProxyBeforeQueueFallback(t *testing.T) {
272+
repo := initRepoWithCommit2(t)
273+
home := t.TempDir()
274+
t.Setenv("HOME", home)
275+
276+
listenAddr, authToken := startRealProxy(t, "sess-1")
277+
writeRuntimeWithProxy(t, "sess-1", listenAddr, authToken)
278+
writeMarker(t, repo, "sess-1")
279+
280+
h := NewHandler(nil)
281+
if err := h.PostRewrite(context.Background(), repo, "amend", strings.NewReader("oldsha1 newsha1\n")); err != nil {
282+
t.Fatalf("PostRewrite: %v", err)
283+
}
284+
285+
q, err := NewLocalQueue("sess-1")
286+
if err != nil {
287+
t.Fatalf("NewLocalQueue: %v", err)
288+
}
289+
items, err := q.List()
290+
if err != nil {
291+
t.Fatalf("List: %v", err)
292+
}
293+
if len(items) != 0 {
294+
t.Fatalf("queued items = %d, want 0 (expected local proxy ingress to accept event)", len(items))
295+
}
296+
297+
events := readProxySpoolEvents(t, "sess-1")
298+
if len(events) != 1 {
299+
t.Fatalf("proxy spool events = %d, want 1", len(events))
300+
}
301+
if got := events[0].EventType; got != "post_rewrite" {
302+
t.Fatalf("event_type = %q, want %q", got, "post_rewrite")
303+
}
304+
}
305+
306+
func TestProxyIngressStampsSessionID(t *testing.T) {
307+
home := t.TempDir()
308+
t.Setenv("HOME", home)
309+
310+
listenAddr, authToken := startRealProxy(t, "sess-1")
311+
if err := proxy.PostEvent(context.Background(), listenAddr, authToken, proxy.EventEnvelope{
312+
EventType: "post_commit",
313+
SessionID: "different-session",
314+
Payload: map[string]any{"x": "y"},
315+
}); err != nil {
316+
t.Fatalf("PostEvent: %v", err)
317+
}
318+
319+
events := readProxySpoolEvents(t, "sess-1")
320+
if len(events) != 1 {
321+
t.Fatalf("proxy spool events = %d, want 1", len(events))
322+
}
323+
if got := events[0].SessionID; got != "sess-1" {
324+
t.Fatalf("session_id = %q, want proxy session %q", got, "sess-1")
325+
}
326+
}
327+
136328
func TestPostCommitQueuesEventWhenUploadFails(t *testing.T) {
137329
repo := initRepoWithCommit2(t)
138330

backend/internal/handler/router.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,9 @@ func SetupRouter(
162162
// Sessions (ae-cli)
163163
sessionGroup := protected.Group("/sessions")
164164
{
165+
sessionGroup.GET("", sessionHandler.List)
166+
sessionGroup.GET("/:id", sessionHandler.Get)
167+
sessionGroup.GET("/:id/provider-credentials", sessionHandler.ProviderCredential)
165168
sessionGroup.POST("/bootstrap", sessionHandler.Bootstrap)
166169
sessionGroup.POST("", sessionHandler.Create)
167170
sessionGroup.PUT("/:id", sessionHandler.Update)

0 commit comments

Comments
 (0)