Skip to content

Commit 9ba3889

Browse files
chrisradeksilesky
andauthored
Adds batching support to AnalyticsNode (#640)
* add buffered segmentio node plugin * adds batching node plugin * integrate node batching plugin with AnalyticsNode * update tests for new plugin * move PublishingProps alongside Publisher and alias ConfigureNodePluginProps * treat fetch !response.ok as error when setting delivery failure reason * remove retry attempt count from ContextBatch * remove unused import from http-integration.test.ts * remove extract-promise-parts from analytics-next * delete no longer needed file * simplify errors * add optional data object * fix test * change `it` tests back to `test` Co-authored-by: Seth Silesky <5115498+silesky@users.noreply.github.com>
1 parent fcbab94 commit 9ba3889

File tree

18 files changed

+946
-134
lines changed

18 files changed

+946
-134
lines changed

packages/browser/src/browser/__tests__/analytics-pre-init.integration.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import unfetch from 'unfetch'
33
import { Analytics } from '../../core/analytics'
44
import { Context } from '../../core/context'
55
import * as Factory from '../../test-helpers/factories'
6-
import { sleep } from '../../test-helpers/sleep'
6+
import { sleep } from '../../lib/sleep'
77
import { setGlobalCDNUrl } from '../../lib/parse-cdn'
88
import { User } from '../../core/user'
99

packages/browser/src/browser/__tests__/standalone-analytics.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { snippet } from '../../tester/__fixtures__/segment-snippet'
55
import { install, AnalyticsStandalone } from '../standalone-analytics'
66
import unfetch from 'unfetch'
77
import { PersistedPriorityQueue } from '../../lib/priority-queue/persisted'
8-
import { sleep } from '../../test-helpers/sleep'
8+
import { sleep } from '../../lib/sleep'
99
import * as Factory from '../../test-helpers/factories'
1010

1111
const track = jest.fn()

packages/browser/src/core/buffer/__tests__/index.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import {
77
} from '..'
88
import { Analytics } from '../../analytics'
99
import { Context } from '../../context'
10-
import { sleep } from '@/test-helpers/sleep'
10+
import { sleep } from '@/lib/sleep'
1111
import { User } from '../../user'
1212

1313
describe('PreInitMethodCallBuffer', () => {
File renamed without changes.

packages/core/src/emitter/interface.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ export type EmittedDeliveryFailureError<Ctx extends CoreContext> = {
1111
code: 'delivery_failure'
1212
message: string
1313
ctx: Ctx
14+
data?: any
1415
}
1516

1617
/**

packages/core/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ export * from './events/interfaces'
66
export * from './events'
77
export * from './callback'
88
export * from './priority-queue'
9+
export { backoff } from './priority-queue/backoff'
910
export * from './context'
1011
export * from './queue/event-queue'
1112
export * from './analytics'

packages/node/src/__tests__/graceful-shutdown-integration.test.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { createSuccess } from './test-helpers/factories'
22

3-
const fetcher = jest.fn().mockReturnValue(createSuccess())
3+
const fetcher = jest.fn()
44
jest.mock('node-fetch', () => fetcher)
55

66
import { AnalyticsNode, NodeSegmentEvent } from '../app/analytics-node'
@@ -19,8 +19,12 @@ describe('Ability for users to exit without losing events', () => {
1919
let ajs!: AnalyticsNode
2020
beforeEach(async () => {
2121
jest.resetAllMocks()
22+
fetcher.mockReturnValue(createSuccess())
2223
ajs = new AnalyticsNode({
2324
writeKey: 'abc123',
25+
batchSettings: {
26+
maxEventsInBatch: 1,
27+
},
2428
})
2529
})
2630
const _helpers = {

packages/node/src/__tests__/http-integration.test.ts

Lines changed: 39 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ jest.mock('node-fetch', () => fetcher)
33

44
import { createSuccess } from './test-helpers/factories'
55
import { AnalyticsNode } from '..'
6-
import { NodeSegmentEvent } from '../app/analytics-node'
76
import { resolveCtx } from './test-helpers/resolve-ctx'
87

98
const myDate = new Date('2016')
@@ -35,7 +34,7 @@ describe('Analytics Node', () => {
3534
const call1 = calls[0]
3635
const [url, httpRes] = call1
3736
expect(httpRes.method).toBe('POST')
38-
expect(url).toBe('https://api.segment.io/v1/identify')
37+
expect(url).toBe('https://api.segment.io/v1/batch')
3938
expect(httpRes.headers).toMatchInlineSnapshot(
4039
{
4140
Authorization: expect.stringContaining('Basic'),
@@ -49,39 +48,47 @@ describe('Analytics Node', () => {
4948
}
5049
`
5150
)
52-
const body: NodeSegmentEvent = JSON.parse(httpRes.body)
51+
const body = JSON.parse(httpRes.body)
5352
expect(body).toMatchInlineSnapshot(
5453
{
55-
messageId: expect.any(String),
56-
_metadata: {
57-
nodeVersion: expect.any(String),
58-
},
54+
batch: [
55+
{
56+
messageId: expect.any(String),
57+
_metadata: {
58+
nodeVersion: expect.any(String),
59+
},
5960

60-
context: {
61-
library: {
62-
version: expect.any(String),
61+
context: {
62+
library: {
63+
version: expect.any(String),
64+
},
65+
},
6366
},
64-
},
67+
],
6568
},
6669
`
6770
Object {
68-
"_metadata": Object {
69-
"nodeVersion": Any<String>,
70-
},
71-
"context": Object {
72-
"library": Object {
73-
"name": "analytics-node-next",
74-
"version": Any<String>,
71+
"batch": Array [
72+
Object {
73+
"_metadata": Object {
74+
"nodeVersion": Any<String>,
75+
},
76+
"context": Object {
77+
"library": Object {
78+
"name": "AnalyticsNode",
79+
"version": Any<String>,
80+
},
81+
},
82+
"integrations": Object {},
83+
"messageId": Any<String>,
84+
"timestamp": "2016-01-01T00:00:00.000Z",
85+
"traits": Object {
86+
"foo": "bar",
87+
},
88+
"type": "identify",
89+
"userId": "my_user_id",
7590
},
76-
},
77-
"integrations": Object {},
78-
"messageId": Any<String>,
79-
"timestamp": "2016-01-01T00:00:00.000Z",
80-
"traits": Object {
81-
"foo": "bar",
82-
},
83-
"type": "identify",
84-
"userId": "my_user_id",
91+
],
8592
}
8693
`
8794
)
@@ -92,7 +99,7 @@ describe('Analytics Node', () => {
9299
ajs.track({ event: 'bar', userId: 'foo', properties: { foo: 'bar' } })
93100
await resolveCtx(ajs, 'track')
94101
expect(fetcher).toHaveBeenCalledWith(
95-
'https://api.segment.io/v1/track',
102+
'https://api.segment.io/v1/batch',
96103
expect.anything()
97104
)
98105
})
@@ -101,7 +108,7 @@ describe('Analytics Node', () => {
101108
ajs.page({ name: 'page', anonymousId: 'foo' })
102109
await resolveCtx(ajs, 'page')
103110
expect(fetcher).toHaveBeenCalledWith(
104-
'https://api.segment.io/v1/page',
111+
'https://api.segment.io/v1/batch',
105112
expect.anything()
106113
)
107114
})
@@ -110,7 +117,7 @@ describe('Analytics Node', () => {
110117
ajs.group({ groupId: 'group', anonymousId: 'foo' })
111118
await resolveCtx(ajs, 'group')
112119
expect(fetcher).toHaveBeenCalledWith(
113-
'https://api.segment.io/v1/group',
120+
'https://api.segment.io/v1/batch',
114121
expect.anything()
115122
)
116123
})
@@ -119,7 +126,7 @@ describe('Analytics Node', () => {
119126
ajs.alias({ userId: 'alias', previousId: 'previous' })
120127
await resolveCtx(ajs, 'alias')
121128
expect(fetcher).toHaveBeenCalledWith(
122-
'https://api.segment.io/v1/alias',
129+
'https://api.segment.io/v1/batch',
123130
expect.anything()
124131
)
125132
})
@@ -128,7 +135,7 @@ describe('Analytics Node', () => {
128135
ajs.screen({ name: 'screen', anonymousId: 'foo' })
129136
await resolveCtx(ajs, 'screen')
130137
expect(fetcher).toHaveBeenCalledWith(
131-
'https://api.segment.io/v1/screen',
138+
'https://api.segment.io/v1/batch',
132139
expect.anything()
133140
)
134141
})

packages/node/src/__tests__/integration.test.ts

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { testPlugin } from './test-helpers/test-plugin'
88
import { createSuccess, createError } from './test-helpers/factories'
99

1010
const writeKey = 'foo'
11+
jest.setTimeout(10000)
1112

1213
beforeEach(() => {
1314
jest.resetAllMocks()
@@ -22,7 +23,7 @@ describe('Initialization', () => {
2223
await analytics.ready
2324

2425
const ajsNodeXt = analytics.queue.plugins.find(
25-
(xt) => xt.name === 'analytics-node-next'
26+
(xt) => xt.name === 'Segment.io'
2627
)
2728
expect(ajsNodeXt).toBeDefined()
2829
expect(ajsNodeXt?.isLoaded()).toBeTruthy()
@@ -46,25 +47,25 @@ describe('Error handling', () => {
4647
expect(() => analytics.track({} as any)).toThrowError(/event/i)
4748
})
4849

49-
test('http delivery errors are accessed through the emitter', (done) => {
50+
it('should emit on an error', async () => {
5051
const analytics = new AnalyticsNode({
5152
writeKey,
53+
batchSettings: {
54+
maxAttempts: 1,
55+
},
5256
})
5357
fetcher.mockReturnValue(
5458
createError({ statusText: 'Service Unavailable', status: 503 })
5559
)
56-
57-
analytics.track({ event: 'foo', userId: 'sup' })
58-
analytics.on('error', (emittedErr) => {
59-
if (emittedErr.code !== 'http_delivery') {
60-
return done.fail('error code incorrect')
61-
}
62-
expect(emittedErr.message).toMatch(/segment/)
63-
expect(emittedErr.code).toMatch(/http/)
64-
expect(emittedErr.response.status).toBe(503)
65-
done()
66-
})
67-
expect.assertions(3)
60+
try {
61+
const promise = resolveCtx(analytics, 'track')
62+
analytics.track({ event: 'foo', userId: 'sup' })
63+
await promise
64+
throw new Error('fail')
65+
} catch (err: any) {
66+
expect(err.message).toMatch(/fail/)
67+
expect(err.code).toMatch(/delivery_failure/)
68+
}
6869
})
6970
})
7071

packages/node/src/app/analytics-node.ts

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,8 @@ import {
1717
pTimeout,
1818
} from '@segment/analytics-core'
1919
import { AnalyticsNodeSettings, validateSettings } from './settings'
20-
import { analyticsNode } from './plugin'
21-
2220
import { version } from '../../package.json'
23-
import { NodeEmittedError } from './emitted-errors'
21+
import { configureNodePlugin } from '../plugins/segmentio'
2422

2523
// create a derived class since we may want to add node specific things to Context later
2624
export class NodeContext extends CoreContext {}
@@ -41,7 +39,7 @@ export interface NodeSegmentEventOptions {
4139
/**
4240
* Map of emitter event names to method args.
4341
*/
44-
type NodeEmitterEvents = CoreEmitterContract<NodeContext, NodeEmittedError> & {
42+
type NodeEmitterEvents = CoreEmitterContract<NodeContext> & {
4543
initialize: [AnalyticsNodeSettings]
4644
call_after_close: [NodeSegmentEvent] // any event that did not get dispatched due to close
4745
drained: []
@@ -85,9 +83,15 @@ export class AnalyticsNode
8583
validateSettings(settings)
8684
this._eventFactory = new EventFactory()
8785
this.queue = new EventQueue(new NodePriorityQueue(3))
86+
const batchSettings = settings.batchSettings || {}
8887

8988
this.ready = this.register(
90-
analyticsNode({ writeKey: settings.writeKey }, this)
89+
configureNodePlugin({
90+
writeKey: settings.writeKey,
91+
maxAttempts: batchSettings.maxAttempts ?? 4,
92+
maxEventsInBatch: batchSettings.maxEventsInBatch ?? 15,
93+
maxWaitTimeInMs: batchSettings.maxWaitTimeInMs ?? 1000,
94+
})
9195
)
9296
.then(() => undefined)
9397
.catch((err) => {

0 commit comments

Comments
 (0)