Skip to content

Commit bb80766

Browse files
apolcynbradfitz
authored andcommitted
http2: Add opt-in option to Framer to allow DataFrame struct reuse
The existing Framer in net/http2 allocates a new DataFrame struct for each DataFrame read on calls to ReadFrame. The SetReuseFrame option introduced here, if set on a Framer, allows the Framer to reuse Frame objects and changes the ReadFrame API so that returned Frame objects are only valid until the next call to ReadFrame. This opt-in API now only implements reuse of DataFrames, but it allows the Framer to reuse of any type of Frame. The footprint caused by creation of new DataFrame structs per data frame was noticed in micro benchmarks of "gRPC" server "streaming throuhgput", which uses the Framer in this package. This benchmark happened to use long lived http2 streams that do client-server "ping-pong" requests with small data frames, and DataFrames were seen to be a significant source of allocations. Running local benchmarks with: (from x/net/http2 directory) $ go test -run=^$ -bench=BenchmarkServerToClientStream example output: * expect an alloc reduction of at least 1 and a small memory reduction between "BenchmarkServerToClientStreamDefaultOptions" and "BenchmarkServerToClientStreamReuseFrames" BenchmarkServerToClientStreamDefaultOptions-12 30000 46216 ns/op 971 B/op 17 allocs/op BenchmarkServerToClientStreamReuseFrames-12 30000 44952 ns/op 924 B/op 16 allocs/op Fixes golang/go#18502 Change-Id: Iad93420ef6c3918f54249d867098f1dadfa324d8 Reviewed-on: https://go-review.googlesource.com/34812 Run-TryBot: Brad Fitzpatrick <[email protected]> TryBot-Result: Gobot Gobot <[email protected]> Reviewed-by: Brad Fitzpatrick <[email protected]>
1 parent 10c134e commit bb80766

File tree

3 files changed

+218
-17
lines changed

3 files changed

+218
-17
lines changed

