-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathhttp_mesh.go
More file actions
309 lines (268 loc) · 9.77 KB
/
Copy pathhttp_mesh.go
File metadata and controls
309 lines (268 loc) · 9.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
package nara
import (
"encoding/json"
"net/http"
"time"
"github.com/sirupsen/logrus"
)
// Mesh Event Sync HTTP handlers
// POST /events/sync - Sync events via mesh (unified sync backbone)
// Used by booting naras to recover event history from neighbors
// Supports both social events and ping observations
func (network *Network) httpEventsSyncHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
var req SyncRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Invalid JSON", http.StatusBadRequest)
return
}
// Validate request
if req.From == "" {
http.Error(w, "from is required", http.StatusBadRequest)
return
}
// Route based on mode
var events []SyncEvent
var nextCursor string
if network.local.SyncLedger == nil {
// No ledger, return empty
events = []SyncEvent{}
} else {
switch req.Mode {
case "sample":
// Decay-weighted sampling for boot recovery (organic hazy memory)
sampleSize := req.SampleSize
if sampleSize <= 0 || sampleSize > 5000 {
sampleSize = 5000
}
events = network.local.SyncLedger.SampleEvents(sampleSize, network.meName(), req.Services, req.Subjects)
logrus.Printf("📤 mesh sync to %s: sampled %d events (mode: sample)", req.From, len(events))
case "page":
// Cursor-based pagination for backup/checkpoint sync (complete retrieval)
pageSize := req.PageSize
if pageSize <= 0 || pageSize > 5000 {
pageSize = 5000
}
events, nextCursor = network.local.SyncLedger.GetEventsPage(req.Cursor, pageSize, req.Services, req.Subjects)
logrus.Printf("📤 mesh sync to %s: page %d events (mode: page, cursor: %s, next: %s)", req.From, len(events), req.Cursor, nextCursor)
case "recent":
// Most recent N events for web UI
limit := req.Limit
if limit <= 0 || limit > 5000 {
limit = 100
}
events = network.local.SyncLedger.GetRecentEvents(limit, req.Services, req.Subjects)
logrus.Printf("📤 mesh sync to %s: recent %d events (mode: recent)", req.From, len(events))
default:
// Legacy mode (backward compatibility)
// Sanity check slice params
sliceTotal := req.SliceTotal
if sliceTotal < 1 {
sliceTotal = 1
}
sliceIndex := req.SliceIndex
if sliceIndex < 0 || sliceIndex >= sliceTotal {
sliceIndex = 0
}
// Default max events
maxEvents := req.MaxEvents
if maxEvents <= 0 || maxEvents > 5000 {
maxEvents = 5000
}
events = network.local.SyncLedger.GetEventsForSync(
req.Services,
req.Subjects,
req.SinceTime,
sliceIndex,
sliceTotal,
maxEvents,
)
logrus.Printf("📤 mesh sync to %s: sent %d events (legacy mode, slice %d/%d)", req.From, len(events), sliceIndex+1, sliceTotal)
}
}
// Create signed response
response := NewSignedSyncResponse(network.meName(), events, network.local.Keypair)
response.NextCursor = nextCursor // Set cursor for page mode
w.Header().Set("Content-Type", "application/json; charset=utf-8")
if err := json.NewEncoder(w).Encode(response); err != nil {
logrus.WithError(err).Warn("Failed to encode response")
}
}
// POST /gossip/zine - Bidirectional zine exchange for P2P event gossip
// Receives a zine, merges events, returns our zine
func (network *Network) httpGossipZineHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
// Decode incoming zine
var theirZine Zine
if err := json.NewDecoder(r.Body).Decode(&theirZine); err != nil {
http.Error(w, "Invalid JSON", http.StatusBadRequest)
return
}
// Validate basic fields
if theirZine.From == "" {
http.Error(w, "from is required", http.StatusBadRequest)
return
}
// Verify signature if we know their public key
pubKey := network.resolvePublicKeyForNara(theirZine.From)
if len(pubKey) > 0 && !VerifyZine(&theirZine, pubKey) {
logrus.Warnf("📰 Invalid zine signature from %s, rejecting", theirZine.From)
http.Error(w, "Invalid signature", http.StatusForbidden)
return
}
// Merge their events into our ledger
added, _ := network.MergeSyncEventsWithVerification(theirZine.Events)
if added > 0 && network.logService != nil {
network.logService.BatchGossipMerge(theirZine.From, added)
}
// Mark sender as online - receiving a zine proves they're reachable
// UNLESS they sent a chau event (graceful shutdown announcement)
senderIsShuttingDown := false
for _, e := range theirZine.Events {
if e.Service == ServiceChau && e.Chau != nil && e.Chau.From == theirZine.From {
senderIsShuttingDown = true
break
}
}
if !senderIsShuttingDown {
// Their events in the zine already prove they're active, no need for seen event
network.recordObservationOnlineNara(theirZine.From, theirZine.CreatedAt)
}
// Create our zine to send back (bidirectional exchange)
myZine := network.createZine()
if myZine == nil {
// Even if we have no events, send empty signed zine
myZine = &Zine{
From: network.meName(),
CreatedAt: time.Now().Unix(),
Events: []SyncEvent{},
}
// Sign the empty zine for consistency
if sig, err := SignZine(myZine, network.local.Keypair); err == nil {
myZine.Signature = sig
}
}
// Return our zine
w.Header().Set("Content-Type", "application/json; charset=utf-8")
if err := json.NewEncoder(w).Encode(myZine); err != nil {
logrus.WithError(err).Warn("Failed to encode response")
}
}
// POST /dm - Receive a direct message (arbitrary SyncEvent)
// This is a generic endpoint for naras to send events directly to each other.
// Events are added to the local ledger and spread via gossip.
func (network *Network) httpDMHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
var event SyncEvent
if err := json.NewDecoder(r.Body).Decode(&event); err != nil {
http.Error(w, "Invalid JSON", http.StatusBadRequest)
return
}
// Event must be signed
if !event.IsSigned() {
http.Error(w, "Unsigned event rejected", http.StatusBadRequest)
return
}
// Verify signature against emitter's public key
pubKey := network.resolvePublicKeyForNara(event.Emitter)
if pubKey == nil {
http.Error(w, "Unknown emitter", http.StatusForbidden)
return
}
if !event.VerifyWithKey(pubKey) {
logrus.Warnf("📬 Invalid DM signature from %s, rejecting", event.Emitter)
http.Error(w, "Invalid signature", http.StatusForbidden)
return
}
// Add to local ledger (with personality filtering for social events)
added := false
if event.Service == ServiceSocial {
added = network.local.SyncLedger.AddSocialEventFiltered(event, network.local.Me.Status.Personality)
} else {
added = network.local.SyncLedger.AddEvent(event)
}
// Trigger projection updates
if added && network.local.Projections != nil {
network.local.Projections.Trigger()
}
// Broadcast to local SSE clients (all event types, not just social)
if added {
network.broadcastSSE(event)
}
// Mark sender as online (unless this is a chau event - those mean "I'm shutting down")
if event.Service != ServiceChau {
// The DM itself is an event they emitted - they prove themselves
network.recordObservationOnlineNara(event.Emitter, event.Timestamp/1e9)
}
// Log via LogService (batched) - skip for social events since the ledger listener handles teases
if network.logService != nil && event.Service != ServiceSocial {
network.logService.BatchDMReceived(event.Emitter)
}
w.Header().Set("Content-Type", "application/json; charset=utf-8")
if err := json.NewEncoder(w).Encode(map[string]interface{}{
"success": added,
"from": network.meName(),
}); err != nil {
logrus.WithError(err).Warn("Failed to encode response")
}
}
// Network Coordinate HTTP handlers
// GET /ping - Lightweight latency probe for Vivaldi coordinates
// Returns server timestamp and nara name for RTT measurement
func (network *Network) httpPingHandler(w http.ResponseWriter, r *http.Request) {
// Track who's pinging us (with mutex for concurrent safety)
caller := r.Header.Get("X-Nara-From")
if caller == "" {
caller = r.RemoteAddr
}
// Log via LogService (batched)
if network.logService != nil {
network.logService.BatchPingsReceived(caller)
}
w.Header().Set("Content-Type", "application/json; charset=utf-8")
if err := json.NewEncoder(w).Encode(map[string]interface{}{
"t": time.Now().UnixNano(),
"from": network.meName(),
"public_key": network.local.Me.Status.PublicKey,
"mesh_ip": network.local.Me.Status.MeshIP,
}); err != nil {
logrus.WithError(err).Warn("Failed to encode response")
}
}
// GET /coordinates - This nara's Vivaldi coordinates
func (network *Network) httpCoordinatesHandler(w http.ResponseWriter, r *http.Request) {
network.local.Me.mu.Lock()
coords := network.local.Me.Status.Coordinates
network.local.Me.mu.Unlock()
response := map[string]interface{}{
"name": network.meName(),
"coordinates": coords,
}
w.Header().Set("Content-Type", "application/json; charset=utf-8")
if err := json.NewEncoder(w).Encode(response); err != nil {
logrus.WithError(err).Warn("Failed to encode response")
}
}
// Mesh Authentication for HTTP Handlers
//
// All mesh endpoints are protected by meshAuthMiddleware, which:
// 1. Verifies X-Nara-Name, X-Nara-Timestamp, X-Nara-Signature headers
// 2. Rejects requests with invalid/expired signatures (30s window)
// 3. Sets X-Nara-Verified header with the authenticated sender name
//
// Handlers should:
// - Use getVerifiedSender(r) to get the authenticated sender (never trust req.From)
// - NOT implement their own signature verification (mesh auth handles it)
// - NOT include Timestamp/Signature fields in request structs (redundant)
//
// Request structs can still include a "From" field for logging/debugging,
// but handlers should verify it matches getVerifiedSender() if used.