-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathmodel.go
More file actions
274 lines (218 loc) · 6.63 KB
/
model.go
File metadata and controls
274 lines (218 loc) · 6.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
package rita
import (
"errors"
"strings"
"sync"
"time"
)
var (
ErrEvolverNotImplemented = errors.New("evolver not implemented")
ErrDeciderNotImplemented = errors.New("decider not implemented")
ErrViewerNotImplemented = errors.New("viewer not implemented")
)
type Expect struct {
Sequence uint64
Pattern string
}
// ExpectSequence can be set to specify the expected sequence number
// for optimistic concurrency control. If the current last sequence
// number does not match the provided value, the append will fail.
// If nil, no sequence check will be performed. The subject defaults
// to the entity's pattern.
func ExpectSequence(seq uint64) *Expect {
return &Expect{Sequence: seq}
}
// ExpectSubject can be set to specify an alternative pattern, such
// as the top-level type.
func ExpectSequenceSubject(seq uint64, pattern string) *Expect {
return &Expect{Sequence: seq, Pattern: pattern}
}
// Event is a wrapper for application-defined events.
type Event struct {
// ID of the event. This will be used as the NATS msg ID
// for de-duplication.
ID string
// Identifier for specific entities. Can be used to determine if
// an event is related to a specific entity/node/endpoint/agent/etc.
// The format must be two tokens, e.g. "node.1234".
Entity string
// Time is the time of when the event occurred which may be different
// from the time the event is appended to the store. If no time is provided,
// the current local time will be used.
Time time.Time
// Type is a unique name for the event itself. This can be omitted
// if a type registry is being used, otherwise it must be set explicitly
// to identity the encoded data.
Type string
// Data is the event data. This must be a byte slice (pre-encoded) or a value
// of a type registered in the type registry.
Data any
// Metadata is application-defined metadata about the event.
Meta map[string]string
// Expect can be set to specify optimistic concurrency control
// expectations for an append operation.
Expect *Expect
// sequence is the sequence number of the event within the stream. Read-only.
sequence uint64
// subject is the subject the event is associated with. Read-only.
subject string
}
// Sequence returns the stream sequence number of the event.
func (e *Event) Sequence() uint64 {
return e.sequence
}
// Subject returns the full NATS subject the event was published to.
func (e *Event) Subject() string {
return e.subject
}
// Evolver is an interface that application-defined models can implement
// to evolve their state based on events.
type Evolver interface {
Evolve(*Event) error
}
// Command is a wrapper for application-defined commands.
type Command struct {
// ID is a unique identifier for the command.
ID string
// Time is the time of when the command was received.
Time time.Time
// Type is a unique name for the command. This can be omitted
// if a type registry is being used, otherwise it must be set explicitly
// to identity the encoded data.
Type string
// Data is the command data. This must be a byte slice (pre-encoded) or a value
// of a type registered in the type registry.
Data any
// Meta is application-defined metadata about the command.
Meta map[string]string
}
// Decider is an interface that application-defined models can implement
// to decide on state transitions. Zero or more events can be returned
// that represents the state transitions to be stored.
type Decider interface {
Decide(*Command) ([]*Event, error)
}
// Viewer represents a read-only view of the state of an entity.
type Viewer[T any] interface {
View(func(T) error) error
}
// DeciderEvolver combines Decider and Evolver for use with DecideAndEvolve.
type DeciderEvolver interface {
Decider
Evolver
}
type entityMap struct {
sseq map[string]map[string]uint64
lseq map[string]map[string]uint64
}
func (em *entityMap) get(entity string, m map[string]map[string]uint64) uint64 {
idx := strings.IndexByte(entity, '.')
if idx < 0 {
return 0
}
pattern := entity[:idx]
id := entity[idx+1:]
if pm, ok := m[pattern]; ok {
if seq, ok := pm[id]; ok {
return seq
}
}
return 0
}
func (em *entityMap) set(entity string, seq uint64, m map[string]map[string]uint64) {
idx := strings.IndexByte(entity, '.')
if idx < 0 {
return
}
pattern := entity[:idx]
id := entity[idx+1:]
if _, ok := m[pattern]; !ok {
m[pattern] = make(map[string]uint64)
}
m[pattern][id] = seq
}
func (em *entityMap) getStart(entity string) uint64 {
return em.get(entity, em.sseq)
}
func (em *entityMap) setStart(entity string, seq uint64) {
em.set(entity, seq, em.sseq)
}
func (em *entityMap) getLast(entity string) uint64 {
return em.get(entity, em.lseq)
}
func (em *entityMap) setLast(entity string, seq uint64) {
em.set(entity, seq, em.lseq)
}
func newEntityMap() *entityMap {
return &entityMap{
sseq: make(map[string]map[string]uint64),
lseq: make(map[string]map[string]uint64),
}
}
// Model combines an Evolver, Decider, and Viewer for a specific type T.
// It provides thread-safe access to the underlying interfaces and keeps track
// of the last sequence number of events applied to the model.
type Model[T any] struct {
t T
e Evolver
d Decider
seqs *entityMap
mu sync.RWMutex
}
func (m *Model[T]) Evolve(event *Event) error {
if m.e == nil {
return ErrEvolverNotImplemented
}
m.mu.Lock()
defer m.mu.Unlock()
lseq := m.seqs.getLast(event.Entity)
// Already applied
if lseq >= event.sequence {
return nil
}
if m.seqs.getStart(event.Entity) == 0 {
m.seqs.setStart(event.Entity, event.sequence)
}
m.seqs.setLast(event.Entity, event.sequence)
return m.e.Evolve(event)
}
func (m *Model[T]) Decide(cmd *Command) ([]*Event, error) {
if m.d == nil {
return nil, ErrDeciderNotImplemented
}
m.mu.RLock()
defer m.mu.RUnlock()
events, err := m.d.Decide(cmd)
if err != nil {
return nil, err
}
entities := make(map[string]struct{})
for _, event := range events {
// Ignore if we've already seen this entity in this batch
// since we only need to set the expect sequence once.
if _, seen := entities[event.Entity]; seen {
continue
}
entities[event.Entity] = struct{}{}
// Either this is explicitly set or we set it to the last known sequence.
if event.Expect == nil {
event.Expect = ExpectSequence(m.seqs.getLast(event.Entity))
}
}
return events, nil
}
func (m *Model[T]) View(fn func(T) error) error {
m.mu.RLock()
defer m.mu.RUnlock()
return fn(m.t)
}
func NewModel[T any](t T) *Model[T] {
m := &Model[T]{}
m.t = t
// Type may implement neither, one, or both interfaces.
// Missing implementations will be caught at runtime when methods are called.
m.e, _ = any(t).(Evolver)
m.d, _ = any(t).(Decider)
m.seqs = newEntityMap()
return m
}