@@ -27,6 +27,7 @@ const {
2727 NumberIsNaN,
2828 ObjectDefineProperties,
2929 ObjectSetPrototypeOf,
30+ Promise,
3031 Set,
3132 SymbolAsyncIterator,
3233 Symbol
@@ -59,11 +60,11 @@ const kPaused = Symbol('kPaused');
5960
6061// Lazy loaded to improve the startup performance.
6162let StringDecoder ;
62- let createReadableStreamAsyncIterator ;
6363let from ;
6464
6565ObjectSetPrototypeOf ( Readable . prototype , Stream . prototype ) ;
6666ObjectSetPrototypeOf ( Readable , Stream ) ;
67+ function nop ( ) { }
6768
6869const { errorOrDestroy } = destroyImpl ;
6970
@@ -1075,13 +1076,68 @@ Readable.prototype.wrap = function(stream) {
10751076} ;
10761077
10771078Readable . prototype [ SymbolAsyncIterator ] = function ( ) {
1078- if ( createReadableStreamAsyncIterator === undefined ) {
1079- createReadableStreamAsyncIterator =
1080- require ( 'internal/streams/async_iterator' ) ;
1079+ let stream = this ;
1080+
1081+ if ( typeof stream . read !== 'function' ) {
1082+ // v1 stream
1083+ const src = stream ;
1084+ stream = new Readable ( {
1085+ objectMode : true ,
1086+ destroy ( err , callback ) {
1087+ destroyImpl . destroyer ( src , err ) ;
1088+ callback ( ) ;
1089+ }
1090+ } ) . wrap ( src ) ;
10811091 }
1082- return createReadableStreamAsyncIterator ( this ) ;
1092+
1093+ const iter = createAsyncIterator ( stream ) ;
1094+ iter . stream = stream ;
1095+ return iter ;
10831096} ;
10841097
1098+ async function * createAsyncIterator ( stream ) {
1099+ let callback = nop ;
1100+
1101+ function next ( resolve ) {
1102+ if ( this === stream ) {
1103+ callback ( ) ;
1104+ callback = nop ;
1105+ } else {
1106+ callback = resolve ;
1107+ }
1108+ }
1109+
1110+ stream
1111+ . on ( 'readable' , next )
1112+ . on ( 'error' , next )
1113+ . on ( 'end' , next )
1114+ . on ( 'close' , next ) ;
1115+
1116+ try {
1117+ const state = stream . _readableState ;
1118+ while ( true ) {
1119+ const chunk = stream . read ( ) ;
1120+ if ( chunk !== null ) {
1121+ yield chunk ;
1122+ } else if ( state . errored ) {
1123+ throw state . errored ;
1124+ } else if ( state . ended ) {
1125+ break ;
1126+ } else if ( state . closed ) {
1127+ // TODO(ronag): ERR_PREMATURE_CLOSE?
1128+ break ;
1129+ } else {
1130+ await new Promise ( next ) ;
1131+ }
1132+ }
1133+ } catch ( err ) {
1134+ destroyImpl . destroyer ( stream , err ) ;
1135+ throw err ;
1136+ } finally {
1137+ destroyImpl . destroyer ( stream , null ) ;
1138+ }
1139+ }
1140+
10851141// Making it explicit these properties are not enumerable
10861142// because otherwise some prototype manipulation in
10871143// userland will fail.
0 commit comments