-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathintegration_checkpoint_test.go
More file actions
439 lines (361 loc) ยท 14.3 KB
/
Copy pathintegration_checkpoint_test.go
File metadata and controls
439 lines (361 loc) ยท 14.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
package nara
import (
"fmt"
"testing"
"time"
"github.com/eljojo/nara/identity"
"github.com/eljojo/nara/types"
)
// TestIntegration_CheckpointConsensus tests the two-round checkpoint consensus mechanism
// via MQTT. It verifies:
// - A nara can propose a checkpoint about itself
// - Other naras receive the proposal and vote
// - Signatures are verified
// - Consensus is reached and checkpoint is finalized
// - The checkpoint is stored in all ledgers
func TestIntegration_CheckpointConsensus(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
t.Parallel()
// Start embedded MQTT broker on dynamic port
_, port := startTestMQTTBrokerDynamic(t)
t.Log("๐งช Testing checkpoint consensus mechanism")
// Need MinVotersRequired + 1 naras (1 proposer + MinVotersRequired voters)
const numNaras = MinVotersRequired + 1
// Create nara names
names := make([]string, numNaras)
for i := 0; i < numNaras; i++ {
names[i] = fmt.Sprintf("checkpoint-test-%d", i)
}
// Start all naras with full discovery
// Note: cleanup is automatically registered by startTestNaras()
naras := startTestNaras(t, port, names, true)
// Configure checkpoint services with longer vote window for CI stability
for _, ln := range naras {
if ln.Network.checkpointService != nil {
ln.Network.checkpointService.voteWindow = 2 * time.Second
// Ensure checkpoint service has MQTT client reference
if ln.Network.Mqtt != nil {
ln.Network.checkpointService.SetMQTTClient(ln.Network.Mqtt)
}
}
}
t.Log("โ
All naras started and discovered")
// Add some observation events so naras have data about each other
// This ensures DeriveRestartCount and DeriveTotalUptime return meaningful values
t.Log("๐ Adding observation events...")
for i, observer := range naras {
for j, subject := range naras {
if i == j {
continue
}
// Add first-seen observation
firstSeenEvent := NewFirstSeenObservationEvent(
observer.Me.Name,
subject.Me.Name,
time.Now().Unix()-86400, // first seen 1 day ago
)
observer.SyncLedger.AddEvent(firstSeenEvent)
// Add restart observation
restartEvent := NewRestartObservationEvent(
observer.Me.Name,
subject.Me.Name,
time.Now().Unix()-3600, // started 1 hour ago
5, // 5 restarts
)
observer.SyncLedger.AddEvent(restartEvent)
}
}
// Give events time to propagate via MQTT
time.Sleep(1 * time.Second)
// Now have the first nara propose a checkpoint about itself
proposer := naras[0]
t.Logf("๐ค %s proposing checkpoint...", proposer.Me.Name)
if proposer.Network.checkpointService == nil {
t.Fatal("โ Checkpoint service not initialized")
}
// Verify MQTT is connected
waitForMQTTConnected(t, proposer, 3*time.Second)
t.Logf("โ
MQTT connected for proposer")
// Verify checkpoint service has ledger
if proposer.Network.checkpointService.ledger == nil {
t.Fatal("โ Checkpoint service ledger is nil - initialization order bug")
}
t.Logf("โ
Checkpoint service has ledger with %d events", len(proposer.SyncLedger.Events))
// Trigger the checkpoint proposal
proposer.Network.checkpointService.ProposeCheckpoint()
// Wait for checkpoint to be finalized (vote window + processing)
t.Log("โณ Waiting for checkpoint finalization...")
checkpoint := waitForCheckpoint(t, proposer.SyncLedger, proposer.Me.Name, 10*time.Second)
// Check if checkpoint was created - this MUST succeed
t.Log("๐ Checking for finalized checkpoint...")
if checkpoint == nil {
t.Fatal("โ No checkpoint found in proposer's ledger - consensus failed")
}
t.Logf("โ
Checkpoint found in proposer's ledger:")
t.Logf(" Subject: %s", checkpoint.Subject)
t.Logf(" SubjectID: %s", checkpoint.SubjectID)
t.Logf(" Restarts: %d", checkpoint.Observation.Restarts)
t.Logf(" TotalUptime: %d", checkpoint.Observation.TotalUptime)
t.Logf(" VoterIDs: %v", checkpoint.VoterIDs)
t.Logf(" Signatures: %d", len(checkpoint.Signatures))
// Verify we got enough voters
if len(checkpoint.VoterIDs) < MinVotersRequired {
t.Fatalf("โ Insufficient voters: got %d, need %d", len(checkpoint.VoterIDs), MinVotersRequired)
}
// Wait for checkpoint to propagate to other naras
waitForCheckpointPropagation(t, naras[1:], proposer.Me.Name, 3*time.Second)
checkpointsPropagated := 0
for _, ln := range naras[1:] {
if ln.SyncLedger.GetCheckpoint(proposer.Me.Name) != nil {
checkpointsPropagated++
}
}
t.Logf("โ
Checkpoint propagated to %d/%d other naras", checkpointsPropagated, numNaras-1)
t.Log("๐ CHECKPOINT CONSENSUS TEST PASSED")
}
// TestIntegration_CheckpointRound2 tests the round 2 fallback when round 1 fails consensus
func TestIntegration_CheckpointRound2(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
t.Parallel()
// Start embedded MQTT broker on dynamic port
_, port := startTestMQTTBrokerDynamic(t)
t.Log("๐งช Testing checkpoint round 2 fallback")
const numNaras = 4
mqttAddr := fmt.Sprintf("tcp://127.0.0.1:%d", port)
naras := make([]*LocalNara, numNaras)
for i := 0; i < numNaras; i++ {
name := fmt.Sprintf("round2-test-%d", i)
hwFingerprint := []byte(fmt.Sprintf("round2-hw-%d", i))
identity := identity.DetermineIdentity(types.NaraName(""), "", name, hwFingerprint)
profile := DefaultMemoryProfile()
profile.Mode = MemoryModeCustom
profile.MaxEvents = 1000
ln, err := NewLocalNara(
identity,
mqttAddr,
"", "",
-1,
profile,
)
if err != nil {
t.Fatalf("Failed to create LocalNara: %v", err)
}
ln.Network.testSkipJitter = true
ln.Network.testSkipBootRecovery = true
ln.Network.testSkipCoordinateWait = true
ln.Network.testSkipHeyThereSleep = true
ln.Network.testSkipHeyThereRateLimit = true
delay := time.Duration(0)
ln.Network.testObservationDelay = &delay
// Register cleanup for this nara
t.Cleanup(func() {
ln.Network.Shutdown()
ln.Network.disconnectMQTT()
})
naras[i] = ln
}
// Start all naras
for i, ln := range naras {
go ln.Start(false, false, "", nil, TransportMQTT)
time.Sleep(100 * time.Millisecond) // Small delay between starts
if ln.Network.checkpointService != nil {
ln.Network.checkpointService.voteWindow = 2 * time.Second // Longer for CI stability
}
t.Logf("โ
Started %s (nara %d)", ln.Me.Name, i)
}
// Wait for MQTT connection
waitForAllMQTTConnected(t, naras, 10*time.Second)
// Trigger hey-there for discovery
for _, ln := range naras {
ln.Network.heyThere()
}
time.Sleep(200 * time.Millisecond)
// Wait for full discovery with public keys
waitForFullDiscovery(t, naras, 10*time.Second)
// Add DIFFERENT observation data to each nara to force disagreement in round 1
// This should trigger the trimmed mean calculation in round 2
t.Log("๐ Adding divergent observation data to force round 2...")
proposer := naras[0]
for i, observer := range naras {
// Each observer reports different restart counts
restartEvent := NewRestartObservationEvent(
observer.Me.Name,
proposer.Me.Name,
time.Now().Unix()-3600,
int64(10+i*5), // 10, 15, 20, 25 - different values
)
observer.SyncLedger.AddEvent(restartEvent)
firstSeenEvent := NewFirstSeenObservationEvent(
observer.Me.Name,
proposer.Me.Name,
time.Now().Unix()-86400-int64(i*3600), // Different first-seen times
)
observer.SyncLedger.AddEvent(firstSeenEvent)
}
time.Sleep(1 * time.Second)
// Trigger checkpoint proposal
t.Logf("๐ค %s proposing checkpoint (expecting round 2 due to disagreement)...", proposer.Me.Name)
proposer.Network.checkpointService.ProposeCheckpoint()
// Wait for checkpoint to be finalized
// Vote window is 2 seconds for CI stability, so 2 rounds = ~4-5 seconds max
// Add extra buffer for message propagation
t.Log("โณ Waiting for checkpoint finalization...")
checkpoint := waitForCheckpoint(t, proposer.SyncLedger, proposer.Me.Name, 8*time.Second)
t.Log("")
t.Log("โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ")
if checkpoint != nil {
t.Log("๐ ROUND 2 FALLBACK TEST COMPLETED")
t.Logf(" โข Final restarts value: %d (should be trimmed mean)", checkpoint.Observation.Restarts)
t.Logf(" โข Voters: %d", len(checkpoint.VoterIDs))
} else {
t.Log("โ ๏ธ No checkpoint created after round 2")
t.Log(" โข This may be expected if voters still disagreed")
}
t.Log("โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ")
}
// TestIntegration_CheckpointSignatureVerification tests that invalid signatures are rejected
func TestIntegration_CheckpointSignatureVerification(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
t.Parallel()
// Start embedded MQTT broker on dynamic port
_, port := startTestMQTTBrokerDynamic(t)
t.Log("๐งช Testing checkpoint signature verification")
// Create 2 naras using the standard test helper
names := []string{"alice-sig", "bob-sig"}
naras := startTestNaras(t, port, names, true)
alice := naras[0]
bob := naras[1]
// Configure vote window for CI stability
if alice.Network.checkpointService != nil {
alice.Network.checkpointService.voteWindow = 2 * time.Second
}
if bob.Network.checkpointService != nil {
bob.Network.checkpointService.voteWindow = 2 * time.Second
}
// Test 1: Valid signature should be accepted
// Add observation data
restartEvent := NewRestartObservationEvent(bob.Me.Name, alice.Me.Name, time.Now().Unix()-3600, 5)
bob.SyncLedger.AddEvent(restartEvent)
firstSeenEvent := NewFirstSeenObservationEvent(bob.Me.Name, alice.Me.Name, time.Now().Unix()-86400)
bob.SyncLedger.AddEvent(firstSeenEvent)
time.Sleep(500 * time.Millisecond)
// Alice proposes checkpoint
t.Log("๐ค Alice proposing checkpoint...")
if alice.Network.checkpointService != nil {
alice.Network.checkpointService.ProposeCheckpoint()
}
// Wait for checkpoint to be finalized (uses condition-based polling instead of fixed sleep)
checkpoint := waitForCheckpoint(t, alice.SyncLedger, alice.Me.Name, 10*time.Second)
t.Log("")
t.Log("โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ")
if checkpoint != nil && len(checkpoint.Signatures) > 0 {
t.Log("๐ SIGNATURE VERIFICATION TEST PASSED")
t.Logf(" โข Checkpoint created with %d signatures", len(checkpoint.Signatures))
t.Log(" โข Valid signatures were accepted")
} else {
t.Log("โ ๏ธ Checkpoint not created or no signatures")
t.Log(" โข This may indicate signature verification working (rejected invalid)")
t.Log(" โข Or voters didn't have proposer's public key yet")
}
t.Log("โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ")
}
// TestIntegration_CheckpointTop10Voters tests that only top 10 voters by uptime are kept
func TestIntegration_CheckpointTop10Voters(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
t.Parallel()
// Start embedded MQTT broker on dynamic port
_, port := startTestMQTTBrokerDynamic(t)
t.Log("๐งช Testing checkpoint top 10 voters limit")
// Create 15 naras (more than the 10 signature limit)
const numNaras = 15
names := make([]string, numNaras)
for i := 0; i < numNaras; i++ {
names[i] = fmt.Sprintf("top10-test-%d", i)
}
naras := startTestNaras(t, port, names, true)
// Configure checkpoint vote windows for CI stability
for _, ln := range naras {
if ln.Network.checkpointService != nil {
ln.Network.checkpointService.voteWindow = 2 * time.Second
// Ensure checkpoint service has MQTT client reference
if ln.Network.Mqtt != nil {
ln.Network.checkpointService.SetMQTTClient(ln.Network.Mqtt)
}
}
}
t.Log("โ
All naras started and discovered")
// Add observation data - all naras observe the proposer with consistent values
proposer := naras[0]
t.Log("๐ Adding observation data...")
proposerStartTime := time.Now().Unix() - 86400 // Proposer started 1 day ago
proposerRestarts := int64(5)
// Every nara observes the proposer with the same values
for _, observer := range naras {
if observer.Me.Name == proposer.Me.Name {
continue // Skip self-observation
}
// Add restart observation
restartEvent := NewRestartObservationEvent(
observer.Me.Name,
proposer.Me.Name,
proposerStartTime,
proposerRestarts,
)
observer.SyncLedger.AddEvent(restartEvent)
// Add first-seen observation
firstSeenEvent := NewFirstSeenObservationEvent(
observer.Me.Name,
proposer.Me.Name,
proposerStartTime-86400, // First seen 2 days ago
)
observer.SyncLedger.AddEvent(firstSeenEvent)
}
// Give proposer uptime data about each voter (for sorting by uptime when selecting top 10)
for i, voter := range naras {
if voter.Me.Name == proposer.Me.Name {
continue
}
// Give each nara different uptime (higher index = more uptime)
uptimeSeconds := int64((i + 1) * 86400) // 1 day, 2 days, ... 15 days
startTime := time.Now().Unix() - uptimeSeconds
statusEvent := NewStatusChangeObservationEvent(
proposer.Me.Name,
voter.Me.Name,
"ONLINE",
)
statusEvent.Timestamp = startTime * 1e9
proposer.SyncLedger.AddEvent(statusEvent)
}
time.Sleep(1 * time.Second)
// Trigger checkpoint
t.Logf("๐ค %s proposing checkpoint...", proposer.Me.Name)
proposer.Network.checkpointService.ProposeCheckpoint()
// Wait for checkpoint to be finalized
t.Log("โณ Waiting for checkpoint finalization...")
checkpoint := waitForCheckpoint(t, proposer.SyncLedger, proposer.Me.Name, 10*time.Second)
// Check if checkpoint was created - this MUST succeed
if checkpoint == nil {
t.Fatal("โ No checkpoint found in proposer's ledger - consensus failed")
}
t.Logf("โ
Checkpoint found in proposer's ledger")
t.Logf(" Subject: %s", checkpoint.Subject)
t.Logf(" Voters: %d", len(checkpoint.VoterIDs))
// Verify we got enough voters but not more than max
if len(checkpoint.VoterIDs) < MinVotersRequired {
t.Fatalf("โ Insufficient voters: got %d, need %d", len(checkpoint.VoterIDs), MinVotersRequired)
}
if len(checkpoint.VoterIDs) > MaxCheckpointSignatures {
t.Fatalf("โ Too many voters: got %d, max should be %d", len(checkpoint.VoterIDs), MaxCheckpointSignatures)
}
t.Log("๐ TOP 10 VOTERS TEST PASSED")
t.Logf(" โข Voters limited to %d (max: %d)", len(checkpoint.VoterIDs), MaxCheckpointSignatures)
t.Logf(" โข VoterIDs: %v", checkpoint.VoterIDs)
}