@@ -381,6 +381,12 @@ type ClientConn struct {
381381 // Lock reqmu BEFORE mu or wmu.
382382 reqHeaderMu chan struct {}
383383
384+ // internalStateHook reports state changes back to the net/http.ClientConn.
385+ // Note that this is different from the user state hook registered by
386+ // net/http.ClientConn.SetStateHook: The internal hook calls ClientConn,
387+ // which calls the user hook.
388+ internalStateHook func ()
389+
384390 // wmu is held while writing.
385391 // Acquire BEFORE mu when holding both, to avoid blocking mu on network writes.
386392 // Only acquire both at the same time when changing peer settings.
@@ -710,7 +716,7 @@ func canRetryError(err error) bool {
710716
711717func (t * Transport ) dialClientConn (ctx context.Context , addr string , singleUse bool ) (* ClientConn , error ) {
712718 if t .transportTestHooks != nil {
713- return t .newClientConn (nil , singleUse )
719+ return t .newClientConn (nil , singleUse , nil )
714720 }
715721 host , _ , err := net .SplitHostPort (addr )
716722 if err != nil {
@@ -720,7 +726,7 @@ func (t *Transport) dialClientConn(ctx context.Context, addr string, singleUse b
720726 if err != nil {
721727 return nil , err
722728 }
723- return t .newClientConn (tconn , singleUse )
729+ return t .newClientConn (tconn , singleUse , nil )
724730}
725731
726732func (t * Transport ) newTLSConfig (host string ) * tls.Config {
@@ -772,10 +778,10 @@ func (t *Transport) expectContinueTimeout() time.Duration {
772778}
773779
774780func (t * Transport ) NewClientConn (c net.Conn ) (* ClientConn , error ) {
775- return t .newClientConn (c , t .disableKeepAlives ())
781+ return t .newClientConn (c , t .disableKeepAlives (), nil )
776782}
777783
778- func (t * Transport ) newClientConn (c net.Conn , singleUse bool ) (* ClientConn , error ) {
784+ func (t * Transport ) newClientConn (c net.Conn , singleUse bool , internalStateHook func () ) (* ClientConn , error ) {
779785 conf := configFromTransport (t )
780786 cc := & ClientConn {
781787 t : t ,
@@ -797,6 +803,7 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
797803 pings : make (map [[8 ]byte ]chan struct {}),
798804 reqHeaderMu : make (chan struct {}, 1 ),
799805 lastActive : time .Now (),
806+ internalStateHook : internalStateHook ,
800807 }
801808 if t .transportTestHooks != nil {
802809 t .transportTestHooks .newclientconn (cc )
@@ -1037,10 +1044,7 @@ func (cc *ClientConn) idleStateLocked() (st clientConnIdleState) {
10371044 maxConcurrentOkay = cc .currentRequestCountLocked () < int (cc .maxConcurrentStreams )
10381045 }
10391046
1040- st .canTakeNewRequest = cc .goAway == nil && ! cc .closed && ! cc .closing && maxConcurrentOkay &&
1041- ! cc .doNotReuse &&
1042- int64 (cc .nextStreamID )+ 2 * int64 (cc .pendingRequests ) < math .MaxInt32 &&
1043- ! cc .tooIdleLocked ()
1047+ st .canTakeNewRequest = maxConcurrentOkay && cc .isUsableLocked ()
10441048
10451049 // If this connection has never been used for a request and is closed,
10461050 // then let it take a request (which will fail).
@@ -1056,6 +1060,31 @@ func (cc *ClientConn) idleStateLocked() (st clientConnIdleState) {
10561060 return
10571061}
10581062
1063+ func (cc * ClientConn ) isUsableLocked () bool {
1064+ return cc .goAway == nil &&
1065+ ! cc .closed &&
1066+ ! cc .closing &&
1067+ ! cc .doNotReuse &&
1068+ int64 (cc .nextStreamID )+ 2 * int64 (cc .pendingRequests ) < math .MaxInt32 &&
1069+ ! cc .tooIdleLocked ()
1070+ }
1071+
1072+ // canReserveLocked reports whether a net/http.ClientConn can reserve a slot on this conn.
1073+ //
1074+ // This follows slightly different rules than clientConnIdleState.canTakeNewRequest.
1075+ // We only permit reservations up to the conn's concurrency limit.
1076+ // This differs from ClientConn.ReserveNewRequest, which permits reservations
1077+ // past the limit when StrictMaxConcurrentStreams is set.
1078+ func (cc * ClientConn ) canReserveLocked () bool {
1079+ if cc .currentRequestCountLocked () >= int (cc .maxConcurrentStreams ) {
1080+ return false
1081+ }
1082+ if ! cc .isUsableLocked () {
1083+ return false
1084+ }
1085+ return true
1086+ }
1087+
10591088// currentRequestCountLocked reports the number of concurrency slots currently in use,
10601089// including active streams, reserved slots, and reset streams waiting for acknowledgement.
10611090func (cc * ClientConn ) currentRequestCountLocked () int {
@@ -1067,6 +1096,14 @@ func (cc *ClientConn) canTakeNewRequestLocked() bool {
10671096 return st .canTakeNewRequest
10681097}
10691098
1099+ // availableLocked reports the number of concurrency slots available.
1100+ func (cc * ClientConn ) availableLocked () int {
1101+ if ! cc .canTakeNewRequestLocked () {
1102+ return 0
1103+ }
1104+ return max (0 , int (cc .maxConcurrentStreams )- cc .currentRequestCountLocked ())
1105+ }
1106+
10701107// tooIdleLocked reports whether this connection has been been sitting idle
10711108// for too much wall time.
10721109func (cc * ClientConn ) tooIdleLocked () bool {
@@ -1091,6 +1128,7 @@ func (cc *ClientConn) closeConn() {
10911128 t := time .AfterFunc (250 * time .Millisecond , cc .forceCloseConn )
10921129 defer t .Stop ()
10931130 cc .tconn .Close ()
1131+ cc .maybeCallStateHook ()
10941132}
10951133
10961134// A tls.Conn.Close can hang for a long time if the peer is unresponsive.
@@ -1693,6 +1731,7 @@ func (cs *clientStream) cleanupWriteRequest(err error) {
16931731 }
16941732
16951733 close (cs .donec )
1734+ cc .maybeCallStateHook ()
16961735}
16971736
16981737// awaitOpenSlotForStreamLocked waits until len(streams) < maxConcurrentStreams.
@@ -2795,6 +2834,7 @@ func (rl *clientConnReadLoop) processSettings(f *SettingsFrame) error {
27952834
27962835func (rl * clientConnReadLoop ) processSettingsNoWrite (f * SettingsFrame ) error {
27972836 cc := rl .cc
2837+ defer cc .maybeCallStateHook ()
27982838 cc .mu .Lock ()
27992839 defer cc .mu .Unlock ()
28002840
@@ -2975,6 +3015,7 @@ func (cc *ClientConn) Ping(ctx context.Context) error {
29753015func (rl * clientConnReadLoop ) processPing (f * PingFrame ) error {
29763016 if f .IsAck () {
29773017 cc := rl .cc
3018+ defer cc .maybeCallStateHook ()
29783019 cc .mu .Lock ()
29793020 defer cc .mu .Unlock ()
29803021 // If ack, notify listener if any
@@ -3198,9 +3239,13 @@ func registerHTTPSProtocol(t *http.Transport, rt noDialH2RoundTripper) (err erro
31983239}
31993240
32003241// noDialH2RoundTripper is a RoundTripper which only tries to complete the request
3201- // if there's already has a cached connection to the host.
3242+ // if there's already a cached connection to the host.
32023243// (The field is exported so it can be accessed via reflect from net/http; tested
32033244// by TestNoDialH2RoundTripperType)
3245+ //
3246+ // A noDialH2RoundTripper is registered with http1.Transport.RegisterProtocol,
3247+ // and the http1.Transport can use type assertions to call non-RoundTrip methods on it.
3248+ // This lets us expose, for example, NewClientConn to net/http.
32043249type noDialH2RoundTripper struct { * Transport }
32053250
32063251func (rt noDialH2RoundTripper ) RoundTrip (req * http.Request ) (* http.Response , error ) {
@@ -3211,6 +3256,85 @@ func (rt noDialH2RoundTripper) RoundTrip(req *http.Request) (*http.Response, err
32113256 return res , err
32123257}
32133258
3259+ func (rt noDialH2RoundTripper ) NewClientConn (conn net.Conn , internalStateHook func ()) (http.RoundTripper , error ) {
3260+ tr := rt .Transport
3261+ cc , err := tr .newClientConn (conn , tr .disableKeepAlives (), internalStateHook )
3262+ if err != nil {
3263+ return nil , err
3264+ }
3265+
3266+ // RoundTrip should block when the conn is at its concurrency limit,
3267+ // not return an error. Setting strictMaxConcurrentStreams enables this.
3268+ cc .strictMaxConcurrentStreams = true
3269+
3270+ return netHTTPClientConn {cc }, nil
3271+ }
3272+
3273+ // netHTTPClientConn wraps ClientConn and implements the interface net/http expects from
3274+ // the RoundTripper returned by NewClientConn.
3275+ type netHTTPClientConn struct {
3276+ cc * ClientConn
3277+ }
3278+
3279+ func (cc netHTTPClientConn ) RoundTrip (req * http.Request ) (* http.Response , error ) {
3280+ return cc .cc .RoundTrip (req )
3281+ }
3282+
3283+ func (cc netHTTPClientConn ) Close () error {
3284+ return cc .cc .Close ()
3285+ }
3286+
3287+ func (cc netHTTPClientConn ) Err () error {
3288+ cc .cc .mu .Lock ()
3289+ defer cc .cc .mu .Unlock ()
3290+ if cc .cc .closed {
3291+ return errors .New ("connection closed" )
3292+ }
3293+ return nil
3294+ }
3295+
3296+ func (cc netHTTPClientConn ) Reserve () error {
3297+ defer cc .cc .maybeCallStateHook ()
3298+ cc .cc .mu .Lock ()
3299+ defer cc .cc .mu .Unlock ()
3300+ if ! cc .cc .canReserveLocked () {
3301+ return errors .New ("connection is unavailable" )
3302+ }
3303+ cc .cc .streamsReserved ++
3304+ return nil
3305+ }
3306+
3307+ func (cc netHTTPClientConn ) Release () {
3308+ defer cc .cc .maybeCallStateHook ()
3309+ cc .cc .mu .Lock ()
3310+ defer cc .cc .mu .Unlock ()
3311+ // We don't complain if streamsReserved is 0.
3312+ //
3313+ // This is consistent with RoundTrip: both Release and RoundTrip will
3314+ // consume a reservation iff one exists.
3315+ if cc .cc .streamsReserved > 0 {
3316+ cc .cc .streamsReserved --
3317+ }
3318+ }
3319+
3320+ func (cc netHTTPClientConn ) Available () int {
3321+ cc .cc .mu .Lock ()
3322+ defer cc .cc .mu .Unlock ()
3323+ return cc .cc .availableLocked ()
3324+ }
3325+
3326+ func (cc netHTTPClientConn ) InFlight () int {
3327+ cc .cc .mu .Lock ()
3328+ defer cc .cc .mu .Unlock ()
3329+ return cc .cc .currentRequestCountLocked ()
3330+ }
3331+
3332+ func (cc * ClientConn ) maybeCallStateHook () {
3333+ if cc .internalStateHook != nil {
3334+ cc .internalStateHook ()
3335+ }
3336+ }
3337+
32143338func (t * Transport ) idleConnTimeout () time.Duration {
32153339 // to keep things backwards compatible, we use non-zero values of
32163340 // IdleConnTimeout, followed by using the IdleConnTimeout on the underlying
0 commit comments