Skip to content

Commit 2960014

Browse files
committed
Handle recvPacket in a single goroutine.
Previously recvPacket would be invoked in several goroutines. This meant that when multiple concurrent requests were in flight there were N goroutines each waiting on recvPacket. For optimal throughput the goal is to send a new request as quickly as possible once a response is received. The previous mechanism worked counter to this because the goroutine sending new requests would be competing against N recvPacket goroutines that may become runnable as data streams in. Having a single goroutine responsible for recvPacket means that the recv and send goroutines will ping-pong back and forth optimizing throughput. This changes shows a ~10-25% increase in throughput in the the *Delay* benchmark tests. $ go test -bench=. -integration PASS BenchmarkRead1k 2 840068631 ns/op 12.48 MB/s BenchmarkRead16k 20 72968548 ns/op 143.70 MB/s BenchmarkRead32k 30 56871347 ns/op 184.38 MB/s BenchmarkRead128k 100 34150953 ns/op 307.05 MB/s BenchmarkRead512k 100 15730685 ns/op 666.59 MB/s BenchmarkRead1MiB 200 10462421 ns/op 1002.24 MB/s BenchmarkRead4MiB 200 7325236 ns/op 1431.47 MB/s BenchmarkRead4MiBDelay10Msec 10 186893765 ns/op 56.11 MB/s BenchmarkRead4MiBDelay50Msec 2 907127114 ns/op 11.56 MB/s BenchmarkRead4MiBDelay150Msec 1 2708025060 ns/op 3.87 MB/s BenchmarkWrite1k 1 1623940932 ns/op 6.46 MB/s BenchmarkWrite16k 10 174293843 ns/op 60.16 MB/s BenchmarkWrite32k 10 120377272 ns/op 87.11 MB/s BenchmarkWrite128k 20 54592205 ns/op 192.08 MB/s BenchmarkWrite512k 50 66449591 ns/op 157.80 MB/s BenchmarkWrite1MiB 50 70965660 ns/op 147.76 MB/s BenchmarkWrite4MiB 50 69234861 ns/op 151.45 MB/s BenchmarkWrite4MiBDelay10Msec 5 276624260 ns/op 37.91 MB/s BenchmarkWrite4MiBDelay50Msec 1 1318396552 ns/op 7.95 MB/s BenchmarkWrite4MiBDelay150Msec 1 3918416658 ns/op 2.68 MB/s BenchmarkCopyDown10MiBDelay10Msec 10 152240808 ns/op 68.88 MB/s BenchmarkCopyDown10MiBDelay50Msec 2 715003188 ns/op 14.67 MB/s BenchmarkCopyDown10MiBDelay150Msec 1 2116878801 ns/op 4.95 MB/s BenchmarkCopyUp10MiBDelay10Msec 10 192748258 ns/op 54.40 MB/s BenchmarkCopyUp10MiBDelay50Msec 2 691486538 ns/op 15.16 MB/s BenchmarkCopyUp10MiBDelay150Msec 1 1997162991 ns/op 5.25 MB/s BenchmarkMarshalInit 2000000 644 ns/op BenchmarkMarshalOpen 3000000 562 ns/op BenchmarkMarshalWriteWorstCase 20000 75166 ns/op BenchmarkMarshalWrite1k 500000 3862 ns/op ok github.com/pkg/sftp 71.174s
1 parent de236e8 commit 2960014

File tree

2 files changed

+42
-34
lines changed

2 files changed

+42
-34
lines changed

client.go

Lines changed: 42 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,12 @@ func NewClientPipe(rd io.Reader, wr io.WriteCloser, opts ...func(*Client) error)
6868
wr.Close()
6969
return nil, err
7070
}
71-
return sftp, sftp.recvVersion()
71+
if err := sftp.recvVersion(); err != nil {
72+
wr.Close()
73+
return nil, err
74+
}
75+
go sftp.recv()
76+
return sftp, nil
7277
}
7378

7479
// Client represents an SFTP session on a *ssh.ClientConn SSH connection.
@@ -127,6 +132,41 @@ func (c *Client) recvVersion() error {
127132
return nil
128133
}
129134

135+
func (c *Client) recv() {
136+
sendAll := func(res result) {
137+
c.mu.Lock()
138+
listeners := make([]chan<- result, len(c.inflight))
139+
for _, ch := range c.inflight {
140+
listeners = append(listeners, ch)
141+
}
142+
c.mu.Unlock()
143+
for _, ch := range listeners {
144+
ch <- res
145+
}
146+
}
147+
for {
148+
typ, data, err := recvPacket(c.r)
149+
if err != nil {
150+
// Return the error to all listeners.
151+
sendAll(result{err: err})
152+
return
153+
}
154+
sid, _ := unmarshalUint32(data)
155+
c.mu.Lock()
156+
ch, ok := c.inflight[sid]
157+
delete(c.inflight, sid)
158+
c.mu.Unlock()
159+
if !ok {
160+
// This is an unexpected occurrence. Send the error
161+
// back to all listeners so that they terminate
162+
// gracefully.
163+
sendAll(result{err: fmt.Errorf("sid: %v not fond", sid)})
164+
return
165+
}
166+
ch <- result{typ: typ, data: data}
167+
}
168+
}
169+
130170
// Walk returns a new Walker rooted at root.
131171
func (c *Client) Walk(root string) *fs.Walker {
132172
return fs.WalkFS(root, c)
@@ -526,33 +566,9 @@ func (c *Client) dispatchRequest(ch chan<- result, p idmarshaler) {
526566
err := sendPacket(c.w, p)
527567
if err != nil {
528568
go func() { ch <- result{err: err} }()
569+
delete(c.inflight, p.id())
529570
return
530571
}
531-
go func() {
532-
c.mu.Lock()
533-
typ, data, err := recvPacket(c.r)
534-
if err != nil {
535-
c.mu.Unlock()
536-
ch <- result{err: err}
537-
return
538-
}
539-
540-
// the packet we received may not be the one we sent
541-
// look up the channel of the owner and dispatch it to
542-
// the sender.
543-
sid, _ := unmarshalUint32(data)
544-
ch1, ok := c.inflight[sid]
545-
delete(c.inflight, sid)
546-
c.mu.Unlock()
547-
if !ok {
548-
// send error back to the caller which started this goroutine
549-
// this may not be the caller who issued the request that we
550-
// have a response for.
551-
ch <- result{err: fmt.Errorf("sid: %v not found", sid)}
552-
return
553-
}
554-
ch1 <- result{typ: typ, data: data}
555-
}()
556572
}
557573

558574
// Creates the specified directory. An error will be returned if a file or

client_integration_test.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -112,14 +112,6 @@ func testClient(t testing.TB, readonly bool, delay time.Duration) (*Client, *exe
112112
t.Fatal(err)
113113
}
114114

115-
if err := sftp.sendInit(); err != nil {
116-
defer cmd.Wait()
117-
t.Fatal(err)
118-
}
119-
if err := sftp.recvVersion(); err != nil {
120-
defer cmd.Wait()
121-
t.Fatal(err)
122-
}
123115
return sftp, cmd
124116
}
125117

0 commit comments

Comments
 (0)