@@ -307,10 +307,9 @@ func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) {
307
307
308
308
// The net/http package sets the write deadline from the
309
309
// http.Server.WriteTimeout during the TLS handshake, but then
310
- // passes the connection off to us with the deadline already
311
- // set. Disarm it here so that it is not applied to additional
312
- // streams opened on this connection.
313
- // TODO: implement WriteTimeout fully. See Issue 18437.
310
+ // passes the connection off to us with the deadline already set.
311
+ // Write deadlines are set per stream in serverConn.newStream.
312
+ // Disarm the net.Conn write deadline here.
314
313
if sc .hs .WriteTimeout != 0 {
315
314
sc .conn .SetWriteDeadline (time.Time {})
316
315
}
@@ -493,9 +492,10 @@ type stream struct {
493
492
numTrailerValues int64
494
493
weight uint8
495
494
state streamState
496
- resetQueued bool // RST_STREAM queued for write; set by sc.resetStream
497
- gotTrailerHeader bool // HEADER frame for trailers was seen
498
- wroteHeaders bool // whether we wrote headers (not status 100)
495
+ resetQueued bool // RST_STREAM queued for write; set by sc.resetStream
496
+ gotTrailerHeader bool // HEADER frame for trailers was seen
497
+ wroteHeaders bool // whether we wrote headers (not status 100)
498
+ writeDeadline * time.Timer // nil if unused
499
499
500
500
trailer http.Header // accumulated trailers
501
501
reqTrailer http.Header // handler's Request.Trailer
@@ -766,6 +766,10 @@ func (sc *serverConn) serve() {
766
766
loopNum ++
767
767
select {
768
768
case wr := <- sc .wantWriteFrameCh :
769
+ if se , ok := wr .write .(StreamError ); ok {
770
+ sc .resetStream (se )
771
+ break
772
+ }
769
773
sc .writeFrame (wr )
770
774
case spr := <- sc .wantStartPushCh :
771
775
sc .startPush (spr )
@@ -1336,6 +1340,9 @@ func (sc *serverConn) closeStream(st *stream, err error) {
1336
1340
panic (fmt .Sprintf ("invariant; can't close stream in state %v" , st .state ))
1337
1341
}
1338
1342
st .state = stateClosed
1343
+ if st .writeDeadline != nil {
1344
+ st .writeDeadline .Stop ()
1345
+ }
1339
1346
if st .isPushed () {
1340
1347
sc .curPushedStreams --
1341
1348
} else {
@@ -1574,6 +1581,12 @@ func (st *stream) copyTrailersToHandlerRequest() {
1574
1581
}
1575
1582
}
1576
1583
1584
+ // onWriteTimeout is run on its own goroutine (from time.AfterFunc)
1585
+ // when the stream's WriteTimeout has fired.
1586
+ func (st * stream ) onWriteTimeout () {
1587
+ st .sc .writeFrameFromHandler (FrameWriteRequest {write : streamError (st .id , ErrCodeInternal )})
1588
+ }
1589
+
1577
1590
func (sc * serverConn ) processHeaders (f * MetaHeadersFrame ) error {
1578
1591
sc .serveG .check ()
1579
1592
id := f .StreamID
@@ -1753,6 +1766,9 @@ func (sc *serverConn) newStream(id, pusherID uint32, state streamState) *stream
1753
1766
st .flow .add (sc .initialStreamSendWindowSize )
1754
1767
st .inflow .conn = & sc .inflow // link to conn-level counter
1755
1768
st .inflow .add (sc .srv .initialStreamRecvWindowSize ())
1769
+ if sc .hs .WriteTimeout != 0 {
1770
+ st .writeDeadline = time .AfterFunc (sc .hs .WriteTimeout , st .onWriteTimeout )
1771
+ }
1756
1772
1757
1773
sc .streams [id ] = st
1758
1774
sc .writeSched .OpenStream (st .id , OpenStreamOptions {PusherID : pusherID })
0 commit comments