@@ -418,6 +418,7 @@ function newStreamWritableFromWritableStream(writableStream, options = kEmptyObj
418418 * @param {Readable } streamReadable
419419 * @param {{
420420 * strategy : QueuingStrategy
421+ * type : string,
421422 * }} [options]
422423 * @returns {ReadableStream }
423424 */
@@ -432,13 +433,15 @@ function newReadableStreamFromStreamReadable(streamReadable, options = kEmptyObj
432433 'stream.Readable' ,
433434 streamReadable ) ;
434435 }
436+ const isBYOB = options ?. type === 'bytes' ;
435437
436438 if ( isDestroyed ( streamReadable ) || ! isReadable ( streamReadable ) ) {
437439 const readable = new ReadableStream ( ) ;
438440 readable . cancel ( ) ;
439441 return readable ;
440442 }
441443
444+
442445 const objectMode = streamReadable . readableObjectMode ;
443446 const highWaterMark = streamReadable . readableHighWaterMark ;
444447
@@ -491,15 +494,27 @@ function newReadableStreamFromStreamReadable(streamReadable, options = kEmptyObj
491494 streamReadable . on ( 'data' , onData ) ;
492495
493496 return new ReadableStream ( {
494- start ( c ) { controller = c ; } ,
497+ type : isBYOB ? 'bytes' : undefined ,
498+ start ( c ) {
499+ controller = c ;
500+ if ( isBYOB ) {
501+ streamReadable . once ( 'end' , ( ) => {
502+ // close the controller
503+ controller . close ( ) ;
504+ // And unlock the last BYOB read request
505+ controller . byobRequest ?. respond ( 0 ) ;
506+ wasCanceled = true ;
507+ } ) ;
508+ }
509+ } ,
495510
496511 pull ( ) { streamReadable . resume ( ) ; } ,
497512
498513 cancel ( reason ) {
499514 wasCanceled = true ;
500515 destroy ( streamReadable , reason ) ;
501516 } ,
502- } , strategy ) ;
517+ } , isBYOB ? undefined : strategy ) ;
503518}
504519
505520/**
@@ -601,9 +616,10 @@ function newStreamReadableFromReadableStream(readableStream, options = kEmptyObj
601616
602617/**
603618 * @param {Duplex } duplex
619+ * @param {{ type?:string} } [options]
604620 * @returns {ReadableWritablePair }
605621 */
606- function newReadableWritablePairFromDuplex ( duplex ) {
622+ function newReadableWritablePairFromDuplex ( duplex , options = kEmptyObject ) {
607623 // Not using the internal/streams/utils isWritableNodeStream and
608624 // isReadableNodeStream utilities here because they will return false
609625 // if the duplex was created with writable or readable options set to
@@ -615,9 +631,11 @@ function newReadableWritablePairFromDuplex(duplex) {
615631 throw new ERR_INVALID_ARG_TYPE ( 'duplex' , 'stream.Duplex' , duplex ) ;
616632 }
617633
634+ validateObject ( options , 'options' ) ;
635+
618636 if ( isDestroyed ( duplex ) ) {
619637 const writable = new WritableStream ( ) ;
620- const readable = new ReadableStream ( ) ;
638+ const readable = new ReadableStream ( options ) ;
621639 writable . close ( ) ;
622640 readable . cancel ( ) ;
623641 return { readable, writable } ;
@@ -633,8 +651,8 @@ function newReadableWritablePairFromDuplex(duplex) {
633651
634652 const readable =
635653 isReadable ( duplex ) ?
636- newReadableStreamFromStreamReadable ( duplex ) :
637- new ReadableStream ( ) ;
654+ newReadableStreamFromStreamReadable ( duplex , options ) :
655+ new ReadableStream ( options ) ;
638656
639657 if ( ! isReadable ( duplex ) )
640658 readable . cancel ( ) ;
0 commit comments