Skip to content

Commit 64f514d

Browse files
committed
Pool buffers in wspb and wsjson
Closes #71
1 parent 631c152 commit 64f514d

File tree

4 files changed

+114
-6
lines changed

4 files changed

+114
-6
lines changed

internal/bpool/bpool.go

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package bpool
2+
3+
import (
4+
"bytes"
5+
"sync"
6+
)
7+
8+
var bpool sync.Pool
9+
10+
// Get returns a buffer from the pool or creates a new one if
11+
// the pool is empty.
12+
func Get() *bytes.Buffer {
13+
b, ok := bpool.Get().(*bytes.Buffer)
14+
if !ok {
15+
return bytes.NewBuffer(make([]byte, 0, 4096))
16+
}
17+
return b
18+
}
19+
20+
// Put returns a buffer into the pool.
21+
func Put(b *bytes.Buffer) {
22+
b.Reset()
23+
bpool.Put(b)
24+
}

internal/bpool/bpool_test.go

+47
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package bpool
2+
3+
import (
4+
"strconv"
5+
"sync"
6+
"testing"
7+
)
8+
9+
func BenchmarkSyncPool(b *testing.B) {
10+
sizes := []int{
11+
2,
12+
16,
13+
32,
14+
64,
15+
128,
16+
256,
17+
512,
18+
4096,
19+
16384,
20+
}
21+
for _, size := range sizes {
22+
b.Run(strconv.Itoa(size), func(b *testing.B) {
23+
b.Run("allocate", func(b *testing.B) {
24+
b.ReportAllocs()
25+
for i := 0; i < b.N; i++ {
26+
buf := make([]byte, size)
27+
_ = buf
28+
}
29+
})
30+
b.Run("pool", func(b *testing.B) {
31+
b.ReportAllocs()
32+
33+
p := sync.Pool{}
34+
35+
b.ResetTimer()
36+
for i := 0; i < b.N; i++ {
37+
buf := p.Get()
38+
if buf == nil {
39+
buf = make([]byte, size)
40+
}
41+
42+
p.Put(buf)
43+
}
44+
})
45+
})
46+
}
47+
}

wsjson/wsjson.go

+16-2
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@ package wsjson
44
import (
55
"context"
66
"encoding/json"
7+
"sync"
78

89
"golang.org/x/xerrors"
910

1011
"nhooyr.io/websocket"
12+
"nhooyr.io/websocket/internal/bpool"
1113
)
1214

1315
// Read reads a json message from c into v.
@@ -21,8 +23,10 @@ func Read(ctx context.Context, c *websocket.Conn, v interface{}) error {
2123
return nil
2224
}
2325

26+
var bufPool sync.Pool
27+
2428
func read(ctx context.Context, c *websocket.Conn, v interface{}) error {
25-
typ, b, err := c.Read(ctx)
29+
typ, r, err := c.Reader(ctx)
2630
if err != nil {
2731
return err
2832
}
@@ -32,7 +36,17 @@ func read(ctx context.Context, c *websocket.Conn, v interface{}) error {
3236
return xerrors.Errorf("unexpected frame type for json (expected %v): %v", websocket.MessageText, typ)
3337
}
3438

35-
err = json.Unmarshal(b, v)
39+
b := bpool.Get()
40+
defer func() {
41+
bpool.Put(b)
42+
}()
43+
44+
_, err = b.ReadFrom(r)
45+
if err != nil {
46+
return err
47+
}
48+
49+
err = json.Unmarshal(b.Bytes(), v)
3650
if err != nil {
3751
return xerrors.Errorf("failed to unmarshal json: %w", err)
3852
}

wspb/wspb.go

+27-4
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@ package wspb
33

44
import (
55
"context"
6+
"sync"
67

78
"github.com/golang/protobuf/proto"
89
"golang.org/x/xerrors"
910

1011
"nhooyr.io/websocket"
12+
"nhooyr.io/websocket/internal/bpool"
1113
)
1214

1315
// Read reads a protobuf message from c into v.
@@ -21,7 +23,7 @@ func Read(ctx context.Context, c *websocket.Conn, v proto.Message) error {
2123
}
2224

2325
func read(ctx context.Context, c *websocket.Conn, v proto.Message) error {
24-
typ, b, err := c.Read(ctx)
26+
typ, r, err := c.Reader(ctx)
2527
if err != nil {
2628
return err
2729
}
@@ -31,7 +33,17 @@ func read(ctx context.Context, c *websocket.Conn, v proto.Message) error {
3133
return xerrors.Errorf("unexpected frame type for protobuf (expected %v): %v", websocket.MessageBinary, typ)
3234
}
3335

34-
err = proto.Unmarshal(b, v)
36+
b := bpool.Get()
37+
defer func() {
38+
bpool.Put(b)
39+
}()
40+
41+
_, err = b.ReadFrom(r)
42+
if err != nil {
43+
return err
44+
}
45+
46+
err = proto.Unmarshal(b.Bytes(), v)
3547
if err != nil {
3648
return xerrors.Errorf("failed to unmarshal protobuf: %w", err)
3749
}
@@ -49,11 +61,22 @@ func Write(ctx context.Context, c *websocket.Conn, v proto.Message) error {
4961
return nil
5062
}
5163

64+
var writeBufPool sync.Pool
65+
5266
func write(ctx context.Context, c *websocket.Conn, v proto.Message) error {
53-
b, err := proto.Marshal(v)
67+
b, ok := writeBufPool.Get().(*proto.Buffer)
68+
if !ok {
69+
b = proto.NewBuffer(make([]byte, 0, 4096))
70+
}
71+
defer func() {
72+
b.Reset()
73+
writeBufPool.Put(b)
74+
}()
75+
76+
err := b.Marshal(v)
5477
if err != nil {
5578
return xerrors.Errorf("failed to marshal protobuf: %w", err)
5679
}
5780

58-
return c.Write(ctx, websocket.MessageBinary, b)
81+
return c.Write(ctx, websocket.MessageBinary, b.Bytes())
5982
}

0 commit comments

Comments
 (0)