Skip to content

Commit e4217f9

Browse files
committed
fix(workers): preserve global refIndex across shards
FuseWorker now tracks per-shard global index mappings so search results report refIndex against the full collection, not the shard. add() appends the new global index to the chosen shard; setCollection rebuilds the mappings in lockstep with chunked reassignment. Worker + index mapping are colocated in a Shard object to keep the invariant structural rather than invariant-by-convention. Adds test/fuse-worker.test.js with a MockWorker that runs real Fuse instances per shard, exercising search, interleaved add, and setCollection with exact refIndex assertions.
1 parent dbc115d commit e4217f9

2 files changed

Lines changed: 279 additions & 52 deletions

File tree

src/workers/FuseWorker.ts

Lines changed: 88 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@ interface PendingCall {
1919
reject: (reason: any) => void
2020
}
2121

22+
interface Shard {
23+
worker: Worker
24+
globalIndices: number[]
25+
}
26+
2227
const DEFAULT_MAX_WORKERS = 8
2328

2429
function getDefaultWorkerCount(): number {
@@ -31,8 +36,9 @@ function getDefaultWorkerCount(): number {
3136
export default class FuseWorker<T> {
3237
private _options: IFuseOptions<T>
3338
private _workerOptions: FuseWorkerOptions
34-
private _docs: ReadonlyArray<T>
35-
private _workers: Worker[] | null = null
39+
private _docs: T[]
40+
private _shards: Shard[] | null = null
41+
private _addCursor = 0
3642
private _initPromise: Promise<void> | null = null
3743
private _pending: Map<number, PendingCall> = new Map()
3844
private _nextId = 0
@@ -43,7 +49,7 @@ export default class FuseWorker<T> {
4349
options?: IFuseOptions<T>,
4450
workerOptions?: FuseWorkerOptions
4551
) {
46-
this._docs = docs
52+
this._docs = docs.slice()
4753
this._options = options || {} as IFuseOptions<T>
4854
this._workerOptions = workerOptions || {}
4955
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
@@ -63,37 +69,48 @@ export default class FuseWorker<T> {
6369
return this._initPromise
6470
}
6571

72+
private _spawnWorker(): Worker {
73+
const worker = new Worker(this._workerUrl, { type: 'module' })
74+
75+
worker.onmessage = (e: MessageEvent) => {
76+
const { id, result, error } = e.data
77+
const handler = this._pending.get(id)
78+
if (!handler) return
79+
this._pending.delete(id)
80+
if (error) {
81+
handler.reject(new Error(error))
82+
} else {
83+
handler.resolve(result)
84+
}
85+
}
86+
87+
worker.onerror = (e: ErrorEvent) => {
88+
for (const [, handler] of this._pending) {
89+
handler.reject(new Error(e.message))
90+
}
91+
}
92+
93+
return worker
94+
}
95+
6696
private async _init(): Promise<void> {
6797
const numWorkers = this._getNumWorkers()
6898
const chunkSize = Math.ceil(this._docs.length / numWorkers)
69-
const initPromises: Promise<void>[] = []
7099

71-
this._workers = []
100+
this._shards = []
101+
this._addCursor = 0
72102

103+
const initPromises: Promise<void>[] = []
73104
for (let i = 0; i < numWorkers; i++) {
74-
const chunk = this._docs.slice(i * chunkSize, (i + 1) * chunkSize)
75-
const worker = new Worker(this._workerUrl, { type: 'module' })
76-
77-
worker.onmessage = (e: MessageEvent) => {
78-
const { id, result, error } = e.data
79-
const handler = this._pending.get(id)
80-
if (!handler) return
81-
this._pending.delete(id)
82-
if (error) {
83-
handler.reject(new Error(error))
84-
} else {
85-
handler.resolve(result)
86-
}
87-
}
88-
89-
worker.onerror = (e: ErrorEvent) => {
90-
for (const [, handler] of this._pending) {
91-
handler.reject(new Error(e.message))
92-
}
93-
}
94-
95-
this._workers.push(worker)
96-
initPromises.push(this._call(worker, 'init', [chunk, this._options]))
105+
const start = i * chunkSize
106+
const end = Math.min(start + chunkSize, this._docs.length)
107+
const chunk = this._docs.slice(start, end)
108+
const globalIndices: number[] = []
109+
for (let j = start; j < end; j += 1) globalIndices.push(j)
110+
111+
const shard: Shard = { worker: this._spawnWorker(), globalIndices }
112+
this._shards.push(shard)
113+
initPromises.push(this._call(shard.worker, 'init', [chunk, this._options]))
97114
}
98115

99116
await Promise.all(initPromises)
@@ -113,14 +130,18 @@ export default class FuseWorker<T> {
113130
): Promise<FuseResult<T>[]> {
114131
await this._ensureInit()
115132

116-
const results = await Promise.all(
117-
this._workers!.map((worker) => this._call(worker, 'search', [query, options]))
133+
const shards = this._shards!
134+
const results: FuseResult<T>[][] = await Promise.all(
135+
shards.map((s) => this._call(s.worker, 'search', [query, options]))
118136
)
119137

120-
// Merge results from all shards
138+
// Merge results from all shards, rewriting refIndex from shard-local to global
121139
const merged: FuseResult<T>[] = []
122-
for (const shardResults of results) {
123-
merged.push(...shardResults)
140+
for (let i = 0, len = results.length; i < len; i += 1) {
141+
const { globalIndices } = shards[i]
142+
for (const r of results[i]) {
143+
merged.push({ ...r, refIndex: globalIndices[r.refIndex] })
144+
}
124145
}
125146

126147
// Sort by score (lower is better)
@@ -141,35 +162,50 @@ export default class FuseWorker<T> {
141162
async add(doc: T): Promise<void> {
142163
await this._ensureInit()
143164

144-
// Round-robin across workers
145-
const idx = this._nextId % this._workers!.length
146-
await this._call(this._workers![idx], 'add', [doc])
165+
const shards = this._shards!
166+
const shard = shards[this._addCursor % shards.length]
167+
this._addCursor += 1
168+
169+
const globalIdx = this._docs.length
170+
this._docs.push(doc)
171+
shard.globalIndices.push(globalIdx)
172+
173+
await this._call(shard.worker, 'add', [doc])
147174
}
148175

149176
async setCollection(docs: ReadonlyArray<T>): Promise<void> {
150-
this._docs = docs
151-
152-
if (this._workers) {
153-
const numWorkers = this._workers.length
154-
const chunkSize = Math.ceil(docs.length / numWorkers)
155-
156-
await Promise.all(
157-
this._workers.map((worker, i) => {
158-
const chunk = docs.slice(i * chunkSize, (i + 1) * chunkSize)
159-
return this._call(worker, 'setCollection', [chunk])
160-
})
161-
)
162-
} else {
177+
this._docs = docs.slice()
178+
179+
if (!this._shards) {
163180
this._initPromise = null
181+
return
182+
}
183+
184+
const shards = this._shards
185+
const chunkSize = Math.ceil(this._docs.length / shards.length)
186+
this._addCursor = 0
187+
188+
const tasks: Promise<void>[] = []
189+
for (let i = 0, len = shards.length; i < len; i += 1) {
190+
const start = i * chunkSize
191+
const end = Math.min(start + chunkSize, this._docs.length)
192+
const chunk = this._docs.slice(start, end)
193+
const globalIndices: number[] = []
194+
for (let j = start; j < end; j += 1) globalIndices.push(j)
195+
196+
shards[i].globalIndices = globalIndices
197+
tasks.push(this._call(shards[i].worker, 'setCollection', [chunk]))
164198
}
199+
200+
await Promise.all(tasks)
165201
}
166202

167203
terminate(): void {
168-
if (this._workers) {
169-
for (const worker of this._workers) {
204+
if (this._shards) {
205+
for (const { worker } of this._shards) {
170206
worker.terminate()
171207
}
172-
this._workers = null
208+
this._shards = null
173209
}
174210
this._initPromise = null
175211

test/fuse-worker.test.js

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
// Unit tests for FuseWorker — mocks the Worker global so we can verify
2+
// the sharding/refIndex rewrite logic without spawning real workers.
3+
4+
import { describe, test, expect, beforeAll, afterAll } from 'vitest'
5+
6+
process.env.EXTENDED_SEARCH_ENABLED = 'true'
7+
process.env.TOKEN_SEARCH_ENABLED = 'true'
8+
9+
const { default: Fuse } = await import('../src/entry')
10+
const { default: FuseWorker } = await import('../src/workers/FuseWorker')
11+
12+
// Mock Worker: each instance owns a real Fuse over its assigned chunk and
13+
// responds to init / search / add / setCollection messages synchronously.
14+
class MockWorker {
15+
constructor() {
16+
this._fuse = null
17+
this.onmessage = null
18+
this.onerror = null
19+
MockWorker.instances.push(this)
20+
}
21+
22+
postMessage(msg) {
23+
queueMicrotask(() => this._handle(msg))
24+
}
25+
26+
_handle({ id, method, args }) {
27+
try {
28+
let result
29+
switch (method) {
30+
case 'init': {
31+
this._fuse = new Fuse(args[0], args[1])
32+
result = true
33+
break
34+
}
35+
case 'search':
36+
result = this._fuse.search(args[0], args[1])
37+
break
38+
case 'add':
39+
this._fuse.add(args[0])
40+
result = true
41+
break
42+
case 'setCollection':
43+
this._fuse.setCollection(args[0])
44+
result = true
45+
break
46+
}
47+
this.onmessage?.({ data: { id, result } })
48+
} catch (err) {
49+
this.onmessage?.({ data: { id, error: err.message } })
50+
}
51+
}
52+
53+
terminate() {}
54+
}
55+
56+
MockWorker.instances = []
57+
58+
const Books = [
59+
{ title: 'Old Man\'s War', author: 'Scalzi' },
60+
{ title: 'The Lock Artist', author: 'Hamilton' },
61+
{ title: 'HTML5', author: 'Sharp' },
62+
{ title: 'A Brief History of Time', author: 'Hawking' },
63+
{ title: 'The Shock of the Fall', author: 'Filer' },
64+
{ title: 'The Great Gatsby', author: 'Fitzgerald' },
65+
{ title: 'The DaVinci Code', author: 'Brown' },
66+
{ title: 'Angels & Demons', author: 'Brown' },
67+
{ title: 'The Rosie Project', author: 'Simsion' }
68+
]
69+
70+
describe('FuseWorker sharding', () => {
71+
let originalWorker
72+
73+
beforeAll(() => {
74+
originalWorker = globalThis.Worker
75+
globalThis.Worker = MockWorker
76+
})
77+
78+
afterAll(() => {
79+
globalThis.Worker = originalWorker
80+
})
81+
82+
test('search results carry global refIndex, not shard-local', async () => {
83+
MockWorker.instances = []
84+
const fw = new FuseWorker(Books, { keys: ['title', 'author'], includeScore: true }, { numWorkers: 3 })
85+
86+
const results = await fw.search('brown')
87+
88+
// Both "Brown" author entries should be found
89+
const titles = results.map(r => r.item.title).sort()
90+
expect(titles).toEqual(['Angels & Demons', 'The DaVinci Code'])
91+
92+
// refIndex must point back to Books (global)
93+
for (const r of results) {
94+
expect(Books[r.refIndex]).toBe(r.item)
95+
}
96+
97+
fw.terminate()
98+
})
99+
100+
test('add() appends globally and subsequent search returns correct refIndex', async () => {
101+
MockWorker.instances = []
102+
const fw = new FuseWorker(Books.slice(), { keys: ['title', 'author'] }, { numWorkers: 3 })
103+
104+
// Prime workers
105+
await fw.search('xyz')
106+
107+
const newDoc = { title: 'Brown Bear', author: 'Someone' }
108+
await fw.add(newDoc)
109+
110+
const results = await fw.search('brown')
111+
const added = results.find(r => r.item.title === 'Brown Bear')
112+
expect(added).toBeDefined()
113+
// Added doc's refIndex should equal its global append position
114+
expect(added.refIndex).toBe(Books.length)
115+
116+
fw.terminate()
117+
})
118+
119+
test('interleaved search/add/setCollection keeps refIndex consistent', async () => {
120+
MockWorker.instances = []
121+
const initial = Books.slice(0, 4)
122+
const fw = new FuseWorker(initial, { keys: ['title', 'author'] }, { numWorkers: 2 })
123+
124+
const globalDocs = initial.slice()
125+
126+
// Round 1: search before any add
127+
let results = await fw.search('war')
128+
for (const r of results) {
129+
expect(globalDocs[r.refIndex]).toBe(r.item)
130+
}
131+
132+
// Round 2: add two docs, then search
133+
const added1 = { title: 'War Stories', author: 'Writer A' }
134+
const added2 = { title: 'Peace and War', author: 'Writer B' }
135+
await fw.add(added1)
136+
globalDocs.push(added1)
137+
await fw.add(added2)
138+
globalDocs.push(added2)
139+
140+
results = await fw.search('war')
141+
for (const r of results) {
142+
expect(globalDocs[r.refIndex]).toBe(r.item)
143+
}
144+
// Specifically, the added docs must map to their append positions
145+
const a1 = results.find(r => r.item === added1)
146+
const a2 = results.find(r => r.item === added2)
147+
expect(a1?.refIndex).toBe(initial.length)
148+
expect(a2?.refIndex).toBe(initial.length + 1)
149+
150+
// Round 3: setCollection resets, then add and search again
151+
const reset = Books.slice(4, 7)
152+
await fw.setCollection(reset)
153+
const globalDocs2 = reset.slice()
154+
155+
const added3 = { title: 'Brown Ale', author: 'Brewer' }
156+
await fw.add(added3)
157+
globalDocs2.push(added3)
158+
159+
results = await fw.search('brown')
160+
for (const r of results) {
161+
expect(globalDocs2[r.refIndex]).toBe(r.item)
162+
}
163+
const a3 = results.find(r => r.item === added3)
164+
expect(a3?.refIndex).toBe(reset.length)
165+
166+
fw.terminate()
167+
})
168+
169+
test('setCollection rebuilds mapping so refIndex matches new collection', async () => {
170+
MockWorker.instances = []
171+
const fw = new FuseWorker(Books, { keys: ['title', 'author'] }, { numWorkers: 2 })
172+
await fw.search('xyz') // force init
173+
174+
const newDocs = [
175+
{ title: 'Alpha', author: 'A' },
176+
{ title: 'Bravo', author: 'B' },
177+
{ title: 'Charlie Brown', author: 'C' },
178+
{ title: 'Delta', author: 'D' },
179+
{ title: 'Brown Fox', author: 'E' }
180+
]
181+
182+
await fw.setCollection(newDocs)
183+
184+
const results = await fw.search('brown')
185+
for (const r of results) {
186+
expect(newDocs[r.refIndex]).toBe(r.item)
187+
}
188+
189+
fw.terminate()
190+
})
191+
})

0 commit comments

Comments
 (0)