Skip to content

Commit 77cea04

Browse files
authored
feat: asyncify (#105)
* feat: asyncify * support subscription broadcast * fix require of Packet * fix import/export of Packet * move trie to asyncPersistence * create broadcastPersistence * use broadcast from asyncPersistence * add broker to setup() in typeDef * toValue in subsByClient * enable direct testing of AsyncPersistence * align promisified * lint fix * split tests over multiple files * add timeout waiting for "ready" event * make trie and broker private class members
1 parent 2d69505 commit 77cea04

15 files changed

Lines changed: 1238 additions & 438 deletions

abstract.js

Lines changed: 138 additions & 67 deletions
Large diffs are not rendered by default.

asyncPersistence.js

Lines changed: 343 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,343 @@
1+
const { Readable } = require('node:stream')
2+
const QlobberSub = require('qlobber/aedes/qlobber-sub')
3+
const { QlobberTrue } = require('qlobber')
4+
const Packet = require('aedes-packet')
5+
const BroadcastPersistence = require('./broadcastPersistence.js')
6+
7+
const QLOBBER_OPTIONS = {
8+
wildcard_one: '+',
9+
wildcard_some: '#',
10+
separator: '/'
11+
}
12+
const CREATE_ON_EMPTY = true
13+
14+
function * multiIterables (iterables) {
15+
for (const iter of iterables) {
16+
yield * iter
17+
}
18+
}
19+
20+
function * retainedMessagesByPattern (retained, pattern) {
21+
const qlobber = new QlobberTrue(QLOBBER_OPTIONS)
22+
qlobber.add(pattern)
23+
24+
for (const [topic, packet] of retained) {
25+
if (qlobber.test(topic)) {
26+
yield packet
27+
}
28+
}
29+
}
30+
31+
function * willsByBrokers (wills, brokers) {
32+
for (const will of wills.values()) {
33+
if (!brokers[will.brokerId]) {
34+
yield will
35+
}
36+
}
37+
}
38+
39+
function * clientListbyTopic (subscriptions, topic) {
40+
for (const [clientId, topicMap] of subscriptions) {
41+
if (topicMap.has(topic)) {
42+
yield clientId
43+
}
44+
}
45+
}
46+
47+
class MemoryPersistence {
48+
// private class members start with #
49+
#retained
50+
#subscriptions
51+
#outgoing
52+
#incoming
53+
#wills
54+
#clientsCount
55+
#destroyed
56+
#broadcastSubscriptions
57+
#trie
58+
#broker
59+
60+
constructor (opts = {}) {
61+
// using Maps for convenience and security (risk on prototype polution)
62+
// Map ( topic -> packet )
63+
this.#retained = new Map()
64+
// Map ( clientId -> Map( topic -> { qos, rh, rap, nl } ))
65+
this.#subscriptions = new Map()
66+
// Map ( clientId > [ packet ] }
67+
this.#outgoing = new Map()
68+
// Map ( clientId -> { packetId -> Packet } )
69+
this.#incoming = new Map()
70+
// Map( clientId -> will )
71+
this.#wills = new Map()
72+
this.#clientsCount = 0
73+
this.#destroyed = false
74+
this.#broadcastSubscriptions = opts.broadcastSubscriptions
75+
this.#trie = new QlobberSub(QLOBBER_OPTIONS)
76+
}
77+
78+
// for testing we need access to the broker
79+
get broker () {
80+
return this.#broker
81+
}
82+
83+
async setup (broker) {
84+
this.#broker = broker
85+
if (this.#broadcastSubscriptions) {
86+
this.broadcast = new BroadcastPersistence(broker, this.#trie)
87+
await this.broadcast.brokerSubscribe()
88+
}
89+
}
90+
91+
async storeRetained (pkt) {
92+
const packet = Object.assign({}, pkt)
93+
if (packet.payload.length === 0) {
94+
this.#retained.delete(packet.topic)
95+
} else {
96+
this.#retained.set(packet.topic, packet)
97+
}
98+
}
99+
100+
createRetainedStreamCombi (patterns) {
101+
const iterables = patterns.map((p) => {
102+
return retainedMessagesByPattern(this.#retained, p)
103+
})
104+
return Readable.from(multiIterables(iterables))
105+
}
106+
107+
createRetainedStream (pattern) {
108+
return Readable.from(retainedMessagesByPattern(this.#retained, pattern))
109+
}
110+
111+
async addSubscriptions (client, subs) {
112+
let stored = this.#subscriptions.get(client.id)
113+
const trie = this.#trie
114+
115+
if (!stored) {
116+
stored = new Map()
117+
this.#subscriptions.set(client.id, stored)
118+
this.#clientsCount++
119+
}
120+
121+
for (const sub of subs) {
122+
const storedSub = stored.get(sub.topic)
123+
if (sub.qos > 0) {
124+
trie.add(sub.topic, {
125+
clientId: client.id,
126+
topic: sub.topic,
127+
qos: sub.qos,
128+
rh: sub.rh,
129+
rap: sub.rap,
130+
nl: sub.nl
131+
})
132+
} else if (storedSub?.qos > 0) {
133+
trie.remove(sub.topic, {
134+
clientId: client.id,
135+
topic: sub.topic
136+
})
137+
}
138+
stored.set(sub.topic, { qos: sub.qos, rh: sub.rh, rap: sub.rap, nl: sub.nl })
139+
}
140+
if (this.#broadcastSubscriptions) {
141+
await this.broadcast.addedSubscriptions(client, subs)
142+
}
143+
}
144+
145+
async removeSubscriptions (client, subs) {
146+
const stored = this.#subscriptions.get(client.id)
147+
const trie = this.#trie
148+
149+
if (stored) {
150+
for (const topic of subs) {
151+
const storedSub = stored.get(topic)
152+
if (storedSub !== undefined) {
153+
if (storedSub.qos > 0) {
154+
trie.remove(topic, { clientId: client.id, topic })
155+
}
156+
stored.delete(topic)
157+
}
158+
}
159+
160+
if (stored.size === 0) {
161+
this.#clientsCount--
162+
this.#subscriptions.delete(client.id)
163+
}
164+
}
165+
if (this.#broadcastSubscriptions) {
166+
await this.broadcast.removedSubscriptions(client, subs)
167+
}
168+
}
169+
170+
async subscriptionsByClient (client) {
171+
const subs = []
172+
const stored = this.#subscriptions.get(client.id)
173+
if (stored) {
174+
for (const [topic, storedSub] of stored) {
175+
subs.push({ topic, ...storedSub })
176+
}
177+
}
178+
return subs
179+
}
180+
181+
async countOffline () {
182+
return { subsCount: this.#trie.subscriptionsCount, clientsCount: this.#clientsCount }
183+
}
184+
185+
async subscriptionsByTopic (pattern) {
186+
return this.#trie.match(pattern)
187+
}
188+
189+
async cleanSubscriptions (client) {
190+
const trie = this.#trie
191+
const stored = this.#subscriptions.get(client.id)
192+
193+
if (stored) {
194+
for (const [topic, storedSub] of stored) {
195+
if (storedSub.qos > 0) {
196+
trie.remove(topic, { clientId: client.id, topic })
197+
}
198+
}
199+
200+
this.#clientsCount--
201+
this.#subscriptions.delete(client.id)
202+
}
203+
}
204+
205+
#outgoingEnqueuePerSub (sub, packet) {
206+
const id = sub.clientId
207+
const queue = getMapRef(this.#outgoing, id, [], CREATE_ON_EMPTY)
208+
queue[queue.length] = new Packet(packet)
209+
}
210+
211+
async outgoingEnqueue (sub, packet) {
212+
this.#outgoingEnqueuePerSub(sub, packet)
213+
}
214+
215+
async outgoingEnqueueCombi (subs, packet) {
216+
for (let i = 0; i < subs.length; i++) {
217+
this.#outgoingEnqueuePerSub(subs[i], packet)
218+
}
219+
}
220+
221+
async outgoingUpdate (client, packet) {
222+
const outgoing = getMapRef(this.#outgoing, client.id, [], CREATE_ON_EMPTY)
223+
224+
let temp
225+
for (let i = 0; i < outgoing.length; i++) {
226+
temp = outgoing[i]
227+
if (temp.brokerId === packet.brokerId) {
228+
if (temp.brokerCounter === packet.brokerCounter) {
229+
temp.messageId = packet.messageId
230+
return
231+
}
232+
/*
233+
Maximum of messageId (packet identifier) is 65535 and will be rotated,
234+
brokerCounter is to ensure the packet identifier be unique.
235+
The for loop is going to search which packet messageId should be updated
236+
in the #outgoing queue.
237+
If there is a case that brokerCounter is different but messageId is same,
238+
we need to let the loop keep searching
239+
*/
240+
} else if (temp.messageId === packet.messageId) {
241+
outgoing[i] = packet
242+
return
243+
}
244+
}
245+
throw new Error('no such packet')
246+
}
247+
248+
async outgoingClearMessageId (client, packet, cb) {
249+
const outgoing = getMapRef(this.#outgoing, client.id, [], CREATE_ON_EMPTY)
250+
251+
let temp
252+
for (let i = 0; i < outgoing.length; i++) {
253+
temp = outgoing[i]
254+
if (temp.messageId === packet.messageId) {
255+
outgoing.splice(i, 1)
256+
return temp
257+
}
258+
}
259+
}
260+
261+
outgoingStream (client) {
262+
// shallow clone the outgoing queue for this client to avoid race conditions
263+
const outgoing = [].concat(getMapRef(this.#outgoing, client.id, []))
264+
return Readable.from(outgoing)
265+
}
266+
267+
async incomingStorePacket (client, packet) {
268+
const id = client.id
269+
const store = getMapRef(this.#incoming, id, {}, CREATE_ON_EMPTY)
270+
271+
store[packet.messageId] = new Packet(packet)
272+
store[packet.messageId].messageId = packet.messageId
273+
}
274+
275+
async incomingGetPacket (client, packet) {
276+
const id = client.id
277+
const store = getMapRef(this.#incoming, id, {})
278+
279+
this.#incoming.set(id, store)
280+
281+
if (!store[packet.messageId]) {
282+
throw new Error('no such packet')
283+
}
284+
return store[packet.messageId]
285+
}
286+
287+
async incomingDelPacket (client, packet) {
288+
const id = client.id
289+
const store = getMapRef(this.#incoming, id, {})
290+
const toDelete = store[packet.messageId]
291+
292+
if (!toDelete) {
293+
throw new Error('no such packet')
294+
}
295+
delete store[packet.messageId]
296+
}
297+
298+
async putWill (client, packet) {
299+
packet.brokerId = this.#broker.id
300+
packet.clientId = client.id
301+
this.#wills.set(client.id, packet)
302+
}
303+
304+
async getWill (client) {
305+
return this.#wills.get(client.id)
306+
}
307+
308+
async delWill (client) {
309+
const will = this.#wills.get(client.id)
310+
this.#wills.delete(client.id)
311+
return will
312+
}
313+
314+
streamWill (brokers = {}) {
315+
return Readable.from(willsByBrokers(this.#wills, brokers))
316+
}
317+
318+
getClientList (topic) {
319+
return Readable.from(clientListbyTopic(this.#subscriptions, topic))
320+
}
321+
322+
async destroy () {
323+
if (this.#destroyed) {
324+
throw new Error('destroyed called twice!')
325+
}
326+
this.#destroyed = true
327+
if (this.#broadcastSubscriptions) {
328+
await this.broadcast.brokerUnsubscribe()
329+
}
330+
this.#retained = null
331+
}
332+
}
333+
334+
function getMapRef (map, key, ifEmpty, createOnEmpty = false) {
335+
const value = map.get(key)
336+
if (value === undefined && createOnEmpty) {
337+
map.set(key, ifEmpty)
338+
}
339+
return value || ifEmpty
340+
}
341+
342+
module.exports = MemoryPersistence
343+
module.exports.Packet = Packet

0 commit comments

Comments
 (0)