Skip to content

Commit 8e69d43

Browse files
committed
net: add Buffers type, do writev on unix
No fast path currently for solaris, windows, nacl, plan9. Fixes #13451 Change-Id: I24b3233a2e3a57fc6445e276a5c0d7b097884007 Reviewed-on: https://go-review.googlesource.com/29951 Reviewed-by: Ian Lance Taylor <[email protected]> Run-TryBot: Brad Fitzpatrick <[email protected]> TryBot-Result: Gobot Gobot <[email protected]>
1 parent ffd1c78 commit 8e69d43

File tree

6 files changed

+365
-0
lines changed

6 files changed

+365
-0
lines changed

src/net/fd_unix.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ type netFD struct {
2929
laddr Addr
3030
raddr Addr
3131

32+
// writev cache.
33+
iovecs *[]syscall.Iovec
34+
3235
// wait server
3336
pd pollDesc
3437
}

src/net/net.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -604,3 +604,66 @@ func acquireThread() {
604604
func releaseThread() {
605605
<-threadLimit
606606
}
607+
608+
// buffersWriter is the interface implemented by Conns that support a
609+
// "writev"-like batch write optimization.
610+
// writeBuffers should fully consume and write all chunks from the
611+
// provided Buffers, else it should report a non-nil error.
612+
type buffersWriter interface {
613+
writeBuffers(*Buffers) (int64, error)
614+
}
615+
616+
var testHookDidWritev = func(wrote int) {}
617+
618+
// Buffers contains zero or more runs of bytes to write.
619+
//
620+
// On certain machines, for certain types of connections, this is
621+
// optimized into an OS-specific batch write operation (such as
622+
// "writev").
623+
type Buffers [][]byte
624+
625+
var (
626+
_ io.WriterTo = (*Buffers)(nil)
627+
_ io.Reader = (*Buffers)(nil)
628+
)
629+
630+
func (v *Buffers) WriteTo(w io.Writer) (n int64, err error) {
631+
if wv, ok := w.(buffersWriter); ok {
632+
return wv.writeBuffers(v)
633+
}
634+
for _, b := range *v {
635+
nb, err := w.Write(b)
636+
n += int64(nb)
637+
if err != nil {
638+
v.consume(n)
639+
return n, err
640+
}
641+
}
642+
v.consume(n)
643+
return n, nil
644+
}
645+
646+
func (v *Buffers) Read(p []byte) (n int, err error) {
647+
for len(p) > 0 && len(*v) > 0 {
648+
n0 := copy(p, (*v)[0])
649+
v.consume(int64(n0))
650+
p = p[n0:]
651+
n += n0
652+
}
653+
if len(*v) == 0 {
654+
err = io.EOF
655+
}
656+
return
657+
}
658+
659+
func (v *Buffers) consume(n int64) {
660+
for len(*v) > 0 {
661+
ln0 := int64(len((*v)[0]))
662+
if ln0 > n {
663+
(*v)[0] = (*v)[0][n:]
664+
return
665+
}
666+
n -= ln0
667+
*v = (*v)[1:]
668+
}
669+
}

src/net/net_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,3 +414,38 @@ func TestZeroByteRead(t *testing.T) {
414414
}
415415
}
416416
}
417+
418+
// withTCPConnPair sets up a TCP connection between two peers, then
419+
// runs peer1 and peer2 concurrently. withTCPConnPair returns when
420+
// both have completed.
421+
func withTCPConnPair(t *testing.T, peer1, peer2 func(c *TCPConn) error) {
422+
ln, err := newLocalListener("tcp")
423+
if err != nil {
424+
t.Fatal(err)
425+
}
426+
defer ln.Close()
427+
errc := make(chan error, 2)
428+
go func() {
429+
c1, err := ln.Accept()
430+
if err != nil {
431+
errc <- err
432+
return
433+
}
434+
defer c1.Close()
435+
errc <- peer1(c1.(*TCPConn))
436+
}()
437+
go func() {
438+
c2, err := Dial("tcp", ln.Addr().String())
439+
if err != nil {
440+
errc <- err
441+
return
442+
}
443+
defer c2.Close()
444+
errc <- peer2(c2.(*TCPConn))
445+
}()
446+
for i := 0; i < 2; i++ {
447+
if err := <-errc; err != nil {
448+
t.Fatal(err)
449+
}
450+
}
451+
}

