Skip to content

Commit 6741775

Browse files
authored
Suspend event deliveries until middlewares are ready (#552)
1 parent aa3c901 commit 6741775

File tree

7 files changed

+95
-12
lines changed

7 files changed

+95
-12
lines changed

.changeset/ten-lions-look.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@segment/analytics-next': patch
3+
---
4+
5+
Suspend event deliveries until middlewares are ready

packages/browser/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
"size-limit": [
4242
{
4343
"path": "dist/umd/index.js",
44-
"limit": "25.7 KB"
44+
"limit": "25.85 KB"
4545
}
4646
],
4747
"dependencies": {

packages/browser/src/core/analytics/index.ts

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -354,19 +354,22 @@ export class Analytics extends Emitter {
354354
}
355355

356356
async addSourceMiddleware(fn: MiddlewareFunction): Promise<Analytics> {
357-
const { sourceMiddlewarePlugin } = await import(
358-
/* webpackChunkName: "middleware" */ '../../plugins/middleware'
359-
)
357+
await this.queue.criticalTasks.run(async () => {
358+
const { sourceMiddlewarePlugin } = await import(
359+
/* webpackChunkName: "middleware" */ '../../plugins/middleware'
360+
)
360361

361-
const integrations: Record<string, boolean> = {}
362-
this.queue.plugins.forEach((plugin) => {
363-
if (plugin.type === 'destination') {
364-
return (integrations[plugin.name] = true)
365-
}
362+
const integrations: Record<string, boolean> = {}
363+
this.queue.plugins.forEach((plugin) => {
364+
if (plugin.type === 'destination') {
365+
return (integrations[plugin.name] = true)
366+
}
367+
})
368+
369+
const plugin = sourceMiddlewarePlugin(fn, integrations)
370+
await this.register(plugin)
366371
})
367372

368-
const plugin = sourceMiddlewarePlugin(fn, integrations)
369-
await this.register(plugin)
370373
return this
371374
}
372375

packages/browser/src/core/inspector/__tests__/index.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ describe('Inspector interface', () => {
1919
const deliveryPromise = analytics.track('Test event').catch(() => {})
2020

2121
expect(window.__SEGMENT_INSPECTOR__?.triggered).toHaveBeenCalledTimes(1)
22-
expect(window.__SEGMENT_INSPECTOR__?.enriched).toHaveBeenCalledTimes(1)
2322

2423
expect(window.__SEGMENT_INSPECTOR__?.triggered).toHaveBeenCalledWith(
2524
expect.objectContaining({
@@ -33,6 +32,7 @@ describe('Inspector interface', () => {
3332

3433
await deliveryPromise
3534

35+
expect(window.__SEGMENT_INSPECTOR__?.enriched).toHaveBeenCalledTimes(1)
3636
expect(window.__SEGMENT_INSPECTOR__?.delivered).toHaveBeenCalledTimes(1)
3737
})
3838
})

packages/browser/src/core/queue/__tests__/event-queue.test.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
/* eslint-disable @typescript-eslint/no-floating-promises */
2+
import { noop } from 'lodash'
23
import { Analytics } from '../../analytics'
34
import { pWhile } from '../../../lib/p-while'
45
import * as timer from '../../../lib/priority-queue/backoff'
@@ -9,6 +10,7 @@ import {
910
import { Context, ContextCancelation } from '../../context'
1011
import { Plugin } from '../../plugin'
1112
import { EventQueue } from '../event-queue'
13+
import { pTimeout } from '../../callback'
1214

1315
async function flushAll(eq: EventQueue): Promise<Context[]> {
1416
const flushSpy = jest.spyOn(eq, 'flush')
@@ -164,6 +166,37 @@ describe('Flushing', () => {
164166
expect(eq.queue.getAttempts(fruitBasket)).toEqual(2)
165167
})
166168

169+
test('waits for critical tasks to finish before performing event deliveries', async () => {
170+
jest.useRealTimers()
171+
172+
const eq = new EventQueue()
173+
174+
let finishCriticalTask: () => void = noop
175+
const startTask = () =>
176+
new Promise<void>((res) => (finishCriticalTask = res))
177+
178+
// some preceding events that've been scheduled
179+
const p1 = eq.dispatch(fruitBasket)
180+
const p2 = eq.dispatch(basketView)
181+
// a critical task has been kicked off
182+
eq.criticalTasks.run(startTask)
183+
// a succeeding event
184+
const p3 = eq.dispatch(shopper)
185+
186+
// even after a good amount of time, none of the events should be delivered
187+
await expect(pTimeout(Promise.race([p1, p2, p3]), 1000)).rejects.toThrow()
188+
189+
// give the green light
190+
finishCriticalTask()
191+
192+
// now that the task is complete, the delivery should resume
193+
expect(await Promise.all([p1, p2, p3])).toMatchObject([
194+
fruitBasket,
195+
basketView,
196+
shopper,
197+
])
198+
})
199+
167200
test('delivers events on retry', async () => {
168201
jest.useRealTimers()
169202

packages/browser/src/core/queue/event-queue.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { Context, ContextCancelation } from '../context'
77
import { Emitter } from '@segment/analytics-core'
88
import { Integrations } from '../events'
99
import { Plugin } from '../plugin'
10+
import { createTaskGroup, TaskGroup } from '../task/task-group'
1011
import { attempt, ensure } from './delivery'
1112
import { inspectorHost } from '../inspector'
1213

@@ -18,6 +19,14 @@ type PluginsByType = {
1819
}
1920

2021
export class EventQueue extends Emitter {
22+
/**
23+
* All event deliveries get suspended until all the tasks in this task group are complete.
24+
* For example: a middleware that augments the event object should be loaded safely as a
25+
* critical task, this way, event queue will wait for it to be ready before sending events.
26+
*
27+
* This applies to all the events already in the queue, and the upcoming ones
28+
*/
29+
criticalTasks: TaskGroup = createTaskGroup()
2130
queue: PriorityQueue<Context>
2231
plugins: Plugin[] = []
2332
failedInitializations: string[] = []
@@ -152,6 +161,8 @@ export class EventQueue extends Emitter {
152161
}
153162

154163
private async deliver(ctx: Context): Promise<Context> {
164+
await this.criticalTasks.done()
165+
155166
const start = Date.now()
156167
try {
157168
ctx = await this.flushOne(ctx)
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import { isThenable } from '../../lib/is-thenable'
2+
3+
export type TaskGroup = {
4+
done: () => Promise<void>
5+
run: <Operation extends (...args: any[]) => any>(
6+
op: Operation
7+
) => ReturnType<Operation>
8+
}
9+
10+
export const createTaskGroup = (): TaskGroup => {
11+
let taskCompletionPromise: Promise<void>
12+
let resolvePromise: () => void
13+
let count = 0
14+
15+
return {
16+
done: () => taskCompletionPromise,
17+
run: (op) => {
18+
const returnValue = op()
19+
20+
if (isThenable(returnValue)) {
21+
if (++count === 1) {
22+
taskCompletionPromise = new Promise((res) => (resolvePromise = res))
23+
}
24+
25+
returnValue.finally(() => --count === 0 && resolvePromise())
26+
}
27+
28+
return returnValue
29+
},
30+
}
31+
}

0 commit comments

Comments
 (0)