-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathboot_sync.go
More file actions
182 lines (157 loc) ยท 5.57 KB
/
Copy pathboot_sync.go
File metadata and controls
182 lines (157 loc) ยท 5.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
package nara
import (
"math/rand"
"time"
"github.com/sirupsen/logrus"
"github.com/eljojo/nara/types"
)
// backgroundSync performs lightweight periodic syncing to strengthen collective memory
// Runs every ~30 minutes (ยฑ5min jitter) to catch up on missed critical events
func (network *Network) backgroundSync() {
// Initial random delay (0-5 minutes) to spread startup load
initialDelay := time.Duration(rand.Intn(5)) * time.Minute
select {
case <-time.After(initialDelay):
// continue
case <-network.ctx.Done():
logrus.Debugf("backgroundSync: shutting down before initial delay")
return
}
// Main sync loop: every 30 minutes ยฑ5min jitter
for {
baseInterval := 30 * time.Minute
jitter := time.Duration(rand.Intn(10)-5) * time.Minute // -5 to +5 minutes
interval := baseInterval + jitter
select {
case <-time.After(interval):
network.performBackgroundSync()
case <-network.ctx.Done():
logrus.Debugf("backgroundSync: shutting down gracefully")
return
}
}
}
// performBackgroundSync executes a single background sync cycle
func (network *Network) performBackgroundSync() {
network.recoverSelfStartTimeFromMesh()
// Get online neighbors
online := network.NeighbourhoodOnlineNames()
if len(online) == 0 {
logrus.Debug("๐ Background sync: no neighbors online")
return
}
// Prefer neighbors that advertise medium/hog memory profiles
eligible := make([]types.NaraName, 0, len(online))
for _, name := range online {
if network.neighborSupportsBackgroundSync(name) {
eligible = append(eligible, name)
}
}
if len(eligible) == 0 {
logrus.Debug("๐ Background sync: no eligible neighbors (memory-limited)")
return
}
// Pick 1-2 random neighbors to query
numNeighbors := 1
if len(eligible) > 1 && rand.Float64() > 0.5 {
numNeighbors = 2
}
// Shuffle and pick neighbors
rand.Shuffle(len(eligible), func(i, j int) {
eligible[i], eligible[j] = eligible[j], eligible[i]
})
neighbors := eligible[:numNeighbors]
// Query each neighbor via mesh if available
for _, neighbor := range neighbors {
if network.tsnetMesh != nil {
ip, naraID := network.getMeshInfoForNara(neighbor)
if ip != "" && naraID != "" {
// Register peer for mesh client lookups
network.meshClient.RegisterPeerIP(naraID, ip)
network.performBackgroundSyncViaMesh(neighbor, naraID)
} else {
logrus.Debugf("๐ Background sync: neighbor %s not mesh-enabled, skipping", neighbor)
}
} else {
// Could add MQTT fallback here if needed
logrus.Debug("๐ Background sync: mesh not available, skipping")
}
}
}
func (network *Network) neighborSupportsBackgroundSync(name types.NaraName) bool {
nara := network.getNara(name)
// Defensive: nara might not be found or might have been removed
if nara == nil || nara.Name == "" {
return true
}
if nara.Status.MemoryMode == string(MemoryModeShort) {
return false
}
return true
}
// performBackgroundSyncViaMesh performs background sync with a specific neighbor via mesh
//
// IMPORTANT: This function determines which event types are continuously synced across the network.
// When adding new service types (in sync.go), consider whether they should be synced here.
// Current services synced:
// - ServiceObservation: restart/first-seen/status-change events (24h window, personality filtered)
// - ServicePing: RTT measurements (no time filter, used for Vivaldi coordinates)
// - ServiceSocial: teases, observations, gossip (24h window, personality filtered)
//
// Boot recovery (bootRecoveryViaMesh) syncs ALL events without filtering.
// This background sync maintains eventual consistency for recent events.
func (network *Network) performBackgroundSyncViaMesh(neighbor types.NaraName, naraID types.NaraID) {
logrus.Infof("๐ background sync: requesting events from %s (%s)", neighbor, naraID)
// Fetch recent events from this neighbor using the new "recent" mode
events, err := network.meshClient.FetchSyncEvents(network.ctx, naraID, SyncRequest{
Mode: "recent",
Limit: 100, // lightweight query
})
if err != nil {
logrus.Infof("๐ Background sync with %s failed: %v", neighbor, err)
return
}
// Filter and merge events by type
// Time cutoff for observation and social events (24h window)
cutoff := time.Now().Add(-24 * time.Hour).UnixNano()
added := 0
hasPingEvents := false
for _, event := range events {
switch event.Service {
case ServiceObservation:
// Observation events: 24h window with personality filtering
if event.Timestamp >= cutoff {
if network.local.SyncLedger.AddEventFiltered(event, network.local.Me.Status.Personality) {
added++
}
}
case ServicePing:
// Ping events: no time filtering (always useful for coordinate estimation)
if network.local.SyncLedger.AddEvent(event) {
added++
hasPingEvents = true
}
case ServiceSocial:
// Social events: 24h window with personality filtering
// This ensures teases, gossip, and social observations propagate across the network
if event.Timestamp >= cutoff {
if network.local.SyncLedger.AddEventFiltered(event, network.local.Me.Status.Personality) {
added++
}
}
}
}
if added > 0 {
logrus.Printf("๐ background sync from %s: received %d events, merged %d", neighbor, len(events), added)
// If we received ping events, update AvgPingRTT from history
if hasPingEvents {
network.seedAvgPingRTTFromHistory()
}
// Trigger projection update for new events
if network.local.Projections != nil {
network.local.Projections.Trigger()
}
} else if len(events) > 0 {
logrus.Debugf("๐ background sync from %s: received %d events (all duplicates)", neighbor, len(events))
}
}