Skip to content
This repository was archived by the owner on Mar 3, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2100,6 +2100,10 @@ class File extends ServiceObject<File, FileMetadata> {

const emitStream = new PassThroughShim();

// If `writeStream` is destroyed before the `writing` event, `emitStream` will not have any listeners. This prevents an unhandled error.
const noop = () => {};
emitStream.on('error', noop);

let hashCalculatingStream: HashStreamValidator | null = null;

if (crc32c || md5) {
Expand Down Expand Up @@ -2138,6 +2142,9 @@ class File extends ServiceObject<File, FileMetadata> {
this.startResumableUpload_(fileWriteStream, options);
}

// remove temporary noop listener as we now create a pipeline that handles the errors
emitStream.removeListener('error', noop);

pipeline(
emitStream,
...(transformStreams as [Transform]),
Expand Down
70 changes: 69 additions & 1 deletion test/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,14 @@ import {
} from '../src/nodejs-common/index.js';
import {describe, it, before, beforeEach, afterEach} from 'mocha';
import {PromisifyAllOptions} from '@google-cloud/promisify';
import {Readable, PassThrough, Stream, Duplex, Transform} from 'stream';
import {
Readable,
PassThrough,
Stream,
Duplex,
Transform,
pipeline,
} from 'stream';
import assert from 'assert';
import * as crypto from 'crypto';
import duplexify from 'duplexify';
Expand Down Expand Up @@ -2281,6 +2288,67 @@ describe('File', () => {
writable.end('data');
});

it('should close upstream when pipeline fails', done => {
const writable: Stream.Writable = file.createWriteStream();
const error = new Error('My error');
const uploadStream = new PassThrough();

let receivedBytes = 0;
const validateStream = new PassThrough();
validateStream.on('data', (chunk: Buffer) => {
receivedBytes += chunk.length;
if (receivedBytes > 5) {
// this aborts the pipeline which should also close the internal pipeline within createWriteStream
pLine.destroy(error);
}
});

file.startResumableUpload_ = (dup: duplexify.Duplexify) => {
dup.setWritable(uploadStream);
// Emit an error so the pipeline's error-handling logic is triggered
uploadStream.emit('error', error);
// Explicitly destroy the stream so that the 'close' event is guaranteed to fire,
// even in Node v14 where autoDestroy defaults may prevent automatic closing
uploadStream.destroy();
};

let closed = false;
uploadStream.on('close', () => {
closed = true;
});

const pLine = pipeline(
(function* () {
yield 'foo'; // write some data
yield 'foo'; // write some data
yield 'foo'; // write some data
})(),
validateStream,
writable,
(e: Error | null) => {
assert.strictEqual(e, error);
assert.strictEqual(closed, true);
done();
}
);
});

it('should error pipeline if source stream emits error before any data', done => {
const writable = file.createWriteStream();
const error = new Error('Error before first chunk');
pipeline(
// eslint-disable-next-line require-yield
(function* () {
throw error;
})(),
writable,
(e: Error | null) => {
assert.strictEqual(e, error);
done();
}
);
});

describe('validation', () => {
const data = 'test';

Expand Down
Loading