Skip to content

Commit c846919

Browse files
authored
fix(instrumentation-fetch): do not modify the returned type of fetch (#6521)
1 parent 99fb3fb commit c846919

File tree

3 files changed

+114
-123
lines changed

3 files changed

+114
-123
lines changed

experimental/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ For notes on migrating to 2.x / 0.200.x see [the upgrade guide](doc/upgrade-to-2
2323

2424
* fix(opentelemetry-instrumentation): access `require` via `globalThis` to avoid webpack analysis [#6481](https://github.com/open-telemetry/opentelemetry-js/pull/6481) @overbalance
2525
* fix(sdk-logs): fix inflated `droppedAttributesCount` when updating existing attribute keys [#6479](https://github.com/open-telemetry/opentelemetry-js/pull/6479) @overbalance
26+
* fix(instrumentation-fetch): do not modify the returned type of fetch [#6521](https://github.com/open-telemetry/opentelemetry-js/pull/6521) @dyladan
2627
* fix(opentelemetry-sdk-node): add missing `@opentelemetry/otlp-exporter-base` dependency [#6520](https://github.com/open-telemetry/opentelemetry-js/pull/6520) @gotgenes
2728

2829
### :books: Documentation

experimental/packages/opentelemetry-instrumentation-fetch/src/fetch.ts

Lines changed: 47 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -377,27 +377,6 @@ export class FetchInstrumentation extends InstrumentationBase<FetchInstrumentati
377377
private _patchConstructor(): (original: typeof fetch) => typeof fetch {
378378
return original => {
379379
const plugin = this;
380-
const readOnlyProps = new Set(['url', 'type', 'redirected']);
381-
function createResponseProxy(
382-
target: Response,
383-
originalResponse: Response
384-
): Response {
385-
return new Proxy(target, {
386-
get(t, prop, _receiver) {
387-
if (typeof prop === 'string' && readOnlyProps.has(prop)) {
388-
return Reflect.get(originalResponse, prop);
389-
}
390-
if (prop === 'clone') {
391-
return function clone() {
392-
return createResponseProxy(t.clone(), originalResponse);
393-
};
394-
}
395-
// Use target as receiver so getters (e.g. headers) run with correct this and avoid "Illegal invocation"
396-
const value = Reflect.get(t, prop, t);
397-
return typeof value === 'function' ? value.bind(t) : value;
398-
},
399-
}) as Response;
400-
}
401380

402381
return function patchConstructor(
403382
this: typeof globalThis,
@@ -459,81 +438,20 @@ export class FetchInstrumentation extends InstrumentationBase<FetchInstrumentati
459438
}
460439
}
461440

462-
function withCancelPropagation(
463-
body: ReadableStream<Uint8Array> | null,
464-
readerClone: ReadableStreamDefaultReader<Uint8Array>
465-
): ReadableStream<Uint8Array> | null {
466-
if (!body) return null;
467-
468-
const reader = body.getReader();
469-
470-
return new ReadableStream({
471-
async pull(controller) {
472-
try {
473-
const { value, done } = await reader.read();
474-
if (done) {
475-
reader.releaseLock();
476-
controller.close();
477-
} else {
478-
controller.enqueue(value);
479-
}
480-
} catch (err) {
481-
controller.error(err);
482-
reader.cancel(err).catch(_ => {});
483-
484-
try {
485-
reader.releaseLock();
486-
} catch {
487-
// Spec reference:
488-
// https://streams.spec.whatwg.org/#default-reader-release-lock
489-
//
490-
// releaseLock() only throws if called on an invalid reader
491-
// (i.e. reader.[[stream]] is undefined, meaning the lock is already released
492-
// or the reader was never associated). In normal use this cannot happen.
493-
// This catch is defensive only.
494-
}
495-
}
496-
},
497-
cancel(reason) {
498-
readerClone.cancel(reason).catch(_ => {});
499-
return reader.cancel(reason);
500-
},
501-
});
502-
}
503-
504-
function onSuccess(
505-
span: Span,
506-
resolve: (value: Response | PromiseLike<Response>) => void,
507-
response: Response
508-
): void {
509-
let proxiedResponse: Response | null = null;
510-
441+
function onSuccess(span: Span, response: Response): Response {
511442
try {
443+
// Clone the response and eagerly consume the clone to detect
444+
// when the body transfer completes. The original response is
445+
// returned to the caller untouched so that it passes internal
446+
// brand-checks required by APIs such as
447+
// WebAssembly.compileStreaming.
512448
// TODO: Switch to a consumer-driven model and drop `resClone`.
513449
// Keeping eager consumption here to preserve current behavior and avoid breaking existing tests.
514450
// Context: discussion in PR #5894 → https://github.com/open-telemetry/opentelemetry-js/pull/5894
515451
const resClone = response.clone();
516452
const body = resClone.body;
517453
if (body) {
518454
const reader = body.getReader();
519-
const isNullBodyStatus =
520-
// 101 responses and protocol upgrading is handled internally by the browser
521-
response.status === 204 ||
522-
response.status === 205 ||
523-
response.status === 304;
524-
const wrappedBody = isNullBodyStatus
525-
? null
526-
: withCancelPropagation(response.body, reader);
527-
528-
const newResponse = new Response(wrappedBody, {
529-
status: response.status,
530-
statusText: response.statusText,
531-
headers: response.headers,
532-
});
533-
534-
// Response url, type, and redirected are read-only properties that can't be set via constructor
535-
// Use a Proxy to forward them from the original response and maintain the wrapped body
536-
proxiedResponse = createResponseProxy(newResponse, response);
537455

538456
const read = (): void => {
539457
reader.read().then(
@@ -554,46 +472,55 @@ export class FetchInstrumentation extends InstrumentationBase<FetchInstrumentati
554472
// some older browsers don't have .body implemented
555473
endSpanOnSuccess(span, response);
556474
}
557-
} finally {
558-
resolve(proxiedResponse ?? response);
475+
} catch (error) {
476+
// Setup failed (e.g. clone() or getReader() threw).
477+
// End the span and clean up so _tasksCount doesn't leak.
478+
plugin._diag.error('Failed to read fetch response body', error);
479+
plugin._endSpan(span, spanData, {
480+
status: 0,
481+
url,
482+
});
559483
}
484+
return response;
560485
}
561486

562-
function onError(
563-
span: Span,
564-
reject: (reason?: unknown) => void,
565-
error: FetchError
566-
) {
487+
function onError(span: Span, error: FetchError): never {
567488
try {
568489
endSpanOnError(span, error);
569-
} finally {
570-
reject(error);
490+
} catch (e: unknown) {
491+
// endSpanOnError failed — fall back to ending the span
492+
// directly so _tasksCount doesn't leak.
493+
plugin._diag.error('Failed to end span on fetch error', e);
494+
plugin._endSpan(span, spanData, {
495+
status: error.status || 0,
496+
url,
497+
});
571498
}
499+
// eslint-disable-next-line @typescript-eslint/only-throw-error
500+
throw error;
572501
}
573502

574-
return new Promise((resolve, reject) => {
575-
return context.with(
576-
trace.setSpan(context.active(), createdSpan),
577-
() => {
578-
// Call request hook before injection so hooks cannot tamper with propagation headers.
579-
// Also, this means the hook will see `options.headers` in the same type as passed in,
580-
// rather than as a `Headers` instance set by `_addHeaders()`.
581-
plugin._callRequestHook(createdSpan, options);
582-
plugin._addHeaders(options, url);
583-
plugin._tasksCount++;
584-
585-
return original
586-
.apply(
587-
self,
588-
options instanceof Request ? [options] : [url, options]
589-
)
590-
.then(
591-
onSuccess.bind(self, createdSpan, resolve),
592-
onError.bind(self, createdSpan, reject)
593-
);
594-
}
595-
);
596-
});
503+
return context.with(
504+
trace.setSpan(context.active(), createdSpan),
505+
() => {
506+
// Call request hook before injection so hooks cannot tamper with propagation headers.
507+
// Also, this means the hook will see `options.headers` in the same type as passed in,
508+
// rather than as a `Headers` instance set by `_addHeaders()`.
509+
plugin._callRequestHook(createdSpan, options);
510+
plugin._addHeaders(options, url);
511+
plugin._tasksCount++;
512+
513+
return original
514+
.apply(
515+
self,
516+
options instanceof Request ? [options] : [url, options]
517+
)
518+
.then(
519+
onSuccess.bind(self, createdSpan),
520+
onError.bind(self, createdSpan)
521+
);
522+
}
523+
);
597524
};
598525
};
599526
}

experimental/packages/opentelemetry-instrumentation-fetch/test/fetch.test.ts

Lines changed: 66 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,35 @@ describe('fetch', () => {
236236
// Avoids ERROR in the logs when calling `disable()` again during cleanup
237237
fetchInstrumentation = undefined;
238238
});
239+
240+
it('should return a Promise<Response> compatible with WebAssembly.compileStreaming', async () => {
241+
// Some web APIs do brand checks to ensure they are working with native objects.
242+
// compileStreaming checks that the argument is a native Response, and will throw if it isn't.
243+
244+
// Minimal valid WASM binary: magic number + version only.
245+
const wasmBytes = new Uint8Array([
246+
0x00, 0x61, 0x73, 0x6d, 0x01, 0x00, 0x00, 0x00,
247+
]);
248+
249+
await startWorker(
250+
msw.http.get(`${ORIGIN}/test.wasm`, () => {
251+
return new msw.HttpResponse(wasmBytes, {
252+
headers: { 'Content-Type': 'application/wasm' },
253+
});
254+
})
255+
);
256+
257+
fetchInstrumentation = new FetchInstrumentation();
258+
assert.ok(isWrapped(window.fetch));
259+
260+
// WebAssembly.compileStreaming requires a native Response from fetch.
261+
const module = await WebAssembly.compileStreaming(
262+
fetch(`${ORIGIN}/test.wasm`)
263+
);
264+
assert.ok(module instanceof WebAssembly.Module);
265+
266+
await waitFor(OBSERVER_WAIT_TIME_MS + 50);
267+
});
239268
});
240269

241270
describe('instrumentation', () => {
@@ -2764,13 +2793,47 @@ describe('fetch', () => {
27642793
});
27652794

27662795
describe('when client cancels the reader', () => {
2767-
it('should cancel stream and release the connection', async () => {
2768-
const { response } = await tracedFetch();
2796+
it('should end the span when the stream completes', async () => {
2797+
// Use a short stream (3 chunks) so the clone finishes quickly
2798+
// after the consumer cancels. The instrumentation tracks
2799+
// completion via an eagerly-consumed clone; consumer-side
2800+
// cancellation does not propagate to the clone.
2801+
const shortStreamHandler = () => {
2802+
const encoder = new TextEncoder();
2803+
return msw.http.get('/api/stream', () => {
2804+
let count = 0;
2805+
const stream = new ReadableStream<Uint8Array>({
2806+
start(controller) {
2807+
timer = setInterval(() => {
2808+
if (count >= 3) {
2809+
clearInterval(timer);
2810+
controller.close();
2811+
return;
2812+
}
2813+
count += 1;
2814+
controller.enqueue(encoder.encode(`data: ${count}\n`));
2815+
}, 20);
2816+
},
2817+
});
2818+
return new msw.HttpResponse(stream, {
2819+
status: 200,
2820+
headers: {
2821+
'Content-Type': 'text/plain; charset=utf-8',
2822+
'Cache-Control': 'no-cache',
2823+
Connection: 'keep-alive',
2824+
},
2825+
});
2826+
});
2827+
};
2828+
2829+
const { response } = await tracedFetch({
2830+
handlers: [shortStreamHandler()],
2831+
});
27692832

27702833
const timeoutPromise = new Promise((_, reject) => {
27712834
setTimeout(() => {
27722835
reject(new Error('trace should finish before timeout'));
2773-
}, 1000);
2836+
}, 2000);
27742837
});
27752838

27762839
// Read the first chunk to confirm the stream is live

0 commit comments

Comments
 (0)