src/net/writev_test.go

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
// Copyright 2016 The Go Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
package net
6+
7+
import (
8+
"bytes"
9+
"fmt"
10+
"io"
11+
"io/ioutil"
12+
"reflect"
13+
"runtime"
14+
"sync"
15+
"testing"
16+
)
17+
18+
func TestBuffers_read(t *testing.T) {
19+
const story = "once upon a time in Gopherland ... "
20+
buffers := Buffers{
21+
[]byte("once "),
22+
[]byte("upon "),
23+
[]byte("a "),
24+
[]byte("time "),
25+
[]byte("in "),
26+
[]byte("Gopherland ... "),
27+
}
28+
got, err := ioutil.ReadAll(&buffers)
29+
if err != nil {
30+
t.Fatal(err)
31+
}
32+
if string(got) != story {
33+
t.Errorf("read %q; want %q", got, story)
34+
}
35+
if len(buffers) != 0 {
36+
t.Errorf("len(buffers) = %d; want 0", len(buffers))
37+
}
38+
}
39+
40+
func TestBuffers_consume(t *testing.T) {
41+
tests := []struct {
42+
in Buffers
43+
consume int64
44+
want Buffers
45+
}{
46+
{
47+
in: Buffers{[]byte("foo"), []byte("bar")},
48+
consume: 0,
49+
want: Buffers{[]byte("foo"), []byte("bar")},
50+
},
51+
{
52+
in: Buffers{[]byte("foo"), []byte("bar")},
53+
consume: 2,
54+
want: Buffers{[]byte("o"), []byte("bar")},
55+
},
56+
{
57+
in: Buffers{[]byte("foo"), []byte("bar")},
58+
consume: 3,
59+
want: Buffers{[]byte("bar")},
60+
},
61+
{
62+
in: Buffers{[]byte("foo"), []byte("bar")},
63+
consume: 4,
64+
want: Buffers{[]byte("ar")},
65+
},
66+
{
67+
in: Buffers{nil, nil, nil, []byte("bar")},
68+
consume: 1,
69+
want: Buffers{[]byte("ar")},
70+
},
71+
{
72+
in: Buffers{nil, nil, nil, []byte("foo")},
73+
consume: 0,
74+
want: Buffers{[]byte("foo")},
75+
},
76+
{
77+
in: Buffers{nil, nil, nil},
78+
consume: 0,
79+
want: Buffers{},
80+
},
81+
}
82+
for i, tt := range tests {
83+
in := tt.in
84+
in.consume(tt.consume)
85+
if !reflect.DeepEqual(in, tt.want) {
86+
t.Errorf("%d. after consume(%d) = %+v, want %+v", i, tt.consume, in, tt.want)
87+
}
88+
}
89+
}
90+
91+
func TestBuffers_WriteTo(t *testing.T) {
92+
for _, name := range []string{"WriteTo", "Copy"} {
93+
for _, size := range []int{0, 10, 1023, 1024, 1025} {
94+
t.Run(fmt.Sprintf("%s/%d", name, size), func(t *testing.T) {
95+
testBuffer_writeTo(t, size, name == "Copy")
96+
})
97+
}
98+
}
99+
}
100+
101+
func testBuffer_writeTo(t *testing.T, chunks int, useCopy bool) {
102+
oldHook := testHookDidWritev
103+
defer func() { testHookDidWritev = oldHook }()
104+
var writeLog struct {
105+
sync.Mutex
106+
log []int
107+
}
108+
testHookDidWritev = func(size int) {
109+
writeLog.Lock()
110+
writeLog.log = append(writeLog.log, size)
111+
writeLog.Unlock()
112+
}
113+
var want bytes.Buffer
114+
for i := 0; i < chunks; i++ {
115+
want.WriteByte(byte(i))
116+
}
117+
118+
withTCPConnPair(t, func(c *TCPConn) error {
119+
buffers := make(Buffers, chunks)
120+
for i := range buffers {
121+
buffers[i] = want.Bytes()[i : i+1]
122+
}
123+
var n int64
124+
var err error
125+
if useCopy {
126+
n, err = io.Copy(c, &buffers)
127+
} else {
128+
n, err = buffers.WriteTo(c)
129+
}
130+
if err != nil {
131+
return err
132+
}
133+
if len(buffers) != 0 {
134+
return fmt.Errorf("len(buffers) = %d; want 0", len(buffers))
135+
}
136+
if n != int64(want.Len()) {
137+
return fmt.Errorf("Buffers.WriteTo returned %d; want %d", n, want.Len())
138+
}
139+
return nil
140+
}, func(c *TCPConn) error {
141+
all, err := ioutil.ReadAll(c)
142+
if !bytes.Equal(all, want.Bytes()) || err != nil {
143+
return fmt.Errorf("client read %q, %v; want %q, nil", all, err, want.Bytes())
144+
}
145+
146+
writeLog.Lock() // no need to unlock
147+
var gotSum int
148+
for _, v := range writeLog.log {
149+
gotSum += v
150+
}
151+
152+
var wantSum int
153+
var wantMinCalls int
154+
switch runtime.GOOS {
155+
case "darwin", "dragonfly", "freebsd", "linux", "netbsd", "openbsd":
156+
wantSum = want.Len()
157+
v := chunks
158+
for v > 0 {
159+
wantMinCalls++
160+
v -= 1024
161+
}
162+
}
163+
if len(writeLog.log) < wantMinCalls {
164+
t.Errorf("write calls = %v < wanted min %v", len(writeLog.log), wantMinCalls)
165+
}
166+
if gotSum != wantSum {
167+
t.Errorf("writev call sum = %v; want %v", gotSum, wantSum)
168+
}
169+
return nil
170+
})
171+
}

