@@ -23,7 +23,14 @@ import {
2323} from '../src/nodejs-common/index.js' ;
2424import { describe , it , before , beforeEach , afterEach } from 'mocha' ;
2525import { PromisifyAllOptions } from '@google-cloud/promisify' ;
26- import { Readable , PassThrough , Stream , Duplex , Transform } from 'stream' ;
26+ import {
27+ Readable ,
28+ PassThrough ,
29+ Stream ,
30+ Duplex ,
31+ Transform ,
32+ pipeline ,
33+ } from 'stream' ;
2734import assert from 'assert' ;
2835import * as crypto from 'crypto' ;
2936import duplexify from 'duplexify' ;
@@ -2281,6 +2288,67 @@ describe('File', () => {
22812288 writable . end ( 'data' ) ;
22822289 } ) ;
22832290
2291+ it ( 'should close upstream when pipeline fails' , done => {
2292+ const writable : Stream . Writable = file . createWriteStream ( ) ;
2293+ const error = new Error ( 'My error' ) ;
2294+ const uploadStream = new PassThrough ( ) ;
2295+
2296+ let receivedBytes = 0 ;
2297+ const validateStream = new PassThrough ( ) ;
2298+ validateStream . on ( 'data' , ( chunk : Buffer ) => {
2299+ receivedBytes += chunk . length ;
2300+ if ( receivedBytes > 5 ) {
2301+ // this aborts the pipeline which should also close the internal pipeline within createWriteStream
2302+ pLine . destroy ( error ) ;
2303+ }
2304+ } ) ;
2305+
2306+ file . startResumableUpload_ = ( dup : duplexify . Duplexify ) => {
2307+ dup . setWritable ( uploadStream ) ;
2308+ // Emit an error so the pipeline's error-handling logic is triggered
2309+ uploadStream . emit ( 'error' , error ) ;
2310+ // Explicitly destroy the stream so that the 'close' event is guaranteed to fire,
2311+ // even in Node v14 where autoDestroy defaults may prevent automatic closing
2312+ uploadStream . destroy ( ) ;
2313+ } ;
2314+
2315+ let closed = false ;
2316+ uploadStream . on ( 'close' , ( ) => {
2317+ closed = true ;
2318+ } ) ;
2319+
2320+ const pLine = pipeline (
2321+ ( function * ( ) {
2322+ yield 'foo' ; // write some data
2323+ yield 'foo' ; // write some data
2324+ yield 'foo' ; // write some data
2325+ } ) ( ) ,
2326+ validateStream ,
2327+ writable ,
2328+ ( e : Error | null ) => {
2329+ assert . strictEqual ( e , error ) ;
2330+ assert . strictEqual ( closed , true ) ;
2331+ done ( ) ;
2332+ }
2333+ ) ;
2334+ } ) ;
2335+
2336+ it ( 'should error pipeline if source stream emits error before any data' , done => {
2337+ const writable = file . createWriteStream ( ) ;
2338+ const error = new Error ( 'Error before first chunk' ) ;
2339+ pipeline (
2340+ // eslint-disable-next-line require-yield
2341+ ( function * ( ) {
2342+ throw error ;
2343+ } ) ( ) ,
2344+ writable ,
2345+ ( e : Error | null ) => {
2346+ assert . strictEqual ( e , error ) ;
2347+ done ( ) ;
2348+ }
2349+ ) ;
2350+ } ) ;
2351+
22842352 describe ( 'validation' , ( ) => {
22852353 const data = 'test' ;
22862354
0 commit comments