@@ -165,6 +165,7 @@ type ClientConn struct {
165165 goAwayDebug string // goAway frame's debug data, retained as a string
166166 streams map [uint32 ]* clientStream // client-initiated
167167 nextStreamID uint32
168+ pendingRequests int // requests blocked and waiting to be sent because len(streams) == maxConcurrentStreams
168169 pings map [[8 ]byte ]chan struct {} // in flight ping data to notification channel
169170 bw * bufio.Writer
170171 br * bufio.Reader
@@ -217,35 +218,45 @@ type clientStream struct {
217218 resTrailer * http.Header // client's Response.Trailer
218219}
219220
220- // awaitRequestCancel runs in its own goroutine and waits for the user
221- // to cancel a RoundTrip request, its context to expire, or for the
222- // request to be done (any way it might be removed from the cc.streams
223- // map: peer reset, successful completion, TCP connection breakage,
224- // etc)
225- func (cs * clientStream ) awaitRequestCancel (req * http.Request ) {
221+ // awaitRequestCancel waits for the user to cancel a request or for the done
222+ // channel to be signaled. A non-nil error is returned only if the request was
223+ // canceled.
224+ func awaitRequestCancel (req * http.Request , done <- chan struct {}) error {
226225 ctx := reqContext (req )
227226 if req .Cancel == nil && ctx .Done () == nil {
228- return
227+ return nil
229228 }
230229 select {
231230 case <- req .Cancel :
232- cs .cancelStream ()
233- cs .bufPipe .CloseWithError (errRequestCanceled )
231+ return errRequestCanceled
234232 case <- ctx .Done ():
233+ return ctx .Err ()
234+ case <- done :
235+ return nil
236+ }
237+ }
238+
239+ // awaitRequestCancel waits for the user to cancel a request, its context to
240+ // expire, or for the request to be done (any way it might be removed from the
241+ // cc.streams map: peer reset, successful completion, TCP connection breakage,
242+ // etc). If the request is canceled, then cs will be canceled and closed.
243+ func (cs * clientStream ) awaitRequestCancel (req * http.Request ) {
244+ if err := awaitRequestCancel (req , cs .done ); err != nil {
235245 cs .cancelStream ()
236- cs .bufPipe .CloseWithError (ctx .Err ())
237- case <- cs .done :
246+ cs .bufPipe .CloseWithError (err )
238247 }
239248}
240249
241250func (cs * clientStream ) cancelStream () {
242- cs .cc .mu .Lock ()
251+ cc := cs .cc
252+ cc .mu .Lock ()
243253 didReset := cs .didReset
244254 cs .didReset = true
245- cs . cc .mu .Unlock ()
255+ cc .mu .Unlock ()
246256
247257 if ! didReset {
248- cs .cc .writeStreamReset (cs .ID , ErrCodeCancel , nil )
258+ cc .writeStreamReset (cs .ID , ErrCodeCancel , nil )
259+ cc .forgetStreamID (cs .ID )
249260 }
250261}
251262
@@ -594,6 +605,8 @@ func (cc *ClientConn) setGoAway(f *GoAwayFrame) {
594605 }
595606}
596607
608+ // CanTakeNewRequest reports whether the connection can take a new request,
609+ // meaning it has not been closed or received or sent a GOAWAY.
597610func (cc * ClientConn ) CanTakeNewRequest () bool {
598611 cc .mu .Lock ()
599612 defer cc .mu .Unlock ()
@@ -605,8 +618,7 @@ func (cc *ClientConn) canTakeNewRequestLocked() bool {
605618 return false
606619 }
607620 return cc .goAway == nil && ! cc .closed &&
608- int64 (len (cc .streams )+ 1 ) < int64 (cc .maxConcurrentStreams ) &&
609- cc .nextStreamID < math .MaxInt32
621+ int64 (cc .nextStreamID )+ int64 (cc .pendingRequests ) < math .MaxInt32
610622}
611623
612624// onIdleTimeout is called from a time.AfterFunc goroutine. It will
@@ -752,10 +764,9 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
752764 hasTrailers := trailers != ""
753765
754766 cc .mu .Lock ()
755- cc .lastActive = time .Now ()
756- if cc .closed || ! cc .canTakeNewRequestLocked () {
767+ if err := cc .awaitOpenSlotForRequest (req ); err != nil {
757768 cc .mu .Unlock ()
758- return nil , errClientConnUnusable
769+ return nil , err
759770 }
760771
761772 body := req .Body
@@ -869,31 +880,31 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
869880 case re := <- readLoopResCh :
870881 return handleReadLoopResponse (re )
871882 case <- respHeaderTimer :
872- cc .forgetStreamID (cs .ID )
873883 if ! hasBody || bodyWritten {
874884 cc .writeStreamReset (cs .ID , ErrCodeCancel , nil )
875885 } else {
876886 bodyWriter .cancel ()
877887 cs .abortRequestBodyWrite (errStopReqBodyWriteAndCancel )
878888 }
889+ cc .forgetStreamID (cs .ID )
879890 return nil , errTimeout
880891 case <- ctx .Done ():
881- cc .forgetStreamID (cs .ID )
882892 if ! hasBody || bodyWritten {
883893 cc .writeStreamReset (cs .ID , ErrCodeCancel , nil )
884894 } else {
885895 bodyWriter .cancel ()
886896 cs .abortRequestBodyWrite (errStopReqBodyWriteAndCancel )
887897 }
898+ cc .forgetStreamID (cs .ID )
888899 return nil , ctx .Err ()
889900 case <- req .Cancel :
890- cc .forgetStreamID (cs .ID )
891901 if ! hasBody || bodyWritten {
892902 cc .writeStreamReset (cs .ID , ErrCodeCancel , nil )
893903 } else {
894904 bodyWriter .cancel ()
895905 cs .abortRequestBodyWrite (errStopReqBodyWriteAndCancel )
896906 }
907+ cc .forgetStreamID (cs .ID )
897908 return nil , errRequestCanceled
898909 case <- cs .peerReset :
899910 // processResetStream already removed the
@@ -920,6 +931,45 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
920931 }
921932}
922933
934+ // awaitOpenSlotForRequest waits until len(streams) < maxConcurrentStreams.
935+ // Must hold cc.mu.
936+ func (cc * ClientConn ) awaitOpenSlotForRequest (req * http.Request ) error {
937+ var waitingForConn chan struct {}
938+ var waitingForConnErr error // guarded by cc.mu
939+ for {
940+ cc .lastActive = time .Now ()
941+ if cc .closed || ! cc .canTakeNewRequestLocked () {
942+ return errClientConnUnusable
943+ }
944+ if int64 (len (cc .streams ))+ 1 <= int64 (cc .maxConcurrentStreams ) {
945+ if waitingForConn != nil {
946+ close (waitingForConn )
947+ }
948+ return nil
949+ }
950+ // Unfortunately, we cannot wait on a condition variable and channel at
951+ // the same time, so instead, we spin up a goroutine to check if the
952+ // request is canceled while we wait for a slot to open in the connection.
953+ if waitingForConn == nil {
954+ waitingForConn = make (chan struct {})
955+ go func () {
956+ if err := awaitRequestCancel (req , waitingForConn ); err != nil {
957+ cc .mu .Lock ()
958+ waitingForConnErr = err
959+ cc .cond .Broadcast ()
960+ cc .mu .Unlock ()
961+ }
962+ }()
963+ }
964+ cc .pendingRequests ++
965+ cc .cond .Wait ()
966+ cc .pendingRequests --
967+ if waitingForConnErr != nil {
968+ return waitingForConnErr
969+ }
970+ }
971+ }
972+
923973// requires cc.wmu be held
924974func (cc * ClientConn ) writeHeaders (streamID uint32 , endStream bool , hdrs []byte ) error {
925975 first := true // first frame written (HEADERS is first, then CONTINUATION)
@@ -1279,7 +1329,9 @@ func (cc *ClientConn) streamByID(id uint32, andRemove bool) *clientStream {
12791329 cc .idleTimer .Reset (cc .idleTimeout )
12801330 }
12811331 close (cs .done )
1282- cc .cond .Broadcast () // wake up checkResetOrDone via clientStream.awaitFlowControl
1332+ // Wake up checkResetOrDone via clientStream.awaitFlowControl and
1333+ // wake up RoundTrip if there is a pending request.
1334+ cc .cond .Broadcast ()
12831335 }
12841336 return cs
12851337}
@@ -1378,8 +1430,9 @@ func (rl *clientConnReadLoop) run() error {
13781430 cc .vlogf ("http2: Transport readFrame error on conn %p: (%T) %v" , cc , err , err )
13791431 }
13801432 if se , ok := err .(StreamError ); ok {
1381- if cs := cc .streamByID (se .StreamID , true /*ended; remove it*/ ); cs != nil {
1433+ if cs := cc .streamByID (se .StreamID , false ); cs != nil {
13821434 cs .cc .writeStreamReset (cs .ID , se .Code , err )
1435+ cs .cc .forgetStreamID (cs .ID )
13831436 if se .Cause == nil {
13841437 se .Cause = cc .fr .errDetail
13851438 }
@@ -1701,6 +1754,7 @@ func (b transportResponseBody) Close() error {
17011754 }
17021755
17031756 cs .bufPipe .BreakWithError (errClosedResponseBody )
1757+ cc .forgetStreamID (cs .ID )
17041758 return nil
17051759}
17061760
0 commit comments