src/net/writev_unix.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
// Copyright 2016 The Go Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
// +build darwin dragonfly freebsd linux netbsd openbsd
6+
7+
package net
8+
9+
import (
10+
"io"
11+
"os"
12+
"syscall"
13+
"unsafe"
14+
)
15+
16+
func (c *conn) writeBuffers(v *Buffers) (int64, error) {
17+
if !c.ok() {
18+
return 0, syscall.EINVAL
19+
}
20+
n, err := c.fd.writeBuffers(v)
21+
if err != nil {
22+
return n, &OpError{Op: "writev", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
23+
}
24+
return n, nil
25+
}
26+
27+
func (fd *netFD) writeBuffers(v *Buffers) (n int64, err error) {
28+
if err := fd.writeLock(); err != nil {
29+
return 0, err
30+
}
31+
defer fd.writeUnlock()
32+
if err := fd.pd.prepareWrite(); err != nil {
33+
return 0, err
34+
}
35+
36+
var iovecs []syscall.Iovec
37+
if fd.iovecs != nil {
38+
iovecs = *fd.iovecs
39+
}
40+
// TODO: read from sysconf(_SC_IOV_MAX)? The Linux default is
41+
// 1024 and this seems conservative enough for now. Darwin's
42+
// UIO_MAXIOV also seems to be 1024.
43+
maxVec := 1024
44+
45+
for len(*v) > 0 {
46+
iovecs = iovecs[:0]
47+
for _, chunk := range *v {
48+
if len(chunk) == 0 {
49+
continue
50+
}
51+
iovecs = append(iovecs, syscall.Iovec{Base: &chunk[0]})
52+
iovecs[len(iovecs)-1].SetLen(len(chunk))
53+
if len(iovecs) == maxVec {
54+
break
55+
}
56+
}
57+
if len(iovecs) == 0 {
58+
break
59+
}
60+
fd.iovecs = &iovecs // cache
61+
62+
wrote, _, e0 := syscall.Syscall(syscall.SYS_WRITEV,
63+
uintptr(fd.sysfd),
64+
uintptr(unsafe.Pointer(&iovecs[0])),
65+
uintptr(len(iovecs)))
66+
if wrote < 0 {
67+
wrote = 0
68+
}
69+
testHookDidWritev(int(wrote))
70+
n += int64(wrote)
71+
v.consume(int64(wrote))
72+
if e0 == syscall.EAGAIN {
73+
if err = fd.pd.waitWrite(); err == nil {
74+
continue
75+
}
76+
} else if e0 != 0 {
77+
err = syscall.Errno(e0)
78+
}
79+
if err != nil {
80+
break
81+
}
82+
if n == 0 {
83+
err = io.ErrUnexpectedEOF
84+
break
85+
}
86+
}
87+
if _, ok := err.(syscall.Errno); ok {
88+
err = os.NewSyscallError("writev", err)
89+
}
90+
return n, err
91+
}

src/syscall/syscall_nacl.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,3 +296,5 @@ func RouteRIB(facility, param int) ([]byte, error) { return nil,
296296
func ParseRoutingMessage(b []byte) ([]RoutingMessage, error) { return nil, ENOSYS }
297297
func ParseRoutingSockaddr(msg RoutingMessage) ([]Sockaddr, error) { return nil, ENOSYS }
298298
func SysctlUint32(name string) (value uint32, err error) { return 0, ENOSYS }
299+
300+
type Iovec struct{} // dummy

0 commit comments

Comments
 (0)