Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 127 additions & 84 deletions http2/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ type Transport struct {

// AllowHTTP, if true, permits HTTP/2 requests using the insecure,
// plain-text "http" scheme. Note that this does not enable h2c support.
AllowHTTP bool

ConnectionFlow uint32
AllowHTTP bool
InitialStreamID uint32
ConnectionFlow uint32

// ConnPool optionally specifies an alternate connection pool to use.
// If nil, the default is used.
Expand Down Expand Up @@ -316,7 +316,6 @@ type ClientConn struct {
dialedAddr string // addr dialed to create tconn; not set with NewClientConn
flow flow // our conn-level flow control quota (cs.flow is per stream)
fr *Framer
freeBuf [][]byte

goAway *GoAwayFrame // if non-nil, the GoAwayFrame we received
goAwayDebug string // goAway frame's debug data, retained as a string
Expand Down Expand Up @@ -357,7 +356,9 @@ type ClientConn struct {
wantSettingsAck bool // we sent a SETTINGS frame and haven't heard back
werr error // first write error that has occurred

wmu sync.Mutex // held while writing; acquire AFTER mu if holding both
wmu sync.Mutex // held while writing; acquire AFTER mu if holding both
connFlow uint32
streamFlow uint32
}

// clientStream is the state for a single HTTP/2 stream. One of these
Expand Down Expand Up @@ -769,6 +770,34 @@ func (t *Transport) newClientConn(c net.Conn, addr string, singleUse bool) (*Cli
wantSettingsAck: true,
pings: make(map[[8]byte]chan struct{}),
}

// ------------------------------------------------------------------
// FIX: Initialize dynamic flow control variables correctly
// ------------------------------------------------------------------

// 1. Determine Stream Flow (default to 4MB if not set in profile)
cc.streamFlow = transportDefaultStreamFlow
if v, ok := t.Settings[SettingInitialWindowSize]; ok && v != 0 {
cc.streamFlow = v
}

// 2. Determine Connection Flow (default to ~15MB if not set)
cc.connFlow = transportDefaultConnFlow
if t.ConnectionFlow != 0 {
cc.connFlow = t.ConnectionFlow
}

// ------------------------------------------------------------------
// SAFETY: Validate flow control values don't exceed int32 max
// RFC 7540 Section 6.9.1: Max window size is 2^31-1
// ------------------------------------------------------------------
if cc.connFlow > math.MaxInt32 {
return nil, fmt.Errorf("http2: connection flow control window too large: %d (max: %d)", cc.connFlow, math.MaxInt32)
}
if cc.streamFlow > math.MaxInt32 {
return nil, fmt.Errorf("http2: stream flow control window too large: %d (max: %d)", cc.streamFlow, math.MaxInt32)
}

if d := t.idleConnTimeout(); d != 0 {
cc.idleTimeout = d
cc.idleTimer = time.AfterFunc(d, cc.onIdleTimeout)
Expand Down Expand Up @@ -804,6 +833,10 @@ func (t *Transport) newClientConn(c net.Conn, addr string, singleUse bool) (*Cli
cc.nextStreamID = 3
}

if t.InitialStreamID != 0 {
cc.nextStreamID = t.InitialStreamID
}

if cs, ok := c.(connectionStater); ok {
state := cs.ConnectionState()
cc.tlsState = &state
Expand All @@ -816,44 +849,38 @@ func (t *Transport) newClientConn(c net.Conn, addr string, singleUse bool) (*Cli
pushEnabled = 1
}

//setMaxHeader := false
if t.Settings != nil {
// we need to iterate over the slice here not the map because of the random range over a map
for _, settingId := range t.SettingsOrder {
settingValue := t.Settings[settingId]

/*
if settingId == SettingMaxHeaderListSize && settingValue != 0 {
// setMaxHeader = true
if settingValue != 0 {
initialSettings = append(initialSettings, Setting{ID: SettingMaxHeaderListSize, Val: settingValue})
continue
}

}*/

initialSettings = append(initialSettings, Setting{ID: settingId, Val: settingValue})
}
} else {
// when we dont define a custom map on the transport we add Enable Push per default
initialSettings = append(initialSettings, Setting{ID: SettingEnablePush, Val: pushEnabled})
}

cc.bw.Write(clientPreface)
cc.fr.WriteSettings(initialSettings...)

cc.fr.WriteWindowUpdate(0, t.ConnectionFlow)
// ------------------------------------------------------------------
// CRITICAL FIX: Use the sanitized cc.connFlow.
// t.ConnectionFlow might be 0, which would send an illegal Window Update of 0.
// cc.connFlow is guaranteed to be non-zero (defaults to transportDefaultConnFlow).
// ------------------------------------------------------------------
if cc.connFlow > 0 {
cc.fr.WriteWindowUpdate(0, cc.connFlow)
}

for _, priority := range t.Priorities {
cc.fr.WritePriority(priority.StreamID, priority.PriorityParam)
cc.nextStreamID = priority.StreamID + 2
}

cc.inflow.add(transportDefaultConnFlow + initialWindowSize)
// Use the dynamic connection flow value we calculated earlier
cc.inflow.add(int32(cc.connFlow) + int32(initialWindowSize))

cc.bw.Flush()
if cc.werr != nil {
cc.Close()

return nil, cc.werr
}

Expand Down Expand Up @@ -1090,50 +1117,6 @@ func (cc *ClientConn) closeForLostPing() error {
return cc.closeForError(err)
}

const maxAllocFrameSize = 512 << 10

// frameBuffer returns a scratch buffer suitable for writing DATA frames.
// They're capped at the min of the peer's max frame size or 512KB
// (kinda arbitrarily), but definitely capped so we don't allocate 4GB
// bufers.
func (cc *ClientConn) frameScratchBuffer() []byte {
cc.mu.Lock()
size := cc.maxFrameSize
if size > maxAllocFrameSize {
size = maxAllocFrameSize
}
for i, buf := range cc.freeBuf {
if len(buf) >= int(size) {
cc.freeBuf[i] = nil
cc.mu.Unlock()

return buf[:size]
}
}
cc.mu.Unlock()

return make([]byte, size)
}

func (cc *ClientConn) putFrameScratchBuffer(buf []byte) {
cc.mu.Lock()
defer cc.mu.Unlock()
const maxBufs = 4 // arbitrary; 4 concurrent requests per conn? investigate.
if len(cc.freeBuf) < maxBufs {
cc.freeBuf = append(cc.freeBuf, buf)

return
}
for i, old := range cc.freeBuf {
if old == nil {
cc.freeBuf[i] = buf

return
}
}
// forget about it.
}

// errRequestCanceled is a copy of net/http's errRequestCanceled because it's not
// exported. At least they'll be DeepEqual for h1-vs-h2 comparisons tests.
var errRequestCanceled = errors.New("net/http: request canceled")
Expand Down Expand Up @@ -1506,11 +1489,35 @@ var (
errStopReqBodyWriteAndCancel = errors.New("http2: canceling request")
)

// frameScratchBufferLen returns the length of a buffer to use for
// outgoing request bodies to read/write to/from.
//
// It returns max(1, min(peer's advertised max frame size,
// Request.ContentLength+1, 512KB)).
func (cs *clientStream) frameScratchBufferLen(maxFrameSize int) int {
const max = 512 << 10
n := int64(maxFrameSize)
if n > max {
n = max
}
if cl := actualContentLength(cs.req); cl != -1 && cl+1 < n {
// Add an extra byte past the declared content-length to
// give the caller's Request.Body io.Reader a chance to
// give us more bytes than they declared, so we can catch it
// early.
n = cl + 1
}
if n < 1 {
return 1
}
return int(n) // doesn't truncate; max is 512K
}

var bufPool sync.Pool // of *[]byte

func (cs *clientStream) writeRequestBody(body io.Reader, bodyCloser io.Closer) (err error) {
cc := cs.cc
sentEnd := false // whether we sent the final DATA frame w/ END_STREAM
buf := cc.frameScratchBuffer()
defer cc.putFrameScratchBuffer(buf)

defer func() {
traceWroteRequest(cs.trace, err)
Expand All @@ -1529,9 +1536,24 @@ func (cs *clientStream) writeRequestBody(body io.Reader, bodyCloser io.Closer) (
remainLen := actualContentLength(req)
hasContentLen := remainLen != -1

cc.mu.Lock()
maxFrameSize := int(cc.maxFrameSize)
cc.mu.Unlock()

// Scratch buffer for reading into & writing from.
scratchLen := cs.frameScratchBufferLen(maxFrameSize)
var buf []byte
if bp, ok := bufPool.Get().(*[]byte); ok && len(*bp) >= scratchLen {
defer bufPool.Put(bp)
buf = *bp
} else {
buf = make([]byte, scratchLen)
defer bufPool.Put(&buf)
}

var sawEOF bool
for !sawEOF {
n, err := body.Read(buf[:len(buf)-1])
n, err := body.Read(buf[:len(buf)])
if hasContentLen {
remainLen -= int64(n)
if remainLen == 0 && err == nil {
Expand All @@ -1542,8 +1564,9 @@ func (cs *clientStream) writeRequestBody(body io.Reader, bodyCloser io.Closer) (
// to send the END_STREAM bit early, double-check that we're actually
// at EOF. Subsequent reads should return (0, EOF) at this point.
// If either value is different, we return an error in one of two ways below.
var scratch [1]byte
var n1 int
n1, err = body.Read(buf[n:])
n1, err = body.Read(scratch[:])
remainLen -= int64(n1)
}
if remainLen < 0 {
Expand Down Expand Up @@ -1616,10 +1639,6 @@ func (cs *clientStream) writeRequestBody(body io.Reader, bodyCloser io.Closer) (
}
}

cc.mu.Lock()
maxFrameSize := int(cc.maxFrameSize)
cc.mu.Unlock()

cc.wmu.Lock()
defer cc.wmu.Unlock()

Expand Down Expand Up @@ -1978,7 +1997,7 @@ func (cc *ClientConn) newStreamWithID(streamID uint32, incNext bool) *clientStre
}
cs.flow.add(int32(cc.initialWindowSize))
cs.flow.setConnFlow(&cc.flow)
cs.inflow.add(transportDefaultStreamFlow)
cs.inflow.add(int32(cc.streamFlow))
cs.inflow.setConnFlow(&cc.inflow)
cc.streams[cs.ID] = cs

Expand Down Expand Up @@ -2405,14 +2424,12 @@ func (b transportResponseBody) Read(p []byte) (n int, err error) {
cc.writeStreamReset(cs.ID, ErrCodeProtocol, err)
}
cs.readErr = err

return int(cs.bytesRemain), err
}
cs.bytesRemain -= int64(n)
if err == io.EOF && cs.bytesRemain > 0 {
err = io.ErrUnexpectedEOF
cs.readErr = err

return n, err
}
}
Expand All @@ -2425,21 +2442,47 @@ func (b transportResponseBody) Read(p []byte) (n int, err error) {
defer cc.mu.Unlock()

var connAdd, streamAdd int32

// Check the conn-level first, before the stream-level.
if v := cc.inflow.available(); v < transportDefaultConnFlow/2 {
connAdd = transportDefaultConnFlow - v
// Use dynamic connFlow logic
if v := cc.inflow.available(); v < int32(cc.connFlow/2) {
connAdd = int32(cc.connFlow) - v
cc.inflow.add(connAdd)
}
if err == nil { // No need to refresh if the stream is over or failed.

if err == nil {
// Consider any buffered body data (read from the conn but not
// consumed by the client) when computing flow control for this
// stream.
unsent := transportDefaultStreamFlow - int(cs.inflow.available()) + cs.bufPipe.Len()
if unsent > transportDefaultStreamMinRefresh && unsent > transportDefaultStreamFlow/2 {
streamAdd = int32(unsent)
cs.inflow.add(streamAdd)

// Use dynamic streamFlow logic
unsent := int(cc.streamFlow) - int(cs.inflow.available()) + cs.bufPipe.Len()

// ------------------------------------------------------------------
// FIX: Adaptive Logic
// ------------------------------------------------------------------
const aggressiveThreshold = 16384 // 16KB

// Check if the configured initial window is small (e.g. Firefox's 128KB or 65KB).
// If so, we need to be aggressive with updates.
isSmallWindow := cc.initialWindowSize < 1048576 // < 1MB

if isSmallWindow {
if unsent > aggressiveThreshold {
streamAdd = int32(unsent)
cs.inflow.add(streamAdd)
}
} else {
// Fallback to standard behavior for large windows (Chrome/Default).
// FIX: Replaced transportDefaultStreamFlow constant with cc.streamFlow.
// This ensures correct behavior if a user sets a custom Large window (e.g. 6MB).
if unsent > transportDefaultStreamMinRefresh && unsent > int(cc.streamFlow)/2 {
streamAdd = int32(unsent)
cs.inflow.add(streamAdd)
}
}
}

if connAdd != 0 || streamAdd != 0 {
cc.wmu.Lock()
defer cc.wmu.Unlock()
Expand Down