@@ -42,6 +42,7 @@ const {
4242 validateQuicEndpointOptions,
4343 validateCreateSecureContextOptions,
4444 validateQuicSocketConnectOptions,
45+ QuicStreamSharedState,
4546 QuicSocketSharedState,
4647 QuicSessionSharedState,
4748 QLogStream,
@@ -1792,8 +1793,7 @@ class QuicSession extends EventEmitter {
17921793 const stream = this [ kInternalState ] . streams . get ( id ) ;
17931794 if ( stream === undefined )
17941795 return ;
1795-
1796- stream . destroy ( ) ;
1796+ stream [ kDestroy ] ( code ) ;
17971797 }
17981798
17991799 [ kStreamReset ] ( id , code ) {
@@ -1968,6 +1968,8 @@ class QuicSession extends EventEmitter {
19681968 return ;
19691969 state . destroyed = true ;
19701970
1971+ state . idleTimeout = Boolean ( this [ kInternalState ] . state ?. idleTimeout ) ;
1972+
19711973 // Destroy any remaining streams immediately.
19721974 for ( const stream of state . streams . values ( ) )
19731975 stream . destroy ( error ) ;
@@ -1982,7 +1984,6 @@ class QuicSession extends EventEmitter {
19821984 handle . stats [ IDX_QUIC_SESSION_STATS_DESTROYED_AT ] =
19831985 process . hrtime . bigint ( ) ;
19841986 state . stats = new BigInt64Array ( handle . stats ) ;
1985- state . idleTimeout = this [ kInternalState ] . state . idleTimeout ;
19861987
19871988 // Destroy the underlying QuicSession handle
19881989 handle . destroy ( state . closeCode , state . closeFamily ) ;
@@ -2530,10 +2531,12 @@ function streamOnPause() {
25302531 if ( ! this . destroyed )
25312532 this [ kHandle ] . readStop ( ) ;
25322533}
2533-
25342534class QuicStream extends Duplex {
25352535 [ kInternalState ] = {
25362536 closed : false ,
2537+ closePromise : undefined ,
2538+ closePromiseReject : undefined ,
2539+ closePromiseResolve : undefined ,
25372540 defaultEncoding : undefined ,
25382541 didRead : false ,
25392542 id : undefined ,
@@ -2544,6 +2547,7 @@ class QuicStream extends Duplex {
25442547 dataRateHistogram : undefined ,
25452548 dataSizeHistogram : undefined ,
25462549 dataAckHistogram : undefined ,
2550+ sharedState : undefined ,
25472551 stats : undefined ,
25482552 } ;
25492553
@@ -2563,7 +2567,7 @@ class QuicStream extends Duplex {
25632567 allowHalfOpen : true ,
25642568 decodeStrings : true ,
25652569 emitClose : true ,
2566- autoDestroy : false ,
2570+ autoDestroy : true ,
25672571 captureRejections : true ,
25682572 } ) ;
25692573 const state = this [ kInternalState ] ;
@@ -2584,7 +2588,6 @@ class QuicStream extends Duplex {
25842588 // is still minimally usable before this but any data
25852589 // written will be buffered until kSetHandle is called.
25862590 [ kSetHandle ] ( handle ) {
2587- this [ kHandle ] = handle ;
25882591 const state = this [ kInternalState ] ;
25892592 if ( handle !== undefined ) {
25902593 handle . onread = onStreamRead ;
@@ -2594,23 +2597,84 @@ class QuicStream extends Duplex {
25942597 state . dataRateHistogram = new Histogram ( handle . rate ) ;
25952598 state . dataSizeHistogram = new Histogram ( handle . size ) ;
25962599 state . dataAckHistogram = new Histogram ( handle . ack ) ;
2600+ state . sharedState = new QuicStreamSharedState ( handle . state ) ;
25972601 state . session [ kAddStream ] ( state . id , this ) ;
25982602 } else {
2603+ if ( this [ kHandle ] !== undefined ) {
2604+ this [ kHandle ] . stats [ IDX_QUIC_STREAM_STATS_DESTROYED_AT ] =
2605+ process . hrtime . bigint ( ) ;
2606+ state . stats = new BigInt64Array ( this [ kHandle ] . stats ) ;
2607+ }
2608+ state . sharedState = undefined ;
25992609 if ( state . dataRateHistogram )
26002610 state . dataRateHistogram [ kDestroyHistogram ] ( ) ;
26012611 if ( state . dataSizeHistogram )
26022612 state . dataSizeHistogram [ kDestroyHistogram ] ( ) ;
26032613 if ( state . dataAckHistogram )
26042614 state . dataAckHistogram [ kDestroyHistogram ] ( ) ;
26052615 }
2616+ this [ kHandle ] = handle ;
26062617 }
26072618
26082619 [ kStreamReset ] ( code ) {
2609- this [ kInternalState ] . resetCode = code | 0 ;
2620+ // Receiving a reset from the peer indicates that it is no
2621+ // longer sending any data, we can safely close the readable
2622+ // side of the Duplex here.
2623+ this [ kInternalState ] . resetCode = code ;
26102624 this . push ( null ) ;
26112625 this . read ( ) ;
26122626 }
26132627
2628+ [ kClose ] ( ) {
2629+ const state = this [ kInternalState ] ;
2630+
2631+ if ( this . destroyed ) {
2632+ return PromiseReject (
2633+ new ERR_INVALID_STATE ( 'QuicStream is already destroyed' ) ) ;
2634+ }
2635+
2636+ const promise = deferredClosePromise ( state ) ;
2637+ if ( this . readable ) {
2638+ this . push ( null ) ;
2639+ this . read ( ) ;
2640+ }
2641+
2642+ if ( this . writable ) {
2643+ this . end ( ) ;
2644+ }
2645+
2646+ return promise ;
2647+ }
2648+
2649+ close ( ) {
2650+ return this [ kInternalState ] . closePromise || this [ kClose ] ( ) ;
2651+ }
2652+
2653+ _destroy ( error , callback ) {
2654+ const state = this [ kInternalState ] ;
2655+ const handle = this [ kHandle ] ;
2656+ this [ kSetHandle ] ( ) ;
2657+ if ( handle !== undefined )
2658+ handle . destroy ( ) ;
2659+ state . session [ kRemoveStream ] ( this ) ;
2660+
2661+ if ( error && typeof state . closePromiseReject === 'function' )
2662+ state . closePromiseReject ( error ) ;
2663+ else if ( typeof state . closePromiseResolve === 'function' )
2664+ state . closePromiseResolve ( ) ;
2665+
2666+ process . nextTick ( ( ) => callback ( error ) ) ;
2667+ }
2668+
2669+ [ kDestroy ] ( code ) {
2670+ // TODO(@jasnell): If code is non-zero, and stream is not otherwise
2671+ // naturally shutdown, then we should destroy with an error.
2672+
2673+ // Put the QuicStream into detached mode before calling destroy
2674+ this [ kSetHandle ] ( ) ;
2675+ this . destroy ( ) ;
2676+ }
2677+
26142678 [ kHeaders ] ( headers , kind , push_id ) {
26152679 // TODO(@jasnell): Convert the headers into a proper object
26162680 let name ;
@@ -2635,42 +2699,6 @@ class QuicStream extends Duplex {
26352699 process . nextTick ( emit . bind ( this , name , headers , push_id ) ) ;
26362700 }
26372701
2638- [ kClose ] ( family , code ) {
2639- const state = this [ kInternalState ] ;
2640- // Trigger the abrupt shutdown of the stream. If the stream is
2641- // already no-longer readable or writable, this does nothing. If
2642- // the stream is readable or writable, then the abort event will
2643- // be emitted immediately after triggering the send of the
2644- // RESET_STREAM and STOP_SENDING frames. The stream will no longer
2645- // be readable or writable, but will not be immediately destroyed
2646- // as we need to wait until ngtcp2 recognizes the stream as
2647- // having been closed to be destroyed.
2648-
2649- // Do nothing if we've already been destroyed
2650- if ( this . destroyed || state . closed )
2651- return ;
2652-
2653- state . closed = true ;
2654-
2655- // Trigger scheduling of the RESET_STREAM and STOP_SENDING frames
2656- // as appropriate. Notify ngtcp2 that the stream is to be shutdown.
2657- // Once sent, the stream will be closed and destroyed as soon as
2658- // the shutdown is acknowledged by the peer.
2659- this [ kHandle ] . resetStream ( code , family ) ;
2660-
2661- // Close down the readable side of the stream
2662- if ( this . readable ) {
2663- this . push ( null ) ;
2664- this . read ( ) ;
2665- }
2666-
2667- // It is important to call shutdown on the handle before shutting
2668- // down the writable side of the stream in order to prevent an
2669- // empty STREAM frame with fin set to be sent to the peer.
2670- if ( this . writable )
2671- this . end ( ) ;
2672- }
2673-
26742702 [ kAfterAsyncWrite ] ( { bytes } ) {
26752703 // TODO(@jasnell): Implement this
26762704 }
@@ -2681,6 +2709,7 @@ class QuicStream extends Duplex {
26812709 const initiated = this . serverInitiated ? 'server' : 'client' ;
26822710 return customInspect ( this , {
26832711 id : this [ kInternalState ] . id ,
2712+ detached : this . detached ,
26842713 direction,
26852714 initiated,
26862715 writableState : this . _writableState ,
@@ -2699,6 +2728,15 @@ class QuicStream extends Duplex {
26992728 // TODO(@jasnell): Implement this later
27002729 }
27012730
2731+ get detached ( ) {
2732+ // The QuicStream is detached if it is yet destroyed
2733+ // but the underlying handle is undefined. While in
2734+ // detached mode, the QuicStream may still have
2735+ // data pending in the read queue, but writes will
2736+ // not be permitted.
2737+ return this [ kHandle ] === undefined ;
2738+ }
2739+
27022740 get serverInitiated ( ) {
27032741 return ! ! ( this [ kInternalState ] . id & 0b01 ) ;
27042742 }
@@ -2740,20 +2778,40 @@ class QuicStream extends Duplex {
27402778 // called. By calling shutdown, we're telling
27412779 // the native side that no more data will be
27422780 // coming so that a fin stream packet can be
2743- // sent.
2781+ // sent, allowing any remaining final stream
2782+ // frames to be sent if necessary.
2783+ //
2784+ // When end() is called, we set the writeEnded
2785+ // flag so that we can know earlier when there
2786+ // is not going to be any more data being written
2787+ // but that is only used when end() is called
2788+ // with a final chunk to write.
27442789 _final ( cb ) {
2745- const handle = this [ kHandle ] ;
2746- if ( handle === undefined ) {
2747- cb ( ) ;
2790+ if ( ! this . detached ) {
2791+ const state = this [ kInternalState ] ;
2792+ if ( state . sharedState ?. finSent )
2793+ return cb ( ) ;
2794+ const handle = this [ kHandle ] ;
2795+ const req = new ShutdownWrap ( ) ;
2796+ req . oncomplete = ( ) => {
2797+ req . handle = undefined ;
2798+ cb ( ) ;
2799+ } ;
2800+ req . handle = handle ;
2801+ if ( handle . shutdown ( req ) === 1 )
2802+ return req . oncomplete ( ) ;
27482803 return ;
27492804 }
2805+ return cb ( ) ;
2806+ }
27502807
2751- const req = new ShutdownWrap ( ) ;
2752- req . oncomplete = ( ) => cb ( ) ;
2753- req . handle = handle ;
2754- const err = handle . shutdown ( req ) ;
2755- if ( err === 1 )
2756- return cb ( ) ;
2808+ end ( ...args ) {
2809+ if ( ! this . destroyed ) {
2810+ if ( ! this . detached )
2811+ this [ kInternalState ] . sharedState . writeEnded = true ;
2812+ super . end . apply ( this , args ) ;
2813+ }
2814+ return this ;
27572815 }
27582816
27592817 _read ( nread ) {
@@ -2809,11 +2867,6 @@ class QuicStream extends Duplex {
28092867 this [ kUpdateTimer ] ( ) ;
28102868 this . ownsFd = ownsFd ;
28112869
2812- // Close the writable side of the stream, but only as far as the writable
2813- // stream implementation is concerned.
2814- this . _final = null ;
2815- this . end ( ) ;
2816-
28172870 defaultTriggerAsyncIdScope ( this [ async_id_symbol ] ,
28182871 QuicStream [ kStartFilePipe ] ,
28192872 this , fd , offset , length ) ;
@@ -2840,6 +2893,7 @@ class QuicStream extends Duplex {
28402893 this . source . close ( ) . catch ( stream . destroy . bind ( stream ) ) ;
28412894 else
28422895 this . source . releaseFD ( ) ;
2896+ stream . end ( ) ;
28432897 }
28442898
28452899 static [ kOnPipedFileHandleRead ] ( ) {
@@ -2869,35 +2923,14 @@ class QuicStream extends Duplex {
28692923 return this [ kInternalState ] . push_id ;
28702924 }
28712925
2872- close ( code ) {
2873- this [ kClose ] ( QUIC_ERROR_APPLICATION , code ) ;
2926+ _onTimeout ( ) {
2927+ // TODO(@jasnell): Implement this
28742928 }
28752929
28762930 get session ( ) {
28772931 return this [ kInternalState ] . session ;
28782932 }
28792933
2880- _destroy ( error , callback ) {
2881- const state = this [ kInternalState ] ;
2882- const handle = this [ kHandle ] ;
2883- // Do not use handle after this point as the underlying C++
2884- // object has been destroyed. Any attempt to use the object
2885- // will segfault and crash the process.
2886- if ( handle !== undefined ) {
2887- handle . stats [ IDX_QUIC_STREAM_STATS_DESTROYED_AT ] =
2888- process . hrtime . bigint ( ) ;
2889- state . stats = new BigInt64Array ( handle . stats ) ;
2890- handle . destroy ( ) ;
2891- }
2892- state . session [ kRemoveStream ] ( this ) ;
2893- // The destroy callback must be invoked in a nextTick
2894- process . nextTick ( ( ) => callback ( error ) ) ;
2895- }
2896-
2897- _onTimeout ( ) {
2898- // TODO(@jasnell): Implement this
2899- }
2900-
29012934 get dataRateHistogram ( ) {
29022935 return this [ kInternalState ] . dataRateHistogram ;
29032936 }
0 commit comments