Skip to content

Commit c3f267f

Browse files
committed
http2: add I/O timeouts
Addresses hanging transport when on blocking I/O. There are many scenario where the roundtrip hangs on write or read and won't be unlocked by current cancelation systems (context, Request.Cancel, ...). This adds read and write deadlines support. The writer disables the read deadline and enables the write deadline, then after the write is successful, it disables the write deadline and re-enables the read deadline. The read loop also sets its read deadline after a successful read since the next frame is not predictable. It guarantees that an I/O will not timeout before IOTimeout and will timeout after a complete block before at least IOTimeout. See issue: golang/go#23559 Change-Id: If618a63857cc32d8c3175c0d9bef1f8bf83c89df
1 parent d0aafc7 commit c3f267f

File tree

2 files changed

+342
-4
lines changed

2 files changed

+342
-4
lines changed

http2/transport.go

Lines changed: 65 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,10 @@ type Transport struct {
9494
// to mean no limit.
9595
MaxHeaderListSize uint32
9696

97+
// IOTimeout, if non-zero, enables timeouts on read and write to the
98+
// connection.
99+
IOTimeout time.Duration
100+
97101
// t1, if non-nil, is the standard library Transport using
98102
// this transport. Its settings are used (but not its
99103
// RoundTrip method, etc).
@@ -321,7 +325,9 @@ func (noCachedConnError) Error() string { return "http2: no cached c
321325
// or its equivalent renamed type in net/http2's h2_bundle.go. Both types
322326
// may coexist in the same running program.
323327
func isNoCachedConnError(err error) bool {
324-
_, ok := err.(interface{ IsHTTP2NoCachedConnError() })
328+
_, ok := err.(interface {
329+
IsHTTP2NoCachedConnError()
330+
})
325331
return ok
326332
}
327333

@@ -529,6 +535,38 @@ func (t *Transport) NewClientConn(c net.Conn) (*ClientConn, error) {
529535
return t.newClientConn(c, false)
530536
}
531537

538+
type timeoutWriter struct {
539+
c net.Conn
540+
timeout time.Duration
541+
}
542+
543+
// Write writes to the underlying connection and manages both read and write
544+
// deadlines.
545+
func (w timeoutWriter) Write(p []byte) (int, error) {
546+
// The read deadline is disabled to allow so the reader doesn't timeout
547+
// while there are no pending requests.
548+
// The write deadline is set.
549+
// The write occurs and the connection is closed on timeout interrupting the
550+
// read with an error.
551+
// If the write was successful it sets the deadline for the current read.
552+
now := time.Now()
553+
w.c.SetReadDeadline(time.Time{})
554+
w.c.SetWriteDeadline(now.Add(w.timeout))
555+
n, err := w.c.Write(p)
556+
if err != nil {
557+
if ne, ok := err.(net.Error); ok && ne.Timeout() {
558+
// The write end of the connection is no longer in a known
559+
// consistent state, unlock the read loop by closing the connection
560+
// and force a cleanup.
561+
w.c.Close()
562+
return n, err
563+
}
564+
}
565+
w.c.SetWriteDeadline(time.Time{})
566+
w.c.SetReadDeadline(time.Now().Add(w.timeout))
567+
return n, err
568+
}
569+
532570
func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, error) {
533571
cc := &ClientConn{
534572
t: t,
@@ -555,9 +593,14 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
555593
cc.cond = sync.NewCond(&cc.mu)
556594
cc.flow.add(int32(initialWindowSize))
557595

596+
var w io.Writer = cc.tconn
597+
if d := cc.ioTimeout(); d > 0 {
598+
w = timeoutWriter{c: cc.tconn, timeout: d}
599+
}
600+
558601
// TODO: adjust this writer size to account for frame size +
559602
// MTU + crypto/tls record padding.
560-
cc.bw = bufio.NewWriter(stickyErrWriter{c, &cc.werr})
603+
cc.bw = bufio.NewWriter(stickyErrWriter{w, &cc.werr})
561604
cc.br = bufio.NewReader(c)
562605
cc.fr = NewFramer(cc.bw, cc.br)
563606
cc.fr.ReadMetaHeaders = hpack.NewDecoder(initialHeaderTableSize, nil)
@@ -733,6 +776,10 @@ func (cc *ClientConn) responseHeaderTimeout() time.Duration {
733776
return 0
734777
}
735778

779+
func (cc *ClientConn) ioTimeout() time.Duration {
780+
return cc.t.IOTimeout
781+
}
782+
736783
// checkConnHeaders checks whether req has any invalid connection-level headers.
737784
// per RFC 7540 section 8.1.2.2: Connection-Specific Header Fields.
738785
// Certain headers are special-cased as okay but not transmitted later.
@@ -1473,12 +1520,23 @@ func (rl *clientConnReadLoop) run() error {
14731520
rl.closeWhenIdle = cc.t.disableKeepAlives() || cc.singleUse
14741521
gotReply := false // ever saw a HEADERS reply
14751522
gotSettings := false
1523+
timeout := cc.ioTimeout()
14761524
for {
14771525
f, err := cc.fr.ReadFrame()
14781526
if err != nil {
14791527
cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err)
14801528
}
1481-
if se, ok := err.(StreamError); ok {
1529+
if ne, ok := err.(net.Error); ok && ne.Timeout() {
1530+
cc.mu.Lock()
1531+
idle := len(cc.streams) == 0
1532+
cc.mu.Unlock()
1533+
if idle {
1534+
// let the idle timer handle the timeout.
1535+
cc.tconn.SetReadDeadline(time.Time{})
1536+
continue
1537+
}
1538+
return err
1539+
} else if se, ok := err.(StreamError); ok {
14821540
if cs := cc.streamByID(se.StreamID, false); cs != nil {
14831541
cs.cc.writeStreamReset(cs.ID, se.Code, err)
14841542
cs.cc.forgetStreamID(cs.ID)
@@ -1537,6 +1595,10 @@ func (rl *clientConnReadLoop) run() error {
15371595
if rl.closeWhenIdle && gotReply && maybeIdle {
15381596
cc.closeIfIdle()
15391597
}
1598+
if timeout > 0 {
1599+
// Upon successful read, set timeout on next read.
1600+
cc.tconn.SetReadDeadline(time.Now().Add(timeout))
1601+
}
15401602
}
15411603
}
15421604

0 commit comments

Comments
 (0)