Skip to content
This repository was archived by the owner on Mar 3, 2026. It is now read-only.

Commit 15825ff

Browse files
committed
fix(gcs-resumable-upload): Stop Duplicate Response Handlers on Retries
Porting fix from googleapis/gcs-resumable-upload#502
1 parent a6b78f5 commit 15825ff

2 files changed

Lines changed: 34 additions & 29 deletions

File tree

src/gcs-resumable-upload/index.ts

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -767,12 +767,6 @@ export class Upload extends Pumpify {
767767
},
768768
});
769769

770-
// This should be 'once' as `startUploading` can be called again for
771-
// multi chunk uploads and each request would have its own response.
772-
this.once('response', resp => {
773-
responseReceived = true;
774-
this.responseHandler(resp);
775-
});
776770
let headers: GaxiosOptions['headers'] = {};
777771

778772
// If using multiple chunk upload, set appropriate header
@@ -797,7 +791,11 @@ export class Upload extends Pumpify {
797791
};
798792

799793
try {
800-
await this.makeRequestStream(reqOpts);
794+
const resp = await this.makeRequestStream(reqOpts);
795+
if (resp) {
796+
responseReceived = true;
797+
this.responseHandler(resp);
798+
}
801799
} catch (err) {
802800
const e = err as Error;
803801
this.destroy(e);
@@ -989,7 +987,7 @@ export class Upload extends Pumpify {
989987
return res;
990988
}
991989

992-
private async makeRequestStream(reqOpts: GaxiosOptions): GaxiosPromise {
990+
private async makeRequestStream(reqOpts: GaxiosOptions) {
993991
const controller = new AbortController();
994992
const errorCallback = () => controller.abort();
995993
this.once('error', errorCallback);
@@ -1008,10 +1006,10 @@ export class Upload extends Pumpify {
10081006
reqOpts
10091007
);
10101008
const res = await this.authClient.request(combinedReqOpts);
1011-
this.onResponse(res);
1009+
const successfulRequest = this.onResponse(res);
10121010
this.removeListener('error', errorCallback);
10131011

1014-
return res;
1012+
return successfulRequest ? res : null;
10151013
}
10161014

10171015
private restart() {

test/gcs-resumable-upload.ts

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import * as mockery from 'mockery';
1919
import * as nock from 'nock';
2020
import * as path from 'path';
2121
import * as sinon from 'sinon';
22-
import {PassThrough, Readable} from 'stream';
22+
import {Readable} from 'stream';
2323

2424
import {
2525
ApiError,
@@ -906,7 +906,7 @@ describe('gcs-resumable-upload', () => {
906906

907907
describe('#startUploading', () => {
908908
beforeEach(() => {
909-
up.makeRequestStream = async () => new PassThrough();
909+
up.makeRequestStream = async () => null;
910910
up.upstreamChunkBuffer = Buffer.alloc(16);
911911
});
912912

@@ -978,14 +978,6 @@ describe('gcs-resumable-upload', () => {
978978
up.startUploading();
979979
});
980980

981-
it("should setup a 'response' listener", async () => {
982-
assert.equal(up.eventNames().includes('response'), false);
983-
984-
await up.startUploading();
985-
986-
assert.equal(up.eventNames().includes('response'), true);
987-
});
988-
989981
it('should destroy the stream if the request failed', done => {
990982
const error = new Error('Error.');
991983
up.on('error', (e: Error) => {
@@ -1697,14 +1689,27 @@ describe('gcs-resumable-upload', () => {
16971689
up.makeRequestStream(REQ_OPTS);
16981690
});
16991691

1700-
it('should return the response', async () => {
1701-
const response = {};
1692+
it('should return the response if successful', async () => {
1693+
const response = {some: 'response'};
17021694
up.authClient = {
17031695
request: async () => response,
17041696
};
1697+
up.onResponse = () => true;
1698+
17051699
const stream = await up.makeRequestStream(REQ_OPTS);
17061700
assert.strictEqual(stream, response);
17071701
});
1702+
1703+
it('should return `null` if the response is unsuccessful', async () => {
1704+
const response = {some: 'response'};
1705+
up.authClient = {
1706+
request: async () => response,
1707+
};
1708+
up.onResponse = () => false;
1709+
1710+
const stream = await up.makeRequestStream(REQ_OPTS);
1711+
assert.strictEqual(stream, null);
1712+
});
17081713
});
17091714

17101715
describe('#restart', () => {
@@ -2261,7 +2266,7 @@ describe('gcs-resumable-upload', () => {
22612266
let dataReceived = 0;
22622267
let chunkWritesInRequest = 0;
22632268

2264-
await new Promise(resolve => {
2269+
const res = await new Promise(resolve => {
22652270
opts.body.on('data', (data: Buffer) => {
22662271
dataReceived += data.byteLength;
22672272
overallDataReceived += data.byteLength;
@@ -2271,14 +2276,16 @@ describe('gcs-resumable-upload', () => {
22712276
opts.body.on('end', () => {
22722277
requests.push({dataReceived, opts, chunkWritesInRequest});
22732278

2274-
up.emit('response', {
2279+
resolve({
22752280
status: 200,
22762281
data: {},
22772282
});
22782283

22792284
resolve(null);
22802285
});
22812286
});
2287+
2288+
return res;
22822289
};
22832290

22842291
up.on('error', done);
@@ -2400,7 +2407,7 @@ describe('gcs-resumable-upload', () => {
24002407
let dataReceived = 0;
24012408
let chunkWritesInRequest = 0;
24022409

2403-
await new Promise(resolve => {
2410+
const res = await new Promise(resolve => {
24042411
opts.body.on('data', (data: Buffer) => {
24052412
dataReceived += data.byteLength;
24062413
overallDataReceived += data.byteLength;
@@ -2415,23 +2422,23 @@ describe('gcs-resumable-upload', () => {
24152422
? overallDataReceived - 1
24162423
: 0;
24172424

2418-
up.emit('response', {
2425+
resolve({
24192426
status: RESUMABLE_INCOMPLETE_STATUS_CODE,
24202427
headers: {
24212428
range: `bytes=0-${lastByteReceived}`,
24222429
},
24232430
data: {},
24242431
});
24252432
} else {
2426-
up.emit('response', {
2433+
resolve({
24272434
status: 200,
24282435
data: {},
24292436
});
24302437
}
2431-
2432-
resolve(null);
24332438
});
24342439
});
2440+
2441+
return res;
24352442
};
24362443

24372444
up.on('error', done);

0 commit comments

Comments
 (0)