@@ -30,10 +30,8 @@ import * as crypto from 'crypto';
3030import * as extend from 'extend' ;
3131import * as fs from 'fs' ;
3232import * as mime from 'mime' ;
33- // eslint-disable-next-line @typescript-eslint/no-var-requires
34- const pumpify = require ( 'pumpify' ) ;
3533import * as resumableUpload from './resumable-upload' ;
36- import { Writable , Readable , PassThrough } from 'stream' ;
34+ import { Writable , Readable , pipeline , Transform , PassThrough } from 'stream' ;
3735import * as zlib from 'zlib' ;
3836import * as http from 'http' ;
3937
@@ -1495,7 +1493,7 @@ class File extends ServiceObject<File> {
14951493
14961494 const headers = rawResponseStream . toJSON ( ) . headers ;
14971495 isServedCompressed = headers [ 'content-encoding' ] === 'gzip' ;
1498- const throughStreams : Writable [ ] = [ ] ;
1496+ const transformStreams : Transform [ ] = [ ] ;
14991497
15001498 if ( shouldRunValidation ) {
15011499 // The x-goog-hash header should be set with a crc32c and md5 hash.
@@ -1517,28 +1515,32 @@ class File extends ServiceObject<File> {
15171515 crc32cGenerator : this . crc32cGenerator ,
15181516 } ) ;
15191517
1520- throughStreams . push ( validateStream ) ;
1518+ transformStreams . push ( validateStream ) ;
15211519 }
15221520
15231521 if ( isServedCompressed && options . decompress ) {
1524- throughStreams . push ( zlib . createGunzip ( ) ) ;
1522+ transformStreams . push ( zlib . createGunzip ( ) ) ;
15251523 }
15261524
1527- if ( throughStreams . length === 1 ) {
1528- rawResponseStream =
1529- // eslint-disable-next-line @typescript-eslint/no-explicit-any
1530- rawResponseStream . pipe ( throughStreams [ 0 ] ) as any ;
1531- } else if ( throughStreams . length > 1 ) {
1532- rawResponseStream = rawResponseStream . pipe (
1533- pumpify . obj ( throughStreams )
1534- ) ;
1535- }
1525+ const handoffStream = new PassThrough ( {
1526+ final : async cb => {
1527+ // Preserving `onComplete`'s ability to
1528+ // close `throughStream` before pipeline
1529+ // attempts to.
1530+ await onComplete ( null ) ;
1531+ cb ( ) ;
1532+ } ,
1533+ } ) ;
15361534
1537- rawResponseStream
1538- . on ( 'error' , onComplete )
1539- . on ( 'end' , onComplete )
1540- . pipe ( throughStream , { end : false } ) ;
1535+ pipeline (
1536+ rawResponseStream ,
1537+ ...( transformStreams as [ Transform ] ) ,
1538+ handoffStream ,
1539+ throughStream ,
1540+ onComplete
1541+ ) ;
15411542 } ;
1543+
15421544 // This is hooked to the `complete` event from the request stream. This is
15431545 // our chance to validate the data and let the user know if anything went
15441546 // wrong.
@@ -1948,101 +1950,92 @@ class File extends ServiceObject<File> {
19481950 crc32c = false ;
19491951 }
19501952
1951- // Collect data as it comes in to store in a hash. This is compared to the
1952- // checksum value on the returned metadata from the API.
1953- const validateStream = new HashStreamValidator ( {
1953+ /**
1954+ * A callback for determining when the underlying pipeline is complete.
1955+ * It's possible the pipeline callback could error before the write stream
1956+ * calls `final` so by default this will destroy the write stream unless the
1957+ * write stream sets this callback via its `final` handler.
1958+ * @param error An optional error
1959+ */
1960+ let pipelineCallback : ( error ?: Error | null ) => void = error => {
1961+ writeStream . destroy ( error || undefined ) ;
1962+ } ;
1963+
1964+ // A stream for consumer to write to
1965+ const writeStream = new Writable ( {
1966+ final ( cb ) {
1967+ // Set the pipeline callback to this callback so the pipeline's results
1968+ // can be populated to the consumer
1969+ pipelineCallback = cb ;
1970+
1971+ emitStream . end ( ) ;
1972+ } ,
1973+ write ( chunk , encoding , cb ) {
1974+ emitStream . write ( chunk , encoding , cb ) ;
1975+ } ,
1976+ } ) ;
1977+
1978+ const emitStream = new PassThroughShim ( ) ;
1979+ const hashCalculatingStream = new HashStreamValidator ( {
19541980 crc32c,
19551981 md5,
19561982 crc32cGenerator : this . crc32cGenerator ,
19571983 } ) ;
19581984
19591985 const fileWriteStream = duplexify ( ) ;
1960-
1961- fileWriteStream . on ( 'progress' , evt => {
1962- stream . emit ( 'progress' , evt ) ;
1963- } ) ;
1964-
1965- const passThroughShim = new PassThroughShim ( ) ;
1966-
1967- passThroughShim . on ( 'writing ', ( ) => {
1968- stream . emit ( 'writing' ) ;
1986+ let fileWriteStreamMetadataReceived = false ;
1987+
1988+ // Handing off emitted events to users
1989+ emitStream . on ( 'reading' , ( ) => writeStream . emit ( 'reading' ) ) ;
1990+ emitStream . on ( 'writing' , ( ) => writeStream . emit ( 'writing' ) ) ;
1991+ fileWriteStream . on ( 'progress' , evt => writeStream . emit ( 'progress' , evt ) ) ;
1992+ fileWriteStream . on ( 'response' , resp => writeStream . emit ( 'response' , resp ) ) ;
1993+ fileWriteStream . once ( 'metadata ', ( ) => {
1994+ fileWriteStreamMetadataReceived = true ;
19691995 } ) ;
19701996
1971- const stream = pumpify ( [
1972- passThroughShim ,
1973- gzip ? zlib . createGzip ( ) : new PassThrough ( ) ,
1974- validateStream ,
1975- fileWriteStream ,
1976- ] ) ;
1977-
1978- // Wait until we've received data to determine what upload technique to use.
1979- stream . on ( 'writing' , ( ) => {
1997+ writeStream . on ( 'writing' , ( ) => {
19801998 if ( options . resumable === false ) {
19811999 this . startSimpleUpload_ ( fileWriteStream , options ) ;
1982- return ;
1983- }
1984- this . startResumableUpload_ ( fileWriteStream , options ) ;
1985- } ) ;
1986-
1987- fileWriteStream . on ( 'response' , stream . emit . bind ( stream , 'response' ) ) ;
1988-
1989- // This is to preserve the `finish` event. We wait until the request stream
1990- // emits "complete", as that is when we do validation of the data. After
1991- // that is successful, we can allow the stream to naturally finish.
1992- //
1993- // Reference for tracking when we can use a non-hack solution:
1994- // https://github.com/nodejs/node/pull/2314
1995- fileWriteStream . on ( 'prefinish' , ( ) => {
1996- stream . cork ( ) ;
1997- } ) ;
1998-
1999- // Compare our hashed version vs the completed upload's version.
2000- fileWriteStream . on ( 'complete' , ( ) => {
2001- const metadata = this . metadata ;
2002-
2003- // If we're doing validation, assume the worst-- a data integrity
2004- // mismatch. If not, these tests won't be performed, and we can assume the
2005- // best.
2006- let failed = crc32c || md5 ;
2007-
2008- if ( crc32c && metadata . crc32c ) {
2009- failed = ! validateStream . test ( 'crc32c' , metadata . crc32c ) ;
2010- }
2011-
2012- if ( md5 && metadata . md5Hash ) {
2013- failed = ! validateStream . test ( 'md5' , metadata . md5Hash ) ;
2000+ } else {
2001+ this . startResumableUpload_ ( fileWriteStream , options ) ;
20142002 }
20152003
2016- if ( failed ) {
2017- this . delete ( ( err : ApiError ) => {
2018- let code ;
2019- let message ;
2020-
2021- if ( err ) {
2022- code = 'FILE_NO_UPLOAD_DELETE' ;
2023- message = `${ FileExceptionMessages . UPLOAD_MISMATCH_DELETE_FAIL } ${ err . message } ` ;
2024- } else if ( md5 && ! metadata . md5Hash ) {
2025- code = 'MD5_NOT_AVAILABLE' ;
2026- message = FileExceptionMessages . MD5_NOT_AVAILABLE ;
2027- } else {
2028- code = 'FILE_NO_UPLOAD' ;
2029- message = FileExceptionMessages . UPLOAD_MISMATCH ;
2004+ pipeline (
2005+ emitStream ,
2006+ gzip ? zlib . createGzip ( ) : new PassThrough ( ) ,
2007+ hashCalculatingStream ,
2008+ fileWriteStream ,
2009+ async e => {
2010+ if ( e ) {
2011+ return pipelineCallback ( e ) ;
20302012 }
20312013
2032- const error = new RequestError ( message ) ;
2033- error . code = code ;
2034- error . errors = [ err ! ] ;
2035-
2036- fileWriteStream . destroy ( error ) ;
2037- } ) ;
2038-
2039- return ;
2040- }
2014+ // We want to make sure we've received the metadata from the server in order
2015+ // to properly validate the object's integrity. Depending on the type of upload,
2016+ // the stream could close before the response is returned.
2017+ if ( ! fileWriteStreamMetadataReceived ) {
2018+ try {
2019+ await new Promise ( ( resolve , reject ) => {
2020+ fileWriteStream . once ( 'metadata' , resolve ) ;
2021+ fileWriteStream . once ( 'error' , reject ) ;
2022+ } ) ;
2023+ } catch ( e ) {
2024+ return pipelineCallback ( e as Error ) ;
2025+ }
2026+ }
20412027
2042- stream . uncork ( ) ;
2028+ try {
2029+ await this . #validateIntegrity( hashCalculatingStream , { crc32c, md5} ) ;
2030+ pipelineCallback ( ) ;
2031+ } catch ( e ) {
2032+ pipelineCallback ( e as Error ) ;
2033+ }
2034+ }
2035+ ) ;
20432036 } ) ;
20442037
2045- return stream as Writable ;
2038+ return writeStream ;
20462039 }
20472040
20482041 /**
@@ -3932,6 +3925,7 @@ class File extends ServiceObject<File> {
39323925 } )
39333926 . on ( 'metadata' , metadata => {
39343927 this . metadata = metadata ;
3928+ dup . emit ( 'metadata' ) ;
39353929 } )
39363930 . on ( 'finish' , ( ) => {
39373931 dup . emit ( 'complete' ) ;
@@ -4011,6 +4005,7 @@ class File extends ServiceObject<File> {
40114005 }
40124006
40134007 this . metadata = body ;
4008+ dup . emit ( 'metadata' , body ) ;
40144009 dup . emit ( 'response' , resp ) ;
40154010 dup . emit ( 'complete' ) ;
40164011 } ) ;
@@ -4049,6 +4044,63 @@ class File extends ServiceObject<File> {
40494044
40504045 return Buffer . concat ( buf ) ;
40514046 }
4047+
4048+ /**
4049+ *
4050+ * @param hashCalculatingStream
4051+ * @param verify
4052+ * @returns {boolean } Returns `true` if valid, throws with error otherwise
4053+ */
4054+ async #validateIntegrity(
4055+ hashCalculatingStream : HashStreamValidator ,
4056+ verify : { crc32c ?: boolean ; md5 ?: boolean } = { }
4057+ ) {
4058+ const metadata = this . metadata ;
4059+
4060+ // If we're doing validation, assume the worst
4061+ let dataMismatch = ! ! ( verify . crc32c || verify . md5 ) ;
4062+
4063+ if ( verify . crc32c && metadata . crc32c ) {
4064+ dataMismatch = ! hashCalculatingStream . test ( 'crc32c' , metadata . crc32c ) ;
4065+ }
4066+
4067+ if ( verify . md5 && metadata . md5Hash ) {
4068+ dataMismatch = ! hashCalculatingStream . test ( 'md5' , metadata . md5Hash ) ;
4069+ }
4070+
4071+ if ( dataMismatch ) {
4072+ const errors : Error [ ] = [ ] ;
4073+ let code = '' ;
4074+ let message = '' ;
4075+
4076+ try {
4077+ await this . delete ( ) ;
4078+
4079+ if ( verify . md5 && ! metadata . md5Hash ) {
4080+ code = 'MD5_NOT_AVAILABLE' ;
4081+ message = FileExceptionMessages . MD5_NOT_AVAILABLE ;
4082+ } else {
4083+ code = 'FILE_NO_UPLOAD' ;
4084+ message = FileExceptionMessages . UPLOAD_MISMATCH ;
4085+ }
4086+ } catch ( e ) {
4087+ const error = e as Error ;
4088+
4089+ code = 'FILE_NO_UPLOAD_DELETE' ;
4090+ message = `${ FileExceptionMessages . UPLOAD_MISMATCH_DELETE_FAIL } ${ error . message } ` ;
4091+
4092+ errors . push ( error ) ;
4093+ }
4094+
4095+ const error = new RequestError ( message ) ;
4096+ error . code = code ;
4097+ error . errors = errors ;
4098+
4099+ throw error ;
4100+ }
4101+
4102+ return true ;
4103+ }
40524104}
40534105
40544106/*! Developer Documentation
0 commit comments