Skip to content

Commit 429e05c

Browse files
authored
fix(sse): respect request.signal for the underlying stream (#2718)
1 parent 41f725d commit 429e05c

2 files changed

Lines changed: 249 additions & 56 deletions

File tree

src/core/sse.ts

Lines changed: 157 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { invariant } from 'outvariant'
2+
import { DeferredPromise } from '@open-draft/deferred-promise'
23
import { Emitter } from 'strict-event-emitter'
34
import type { ResponseResolver } from './handlers/RequestHandler'
45
import {
@@ -89,26 +90,19 @@ class ServerSentEventHandler<
8990
)
9091

9192
super('GET', path, async (info) => {
92-
const stream = new ReadableStream({
93-
start: async (controller) => {
94-
const client = new ServerSentEventClient<EventMap>({
95-
controller,
96-
emitter: this.#emitter,
97-
})
98-
const server = new ServerSentEventServer({
99-
request: info.request,
100-
client,
101-
})
102-
103-
await resolver({
104-
...info,
105-
client,
106-
server,
107-
})
108-
},
93+
const { client, server, response } = createEventStream<EventMap>(
94+
info.request,
95+
)
96+
97+
client[kClientEmitter] = this.#emitter
98+
99+
await resolver({
100+
...info,
101+
client,
102+
server,
109103
})
110104

111-
return new Response(stream, SSE_RESPONSE_INIT)
105+
return response
112106
})
113107

114108
this.#emitter = new Emitter<ServerSentEventClientEventMap>()
@@ -222,32 +216,24 @@ type ToEventDiscriminatedUnion<T> = Values<{
222216
}>
223217

224218
type ServerSentEventClientEventMap = {
225-
message: [
226-
payload: {
227-
id?: string
228-
event: string
229-
data?: unknown
230-
frames: Array<string>
231-
},
232-
]
219+
message: [payload: EventStreamMessage]
233220
error: []
234221
close: []
235222
}
236223

224+
const kClientEmitter = Symbol.for('kClientEmitter')
225+
237226
class ServerSentEventClient<
238227
EventMap extends EventMapConstraint = { message: unknown },
239228
> {
229+
private [kClientEmitter]?: Emitter<ServerSentEventClientEventMap>
230+
240231
#encoder: TextEncoder
241-
#controller: ReadableStreamDefaultController
242-
#emitter: Emitter<ServerSentEventClientEventMap>
232+
#writer: WritableStreamDefaultWriter
243233

244-
constructor(args: {
245-
controller: ReadableStreamDefaultController
246-
emitter: Emitter<ServerSentEventClientEventMap>
247-
}) {
234+
constructor(writable: WritableStream) {
248235
this.#encoder = new TextEncoder()
249-
this.#controller = args.controller
250-
this.#emitter = args.emitter
236+
this.#writer = writable.getWriter()
251237
}
252238

253239
/**
@@ -303,22 +289,37 @@ class ServerSentEventClient<
303289
* error.
304290
*/
305291
public error(): void {
306-
this.#controller.error()
307-
this.#emitter.emit('error')
292+
this.#writer.abort().catch((error) => {
293+
console.error(error)
294+
devUtils.error(
295+
'Failed to abort server-side EventSource. Please see the original error above.',
296+
)
297+
})
298+
this[kClientEmitter]?.emit('error')
308299
}
309300

310301
/**
311302
* Closes the underlying `EventSource`, closing the connection.
312303
*/
313304
public close(): void {
314-
this.#controller.close()
315-
this.#emitter.emit('close')
305+
this.#writer.close().catch((error) => {
306+
console.error(error)
307+
devUtils.error(
308+
'Failed to close server-side EventSource. Please see the original error above.',
309+
)
310+
})
311+
this[kClientEmitter]?.emit('close')
316312
}
317313

318314
#sendRetry(retry: number): void {
319-
if (typeof retry === 'number') {
320-
this.#controller.enqueue(this.#encoder.encode(`retry:${retry}\n\n`))
321-
}
315+
this.#writer
316+
.write(this.#encoder.encode(`retry:${retry}\n\n`))
317+
.catch((error) => {
318+
console.error(error)
319+
devUtils.error(
320+
'Failed to send a retry packet to server-side EventSource. Please see the original error above.',
321+
)
322+
})
322323
}
323324

324325
#sendMessage(message: {
@@ -349,9 +350,16 @@ class ServerSentEventClient<
349350

350351
frames.push('', '')
351352

352-
this.#controller.enqueue(this.#encoder.encode(frames.join('\n')))
353+
this.#writer
354+
.write(this.#encoder.encode(frames.join('\n')))
355+
.catch((error) => {
356+
console.error(error)
357+
devUtils.error(
358+
'Failed to send a message to server-side EventSource. Please see the original error above.',
359+
)
360+
})
353361

354-
this.#emitter.emit('message', {
362+
this[kClientEmitter]?.emit('message', {
355363
id: message.id,
356364
event: message.event?.toString() || 'message',
357365
data: message.data,
@@ -383,6 +391,7 @@ class ServerSentEventServer {
383391
*/
384392
accept: 'msw/passthrough',
385393
},
394+
signal: this.#request.signal,
386395
})
387396

388397
source[kOnAnyMessage] = (event) => {
@@ -429,6 +438,7 @@ class ServerSentEventServer {
429438

430439
interface ObservableEventSourceInit extends EventSourceInit {
431440
headers?: HeadersInit
441+
signal?: AbortSignal
432442
}
433443

434444
type EventHandler<EventType extends Event> = (
@@ -489,6 +499,18 @@ class ObservableEventSource extends EventTarget implements EventSource {
489499
signal: this[kAbortController].signal,
490500
})
491501

502+
if (init?.signal) {
503+
if (init.signal.aborted) {
504+
this.close()
505+
return
506+
}
507+
508+
init.signal.addEventListener('abort', () => this.close(), {
509+
once: true,
510+
signal: this[kAbortController].signal,
511+
})
512+
}
513+
492514
this.connect()
493515
}
494516

@@ -518,7 +540,7 @@ class ObservableEventSource extends EventTarget implements EventSource {
518540
get onerror(): EventHandler<Event> | null {
519541
return this[kOnError]
520542
}
521-
set oneerror(handler: EventHandler<Event>) {
543+
set onerror(handler: EventHandler<Event>) {
522544
if (this[kOnError]) {
523545
this.removeEventListener('error', { handleEvent: this[kOnError] })
524546
}
@@ -642,10 +664,6 @@ class ObservableEventSource extends EventTarget implements EventSource {
642664
this[kLastEventId] = message.id
643665
}
644666

645-
if (message.retry) {
646-
this[kReconnectionTime] = message.retry
647-
}
648-
649667
const messageEvent = new MessageEvent(
650668
message.event ? message.event : 'message',
651669
{
@@ -659,6 +677,9 @@ class ObservableEventSource extends EventTarget implements EventSource {
659677
this[kOnAnyMessage]?.(messageEvent)
660678
this.dispatchEvent(messageEvent)
661679
},
680+
retry: (reconnectionTime) => {
681+
this[kReconnectionTime] = reconnectionTime
682+
},
662683
abort: () => {
663684
throw new Error('Stream abort is not implemented')
664685
},
@@ -693,7 +714,25 @@ class ObservableEventSource extends EventTarget implements EventSource {
693714
this.dispatchEvent(new Event('error'))
694715
})
695716

696-
await delay(this[kReconnectionTime])
717+
const signal = this[kAbortController].signal
718+
719+
if (signal.aborted) {
720+
return
721+
}
722+
723+
const aborted = new DeferredPromise<void>()
724+
const onAbort = () => aborted.resolve()
725+
signal.addEventListener('abort', onAbort, { once: true })
726+
727+
await Promise.race([delay(this[kReconnectionTime]), aborted]).finally(
728+
() => {
729+
signal.removeEventListener('abort', onAbort)
730+
},
731+
)
732+
733+
if (signal.aborted) {
734+
return
735+
}
697736

698737
queueMicrotask(async () => {
699738
if (this.readyState !== this.CONNECTING) {
@@ -743,7 +782,6 @@ interface EventSourceMessage {
743782
id?: string
744783
event?: string
745784
data?: string
746-
retry?: number
747785
}
748786

749787
class EventSourceParsingStream extends WritableStream {
@@ -758,12 +796,12 @@ class EventSourceParsingStream extends WritableStream {
758796
id: undefined,
759797
event: undefined,
760798
data: undefined,
761-
retry: undefined,
762799
}
763800

764801
constructor(
765802
private underlyingSink: {
766803
message: (message: EventSourceMessage) => void
804+
retry?: (reconnectionTime: number) => void
767805
abort?: (reason: any) => void
768806
close?: () => void
769807
},
@@ -789,7 +827,6 @@ class EventSourceParsingStream extends WritableStream {
789827
id: undefined,
790828
event: undefined,
791829
data: undefined,
792-
retry: undefined,
793830
}
794831
}
795832

@@ -903,14 +940,78 @@ class EventSourceParsingStream extends WritableStream {
903940
}
904941

905942
case 'retry': {
906-
const retry = parseInt(value, 10)
907-
908-
if (!isNaN(retry)) {
909-
this.message.retry = retry
943+
/**
944+
* Apply the retry immediately. Don't buffer onto the current message.
945+
* @see https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation
946+
*/
947+
if (/^\d+$/.test(value)) {
948+
this.underlyingSink.retry?.(parseInt(value, 10))
910949
}
911950
break
912951
}
913952
}
914953
}
915954
}
916955
}
956+
957+
interface EventStreamMessage {
958+
id?: string
959+
event: string
960+
data?: unknown
961+
frames: Array<string>
962+
}
963+
964+
interface EventStream<EventMap extends EventMapConstraint> {
965+
client: ServerSentEventClient<EventMap>
966+
server: ServerSentEventServer
967+
response: Response
968+
}
969+
970+
/**
971+
* Create an event stream out of the given Fetch API `Request`.
972+
* Returns the following properties:
973+
* - `client`, to operate on the intercepted request;
974+
* - `server`, to establish and manage the actual server connection;
975+
* - `response`, a `Response` to use as the mock response's body.
976+
*
977+
* @example
978+
* http.post('/resource', ({ request }) => {
979+
* const { client, server, response } = createEventStream(request)
980+
* client.send({ data: 'hello world' })
981+
* return response
982+
* })
983+
*/
984+
function createEventStream<EventMap extends EventMapConstraint>(
985+
request: Request,
986+
): EventStream<EventMap> {
987+
invariant(
988+
!request.signal.aborted,
989+
'Failed to call "createEventStream" on the "%s %s" request: request aborted',
990+
request.method,
991+
request.url,
992+
)
993+
994+
const { readable, writable } = new TransformStream()
995+
996+
const client = new ServerSentEventClient<EventMap>(writable)
997+
const server = new ServerSentEventServer({
998+
request,
999+
client,
1000+
})
1001+
1002+
const response = new Response(readable, SSE_RESPONSE_INIT)
1003+
1004+
request.signal.addEventListener(
1005+
'abort',
1006+
() => {
1007+
client.close()
1008+
},
1009+
{ once: true },
1010+
)
1011+
1012+
return {
1013+
client,
1014+
server,
1015+
response,
1016+
}
1017+
}

0 commit comments

Comments
 (0)