http2/frame.go

Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ var flagName = map[FrameType]map[Flags]string{
122122
// a frameParser parses a frame given its FrameHeader and payload
123123
// bytes. The length of payload will always equal fh.Length (which
124124
// might be 0).
125-
type frameParser func(fh FrameHeader, payload []byte) (Frame, error)
125+
type frameParser func(fc *frameCache, fh FrameHeader, payload []byte) (Frame, error)
126126

127127
var frameParsers = map[FrameType]frameParser{
128128
FrameData: parseDataFrame,
@@ -323,6 +323,8 @@ type Framer struct {
323323
debugFramerBuf *bytes.Buffer
324324
debugReadLoggerf func(string, ...interface{})
325325
debugWriteLoggerf func(string, ...interface{})
326+
327+
frameCache *frameCache // nil if frames aren't reused (default)
326328
}
327329

328330
func (fr *Framer) maxHeaderListSize() uint32 {
@@ -398,6 +400,27 @@ const (
398400
maxFrameSize = 1<<24 - 1
399401
)
400402

403+
// SetReuseFrames allows the Framer to reuse Frames.
404+
// If called on a Framer, Frames returned by calls to ReadFrame are only
405+
// valid until the next call to ReadFrame.
406+
func (fr *Framer) SetReuseFrames() {
407+
if fr.frameCache != nil {
408+
return
409+
}
410+
fr.frameCache = &frameCache{}
411+
}
412+
413+
type frameCache struct {
414+
dataFrame DataFrame
415+
}
416+
417+
func (fc *frameCache) getDataFrame() *DataFrame {
418+
if fc == nil {
419+
return &DataFrame{}
420+
}
421+
return &fc.dataFrame
422+
}
423+
401424
// NewFramer returns a Framer that writes frames to w and reads them from r.
402425
func NewFramer(w io.Writer, r io.Reader) *Framer {
403426
fr := &Framer{
@@ -477,7 +500,7 @@ func (fr *Framer) ReadFrame() (Frame, error) {
477500
if _, err := io.ReadFull(fr.r, payload); err != nil {
478501
return nil, err
479502
}
480-
f, err := typeFrameParser(fh.Type)(fh, payload)
503+
f, err := typeFrameParser(fh.Type)(fr.frameCache, fh, payload)
481504
if err != nil {
482505
if ce, ok := err.(connError); ok {
483506
return nil, fr.connError(ce.Code, ce.Reason)
@@ -565,7 +588,7 @@ func (f *DataFrame) Data() []byte {
565588
return f.data
566589
}
567590

568-
func parseDataFrame(fh FrameHeader, payload []byte) (Frame, error) {
591+
func parseDataFrame(fc *frameCache, fh FrameHeader, payload []byte) (Frame, error) {
569592
if fh.StreamID == 0 {
570593
// DATA frames MUST be associated with a stream. If a
571594
// DATA frame is received whose stream identifier
@@ -574,9 +597,9 @@ func parseDataFrame(fh FrameHeader, payload []byte) (Frame, error) {
574597
// PROTOCOL_ERROR.
575598
return nil, connError{ErrCodeProtocol, "DATA frame with stream ID 0"}
576599
}
577-
f := &DataFrame{
578-
FrameHeader: fh,
579-
}
600+
f := fc.getDataFrame()
601+
f.FrameHeader = fh
602+
580603
var padSize byte
581604
if fh.Flags.Has(FlagDataPadded) {
582605
var err error
@@ -672,7 +695,7 @@ type SettingsFrame struct {
672695
p []byte
673696
}
674697

675-
func parseSettingsFrame(fh FrameHeader, p []byte) (Frame, error) {
698+
func parseSettingsFrame(_ *frameCache, fh FrameHeader, p []byte) (Frame, error) {
676699
if fh.Flags.Has(FlagSettingsAck) && fh.Length > 0 {
677700
// When this (ACK 0x1) bit is set, the payload of the
678701
// SETTINGS frame MUST be empty. Receipt of a
@@ -774,7 +797,7 @@ type PingFrame struct {
774797

775798
func (f *PingFrame) IsAck() bool { return f.Flags.Has(FlagPingAck) }
776799

777-
func parsePingFrame(fh FrameHeader, payload []byte) (Frame, error) {
800+
func parsePingFrame(_ *frameCache, fh FrameHeader, payload []byte) (Frame, error) {
778801
if len(payload) != 8 {
779802
return nil, ConnectionError(ErrCodeFrameSize)
780803
}
@@ -814,7 +837,7 @@ func (f *GoAwayFrame) DebugData() []byte {
814837
return f.debugData
815838
}
816839

817-
func parseGoAwayFrame(fh FrameHeader, p []byte) (Frame, error) {
840+
func parseGoAwayFrame(_ *frameCache, fh FrameHeader, p []byte) (Frame, error) {
818841
if fh.StreamID != 0 {
819842
return nil, ConnectionError(ErrCodeProtocol)
820843
}
@@ -854,7 +877,7 @@ func (f *UnknownFrame) Payload() []byte {
854877
return f.p
855878
}
856879

857-
func parseUnknownFrame(fh FrameHeader, p []byte) (Frame, error) {
880+
func parseUnknownFrame(_ *frameCache, fh FrameHeader, p []byte) (Frame, error) {
858881
return &UnknownFrame{fh, p}, nil
859882
}
860883

@@ -865,7 +888,7 @@ type WindowUpdateFrame struct {
865888
Increment uint32 // never read with high bit set
866889
}
867890

868-
func parseWindowUpdateFrame(fh FrameHeader, p []byte) (Frame, error) {
891+
func parseWindowUpdateFrame(_ *frameCache, fh FrameHeader, p []byte) (Frame, error) {
869892
if len(p) != 4 {
870893
return nil, ConnectionError(ErrCodeFrameSize)
871894
}
@@ -930,7 +953,7 @@ func (f *HeadersFrame) HasPriority() bool {
930953
return f.FrameHeader.Flags.Has(FlagHeadersPriority)
931954
}
932955

933-
func parseHeadersFrame(fh FrameHeader, p []byte) (_ Frame, err error) {
956+
func parseHeadersFrame(_ *frameCache, fh FrameHeader, p []byte) (_ Frame, err error) {
934957
hf := &HeadersFrame{
935958
FrameHeader: fh,
936959
}
@@ -1067,7 +1090,7 @@ func (p PriorityParam) IsZero() bool {
10671090
return p == PriorityParam{}
10681091
}
10691092

1070-
func parsePriorityFrame(fh FrameHeader, payload []byte) (Frame, error) {
1093+
func parsePriorityFrame(_ *frameCache, fh FrameHeader, payload []byte) (Frame, error) {
10711094
if fh.StreamID == 0 {
10721095
return nil, connError{ErrCodeProtocol, "PRIORITY frame with stream ID 0"}
10731096
}
@@ -1114,7 +1137,7 @@ type RSTStreamFrame struct {
11141137
ErrCode ErrCode
11151138
}
11161139

1117-
func parseRSTStreamFrame(fh FrameHeader, p []byte) (Frame, error) {
1140+
func parseRSTStreamFrame(_ *frameCache, fh FrameHeader, p []byte) (Frame, error) {
11181141
if len(p) != 4 {
11191142
return nil, ConnectionError(ErrCodeFrameSize)
11201143
}
@@ -1144,7 +1167,7 @@ type ContinuationFrame struct {
11441167
headerFragBuf []byte
11451168
}
11461169

1147-
func parseContinuationFrame(fh FrameHeader, p []byte) (Frame, error) {
1170+
func parseContinuationFrame(_ *frameCache, fh FrameHeader, p []byte) (Frame, error) {
11481171
if fh.StreamID == 0 {
11491172
return nil, connError{ErrCodeProtocol, "CONTINUATION frame with stream ID 0"}
11501173
}
@@ -1194,7 +1217,7 @@ func (f *PushPromiseFrame) HeadersEnded() bool {
11941217
return f.FrameHeader.Flags.Has(FlagPushPromiseEndHeaders)
11951218
}
11961219

1197-
func parsePushPromise(fh FrameHeader, p []byte) (_ Frame, err error) {
1220+
func parsePushPromise(_ *frameCache, fh FrameHeader, p []byte) (_ Frame, err error) {
11981221
pp := &PushPromiseFrame{
11991222
FrameHeader: fh,
12001223
}

http2/frame_test.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1096,6 +1096,95 @@ func TestMetaFrameHeader(t *testing.T) {
10961096
}
10971097
}
10981098

1099+
func TestSetReuseFrames(t *testing.T) {
1100+
fr, buf := testFramer()
1101+
fr.SetReuseFrames()
1102+
1103+
// Check that DataFrames are reused. Note that
1104+
// SetReuseFrames only currently implements reuse of DataFrames.
1105+
firstDf := readAndVerifyDataFrame("ABC", 3, fr, buf, t)
1106+
1107+
for i := 0; i < 10; i++ {
1108+
df := readAndVerifyDataFrame("XYZ", 3, fr, buf, t)
1109+
if df != firstDf {
1110+
t.Errorf("Expected Framer to return references to the same DataFrame. Have %v and %v", &df, &firstDf)
1111+
}
1112+
}
1113+
1114+
for i := 0; i < 10; i++ {
1115+
df := readAndVerifyDataFrame("", 0, fr, buf, t)
1116+
if df != firstDf {
1117+
t.Errorf("Expected Framer to return references to the same DataFrame. Have %v and %v", &df, &firstDf)
1118+
}
1119+
}
1120+
1121+
for i := 0; i < 10; i++ {
1122+
df := readAndVerifyDataFrame("HHH", 3, fr, buf, t)
1123+
if df != firstDf {
1124+
t.Errorf("Expected Framer to return references to the same DataFrame. Have %v and %v", &df, &firstDf)
1125+
}
1126+
}
1127+
}
1128+
1129+
func TestSetReuseFramesMoreThanOnce(t *testing.T) {
1130+
fr, buf := testFramer()
1131+
fr.SetReuseFrames()
1132+
1133+
firstDf := readAndVerifyDataFrame("ABC", 3, fr, buf, t)
1134+
fr.SetReuseFrames()
1135+
1136+
for i := 0; i < 10; i++ {
1137+
df := readAndVerifyDataFrame("XYZ", 3, fr, buf, t)
1138+
// SetReuseFrames should be idempotent
1139+
fr.SetReuseFrames()
1140+
if df != firstDf {
1141+
t.Errorf("Expected Framer to return references to the same DataFrame. Have %v and %v", &df, &firstDf)
1142+
}
1143+
}
1144+
}
1145+
1146+
func TestNoSetReuseFrames(t *testing.T) {
1147+
fr, buf := testFramer()
1148+
const numNewDataFrames = 10
1149+
dfSoFar := make([]interface{}, numNewDataFrames)
1150+
1151+
// Check that DataFrames are not reused if SetReuseFrames wasn't called.
1152+
// SetReuseFrames only currently implements reuse of DataFrames.
1153+
for i := 0; i < numNewDataFrames; i++ {
1154+
df := readAndVerifyDataFrame("XYZ", 3, fr, buf, t)
1155+
for _, item := range dfSoFar {
1156+
if df == item {
1157+
t.Errorf("Expected Framer to return new DataFrames since SetNoReuseFrames not set.")
1158+
}
1159+
}
1160+
dfSoFar[i] = df
1161+
}
1162+
}
1163+
1164+
func readAndVerifyDataFrame(data string, length byte, fr *Framer, buf *bytes.Buffer, t *testing.T) *DataFrame {
1165+
var streamID uint32 = 1<<24 + 2<<16 + 3<<8 + 4
1166+
fr.WriteData(streamID, true, []byte(data))
1167+
wantEnc := "\x00\x00" + string(length) + "\x00\x01\x01\x02\x03\x04" + data
1168+
if buf.String() != wantEnc {
1169+
t.Errorf("encoded as %q; want %q", buf.Bytes(), wantEnc)
1170+
}
1171+
f, err := fr.ReadFrame()
1172+
if err != nil {
1173+
t.Fatal(err)
1174+
}
1175+
df, ok := f.(*DataFrame)
1176+
if !ok {
1177+
t.Fatalf("got %T; want *DataFrame", f)
1178+
}
1179+
if !bytes.Equal(df.Data(), []byte(data)) {
1180+
t.Errorf("got %q; want %q", df.Data(), []byte(data))
1181+
}
1182+
if f.Header().Flags&1 == 0 {
1183+
t.Errorf("didn't see END_STREAM flag")
1184+
}
1185+
return df
1186+
}
1187+
10991188
func encodeHeaderRaw(t *testing.T, pairs ...string) []byte {
11001189
var he hpackEncoder
11011190
return he.encodeHeaderRaw(t, pairs...)

http2/server_test.go

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ type serverTesterOpt string
8080

8181
var optOnlyServer = serverTesterOpt("only_server")
8282
var optQuiet = serverTesterOpt("quiet_logging")
83+
var optFramerReuseFrames = serverTesterOpt("frame_reuse_frames")
8384

8485
func newServerTester(t testing.TB, handler http.HandlerFunc, opts ...interface{}) *serverTester {
8586
resetHooks()
@@ -91,7 +92,7 @@ func newServerTester(t testing.TB, handler http.HandlerFunc, opts ...interface{}
9192
NextProtos: []string{NextProtoTLS},
9293
}
9394

94-
var onlyServer, quiet bool
95+
var onlyServer, quiet, framerReuseFrames bool
9596
h2server := new(Server)
9697
for _, opt := range opts {
9798
switch v := opt.(type) {
@@ -107,6 +108,8 @@ func newServerTester(t testing.TB, handler http.HandlerFunc, opts ...interface{}
107108
onlyServer = true
108109
case optQuiet:
109110
quiet = true
111+
case optFramerReuseFrames:
112+
framerReuseFrames = true
110113
}
111114
case func(net.Conn, http.ConnState):
112115
ts.Config.ConnState = v
@@ -149,6 +152,9 @@ func newServerTester(t testing.TB, handler http.HandlerFunc, opts ...interface{}
149152
}
150153
st.cc = cc
151154
st.fr = NewFramer(cc, cc)
155+
if framerReuseFrames {
156+
st.fr.SetReuseFrames()
157+
}
152158
if !logFrameReads && !logFrameWrites {
153159
st.fr.debugReadLoggerf = func(m string, v ...interface{}) {
154160
m = time.Now().Format("2006-01-02 15:04:05.999999999 ") + strings.TrimPrefix(m, "http2: ") + "\n"
@@ -2994,6 +3000,89 @@ func BenchmarkServerPosts(b *testing.B) {
29943000
}
29953001
}
29963002

3003+
// Send a stream of messages from server to client in separate data frames.
3004+
// Brings up performance issues seen in long streams.
3005+
// Created to show problem in go issue #18502
3006+
func BenchmarkServerToClientStreamDefaultOptions(b *testing.B) {
3007+
benchmarkServerToClientStream(b)
3008+
}
3009+
3010+
// Justification for Change-Id: Iad93420ef6c3918f54249d867098f1dadfa324d8
3011+
// Expect to see memory/alloc reduction by opting in to Frame reuse with the Framer.
3012+
func BenchmarkServerToClientStreamReuseFrames(b *testing.B) {
3013+
benchmarkServerToClientStream(b, optFramerReuseFrames)
3014+
}
3015+
3016+
func benchmarkServerToClientStream(b *testing.B, newServerOpts ...interface{}) {
3017+
defer disableGoroutineTracking()()
3018+
b.ReportAllocs()
3019+
const msgLen = 1
3020+
// default window size
3021+
const windowSize = 1<<16 - 1
3022+
3023+
// next message to send from the server and for the client to expect
3024+
nextMsg := func(i int) []byte {
3025+
msg := make([]byte, msgLen)
3026+
msg[0] = byte(i)
3027+
if len(msg) != msgLen {
3028+
panic("invalid test setup msg length")
3029+
}
3030+
return msg
3031+
}
3032+
3033+
st := newServerTester(b, func(w http.ResponseWriter, r *http.Request) {
3034+
// Consume the (empty) body from th peer before replying, otherwise
3035+
// the server will sometimes (depending on scheduling) send the peer a
3036+
// a RST_STREAM with the CANCEL error code.
3037+
if n, err := io.Copy(ioutil.Discard, r.Body); n != 0 || err != nil {
3038+
b.Errorf("Copy error; got %v, %v; want 0, nil", n, err)
3039+
}
3040+
for i := 0; i < b.N; i += 1 {
3041+
w.Write(nextMsg(i))
3042+
w.(http.Flusher).Flush()
3043+
}
3044+
}, newServerOpts...)
3045+
defer st.Close()
3046+
st.greet()
3047+
3048+
const id = uint32(1)
3049+
3050+
st.writeHeaders(HeadersFrameParam{
3051+
StreamID: id,
3052+
BlockFragment: st.encodeHeader(":method", "POST"),
3053+
EndStream: false,
3054+
EndHeaders: true,
3055+
})
3056+
3057+
st.writeData(id, true, nil)
3058+
st.wantHeaders()
3059+
3060+
var pendingWindowUpdate = uint32(0)
3061+
3062+
for i := 0; i < b.N; i += 1 {
3063+
expected := nextMsg(i)
3064+
df := st.wantData()
3065+
if bytes.Compare(expected, df.data) != 0 {
3066+
b.Fatalf("Bad message received; want %v; got %v", expected, df.data)
3067+
}
3068+
// try to send infrequent but large window updates so they don't overwhelm the test
3069+
pendingWindowUpdate += uint32(len(df.data))
3070+
if pendingWindowUpdate >= windowSize/2 {
3071+
if err := st.fr.WriteWindowUpdate(0, pendingWindowUpdate); err != nil {
3072+
b.Fatal(err)
3073+
}
3074+
if err := st.fr.WriteWindowUpdate(id, pendingWindowUpdate); err != nil {
3075+
b.Fatal(err)
3076+
}
3077+
pendingWindowUpdate = 0
3078+
}
3079+
}
3080+
df := st.wantData()
3081+
if !df.StreamEnded() {
3082+
b.Fatalf("DATA didn't have END_STREAM; got %v", df)
3083+
}
3084+
}
3085+
29973086
// go-fuzz bug, originally reported at https://github.com/bradfitz/http2/issues/53
29983087
// Verify we don't hang.
29993088
func TestIssue53(t *testing.T) {

0 commit comments

Comments
 (0)