Skip to content

Commit f5f8089

Browse files
committed
WIP cleanup
1 parent dff4af3 commit f5f8089

File tree

10 files changed

+215
-147
lines changed

10 files changed

+215
-147
lines changed

compress_notjs.go

+32-12
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@
33
package websocket
44

55
import (
6-
"compress/flate"
76
"io"
87
"net/http"
98
"sync"
9+
10+
"github.com/klauspost/compress/flate"
1011
)
1112

1213
func (m CompressionMode) opts() *compressionOptions {
@@ -45,10 +46,16 @@ type trimLastFourBytesWriter struct {
4546
}
4647

4748
func (tw *trimLastFourBytesWriter) reset() {
48-
tw.tail = tw.tail[:0]
49+
if tw != nil && tw.tail != nil {
50+
tw.tail = tw.tail[:0]
51+
}
4952
}
5053

5154
func (tw *trimLastFourBytesWriter) Write(p []byte) (int, error) {
55+
if tw.tail == nil {
56+
tw.tail = make([]byte, 0, 4)
57+
}
58+
5259
extra := len(tw.tail) + len(p) - 4
5360

5461
if extra <= 0 {
@@ -65,7 +72,10 @@ func (tw *trimLastFourBytesWriter) Write(p []byte) (int, error) {
6572
if err != nil {
6673
return 0, err
6774
}
68-
tw.tail = tw.tail[extra:]
75+
76+
// Shift remaining bytes in tail over.
77+
n := copy(tw.tail, tw.tail[extra:])
78+
tw.tail = tw.tail[:n]
6979
}
7080

7181
// If p is less than or equal to 4 bytes,
@@ -118,22 +128,32 @@ type slidingWindow struct {
118128
buf []byte
119129
}
120130

121-
var swPoolMu sync.Mutex
131+
var swPoolMu sync.RWMutex
122132
var swPool = map[int]*sync.Pool{}
123133

124-
func (sw *slidingWindow) init(n int) {
125-
if sw.buf != nil {
126-
return
134+
func slidingWindowPool(n int) *sync.Pool {
135+
swPoolMu.RLock()
136+
p, ok := swPool[n]
137+
swPoolMu.RUnlock()
138+
if ok {
139+
return p
127140
}
128141

142+
p = &sync.Pool{}
143+
129144
swPoolMu.Lock()
130-
defer swPoolMu.Unlock()
145+
swPool[n] = p
146+
swPoolMu.Unlock()
131147

132-
p, ok := swPool[n]
133-
if !ok {
134-
p = &sync.Pool{}
135-
swPool[n] = p
148+
return p
149+
}
150+
151+
func (sw *slidingWindow) init(n int) {
152+
if sw.buf != nil {
153+
return
136154
}
155+
156+
p := slidingWindowPool(n)
137157
buf, ok := p.Get().([]byte)
138158
if ok {
139159
sw.buf = buf[:0]

conn_notjs.go

+9-8
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,17 @@ type Conn struct {
3939

4040
// Read state.
4141
readMu *mu
42-
readHeader header
42+
readHeaderBuf [8]byte
4343
readControlBuf [maxControlPayload]byte
4444
msgReader *msgReader
4545
readCloseFrameErr error
4646

4747
// Write state.
48-
msgWriter *msgWriter
49-
writeFrameMu *mu
50-
writeBuf []byte
51-
writeHeader header
48+
msgWriterState *msgWriterState
49+
writeFrameMu *mu
50+
writeBuf []byte
51+
writeHeaderBuf [8]byte
52+
writeHeader header
5253

5354
closed chan struct{}
5455
closeMu sync.Mutex
@@ -94,14 +95,14 @@ func newConn(cfg connConfig) *Conn {
9495

9596
c.msgReader = newMsgReader(c)
9697

97-
c.msgWriter = newMsgWriter(c)
98+
c.msgWriterState = newMsgWriterState(c)
9899
if c.client {
99100
c.writeBuf = extractBufioWriterBuf(c.bw, c.rwc)
100101
}
101102

102103
if c.flate() && c.flateThreshold == 0 {
103104
c.flateThreshold = 256
104-
if !c.msgWriter.flateContextTakeover() {
105+
if !c.msgWriterState.flateContextTakeover() {
105106
c.flateThreshold = 512
106107
}
107108
}
@@ -142,7 +143,7 @@ func (c *Conn) close(err error) {
142143
c.writeFrameMu.Lock(context.Background())
143144
putBufioWriter(c.bw)
144145
}
145-
c.msgWriter.close()
146+
c.msgWriterState.close()
146147

147148
c.msgReader.close()
148149
if c.client {

conn_test.go

+23-26
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@ package websocket_test
55
import (
66
"bytes"
77
"context"
8-
"crypto/rand"
98
"fmt"
109
"io"
1110
"io/ioutil"
1211
"net/http"
1312
"net/http/httptest"
1413
"os"
1514
"os/exec"
15+
"strings"
1616
"sync"
1717
"testing"
1818
"time"
@@ -379,60 +379,52 @@ func BenchmarkConn(b *testing.B) {
379379
mode websocket.CompressionMode
380380
}{
381381
{
382-
name: "compressionDisabled",
382+
name: "disabledCompress",
383383
mode: websocket.CompressionDisabled,
384384
},
385385
{
386-
name: "compression",
386+
name: "compress",
387387
mode: websocket.CompressionContextTakeover,
388388
},
389389
{
390-
name: "noContextCompression",
390+
name: "compressNoContext",
391391
mode: websocket.CompressionNoContextTakeover,
392392
},
393393
}
394394
for _, bc := range benchCases {
395395
b.Run(bc.name, func(b *testing.B) {
396396
bb, c1, c2 := newConnTest(b, &websocket.DialOptions{
397397
CompressionOptions: &websocket.CompressionOptions{Mode: bc.mode},
398-
}, nil)
398+
}, &websocket.AcceptOptions{
399+
CompressionOptions: &websocket.CompressionOptions{Mode: bc.mode},
400+
})
399401
defer bb.cleanup()
400402

401403
bb.goEchoLoop(c2)
402404

403-
const n = 32768
404-
writeBuf := make([]byte, n)
405-
readBuf := make([]byte, n)
406-
writes := make(chan websocket.MessageType)
405+
msg := []byte(strings.Repeat("1234", 128))
406+
readBuf := make([]byte, len(msg))
407+
writes := make(chan struct{})
407408
defer close(writes)
408409
werrs := make(chan error)
409410

410411
go func() {
411-
for typ := range writes {
412-
werrs <- c1.Write(bb.ctx, typ, writeBuf)
412+
for range writes {
413+
werrs <- c1.Write(bb.ctx, websocket.MessageText, msg)
413414
}
414415
}()
415-
b.SetBytes(n)
416+
b.SetBytes(int64(len(msg)))
416417
b.ReportAllocs()
417418
b.ResetTimer()
418419
for i := 0; i < b.N; i++ {
419-
_, err := rand.Reader.Read(writeBuf)
420-
if err != nil {
421-
b.Fatal(err)
422-
}
423-
424-
expType := websocket.MessageBinary
425-
if writeBuf[0]%2 == 1 {
426-
expType = websocket.MessageText
427-
}
428-
writes <- expType
420+
writes <- struct{}{}
429421

430422
typ, r, err := c1.Reader(bb.ctx)
431423
if err != nil {
432424
b.Fatal(err)
433425
}
434-
if expType != typ {
435-
assert.Equal(b, "data type", expType, typ)
426+
if websocket.MessageText != typ {
427+
assert.Equal(b, "data type", websocket.MessageText, typ)
436428
}
437429

438430
_, err = io.ReadFull(r, readBuf)
@@ -448,8 +440,8 @@ func BenchmarkConn(b *testing.B) {
448440
assert.Equal(b, "n2", 0, n2)
449441
}
450442

451-
if !bytes.Equal(writeBuf, readBuf) {
452-
assert.Equal(b, "msg", writeBuf, readBuf)
443+
if !bytes.Equal(msg, readBuf) {
444+
assert.Equal(b, "msg", msg, readBuf)
453445
}
454446

455447
err = <-werrs
@@ -464,3 +456,8 @@ func BenchmarkConn(b *testing.B) {
464456
})
465457
}
466458
}
459+
460+
func TestCompression(t *testing.T) {
461+
t.Parallel()
462+
463+
}

frame.go

+26-15
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@ package websocket
33
import (
44
"bufio"
55
"encoding/binary"
6+
"io"
67
"math"
78
"math/bits"
89

10+
"golang.org/x/xerrors"
11+
912
"nhooyr.io/websocket/internal/errd"
1013
)
1114

@@ -46,12 +49,12 @@ type header struct {
4649

4750
// readFrameHeader reads a header from the reader.
4851
// See https://tools.ietf.org/html/rfc6455#section-5.2.
49-
func readFrameHeader(h *header, r *bufio.Reader) (err error) {
52+
func readFrameHeader(r *bufio.Reader, readBuf []byte) (h header, err error) {
5053
defer errd.Wrap(&err, "failed to read frame header")
5154

5255
b, err := r.ReadByte()
5356
if err != nil {
54-
return err
57+
return header{}, err
5558
}
5659

5760
h.fin = b&(1<<7) != 0
@@ -63,7 +66,7 @@ func readFrameHeader(h *header, r *bufio.Reader) (err error) {
6366

6467
b, err = r.ReadByte()
6568
if err != nil {
66-
return err
69+
return header{}, err
6770
}
6871

6972
h.masked = b&(1<<7) != 0
@@ -73,24 +76,29 @@ func readFrameHeader(h *header, r *bufio.Reader) (err error) {
7376
case payloadLength < 126:
7477
h.payloadLength = int64(payloadLength)
7578
case payloadLength == 126:
76-
var pl uint16
77-
err = binary.Read(r, binary.BigEndian, &pl)
78-
h.payloadLength = int64(pl)
79+
_, err = io.ReadFull(r, readBuf[:2])
80+
h.payloadLength = int64(binary.BigEndian.Uint16(readBuf))
7981
case payloadLength == 127:
80-
err = binary.Read(r, binary.BigEndian, &h.payloadLength)
82+
_, err = io.ReadFull(r, readBuf)
83+
h.payloadLength = int64(binary.BigEndian.Uint64(readBuf))
8184
}
8285
if err != nil {
83-
return err
86+
return header{}, err
87+
}
88+
89+
if h.payloadLength < 0 {
90+
return header{}, xerrors.Errorf("received negative payload length: %v", h.payloadLength)
8491
}
8592

8693
if h.masked {
87-
err = binary.Read(r, binary.LittleEndian, &h.maskKey)
94+
_, err = io.ReadFull(r, readBuf[:4])
8895
if err != nil {
89-
return err
96+
return header{}, err
9097
}
98+
h.maskKey = binary.LittleEndian.Uint32(readBuf)
9199
}
92100

93-
return nil
101+
return h, nil
94102
}
95103

96104
// maxControlPayload is the maximum length of a control frame payload.
@@ -99,7 +107,7 @@ const maxControlPayload = 125
99107

100108
// writeFrameHeader writes the bytes of the header to w.
101109
// See https://tools.ietf.org/html/rfc6455#section-5.2
102-
func writeFrameHeader(h header, w *bufio.Writer) (err error) {
110+
func writeFrameHeader(h header, w *bufio.Writer, buf []byte) (err error) {
103111
defer errd.Wrap(&err, "failed to write frame header")
104112

105113
var b byte
@@ -143,16 +151,19 @@ func writeFrameHeader(h header, w *bufio.Writer) (err error) {
143151

144152
switch {
145153
case h.payloadLength > math.MaxUint16:
146-
err = binary.Write(w, binary.BigEndian, h.payloadLength)
154+
binary.BigEndian.PutUint64(buf, uint64(h.payloadLength))
155+
_, err = w.Write(buf)
147156
case h.payloadLength > 125:
148-
err = binary.Write(w, binary.BigEndian, uint16(h.payloadLength))
157+
binary.BigEndian.PutUint16(buf, uint16(h.payloadLength))
158+
_, err = w.Write(buf[:2])
149159
}
150160
if err != nil {
151161
return err
152162
}
153163

154164
if h.masked {
155-
err = binary.Write(w, binary.LittleEndian, h.maskKey)
165+
binary.LittleEndian.PutUint32(buf, h.maskKey)
166+
_, err = w.Write(buf[:4])
156167
if err != nil {
157168
return err
158169
}

frame_test.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -80,14 +80,13 @@ func testHeader(t *testing.T, h header) {
8080
w := bufio.NewWriter(b)
8181
r := bufio.NewReader(b)
8282

83-
err := writeFrameHeader(h, w)
83+
err := writeFrameHeader(h, w, make([]byte, 8))
8484
assert.Success(t, err)
8585

8686
err = w.Flush()
8787
assert.Success(t, err)
8888

89-
var h2 header
90-
err = readFrameHeader(&h2, r)
89+
h2, err := readFrameHeader(r, make([]byte, 8))
9190
assert.Success(t, err)
9291

9392
assert.Equal(t, "read header", h, h2)

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ require (
99
github.com/golang/protobuf v1.3.3
1010
github.com/google/go-cmp v0.4.0
1111
github.com/gorilla/websocket v1.4.1
12+
github.com/klauspost/compress v1.10.0
1213
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
1314
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543
1415
)

go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
1010
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
1111
github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM=
1212
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
13+
github.com/klauspost/compress v1.10.0 h1:92XGj1AcYzA6UrVdd4qIIBrT8OroryvRvdmg/IfmC7Y=
14+
github.com/klauspost/compress v1.10.0/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
1315
golang.org/x/time v0.0.0-20191024005414-555d28b269f0 h1:/5xXl8Y5W96D+TtHSlonuFqGHIWVuyCkGJLwGh9JJFs=
1416
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
1517
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=

internal/xsync/go.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import (
66

77
// Go allows running a function in another goroutine
88
// and waiting for its error.
9-
func Go(fn func() error) <- chan error {
9+
func Go(fn func() error) <-chan error {
1010
errs := make(chan error, 1)
1111
go func() {
1212
defer func() {

0 commit comments

Comments
 (0)